Add WebSocket endpoints for real-time communication and message handling
This commit is contained in:
@@ -0,0 +1,71 @@
|
||||
"""
|
||||
WebSocket endpoints for real-time communication.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Query
|
||||
|
||||
from ..websocket import connection_manager, handle_websocket_message, websocket_connection
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@router.websocket("/ws")
|
||||
async def websocket_endpoint(
|
||||
websocket: WebSocket,
|
||||
client_id: Optional[str] = Query(None, description="Client identifier")
|
||||
):
|
||||
"""Main WebSocket endpoint for real-time data streaming."""
|
||||
async with websocket_connection(websocket, client_id):
|
||||
try:
|
||||
while True:
|
||||
# Receive message from client
|
||||
data = await websocket.receive_text()
|
||||
|
||||
try:
|
||||
message = json.loads(data)
|
||||
await handle_websocket_message(websocket, message)
|
||||
except json.JSONDecodeError:
|
||||
await connection_manager._send_message(websocket, {
|
||||
"type": "error",
|
||||
"message": "Invalid JSON message",
|
||||
"timestamp": connection_manager._get_timestamp()
|
||||
})
|
||||
except Exception as e:
|
||||
logger.error(f"Error handling WebSocket message: {e}")
|
||||
await connection_manager._send_message(websocket, {
|
||||
"type": "error",
|
||||
"message": "Message processing error",
|
||||
"timestamp": connection_manager._get_timestamp()
|
||||
})
|
||||
|
||||
except WebSocketDisconnect:
|
||||
logger.info(f"WebSocket client {client_id or 'anonymous'} disconnected")
|
||||
except Exception as e:
|
||||
logger.error(f"WebSocket endpoint error: {e}")
|
||||
|
||||
|
||||
@router.get("/ws/stats")
|
||||
async def websocket_stats():
|
||||
"""Get WebSocket connection statistics."""
|
||||
return connection_manager.get_connection_stats()
|
||||
|
||||
|
||||
@router.post("/ws/broadcast")
|
||||
async def broadcast_message(message: dict):
|
||||
"""Broadcast a message to all connected WebSocket clients."""
|
||||
await connection_manager.broadcast_to_all(message)
|
||||
return {"success": True, "message": "Message broadcasted to all clients"}
|
||||
|
||||
|
||||
@router.post("/ws/broadcast/{channel}")
|
||||
async def broadcast_to_channel(channel: str, message: dict):
|
||||
"""Broadcast a message to clients subscribed to a specific channel."""
|
||||
await connection_manager.broadcast_to_channel(channel, message)
|
||||
return {"success": True, "message": f"Message broadcasted to {channel} channel"}
|
||||
358
python_rewrite/src/tempering_machine/services/web/websocket.py
Normal file
358
python_rewrite/src/tempering_machine/services/web/websocket.py
Normal file
@@ -0,0 +1,358 @@
|
||||
"""
|
||||
WebSocket server for real-time data streaming to UI clients.
|
||||
Provides live updates for temperature, process status, and alarms.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
from typing import Dict, List, Set, Optional, Any
|
||||
from datetime import datetime
|
||||
from contextlib import asynccontextmanager
|
||||
|
||||
from fastapi import WebSocket, WebSocketDisconnect
|
||||
from fastapi.websockets import WebSocketState
|
||||
|
||||
from ..hardware.hardware_manager import hardware_manager
|
||||
from ..recipe.recipe_controller import recipe_controller
|
||||
from ..safety.safety_monitor import safety_monitor
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ConnectionManager:
|
||||
"""Manages WebSocket connections and broadcasts."""
|
||||
|
||||
def __init__(self):
|
||||
self.active_connections: Set[WebSocket] = set()
|
||||
self.client_subscriptions: Dict[WebSocket, Set[str]] = {}
|
||||
self.broadcast_task: Optional[asyncio.Task] = None
|
||||
self._running = False
|
||||
|
||||
async def connect(self, websocket: WebSocket, client_id: Optional[str] = None):
|
||||
"""Accept and register a new WebSocket connection."""
|
||||
await websocket.accept()
|
||||
self.active_connections.add(websocket)
|
||||
self.client_subscriptions[websocket] = set()
|
||||
|
||||
logger.info(f"WebSocket client connected: {client_id or 'anonymous'}")
|
||||
|
||||
# Send initial data
|
||||
await self._send_initial_data(websocket)
|
||||
|
||||
# Start broadcast task if not running
|
||||
if not self._running:
|
||||
await self.start_broadcasting()
|
||||
|
||||
async def disconnect(self, websocket: WebSocket):
|
||||
"""Remove a WebSocket connection."""
|
||||
if websocket in self.active_connections:
|
||||
self.active_connections.remove(websocket)
|
||||
|
||||
if websocket in self.client_subscriptions:
|
||||
del self.client_subscriptions[websocket]
|
||||
|
||||
logger.info("WebSocket client disconnected")
|
||||
|
||||
# Stop broadcasting if no active connections
|
||||
if not self.active_connections and self._running:
|
||||
await self.stop_broadcasting()
|
||||
|
||||
async def subscribe(self, websocket: WebSocket, channels: List[str]):
|
||||
"""Subscribe a client to specific data channels."""
|
||||
if websocket in self.client_subscriptions:
|
||||
self.client_subscriptions[websocket].update(channels)
|
||||
await self._send_message(websocket, {
|
||||
"type": "subscription_confirmed",
|
||||
"channels": channels,
|
||||
"timestamp": datetime.now().isoformat()
|
||||
})
|
||||
|
||||
async def unsubscribe(self, websocket: WebSocket, channels: List[str]):
|
||||
"""Unsubscribe a client from specific data channels."""
|
||||
if websocket in self.client_subscriptions:
|
||||
for channel in channels:
|
||||
self.client_subscriptions[websocket].discard(channel)
|
||||
|
||||
await self._send_message(websocket, {
|
||||
"type": "unsubscription_confirmed",
|
||||
"channels": channels,
|
||||
"timestamp": datetime.now().isoformat()
|
||||
})
|
||||
|
||||
async def broadcast_to_channel(self, channel: str, data: Dict[str, Any]):
|
||||
"""Broadcast data to all clients subscribed to a channel."""
|
||||
message = {
|
||||
"channel": channel,
|
||||
"data": data,
|
||||
"timestamp": datetime.now().isoformat()
|
||||
}
|
||||
|
||||
disconnected_clients = set()
|
||||
|
||||
for websocket in self.active_connections.copy():
|
||||
if channel in self.client_subscriptions.get(websocket, set()):
|
||||
try:
|
||||
if websocket.client_state == WebSocketState.CONNECTED:
|
||||
await self._send_message(websocket, message)
|
||||
else:
|
||||
disconnected_clients.add(websocket)
|
||||
except Exception as e:
|
||||
logger.error(f"Error broadcasting to client: {e}")
|
||||
disconnected_clients.add(websocket)
|
||||
|
||||
# Clean up disconnected clients
|
||||
for websocket in disconnected_clients:
|
||||
await self.disconnect(websocket)
|
||||
|
||||
async def broadcast_to_all(self, data: Dict[str, Any]):
|
||||
"""Broadcast data to all connected clients."""
|
||||
message = {
|
||||
"type": "broadcast",
|
||||
"data": data,
|
||||
"timestamp": datetime.now().isoformat()
|
||||
}
|
||||
|
||||
disconnected_clients = set()
|
||||
|
||||
for websocket in self.active_connections.copy():
|
||||
try:
|
||||
if websocket.client_state == WebSocketState.CONNECTED:
|
||||
await self._send_message(websocket, message)
|
||||
else:
|
||||
disconnected_clients.add(websocket)
|
||||
except Exception as e:
|
||||
logger.error(f"Error broadcasting to client: {e}")
|
||||
disconnected_clients.add(websocket)
|
||||
|
||||
# Clean up disconnected clients
|
||||
for websocket in disconnected_clients:
|
||||
await self.disconnect(websocket)
|
||||
|
||||
async def start_broadcasting(self):
|
||||
"""Start periodic data broadcasting."""
|
||||
if self._running:
|
||||
return
|
||||
|
||||
self._running = True
|
||||
self.broadcast_task = asyncio.create_task(self._broadcast_loop())
|
||||
logger.info("WebSocket broadcasting started")
|
||||
|
||||
async def stop_broadcasting(self):
|
||||
"""Stop periodic data broadcasting."""
|
||||
self._running = False
|
||||
|
||||
if self.broadcast_task:
|
||||
self.broadcast_task.cancel()
|
||||
try:
|
||||
await self.broadcast_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
logger.info("WebSocket broadcasting stopped")
|
||||
|
||||
async def _broadcast_loop(self):
|
||||
"""Main broadcasting loop for periodic updates."""
|
||||
while self._running:
|
||||
try:
|
||||
# Broadcast temperature data
|
||||
await self._broadcast_temperatures()
|
||||
|
||||
# Broadcast process status
|
||||
await self._broadcast_process_status()
|
||||
|
||||
# Broadcast system status
|
||||
await self._broadcast_system_status()
|
||||
|
||||
# Broadcast alarms
|
||||
await self._broadcast_alarms()
|
||||
|
||||
# Wait for next broadcast cycle
|
||||
await asyncio.sleep(2.0) # Broadcast every 2 seconds
|
||||
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"Error in broadcast loop: {e}")
|
||||
await asyncio.sleep(1.0)
|
||||
|
||||
async def _broadcast_temperatures(self):
|
||||
"""Broadcast current temperature readings."""
|
||||
try:
|
||||
temperatures = await hardware_manager.get_all_temperatures()
|
||||
|
||||
temp_data = {}
|
||||
for sensor_name, reading in temperatures.items():
|
||||
temp_data[sensor_name] = {
|
||||
"value": reading.value,
|
||||
"units": reading.units,
|
||||
"is_valid": reading.is_valid,
|
||||
"timestamp": reading.timestamp.isoformat(),
|
||||
"error_message": reading.error_message
|
||||
}
|
||||
|
||||
await self.broadcast_to_channel("temperatures", temp_data)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error broadcasting temperatures: {e}")
|
||||
|
||||
async def _broadcast_process_status(self):
|
||||
"""Broadcast current process status."""
|
||||
try:
|
||||
process_status = await recipe_controller.get_process_status()
|
||||
|
||||
if process_status:
|
||||
await self.broadcast_to_channel("process", process_status)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error broadcasting process status: {e}")
|
||||
|
||||
async def _broadcast_system_status(self):
|
||||
"""Broadcast system health status."""
|
||||
try:
|
||||
hw_status = hardware_manager.get_hardware_status()
|
||||
|
||||
system_data = {
|
||||
"communication_health": hw_status.communication_health,
|
||||
"system_status": hw_status.system_status.value,
|
||||
"last_update": hw_status.last_update.isoformat() if hw_status.last_update else None,
|
||||
"motor_count": len(hw_status.motors),
|
||||
"temperature_sensor_count": len(hw_status.temperatures),
|
||||
"emergency_stop_active": hw_status.safety.emergency_stop_active,
|
||||
"cover_sensor_closed": hw_status.safety.cover_sensor_closed
|
||||
}
|
||||
|
||||
await self.broadcast_to_channel("system", system_data)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error broadcasting system status: {e}")
|
||||
|
||||
async def _broadcast_alarms(self):
|
||||
"""Broadcast active alarms."""
|
||||
try:
|
||||
alarm_summary = safety_monitor.get_alarm_summary()
|
||||
active_alarms = safety_monitor.get_active_alarms()
|
||||
|
||||
alarm_data = {
|
||||
"summary": alarm_summary,
|
||||
"active_alarms": [
|
||||
{
|
||||
"id": alarm.id,
|
||||
"title": alarm.title,
|
||||
"message": alarm.message,
|
||||
"category": alarm.category.value,
|
||||
"priority": alarm.priority.value,
|
||||
"state": alarm.state.value,
|
||||
"timestamp": alarm.timestamp.isoformat(),
|
||||
"occurrence_count": alarm.occurrence_count
|
||||
}
|
||||
for alarm in active_alarms
|
||||
]
|
||||
}
|
||||
|
||||
await self.broadcast_to_channel("alarms", alarm_data)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error broadcasting alarms: {e}")
|
||||
|
||||
async def _send_initial_data(self, websocket: WebSocket):
|
||||
"""Send initial data to a newly connected client."""
|
||||
try:
|
||||
# Send welcome message
|
||||
await self._send_message(websocket, {
|
||||
"type": "welcome",
|
||||
"message": "Connected to Chocolate Tempering Machine",
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
"available_channels": [
|
||||
"temperatures", "process", "system", "alarms", "motors", "hardware"
|
||||
]
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error sending initial data: {e}")
|
||||
|
||||
async def _send_message(self, websocket: WebSocket, message: Dict[str, Any]):
|
||||
"""Send a message to a specific WebSocket client."""
|
||||
try:
|
||||
await websocket.send_text(json.dumps(message, default=str))
|
||||
except Exception as e:
|
||||
logger.error(f"Error sending WebSocket message: {e}")
|
||||
raise
|
||||
|
||||
def get_connection_stats(self) -> Dict[str, Any]:
|
||||
"""Get WebSocket connection statistics."""
|
||||
return {
|
||||
"active_connections": len(self.active_connections),
|
||||
"total_subscriptions": sum(len(subs) for subs in self.client_subscriptions.values()),
|
||||
"is_broadcasting": self._running,
|
||||
"channels": {
|
||||
"temperatures": sum(1 for subs in self.client_subscriptions.values() if "temperatures" in subs),
|
||||
"process": sum(1 for subs in self.client_subscriptions.values() if "process" in subs),
|
||||
"system": sum(1 for subs in self.client_subscriptions.values() if "system" in subs),
|
||||
"alarms": sum(1 for subs in self.client_subscriptions.values() if "alarms" in subs),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
# Global connection manager instance
|
||||
connection_manager = ConnectionManager()
|
||||
|
||||
|
||||
async def handle_websocket_message(websocket: WebSocket, message: Dict[str, Any]):
|
||||
"""Handle incoming WebSocket messages from clients."""
|
||||
try:
|
||||
message_type = message.get("type")
|
||||
|
||||
if message_type == "subscribe":
|
||||
channels = message.get("channels", [])
|
||||
await connection_manager.subscribe(websocket, channels)
|
||||
|
||||
elif message_type == "unsubscribe":
|
||||
channels = message.get("channels", [])
|
||||
await connection_manager.unsubscribe(websocket, channels)
|
||||
|
||||
elif message_type == "ping":
|
||||
await connection_manager._send_message(websocket, {
|
||||
"type": "pong",
|
||||
"timestamp": datetime.now().isoformat()
|
||||
})
|
||||
|
||||
elif message_type == "get_status":
|
||||
# Send current status immediately
|
||||
process_status = await recipe_controller.get_process_status()
|
||||
if process_status:
|
||||
await connection_manager._send_message(websocket, {
|
||||
"type": "status_response",
|
||||
"data": process_status,
|
||||
"timestamp": datetime.now().isoformat()
|
||||
})
|
||||
|
||||
else:
|
||||
await connection_manager._send_message(websocket, {
|
||||
"type": "error",
|
||||
"message": f"Unknown message type: {message_type}",
|
||||
"timestamp": datetime.now().isoformat()
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error handling WebSocket message: {e}")
|
||||
await connection_manager._send_message(websocket, {
|
||||
"type": "error",
|
||||
"message": "Internal server error",
|
||||
"timestamp": datetime.now().isoformat()
|
||||
})
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def websocket_connection(websocket: WebSocket, client_id: Optional[str] = None):
|
||||
"""Context manager for WebSocket connections."""
|
||||
try:
|
||||
await connection_manager.connect(websocket, client_id)
|
||||
yield
|
||||
except WebSocketDisconnect:
|
||||
logger.info("WebSocket client disconnected")
|
||||
except Exception as e:
|
||||
logger.error(f"WebSocket connection error: {e}")
|
||||
finally:
|
||||
await connection_manager.disconnect(websocket)
|
||||
Reference in New Issue
Block a user