diff --git a/python_rewrite/src/tempering_machine/services/web/routers/websockets.py b/python_rewrite/src/tempering_machine/services/web/routers/websockets.py new file mode 100644 index 0000000..780994c --- /dev/null +++ b/python_rewrite/src/tempering_machine/services/web/routers/websockets.py @@ -0,0 +1,71 @@ +""" +WebSocket endpoints for real-time communication. +""" + +import json +import logging +from typing import Optional + +from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Query + +from ..websocket import connection_manager, handle_websocket_message, websocket_connection + + +logger = logging.getLogger(__name__) + +router = APIRouter() + + +@router.websocket("/ws") +async def websocket_endpoint( + websocket: WebSocket, + client_id: Optional[str] = Query(None, description="Client identifier") +): + """Main WebSocket endpoint for real-time data streaming.""" + async with websocket_connection(websocket, client_id): + try: + while True: + # Receive message from client + data = await websocket.receive_text() + + try: + message = json.loads(data) + await handle_websocket_message(websocket, message) + except json.JSONDecodeError: + await connection_manager._send_message(websocket, { + "type": "error", + "message": "Invalid JSON message", + "timestamp": connection_manager._get_timestamp() + }) + except Exception as e: + logger.error(f"Error handling WebSocket message: {e}") + await connection_manager._send_message(websocket, { + "type": "error", + "message": "Message processing error", + "timestamp": connection_manager._get_timestamp() + }) + + except WebSocketDisconnect: + logger.info(f"WebSocket client {client_id or 'anonymous'} disconnected") + except Exception as e: + logger.error(f"WebSocket endpoint error: {e}") + + +@router.get("/ws/stats") +async def websocket_stats(): + """Get WebSocket connection statistics.""" + return connection_manager.get_connection_stats() + + +@router.post("/ws/broadcast") +async def broadcast_message(message: dict): + """Broadcast a message to all connected WebSocket clients.""" + await connection_manager.broadcast_to_all(message) + return {"success": True, "message": "Message broadcasted to all clients"} + + +@router.post("/ws/broadcast/{channel}") +async def broadcast_to_channel(channel: str, message: dict): + """Broadcast a message to clients subscribed to a specific channel.""" + await connection_manager.broadcast_to_channel(channel, message) + return {"success": True, "message": f"Message broadcasted to {channel} channel"} \ No newline at end of file diff --git a/python_rewrite/src/tempering_machine/services/web/websocket.py b/python_rewrite/src/tempering_machine/services/web/websocket.py new file mode 100644 index 0000000..ab1530e --- /dev/null +++ b/python_rewrite/src/tempering_machine/services/web/websocket.py @@ -0,0 +1,358 @@ +""" +WebSocket server for real-time data streaming to UI clients. +Provides live updates for temperature, process status, and alarms. +""" + +import asyncio +import json +import logging +from typing import Dict, List, Set, Optional, Any +from datetime import datetime +from contextlib import asynccontextmanager + +from fastapi import WebSocket, WebSocketDisconnect +from fastapi.websockets import WebSocketState + +from ..hardware.hardware_manager import hardware_manager +from ..recipe.recipe_controller import recipe_controller +from ..safety.safety_monitor import safety_monitor + + +logger = logging.getLogger(__name__) + + +class ConnectionManager: + """Manages WebSocket connections and broadcasts.""" + + def __init__(self): + self.active_connections: Set[WebSocket] = set() + self.client_subscriptions: Dict[WebSocket, Set[str]] = {} + self.broadcast_task: Optional[asyncio.Task] = None + self._running = False + + async def connect(self, websocket: WebSocket, client_id: Optional[str] = None): + """Accept and register a new WebSocket connection.""" + await websocket.accept() + self.active_connections.add(websocket) + self.client_subscriptions[websocket] = set() + + logger.info(f"WebSocket client connected: {client_id or 'anonymous'}") + + # Send initial data + await self._send_initial_data(websocket) + + # Start broadcast task if not running + if not self._running: + await self.start_broadcasting() + + async def disconnect(self, websocket: WebSocket): + """Remove a WebSocket connection.""" + if websocket in self.active_connections: + self.active_connections.remove(websocket) + + if websocket in self.client_subscriptions: + del self.client_subscriptions[websocket] + + logger.info("WebSocket client disconnected") + + # Stop broadcasting if no active connections + if not self.active_connections and self._running: + await self.stop_broadcasting() + + async def subscribe(self, websocket: WebSocket, channels: List[str]): + """Subscribe a client to specific data channels.""" + if websocket in self.client_subscriptions: + self.client_subscriptions[websocket].update(channels) + await self._send_message(websocket, { + "type": "subscription_confirmed", + "channels": channels, + "timestamp": datetime.now().isoformat() + }) + + async def unsubscribe(self, websocket: WebSocket, channels: List[str]): + """Unsubscribe a client from specific data channels.""" + if websocket in self.client_subscriptions: + for channel in channels: + self.client_subscriptions[websocket].discard(channel) + + await self._send_message(websocket, { + "type": "unsubscription_confirmed", + "channels": channels, + "timestamp": datetime.now().isoformat() + }) + + async def broadcast_to_channel(self, channel: str, data: Dict[str, Any]): + """Broadcast data to all clients subscribed to a channel.""" + message = { + "channel": channel, + "data": data, + "timestamp": datetime.now().isoformat() + } + + disconnected_clients = set() + + for websocket in self.active_connections.copy(): + if channel in self.client_subscriptions.get(websocket, set()): + try: + if websocket.client_state == WebSocketState.CONNECTED: + await self._send_message(websocket, message) + else: + disconnected_clients.add(websocket) + except Exception as e: + logger.error(f"Error broadcasting to client: {e}") + disconnected_clients.add(websocket) + + # Clean up disconnected clients + for websocket in disconnected_clients: + await self.disconnect(websocket) + + async def broadcast_to_all(self, data: Dict[str, Any]): + """Broadcast data to all connected clients.""" + message = { + "type": "broadcast", + "data": data, + "timestamp": datetime.now().isoformat() + } + + disconnected_clients = set() + + for websocket in self.active_connections.copy(): + try: + if websocket.client_state == WebSocketState.CONNECTED: + await self._send_message(websocket, message) + else: + disconnected_clients.add(websocket) + except Exception as e: + logger.error(f"Error broadcasting to client: {e}") + disconnected_clients.add(websocket) + + # Clean up disconnected clients + for websocket in disconnected_clients: + await self.disconnect(websocket) + + async def start_broadcasting(self): + """Start periodic data broadcasting.""" + if self._running: + return + + self._running = True + self.broadcast_task = asyncio.create_task(self._broadcast_loop()) + logger.info("WebSocket broadcasting started") + + async def stop_broadcasting(self): + """Stop periodic data broadcasting.""" + self._running = False + + if self.broadcast_task: + self.broadcast_task.cancel() + try: + await self.broadcast_task + except asyncio.CancelledError: + pass + + logger.info("WebSocket broadcasting stopped") + + async def _broadcast_loop(self): + """Main broadcasting loop for periodic updates.""" + while self._running: + try: + # Broadcast temperature data + await self._broadcast_temperatures() + + # Broadcast process status + await self._broadcast_process_status() + + # Broadcast system status + await self._broadcast_system_status() + + # Broadcast alarms + await self._broadcast_alarms() + + # Wait for next broadcast cycle + await asyncio.sleep(2.0) # Broadcast every 2 seconds + + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"Error in broadcast loop: {e}") + await asyncio.sleep(1.0) + + async def _broadcast_temperatures(self): + """Broadcast current temperature readings.""" + try: + temperatures = await hardware_manager.get_all_temperatures() + + temp_data = {} + for sensor_name, reading in temperatures.items(): + temp_data[sensor_name] = { + "value": reading.value, + "units": reading.units, + "is_valid": reading.is_valid, + "timestamp": reading.timestamp.isoformat(), + "error_message": reading.error_message + } + + await self.broadcast_to_channel("temperatures", temp_data) + + except Exception as e: + logger.error(f"Error broadcasting temperatures: {e}") + + async def _broadcast_process_status(self): + """Broadcast current process status.""" + try: + process_status = await recipe_controller.get_process_status() + + if process_status: + await self.broadcast_to_channel("process", process_status) + + except Exception as e: + logger.error(f"Error broadcasting process status: {e}") + + async def _broadcast_system_status(self): + """Broadcast system health status.""" + try: + hw_status = hardware_manager.get_hardware_status() + + system_data = { + "communication_health": hw_status.communication_health, + "system_status": hw_status.system_status.value, + "last_update": hw_status.last_update.isoformat() if hw_status.last_update else None, + "motor_count": len(hw_status.motors), + "temperature_sensor_count": len(hw_status.temperatures), + "emergency_stop_active": hw_status.safety.emergency_stop_active, + "cover_sensor_closed": hw_status.safety.cover_sensor_closed + } + + await self.broadcast_to_channel("system", system_data) + + except Exception as e: + logger.error(f"Error broadcasting system status: {e}") + + async def _broadcast_alarms(self): + """Broadcast active alarms.""" + try: + alarm_summary = safety_monitor.get_alarm_summary() + active_alarms = safety_monitor.get_active_alarms() + + alarm_data = { + "summary": alarm_summary, + "active_alarms": [ + { + "id": alarm.id, + "title": alarm.title, + "message": alarm.message, + "category": alarm.category.value, + "priority": alarm.priority.value, + "state": alarm.state.value, + "timestamp": alarm.timestamp.isoformat(), + "occurrence_count": alarm.occurrence_count + } + for alarm in active_alarms + ] + } + + await self.broadcast_to_channel("alarms", alarm_data) + + except Exception as e: + logger.error(f"Error broadcasting alarms: {e}") + + async def _send_initial_data(self, websocket: WebSocket): + """Send initial data to a newly connected client.""" + try: + # Send welcome message + await self._send_message(websocket, { + "type": "welcome", + "message": "Connected to Chocolate Tempering Machine", + "timestamp": datetime.now().isoformat(), + "available_channels": [ + "temperatures", "process", "system", "alarms", "motors", "hardware" + ] + }) + + except Exception as e: + logger.error(f"Error sending initial data: {e}") + + async def _send_message(self, websocket: WebSocket, message: Dict[str, Any]): + """Send a message to a specific WebSocket client.""" + try: + await websocket.send_text(json.dumps(message, default=str)) + except Exception as e: + logger.error(f"Error sending WebSocket message: {e}") + raise + + def get_connection_stats(self) -> Dict[str, Any]: + """Get WebSocket connection statistics.""" + return { + "active_connections": len(self.active_connections), + "total_subscriptions": sum(len(subs) for subs in self.client_subscriptions.values()), + "is_broadcasting": self._running, + "channels": { + "temperatures": sum(1 for subs in self.client_subscriptions.values() if "temperatures" in subs), + "process": sum(1 for subs in self.client_subscriptions.values() if "process" in subs), + "system": sum(1 for subs in self.client_subscriptions.values() if "system" in subs), + "alarms": sum(1 for subs in self.client_subscriptions.values() if "alarms" in subs), + } + } + + +# Global connection manager instance +connection_manager = ConnectionManager() + + +async def handle_websocket_message(websocket: WebSocket, message: Dict[str, Any]): + """Handle incoming WebSocket messages from clients.""" + try: + message_type = message.get("type") + + if message_type == "subscribe": + channels = message.get("channels", []) + await connection_manager.subscribe(websocket, channels) + + elif message_type == "unsubscribe": + channels = message.get("channels", []) + await connection_manager.unsubscribe(websocket, channels) + + elif message_type == "ping": + await connection_manager._send_message(websocket, { + "type": "pong", + "timestamp": datetime.now().isoformat() + }) + + elif message_type == "get_status": + # Send current status immediately + process_status = await recipe_controller.get_process_status() + if process_status: + await connection_manager._send_message(websocket, { + "type": "status_response", + "data": process_status, + "timestamp": datetime.now().isoformat() + }) + + else: + await connection_manager._send_message(websocket, { + "type": "error", + "message": f"Unknown message type: {message_type}", + "timestamp": datetime.now().isoformat() + }) + + except Exception as e: + logger.error(f"Error handling WebSocket message: {e}") + await connection_manager._send_message(websocket, { + "type": "error", + "message": "Internal server error", + "timestamp": datetime.now().isoformat() + }) + + +@asynccontextmanager +async def websocket_connection(websocket: WebSocket, client_id: Optional[str] = None): + """Context manager for WebSocket connections.""" + try: + await connection_manager.connect(websocket, client_id) + yield + except WebSocketDisconnect: + logger.info("WebSocket client disconnected") + except Exception as e: + logger.error(f"WebSocket connection error: {e}") + finally: + await connection_manager.disconnect(websocket) \ No newline at end of file