- Complete MPU-MCU communication setup - LED Matrix text display with configurable parameters - Configuration management with personal data protection - Git template with .gitignore for sensitive data - Automated build and deployment scripts - SSH key management and service scripts
176 lines
6.4 KiB
Python
176 lines
6.4 KiB
Python
"""
|
|
Arduino UNO Q Library
|
|
Bridge communication between MPU (Linux) and MCU (STM32)
|
|
"""
|
|
|
|
import time
|
|
import json
|
|
import sys
|
|
import os
|
|
import logging
|
|
from pathlib import Path
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class ArduinoUNOQ:
|
|
"""Arduino UNO Q Bridge Communication Class"""
|
|
|
|
def __init__(self):
|
|
self.bridge_available = False
|
|
self.setup_bridge()
|
|
|
|
def setup_bridge(self):
|
|
"""Setup Arduino Bridge communication"""
|
|
try:
|
|
# Try to import Arduino Bridge library
|
|
sys.path.append('/usr/lib/python3/dist-packages')
|
|
# Note: Uncomment when Bridge library is available
|
|
# import bridge
|
|
# self.bridge = bridge.Bridge()
|
|
# self.bridge_available = True
|
|
logger.info("Arduino Bridge simulation mode (library not available)")
|
|
self.bridge_available = False
|
|
except ImportError:
|
|
logger.warning("Arduino Bridge not available, running in simulation mode")
|
|
self.bridge_available = False
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize Bridge: {e}")
|
|
self.bridge_available = False
|
|
|
|
def read_mcu_data(self):
|
|
"""Read data from MCU via Bridge"""
|
|
# TODO: Implement your MCU data reading logic here
|
|
if not self.bridge_available:
|
|
return {"status": "simulated", "message": "Bridge not available"}
|
|
|
|
try:
|
|
# Example: Read digital pins
|
|
data = {}
|
|
# TODO: Add your pin reading logic
|
|
# for pin in range(2, 14):
|
|
# value = self.bridge.digitalRead(pin)
|
|
# data[f"d{pin}"] = value
|
|
|
|
return data
|
|
except Exception as e:
|
|
logger.error(f"Error reading MCU data: {e}")
|
|
return {}
|
|
|
|
def write_mcu_pin(self, pin, value):
|
|
"""Write to MCU pin via Bridge"""
|
|
# TODO: Implement your MCU pin writing logic here
|
|
if not self.bridge_available:
|
|
logger.info(f"Simulated: Pin {pin} set to {value}")
|
|
return True
|
|
|
|
try:
|
|
if isinstance(value, str) and value.lower() in ['high', 'low']:
|
|
value = 1 if value.lower() == 'high' else 0
|
|
|
|
# TODO: Add your pin writing logic
|
|
# self.bridge.digitalWrite(pin, value)
|
|
logger.info(f"Pin {pin} set to {value}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error writing to pin {pin}: {e}")
|
|
return False
|
|
|
|
def send_led_matrix_command(self, text, speed=150, display_time=5000, direction=0):
|
|
"""Send LED Matrix command to MCU"""
|
|
command = f'"{text}",{speed},{display_time},{direction}'
|
|
|
|
if not self.bridge_available:
|
|
logger.info(f"Simulated LED Matrix command: {command}")
|
|
return True
|
|
|
|
try:
|
|
# Send command via serial to MCU
|
|
# This would use actual Bridge serial communication
|
|
# self.bridge.serial.write(command.encode())
|
|
logger.info(f"LED Matrix command sent: {command}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error sending LED Matrix command: {e}")
|
|
return False
|
|
|
|
def process_command(self, command):
|
|
"""Process commands from external interface"""
|
|
try:
|
|
cmd_data = json.loads(command)
|
|
cmd_type = cmd_data.get('type')
|
|
|
|
if cmd_type == 'read':
|
|
return self.read_mcu_data()
|
|
elif cmd_type == 'write':
|
|
pin = cmd_data.get('pin')
|
|
value = cmd_data.get('value')
|
|
return {"success": self.write_mcu_pin(pin, value)}
|
|
elif cmd_type == 'led_matrix':
|
|
text = cmd_data.get('text', 'Hallo World')
|
|
speed = cmd_data.get('speed', 150)
|
|
display_time = cmd_data.get('time', 5000)
|
|
direction = cmd_data.get('direction', 0)
|
|
return {"success": self.send_led_matrix_command(text, speed, display_time, direction)}
|
|
elif cmd_type == 'status':
|
|
return {
|
|
"bridge_available": self.bridge_available,
|
|
"uptime": time.time(),
|
|
"status": "running"
|
|
}
|
|
else:
|
|
return {"error": "Unknown command type"}
|
|
|
|
except json.JSONDecodeError:
|
|
return {"error": "Invalid JSON command"}
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
def main_loop(self):
|
|
"""Main application loop"""
|
|
logger.info("Arduino UNO Q MPU Application Started")
|
|
|
|
# Create command pipe for external communication
|
|
cmd_pipe = Path('/tmp/unoq_cmd.pipe')
|
|
status_file = Path('/tmp/unoq_status.json')
|
|
|
|
try:
|
|
# Create named pipe if it doesn't exist
|
|
if not cmd_pipe.exists():
|
|
os.mkfifo(cmd_pipe)
|
|
except:
|
|
pass
|
|
|
|
while True:
|
|
try:
|
|
# TODO: Add your main application logic here
|
|
|
|
# Check for commands
|
|
if cmd_pipe.exists():
|
|
try:
|
|
with open(str(cmd_pipe), 'r') as f:
|
|
command = f.read().strip()
|
|
if command:
|
|
result = self.process_command(command)
|
|
|
|
# Write result to status file
|
|
with open(str(status_file), 'w') as f:
|
|
json.dump(result, f)
|
|
except:
|
|
pass
|
|
|
|
# Periodic status update
|
|
current_data = self.read_mcu_data()
|
|
current_data["timestamp"] = str(int(time.time()))
|
|
current_data["status"] = "running"
|
|
|
|
with open(str(status_file), 'w') as f:
|
|
json.dump(current_data, f)
|
|
|
|
time.sleep(1) # Update every second
|
|
|
|
except KeyboardInterrupt:
|
|
logger.info("Shutting down...")
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Error in main loop: {e}")
|
|
time.sleep(5) # Wait before retrying |