diff --git a/python_rewrite/.env.example b/python_rewrite/.env.example new file mode 100644 index 0000000..59a2100 --- /dev/null +++ b/python_rewrite/.env.example @@ -0,0 +1,114 @@ +# Environment Configuration for Chocolate Tempering Machine +# Copy this file to .env and adjust values for your environment + +# Application Settings +TEMPERING_ENVIRONMENT=development +TEMPERING_DEBUG=true +TEMPERING_LOG_LEVEL=DEBUG +TEMPERING_APP_NAME=Chocolate Tempering Machine +TEMPERING_APP_VERSION=1.0.0 + +# Database Configuration +TEMPERING_DATABASE_URL=sqlite:///data/tempering_machine.db +# For PostgreSQL: postgresql+asyncpg://user:password@localhost:5432/tempering_machine +TEMPERING_DATABASE__ECHO=false +TEMPERING_DATABASE__POOL_SIZE=5 +TEMPERING_DATABASE__MAX_OVERFLOW=10 + +# Serial/Modbus Hardware Configuration +TEMPERING_SERIAL__PORT=/dev/ttyUSB0 +TEMPERING_SERIAL__BAUDRATE=9600 +TEMPERING_SERIAL__TIMEOUT=2.0 +TEMPERING_SERIAL__BYTESIZE=8 +TEMPERING_SERIAL__PARITY=N +TEMPERING_SERIAL__STOPBITS=1 +TEMPERING_SERIAL__MAX_RETRIES=3 +TEMPERING_SERIAL__RETRY_DELAY=0.1 + +# Modbus Configuration +TEMPERING_MODBUS__SLAVE_ADDRESS=1 +TEMPERING_MODBUS__READ_TIMEOUT=2.0 +TEMPERING_MODBUS__WRITE_TIMEOUT=2.0 +TEMPERING_MODBUS__MAX_READ_REGISTERS=125 +TEMPERING_MODBUS__MAX_WRITE_REGISTERS=100 +TEMPERING_MODBUS__CONNECTION_CHECK_INTERVAL=10.0 + +# Temperature Control Configuration +TEMPERING_TEMPERATURE__ABSOLUTE_MAX_TEMP=80.0 +TEMPERING_TEMPERATURE__ABSOLUTE_MIN_TEMP=10.0 +TEMPERING_TEMPERATURE__TANK_MAX_HEAT=60.0 +TEMPERING_TEMPERATURE__PUMP_MAX_HEAT=55.0 +TEMPERING_TEMPERATURE__PUMP_MIN_HEAT=25.0 +TEMPERING_TEMPERATURE__TEMPERATURE_TOLERANCE=0.5 +TEMPERING_TEMPERATURE__SENSOR_ACCURACY=0.1 + +# PID Control Defaults +TEMPERING_TEMPERATURE__DEFAULT_KP=1.0 +TEMPERING_TEMPERATURE__DEFAULT_KI=0.1 +TEMPERING_TEMPERATURE__DEFAULT_KD=0.01 +TEMPERING_TEMPERATURE__DEFAULT_KL=100.0 + +# Safety Configuration +TEMPERING_SAFETY__GRID_VOLTAGE_NOMINAL=230.0 +TEMPERING_SAFETY__GRID_VOLTAGE_TOLERANCE=0.1 +TEMPERING_SAFETY__GRID_FREQUENCY_NOMINAL=50.0 +TEMPERING_SAFETY__GRID_FREQUENCY_TOLERANCE=0.02 +TEMPERING_SAFETY__MAX_NEUTRAL_CURRENT=16.0 +TEMPERING_SAFETY__MAX_MOTOR1_CURRENT=10.0 +TEMPERING_SAFETY__MAX_MOTOR2_CURRENT=10.0 +TEMPERING_SAFETY__ERROR_CHECK_INTERVAL=1.0 +TEMPERING_SAFETY__AUTO_RECOVERY_ATTEMPTS=3 +TEMPERING_SAFETY__AUTO_RECOVERY_DELAY=5.0 +TEMPERING_SAFETY__COMMUNICATION_TIMEOUT=3.0 +TEMPERING_SAFETY__HEARTBEAT_INTERVAL=1.0 + +# Process Control Configuration +TEMPERING_PROCESS__HEATING_DELAY=60.0 +TEMPERING_PROCESS__COOLING_DELAY=120.0 +TEMPERING_PROCESS__POURING_DELAY=30.0 +TEMPERING_PROCESS__PUMP_DELAY=10.0 +TEMPERING_PROCESS__MIXER_DELAY=5.0 +TEMPERING_PROCESS__PROCESS_CONTROL_INTERVAL=0.5 +TEMPERING_PROCESS__TEMPERATURE_READ_INTERVAL=1.0 +TEMPERING_PROCESS__STATUS_UPDATE_INTERVAL=2.0 +TEMPERING_PROCESS__MIN_HEATING_GOAL=40.0 +TEMPERING_PROCESS__MAX_HEATING_GOAL=60.0 +TEMPERING_PROCESS__MIN_COOLING_GOAL=20.0 +TEMPERING_PROCESS__MAX_COOLING_GOAL=40.0 + +# Redis Configuration (for message queuing) +TEMPERING_REDIS__HOST=localhost +TEMPERING_REDIS__PORT=6379 +TEMPERING_REDIS__DB=0 +TEMPERING_REDIS__PASSWORD= +TEMPERING_REDIS__SOCKET_TIMEOUT=5.0 +TEMPERING_REDIS__CONNECTION_POOL_MAX_CONNECTIONS=10 + +# Web Service Configuration +TEMPERING_WEB__HOST=0.0.0.0 +TEMPERING_WEB__PORT=8000 +TEMPERING_WEB__WORKERS=1 +TEMPERING_WEB__RELOAD=true +TEMPERING_WEB__ACCESS_LOG=true +TEMPERING_WEB__API_TITLE=Chocolate Tempering Machine API +TEMPERING_WEB__API_VERSION=1.0.0 +TEMPERING_WEB__CORS_ORIGINS=["http://localhost:3000", "http://localhost:8080"] + +# File Paths +TEMPERING_DATA_DIRECTORY=data +TEMPERING_LOG_DIRECTORY=logs +TEMPERING_CONFIG_DIRECTORY=config +TEMPERING_BACKUP_DIRECTORY=backups + +# Development/Testing Settings +# Uncomment for development +# TEMPERING_WEB__RELOAD=true +# TEMPERING_DATABASE__ECHO=true + +# Production Settings +# Uncomment for production +# TEMPERING_ENVIRONMENT=production +# TEMPERING_DEBUG=false +# TEMPERING_LOG_LEVEL=INFO +# TEMPERING_WEB__RELOAD=false +# TEMPERING_WEB__WORKERS=4 \ No newline at end of file diff --git a/python_rewrite/Dockerfile b/python_rewrite/Dockerfile new file mode 100644 index 0000000..3e10168 --- /dev/null +++ b/python_rewrite/Dockerfile @@ -0,0 +1,81 @@ +# Multi-stage Docker build for chocolate tempering machine control system + +# Build stage +FROM python:3.11-slim as builder + +# Set build arguments +ARG BUILD_ENV=production + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + build-essential \ + gcc \ + g++ \ + && rm -rf /var/lib/apt/lists/* + +# Set work directory +WORKDIR /app + +# Copy requirements +COPY requirements.txt requirements-dev.txt ./ + +# Install Python dependencies +RUN pip install --no-cache-dir --upgrade pip && \ + pip install --no-cache-dir -r requirements.txt + +# If development build, install dev dependencies +RUN if [ "$BUILD_ENV" = "development" ]; then \ + pip install --no-cache-dir -r requirements-dev.txt; \ + fi + +# Production stage +FROM python:3.11-slim as production + +# Set environment variables +ENV PYTHONPATH=/app/src +ENV PYTHONUNBUFFERED=1 +ENV PYTHONDONTWRITEBYTECODE=1 + +# Create non-root user +RUN groupadd -r tempering && useradd -r -g tempering tempering + +# Install runtime dependencies +RUN apt-get update && apt-get install -y \ + curl \ + && rm -rf /var/lib/apt/lists/* + +# Set work directory +WORKDIR /app + +# Copy Python dependencies from builder stage +COPY --from=builder /usr/local/lib/python3.11/site-packages /usr/local/lib/python3.11/site-packages +COPY --from=builder /usr/local/bin /usr/local/bin + +# Copy application code +COPY src/ ./src/ +COPY alembic.ini ./ +COPY alembic/ ./alembic/ + +# Create necessary directories +RUN mkdir -p /app/data /app/logs /app/config /app/backups && \ + chown -R tempering:tempering /app + +# Copy startup script +COPY docker/entrypoint.sh ./entrypoint.sh +RUN chmod +x ./entrypoint.sh + +# Health check +HEALTHCHECK --interval=30s --timeout=30s --start-period=60s --retries=3 \ + CMD curl -f http://localhost:8000/health/live || exit 1 + +# Switch to non-root user +USER tempering + +# Expose port +EXPOSE 8000 + +# Set entrypoint +ENTRYPOINT ["./entrypoint.sh"] + +# Default command +CMD ["web"] \ No newline at end of file diff --git a/python_rewrite/docker-compose.yml b/python_rewrite/docker-compose.yml new file mode 100644 index 0000000..2de106b --- /dev/null +++ b/python_rewrite/docker-compose.yml @@ -0,0 +1,152 @@ +version: '3.8' + +services: + # Main application + tempering-machine: + build: + context: . + dockerfile: Dockerfile + args: + BUILD_ENV: production + container_name: tempering-machine + restart: unless-stopped + ports: + - "8000:8000" + environment: + # Database configuration + - TEMPERING_DATABASE_URL=sqlite:///data/tempering_machine.db + + # Serial/Modbus configuration + - TEMPERING_SERIAL__PORT=/dev/ttyUSB0 + - TEMPERING_SERIAL__BAUDRATE=9600 + - TEMPERING_SERIAL__TIMEOUT=2.0 + + # Safety configuration + - TEMPERING_SAFETY__MAX_TANK_TEMPERATURE=80.0 + - TEMPERING_SAFETY__MAX_NEUTRAL_CURRENT=16.0 + - TEMPERING_SAFETY__COMMUNICATION_TIMEOUT=5.0 + + # Web configuration + - TEMPERING_WEB__HOST=0.0.0.0 + - TEMPERING_WEB__PORT=8000 + - TEMPERING_WEB__CORS_ORIGINS=["http://localhost:3000", "http://localhost:8080"] + + # Application settings + - TEMPERING_ENVIRONMENT=production + - TEMPERING_LOG_LEVEL=INFO + - TEMPERING_DEBUG=false + volumes: + - ./data:/app/data + - ./logs:/app/logs + - ./backups:/app/backups + - ./config:/app/config + # Mount serial device (adjust as needed) + - /dev/ttyUSB0:/dev/ttyUSB0 + devices: + - /dev/ttyUSB0:/dev/ttyUSB0 + networks: + - tempering-network + depends_on: + - redis + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8000/health/live"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 60s + + # Redis for message queuing and caching + redis: + image: redis:7-alpine + container_name: tempering-redis + restart: unless-stopped + ports: + - "6379:6379" + volumes: + - redis-data:/data + networks: + - tempering-network + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 30s + timeout: 10s + retries: 3 + + # Optional: PostgreSQL database (if not using SQLite) + postgres: + image: postgres:15-alpine + container_name: tempering-postgres + restart: unless-stopped + ports: + - "5432:5432" + environment: + - POSTGRES_DB=tempering_machine + - POSTGRES_USER=tempering + - POSTGRES_PASSWORD=tempering123 + volumes: + - postgres-data:/var/lib/postgresql/data + - ./docker/postgres/init.sql:/docker-entrypoint-initdb.d/init.sql + networks: + - tempering-network + healthcheck: + test: ["CMD-SHELL", "pg_isready -U tempering"] + interval: 30s + timeout: 10s + retries: 3 + profiles: + - postgres + + # Optional: Grafana for monitoring dashboards + grafana: + image: grafana/grafana:latest + container_name: tempering-grafana + restart: unless-stopped + ports: + - "3001:3000" + environment: + - GF_SECURITY_ADMIN_PASSWORD=admin123 + - GF_USERS_ALLOW_SIGN_UP=false + volumes: + - grafana-data:/var/lib/grafana + - ./docker/grafana/dashboards:/etc/grafana/provisioning/dashboards + - ./docker/grafana/datasources:/etc/grafana/provisioning/datasources + networks: + - tempering-network + profiles: + - monitoring + + # Optional: Prometheus for metrics collection + prometheus: + image: prom/prometheus:latest + container_name: tempering-prometheus + restart: unless-stopped + ports: + - "9090:9090" + volumes: + - prometheus-data:/prometheus + - ./docker/prometheus/prometheus.yml:/etc/prometheus/prometheus.yml + networks: + - tempering-network + command: + - '--config.file=/etc/prometheus/prometheus.yml' + - '--storage.tsdb.path=/prometheus' + - '--web.console.libraries=/etc/prometheus/console_libraries' + - '--web.console.templates=/etc/prometheus/consoles' + - '--storage.tsdb.retention.time=200h' + - '--web.enable-lifecycle' + profiles: + - monitoring + +networks: + tempering-network: + driver: bridge + +volumes: + redis-data: + driver: local + postgres-data: + driver: local + grafana-data: + driver: local + prometheus-data: + driver: local \ No newline at end of file diff --git a/python_rewrite/docker/entrypoint.sh b/python_rewrite/docker/entrypoint.sh new file mode 100644 index 0000000..bfe32c6 --- /dev/null +++ b/python_rewrite/docker/entrypoint.sh @@ -0,0 +1,134 @@ +#!/bin/bash +set -e + +# Entrypoint script for chocolate tempering machine container + +# Function to wait for service +wait_for_service() { + local host=$1 + local port=$2 + local service_name=$3 + + echo "Waiting for $service_name..." + while ! nc -z $host $port; do + sleep 1 + done + echo "$service_name is ready!" +} + +# Function to run database migrations +run_migrations() { + echo "Running database migrations..." + alembic upgrade head + echo "Database migrations completed" +} + +# Function to create initial data +create_initial_data() { + echo "Creating initial data..." + python -c " +import asyncio +from src.tempering_machine.shared.database import init_database, create_tables + +async def setup(): + init_database() + await create_tables() + print('Database tables created') + +asyncio.run(setup()) +" +} + +# Main execution +case "$1" in + web) + echo "Starting chocolate tempering machine web service..." + + # Wait for Redis if configured + if [ ! -z "$TEMPERING_REDIS__HOST" ]; then + wait_for_service ${TEMPERING_REDIS__HOST:-redis} ${TEMPERING_REDIS__PORT:-6379} "Redis" + fi + + # Wait for PostgreSQL if configured + if [[ "$TEMPERING_DATABASE_URL" == postgresql* ]]; then + DB_HOST=$(echo $TEMPERING_DATABASE_URL | sed -n 's/.*@\([^:]*\).*/\1/p') + DB_PORT=$(echo $TEMPERING_DATABASE_URL | sed -n 's/.*:\([0-9]*\).*/\1/p') + wait_for_service ${DB_HOST:-postgres} ${DB_PORT:-5432} "PostgreSQL" + fi + + # Run migrations + run_migrations + + # Create initial data + create_initial_data + + # Start web service + echo "Starting FastAPI web service..." + exec python -m uvicorn tempering_machine.services.web.main:app \ + --host ${TEMPERING_WEB__HOST:-0.0.0.0} \ + --port ${TEMPERING_WEB__PORT:-8000} \ + --workers ${TEMPERING_WEB__WORKERS:-1} \ + --access-log + ;; + + worker) + echo "Starting background worker..." + + # Wait for Redis + wait_for_service ${TEMPERING_REDIS__HOST:-redis} ${TEMPERING_REDIS__PORT:-6379} "Redis" + + # Start Celery worker + exec celery -A tempering_machine.services.worker worker \ + --loglevel=${TEMPERING_LOG_LEVEL:-info} \ + --concurrency=2 + ;; + + migrate) + echo "Running database migrations only..." + run_migrations + ;; + + shell) + echo "Starting Python shell..." + exec python -c " +import asyncio +from src.tempering_machine.shared.database import init_database +from src.tempering_machine.shared.config import settings + +print('Chocolate Tempering Machine - Python Shell') +print(f'Environment: {settings.environment}') +print(f'Database: {settings.database.url}') +print('') + +init_database() +print('Database initialized. You can now import and use the application modules.') +print('') + +# Import commonly used modules +from tempering_machine.shared.models import * +from tempering_machine.services.hardware.hardware_manager import hardware_manager +from tempering_machine.services.recipe.recipe_controller import recipe_controller +from tempering_machine.services.safety.safety_monitor import safety_monitor + +import IPython +IPython.start_ipython(argv=[]) +" + ;; + + test) + echo "Running tests..." + exec python -m pytest tests/ -v + ;; + + *) + echo "Usage: $0 {web|worker|migrate|shell|test}" + echo "" + echo "Commands:" + echo " web - Start FastAPI web service (default)" + echo " worker - Start Celery background worker" + echo " migrate - Run database migrations only" + echo " shell - Start interactive Python shell" + echo " test - Run test suite" + exit 1 + ;; +esac \ No newline at end of file diff --git a/python_rewrite/src/tempering_machine/services/data/logger.py b/python_rewrite/src/tempering_machine/services/data/logger.py new file mode 100644 index 0000000..56dbcf0 --- /dev/null +++ b/python_rewrite/src/tempering_machine/services/data/logger.py @@ -0,0 +1,379 @@ +""" +Data logging service for chocolate tempering machine. +Handles high-frequency data collection, storage, and export. +""" + +import asyncio +import logging +from typing import Dict, List, Optional, Any +from datetime import datetime, timedelta +from dataclasses import dataclass, field +from pathlib import Path +import json +import csv + +from ...shared.config import settings +from ...shared.database import db_manager +from ...shared.models.process import ProcessSession, TemperatureReading, ProcessLog +from ..hardware.hardware_manager import hardware_manager + + +logger = logging.getLogger(__name__) + + +@dataclass +class DataPoint: + """Single data point for logging.""" + timestamp: datetime + session_id: Optional[str] + sensor_name: str + value: float + units: str = "°C" + quality: str = "good" # good, poor, bad + additional_data: Dict[str, Any] = field(default_factory=dict) + + +class DataLogger: + """ + High-frequency data logger for process monitoring and analysis. + Collects temperature, motor, and process data at regular intervals. + """ + + def __init__(self): + self.is_logging = False + self.logging_task: Optional[asyncio.Task] = None + self.data_buffer: List[DataPoint] = [] + self.buffer_size = 1000 + self.log_interval = 1.0 # seconds + self.current_session_id: Optional[str] = None + + # Statistics + self.stats = { + "total_points_logged": 0, + "buffer_flushes": 0, + "logging_errors": 0, + "last_log_time": None, + "start_time": datetime.now(), + } + + async def start_logging(self, session_id: Optional[str] = None) -> None: + """Start data logging.""" + if self.is_logging: + return + + self.current_session_id = session_id + self.is_logging = True + self.logging_task = asyncio.create_task(self._logging_loop()) + + logger.info(f"Data logging started for session: {session_id}") + + async def stop_logging(self) -> None: + """Stop data logging and flush remaining data.""" + if not self.is_logging: + return + + self.is_logging = False + + if self.logging_task: + self.logging_task.cancel() + try: + await self.logging_task + except asyncio.CancelledError: + pass + + # Flush remaining data + await self._flush_buffer() + + logger.info("Data logging stopped") + + async def _logging_loop(self) -> None: + """Main data logging loop.""" + while self.is_logging: + try: + await self._collect_data_points() + + # Flush buffer if full + if len(self.data_buffer) >= self.buffer_size: + await self._flush_buffer() + + # Update statistics + self.stats["last_log_time"] = datetime.now() + + # Wait for next collection cycle + await asyncio.sleep(self.log_interval) + + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"Error in data logging loop: {e}") + self.stats["logging_errors"] += 1 + await asyncio.sleep(1.0) # Error recovery delay + + async def _collect_data_points(self) -> None: + """Collect current data points from hardware.""" + try: + timestamp = datetime.now() + + # Collect temperature data + temperatures = await hardware_manager.get_all_temperatures() + + for sensor_name, reading in temperatures.items(): + quality = "good" if reading.is_valid else "bad" + + data_point = DataPoint( + timestamp=timestamp, + session_id=self.current_session_id, + sensor_name=sensor_name, + value=reading.value if reading.is_valid else 0.0, + units=reading.units, + quality=quality, + additional_data={ + "error_message": reading.error_message if not reading.is_valid else None + } + ) + + self.data_buffer.append(data_point) + self.stats["total_points_logged"] += 1 + + # Collect motor current data (if available) + hw_status = hardware_manager.get_hardware_status() + + for motor_name, motor in hw_status.motors.items(): + if motor.current is not None: + data_point = DataPoint( + timestamp=timestamp, + session_id=self.current_session_id, + sensor_name=f"{motor_name}_current", + value=motor.current, + units="A", + quality="good", + additional_data={ + "motor_state": motor.state.value, + "is_enabled": motor.is_enabled + } + ) + + self.data_buffer.append(data_point) + self.stats["total_points_logged"] += 1 + + except Exception as e: + logger.error(f"Error collecting data points: {e}") + raise + + async def _flush_buffer(self) -> None: + """Flush data buffer to database.""" + if not self.data_buffer: + return + + try: + async with db_manager.get_async_session() as session: + # Convert data points to temperature readings + temperature_readings = [] + + for data_point in self.data_buffer: + if data_point.units == "°C": # Temperature readings + # Map sensor names to columns + temp_reading = TemperatureReading( + session_id=data_point.session_id, + timestamp=data_point.timestamp, + **{self._map_sensor_to_column(data_point.sensor_name): data_point.value} + ) + temperature_readings.append(temp_reading) + + # Batch insert temperature readings + if temperature_readings: + session.add_all(temperature_readings) + await session.commit() + + # Clear buffer + self.data_buffer.clear() + self.stats["buffer_flushes"] += 1 + + logger.debug(f"Flushed {len(temperature_readings)} temperature readings to database") + + except Exception as e: + logger.error(f"Error flushing data buffer: {e}") + # Don't clear buffer on error to retry later + raise + + def _map_sensor_to_column(self, sensor_name: str) -> str: + """Map sensor names to database column names.""" + mapping = { + "tank_bottom": "tank_bottom", + "tank_wall": "tank_wall", + "pump": "pump", + "fountain": "fountain", + } + return mapping.get(sensor_name, "tank_bottom") # Default fallback + + async def export_data( + self, + session_id: str, + format: str = "csv", + start_time: Optional[datetime] = None, + end_time: Optional[datetime] = None + ) -> str: + """Export logged data for a session.""" + try: + async with db_manager.get_async_session() as session: + # Query temperature readings + query = session.query(TemperatureReading).filter( + TemperatureReading.session_id == session_id + ) + + if start_time: + query = query.filter(TemperatureReading.timestamp >= start_time) + if end_time: + query = query.filter(TemperatureReading.timestamp <= end_time) + + query = query.order_by(TemperatureReading.timestamp) + + readings = await query.all() + + # Export to specified format + if format.lower() == "csv": + return await self._export_to_csv(readings, session_id) + elif format.lower() == "json": + return await self._export_to_json(readings, session_id) + else: + raise ValueError(f"Unsupported export format: {format}") + + except Exception as e: + logger.error(f"Error exporting data: {e}") + raise + + async def _export_to_csv(self, readings: List[TemperatureReading], session_id: str) -> str: + """Export readings to CSV format.""" + try: + export_dir = settings.data_directory / "exports" + export_dir.mkdir(exist_ok=True) + + filename = f"temperature_data_{session_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" + filepath = export_dir / filename + + with open(filepath, 'w', newline='') as csvfile: + fieldnames = [ + 'timestamp', 'phase', 'tank_bottom', 'tank_wall', + 'pump', 'fountain', 'target_temperature', 'temperature_error' + ] + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + + writer.writeheader() + for reading in readings: + writer.writerow({ + 'timestamp': reading.timestamp.isoformat(), + 'phase': reading.phase.value if reading.phase else None, + 'tank_bottom': reading.tank_bottom, + 'tank_wall': reading.tank_wall, + 'pump': reading.pump, + 'fountain': reading.fountain, + 'target_temperature': reading.target_temperature, + 'temperature_error': reading.temperature_error, + }) + + logger.info(f"Exported {len(readings)} readings to {filepath}") + return str(filepath) + + except Exception as e: + logger.error(f"Error exporting to CSV: {e}") + raise + + async def _export_to_json(self, readings: List[TemperatureReading], session_id: str) -> str: + """Export readings to JSON format.""" + try: + export_dir = settings.data_directory / "exports" + export_dir.mkdir(exist_ok=True) + + filename = f"temperature_data_{session_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" + filepath = export_dir / filename + + data = { + "session_id": session_id, + "export_time": datetime.now().isoformat(), + "reading_count": len(readings), + "readings": [ + { + "timestamp": reading.timestamp.isoformat(), + "phase": reading.phase.value if reading.phase else None, + "temperatures": { + "tank_bottom": reading.tank_bottom, + "tank_wall": reading.tank_wall, + "pump": reading.pump, + "fountain": reading.fountain, + }, + "target_temperature": reading.target_temperature, + "temperature_error": reading.temperature_error, + } + for reading in readings + ] + } + + with open(filepath, 'w') as jsonfile: + json.dump(data, jsonfile, indent=2) + + logger.info(f"Exported {len(readings)} readings to {filepath}") + return str(filepath) + + except Exception as e: + logger.error(f"Error exporting to JSON: {e}") + raise + + async def get_data_summary(self, session_id: str) -> Dict[str, Any]: + """Get data summary for a session.""" + try: + async with db_manager.get_async_session() as session: + # Query session data + query = session.query(TemperatureReading).filter( + TemperatureReading.session_id == session_id + ) + + readings = await query.all() + + if not readings: + return {"session_id": session_id, "reading_count": 0} + + # Calculate statistics + timestamps = [r.timestamp for r in readings] + fountain_temps = [r.fountain for r in readings if r.fountain is not None] + + summary = { + "session_id": session_id, + "reading_count": len(readings), + "start_time": min(timestamps).isoformat(), + "end_time": max(timestamps).isoformat(), + "duration_seconds": (max(timestamps) - min(timestamps)).total_seconds(), + } + + if fountain_temps: + summary.update({ + "fountain_temp_min": min(fountain_temps), + "fountain_temp_max": max(fountain_temps), + "fountain_temp_avg": sum(fountain_temps) / len(fountain_temps), + }) + + return summary + + except Exception as e: + logger.error(f"Error getting data summary: {e}") + raise + + def get_logging_statistics(self) -> Dict[str, Any]: + """Get logging statistics.""" + uptime = datetime.now() - self.stats["start_time"] + + return { + **self.stats, + "is_logging": self.is_logging, + "current_session_id": self.current_session_id, + "buffer_size": len(self.data_buffer), + "buffer_capacity": self.buffer_size, + "log_interval": self.log_interval, + "uptime_seconds": uptime.total_seconds(), + "points_per_second": ( + self.stats["total_points_logged"] / max(1, uptime.total_seconds()) + ), + } + + +# Global data logger instance +data_logger = DataLogger() \ No newline at end of file diff --git a/python_rewrite/src/tempering_machine/services/recipe/recipe_controller.py b/python_rewrite/src/tempering_machine/services/recipe/recipe_controller.py new file mode 100644 index 0000000..1c78d8a --- /dev/null +++ b/python_rewrite/src/tempering_machine/services/recipe/recipe_controller.py @@ -0,0 +1,532 @@ +""" +Recipe controller for managing chocolate tempering process execution. +Orchestrates recipe execution, state management, and process monitoring. +""" + +import asyncio +import logging +from typing import Optional, Dict, Any, List +from datetime import datetime, timezone +from uuid import uuid4 + +from ...shared.database import get_db, db_manager +from ...shared.models.recipe import Recipe +from ...shared.models.process import ProcessSession, ProcessStatus, ProcessLog, LogLevel +from .state_machine import TemperingStateMachine, ProcessContext, ProcessMetrics +from ..hardware.hardware_manager import hardware_manager + + +logger = logging.getLogger(__name__) + + +class RecipeExecutionError(Exception): + """Exception raised during recipe execution.""" + pass + + +class RecipeController: + """ + Recipe controller manages the complete lifecycle of tempering process execution. + Handles recipe loading, process control, monitoring, and data logging. + """ + + def __init__(self): + self.current_session: Optional[ProcessSession] = None + self.state_machine: Optional[TemperingStateMachine] = None + self.recipe: Optional[Recipe] = None + self.session_id: Optional[str] = None + self.monitoring_task: Optional[asyncio.Task] = None + self._is_running = False + + async def start_recipe(self, recipe_id: int, user_id: Optional[str] = None) -> str: + """ + Start executing a recipe. + Returns session ID for tracking. + """ + try: + # Check if another recipe is already running + if self._is_running: + raise RecipeExecutionError("Another recipe is already running") + + logger.info(f"Starting recipe execution: recipe_id={recipe_id}, user={user_id}") + + # Load recipe from database + async with db_manager.get_async_session() as session: + self.recipe = await session.get(Recipe, recipe_id) + if not self.recipe: + raise RecipeExecutionError(f"Recipe {recipe_id} not found") + + if not self.recipe.is_active: + raise RecipeExecutionError(f"Recipe {recipe_id} is not active") + + # Validate recipe parameters + if not self.recipe.validate_temperatures(): + raise RecipeExecutionError(f"Recipe {recipe_id} has invalid temperature parameters") + + # Check hardware readiness + is_safe, issues = await hardware_manager.is_safe_to_operate() + if not is_safe: + raise RecipeExecutionError(f"Hardware not ready: {', '.join(issues)}") + + # Create process session + self.session_id = str(uuid4()) + await self._create_process_session(user_id) + + # Create process context + context = ProcessContext( + recipe_id=recipe_id, + session_id=self.session_id, + recipe_parameters=self.recipe.to_dict(), + current_metrics=ProcessMetrics(phase_start_time=datetime.now()), + user_id=user_id, + start_time=datetime.now() + ) + + # Initialize state machine + self.state_machine = TemperingStateMachine(context) + + if not await self.state_machine.initialize(): + raise RecipeExecutionError("Failed to initialize state machine") + + # Start the process + await self._start_process_execution() + + logger.info(f"Recipe execution started successfully: session_id={self.session_id}") + return self.session_id + + except Exception as e: + logger.error(f"Failed to start recipe execution: {e}") + await self._handle_startup_error(str(e)) + raise + + async def stop_recipe(self, user_id: Optional[str] = None, reason: str = "Manual stop") -> bool: + """ + Stop the currently running recipe. + """ + try: + if not self._is_running or not self.state_machine: + logger.warning("No recipe is currently running") + return False + + logger.info(f"Stopping recipe execution: session_id={self.session_id}, reason={reason}") + + # Stop state machine control loop + await self.state_machine.stop_control_loop() + + # Emergency stop hardware for safety + await hardware_manager.emergency_stop() + + # Update process session + await self._finalize_process_session(ProcessStatus.ABORTED, reason, user_id) + + # Log the stop event + await self._log_process_event(LogLevel.INFO, f"Recipe stopped: {reason}") + + # Clean up + await self._cleanup_process() + + logger.info("Recipe execution stopped successfully") + return True + + except Exception as e: + logger.error(f"Error stopping recipe: {e}") + return False + + async def pause_recipe(self, user_id: Optional[str] = None) -> bool: + """ + Pause the currently running recipe. + """ + try: + if not self._is_running or not self.state_machine: + return False + + logger.info(f"Pausing recipe execution: session_id={self.session_id}") + + # Stop control loop but don't emergency stop + await self.state_machine.stop_control_loop() + + # Update session status + async with db_manager.get_async_session() as session: + if self.current_session: + self.current_session.status = ProcessStatus.PAUSED + session.add(self.current_session) + await session.commit() + + await self._log_process_event(LogLevel.INFO, f"Recipe paused by {user_id or 'system'}") + + logger.info("Recipe execution paused") + return True + + except Exception as e: + logger.error(f"Error pausing recipe: {e}") + return False + + async def resume_recipe(self, user_id: Optional[str] = None) -> bool: + """ + Resume a paused recipe. + """ + try: + if not self.current_session or self.current_session.status != ProcessStatus.PAUSED: + return False + + logger.info(f"Resuming recipe execution: session_id={self.session_id}") + + # Check hardware is still safe + is_safe, issues = await hardware_manager.is_safe_to_operate() + if not is_safe: + await self._log_process_event(LogLevel.ERROR, f"Cannot resume - hardware issues: {', '.join(issues)}") + return False + + # Resume control loop + await self.state_machine.start_control_loop() + + # Update session status + async with db_manager.get_async_session() as session: + self.current_session.status = ProcessStatus.RUNNING + session.add(self.current_session) + await session.commit() + + await self._log_process_event(LogLevel.INFO, f"Recipe resumed by {user_id or 'system'}") + + logger.info("Recipe execution resumed") + return True + + except Exception as e: + logger.error(f"Error resuming recipe: {e}") + return False + + async def emergency_stop(self, user_id: Optional[str] = None) -> bool: + """ + Execute emergency stop of the current recipe. + """ + try: + logger.critical(f"EMERGENCY STOP activated: session_id={self.session_id}") + + # Immediate hardware emergency stop + await hardware_manager.emergency_stop() + + # Stop state machine if running + if self.state_machine: + try: + self.state_machine.emergency_stop() + except: + pass # Force stop regardless of state + await self.state_machine.stop_control_loop() + + # Update process session + await self._finalize_process_session( + ProcessStatus.ABORTED, + "Emergency stop activated", + user_id + ) + + # Log critical event + await self._log_process_event( + LogLevel.CRITICAL, + f"EMERGENCY STOP activated by {user_id or 'system'}" + ) + + # Clean up + await self._cleanup_process() + + logger.critical("Emergency stop completed") + return True + + except Exception as e: + logger.critical(f"Error during emergency stop: {e}") + return False + + async def get_process_status(self) -> Optional[Dict[str, Any]]: + """ + Get current process status and metrics. + """ + if not self.state_machine or not self.current_session: + return None + + try: + # Get state machine status + sm_status = self.state_machine.get_process_status() + + # Get hardware status + hw_status = hardware_manager.get_hardware_status() + + # Get session information + session_info = { + "session_id": self.current_session.id, + "recipe_name": self.recipe.name if self.recipe else "Unknown", + "status": self.current_session.status.value, + "started_at": self.current_session.started_at.isoformat() if self.current_session.started_at else None, + "duration_seconds": self.current_session.duration, + "started_by": self.current_session.started_by, + } + + return { + "session": session_info, + "state_machine": sm_status, + "hardware": { + "temperatures": {k: v.value for k, v in hw_status.temperatures.items()}, + "motors": {k: {"state": v.state.value, "enabled": v.is_enabled} + for k, v in hw_status.motors.items()}, + "safety": { + "emergency_stop": hw_status.safety.emergency_stop_active, + "cover_closed": hw_status.safety.cover_sensor_closed, + "alarms": hw_status.safety.temperature_alarms + hw_status.safety.current_alarms, + }, + "communication_health": hw_status.communication_health, + "system_status": hw_status.system_status.value, + } + } + + except Exception as e: + logger.error(f"Error getting process status: {e}") + return None + + async def get_active_sessions(self) -> List[Dict[str, Any]]: + """Get list of all active process sessions.""" + try: + async with db_manager.get_async_session() as session: + # This would query active sessions from database + # For now, return current session if active + if self.current_session and self.current_session.status in [ + ProcessStatus.RUNNING, ProcessStatus.PAUSED, ProcessStatus.STARTING + ]: + return [self.current_session.to_dict()] + return [] + + except Exception as e: + logger.error(f"Error getting active sessions: {e}") + return [] + + async def _create_process_session(self, user_id: Optional[str]) -> None: + """Create a new process session in the database.""" + try: + async with db_manager.get_async_session() as session: + self.current_session = ProcessSession( + id=self.session_id, + recipe_id=self.recipe.id, + status=ProcessStatus.STARTING, + started_at=datetime.now(timezone.utc), + started_by=user_id, + process_parameters=self.recipe.to_dict(), + target_heating_temp=self.recipe.heating_goal, + target_cooling_temp=self.recipe.cooling_goal, + ) + + session.add(self.current_session) + await session.commit() + + logger.info(f"Process session created: {self.session_id}") + + except Exception as e: + logger.error(f"Failed to create process session: {e}") + raise + + async def _start_process_execution(self) -> None: + """Start the actual process execution.""" + try: + # Update session status + async with db_manager.get_async_session() as session: + self.current_session.status = ProcessStatus.RUNNING + session.add(self.current_session) + await session.commit() + + # Start state machine + self.state_machine.start_process() + await self.state_machine.start_control_loop() + + # Start monitoring task + self.monitoring_task = asyncio.create_task(self._monitoring_loop()) + + self._is_running = True + + # Log start event + await self._log_process_event(LogLevel.INFO, "Recipe execution started") + + except Exception as e: + logger.error(f"Failed to start process execution: {e}") + raise + + async def _monitoring_loop(self) -> None: + """Background monitoring loop for process execution.""" + while self._is_running: + try: + # Update process session with current metrics + await self._update_session_metrics() + + # Check for process completion or error conditions + await self._check_process_completion() + + # Sleep until next monitoring cycle + await asyncio.sleep(5.0) # Monitor every 5 seconds + + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"Error in monitoring loop: {e}") + await asyncio.sleep(1.0) + + async def _update_session_metrics(self) -> None: + """Update process session with current metrics.""" + if not self.current_session or not self.state_machine: + return + + try: + # Get current metrics from state machine + context = self.state_machine.context + metrics = context.current_metrics + + async with db_manager.get_async_session() as session: + # Update session with current metrics + self.current_session.current_phase = self.state_machine.get_current_phase() + self.current_session.achieved_heating_temp = metrics.current_temperature + self.current_session.error_count = metrics.error_count + self.current_session.warning_count = metrics.warning_count + + session.add(self.current_session) + await session.commit() + + except Exception as e: + logger.error(f"Error updating session metrics: {e}") + + async def _check_process_completion(self) -> None: + """Check if process has completed and handle completion.""" + if not self.state_machine: + return + + current_state = self.state_machine.current_state.name.lower() + + if current_state == "completed": + await self._handle_process_completion() + elif current_state == "error": + await self._handle_process_error() + elif current_state == "emergency_stopped": + await self._handle_emergency_stop() + + async def _handle_process_completion(self) -> None: + """Handle successful process completion.""" + logger.info(f"Process completed successfully: session_id={self.session_id}") + + await self._finalize_process_session( + ProcessStatus.COMPLETED, + "Process completed successfully", + None + ) + + await self._log_process_event(LogLevel.INFO, "Process completed successfully") + await self._cleanup_process() + + async def _handle_process_error(self) -> None: + """Handle process error condition.""" + logger.error(f"Process error detected: session_id={self.session_id}") + + await self._finalize_process_session( + ProcessStatus.ERROR, + "Process error occurred", + None + ) + + await self._log_process_event(LogLevel.ERROR, "Process error occurred") + await self._cleanup_process() + + async def _handle_emergency_stop(self) -> None: + """Handle emergency stop condition.""" + logger.critical(f"Emergency stop condition: session_id={self.session_id}") + + await self._finalize_process_session( + ProcessStatus.ABORTED, + "Emergency stop condition", + None + ) + + await self._log_process_event(LogLevel.CRITICAL, "Emergency stop condition") + await self._cleanup_process() + + async def _finalize_process_session( + self, + status: ProcessStatus, + reason: str, + user_id: Optional[str] + ) -> None: + """Finalize process session with final status.""" + try: + async with db_manager.get_async_session() as session: + if self.current_session: + self.current_session.status = status + self.current_session.completed_at = datetime.now(timezone.utc) + self.current_session.stop_reason = reason + self.current_session.stopped_by = user_id + + session.add(self.current_session) + await session.commit() + + except Exception as e: + logger.error(f"Error finalizing process session: {e}") + + async def _log_process_event(self, level: LogLevel, message: str) -> None: + """Log a process event to the database.""" + try: + if not self.current_session: + return + + async with db_manager.get_async_session() as session: + log_entry = ProcessLog( + session_id=self.current_session.id, + level=level, + message=message, + component="RecipeController", + phase=self.state_machine.get_current_phase() if self.state_machine else None, + ) + + session.add(log_entry) + await session.commit() + + except Exception as e: + logger.error(f"Error logging process event: {e}") + + async def _handle_startup_error(self, error_message: str) -> None: + """Handle errors during startup.""" + try: + if self.current_session: + await self._finalize_process_session( + ProcessStatus.ERROR, + f"Startup error: {error_message}", + None + ) + + await self._cleanup_process() + + except Exception as e: + logger.error(f"Error handling startup error: {e}") + + async def _cleanup_process(self) -> None: + """Clean up process resources.""" + try: + self._is_running = False + + # Cancel monitoring task + if self.monitoring_task: + self.monitoring_task.cancel() + try: + await self.monitoring_task + except asyncio.CancelledError: + pass + + # Clean up state machine + if self.state_machine: + await self.state_machine.stop_control_loop() + self.state_machine = None + + # Reset instance variables + self.current_session = None + self.recipe = None + self.session_id = None + self.monitoring_task = None + + logger.info("Process cleanup completed") + + except Exception as e: + logger.error(f"Error during process cleanup: {e}") + + +# Global recipe controller instance +recipe_controller = RecipeController() \ No newline at end of file diff --git a/python_rewrite/src/tempering_machine/services/recipe/state_machine.py b/python_rewrite/src/tempering_machine/services/recipe/state_machine.py new file mode 100644 index 0000000..db225b0 --- /dev/null +++ b/python_rewrite/src/tempering_machine/services/recipe/state_machine.py @@ -0,0 +1,514 @@ +""" +State machine implementation for chocolate tempering process control. +Manages the complete tempering process workflow with proper state transitions. +""" + +import asyncio +import logging +from typing import Optional, Dict, Any, Callable +from datetime import datetime, timedelta +from enum import Enum +from dataclasses import dataclass + +from statemachine import StateMachine, State +from statemachine.exceptions import TransitionNotAllowed + +from ...shared.models.recipe import RecipePhase +from ...shared.config import settings +from ..hardware.hardware_manager import hardware_manager + + +logger = logging.getLogger(__name__) + + +@dataclass +class ProcessMetrics: + """Process execution metrics and measurements.""" + phase_start_time: datetime + target_temperature: Optional[float] = None + current_temperature: Optional[float] = None + temperature_error: float = 0.0 + stability_duration: float = 0.0 # Time at stable temperature + energy_consumption: float = 0.0 + error_count: int = 0 + warning_count: int = 0 + + +@dataclass +class ProcessContext: + """Context information for process execution.""" + recipe_id: int + session_id: str + recipe_parameters: Dict[str, Any] + current_metrics: ProcessMetrics + user_id: Optional[str] = None + start_time: Optional[datetime] = None + estimated_completion: Optional[datetime] = None + + +class TemperingStateMachine(StateMachine): + """ + State machine for chocolate tempering process control. + Implements the complete tempering workflow with safety checks. + """ + + # Define states based on recipe phases + idle = State("Idle", initial=True) + preheating = State("Preheating") + heating = State("Heating") + heating_delay = State("HeatingDelay") + cooling = State("Cooling") + cooling_delay = State("CoolingDelay") + pouring = State("Pouring") + completed = State("Completed") + error = State("Error") + emergency_stopped = State("EmergencyStopped") + + # Define transitions + start_process = idle.to(preheating) + begin_heating = preheating.to(heating) + start_heating_delay = heating.to(heating_delay) + begin_cooling = heating_delay.to(cooling) + start_cooling_delay = cooling.to(cooling_delay) + begin_pouring = cooling_delay.to(pouring) + finish_process = pouring.to(completed) + + # Error and emergency transitions (from any state) + error_occurred = ( + preheating.to(error) | + heating.to(error) | + heating_delay.to(error) | + cooling.to(error) | + cooling_delay.to(error) | + pouring.to(error) + ) + + emergency_stop = ( + preheating.to(emergency_stopped) | + heating.to(emergency_stopped) | + heating_delay.to(emergency_stopped) | + cooling.to(emergency_stopped) | + cooling_delay.to(emergency_stopped) | + pouring.to(emergency_stopped) | + error.to(emergency_stopped) + ) + + # Recovery transitions + reset_from_error = error.to(idle) + reset_from_emergency = emergency_stopped.to(idle) + reset_from_completed = completed.to(idle) + + def __init__(self, context: ProcessContext): + self.context = context + self.phase_timers: Dict[str, datetime] = {} + self.stability_checker = TemperatureStabilityChecker() + self.safety_monitor = ProcessSafetyMonitor() + self.control_loop_task: Optional[asyncio.Task] = None + self._running = False + + super().__init__() + + # Set up phase-specific handlers + self._phase_handlers = { + "preheating": self._handle_preheating_phase, + "heating": self._handle_heating_phase, + "heating_delay": self._handle_heating_delay_phase, + "cooling": self._handle_cooling_phase, + "cooling_delay": self._handle_cooling_delay_phase, + "pouring": self._handle_pouring_phase, + } + + async def initialize(self) -> bool: + """Initialize the state machine and hardware systems.""" + try: + logger.info(f"Initializing tempering process for recipe {self.context.recipe_id}") + + # Verify hardware is ready + is_safe, issues = await hardware_manager.is_safe_to_operate() + if not is_safe: + logger.error(f"Hardware not safe to operate: {issues}") + return False + + # Initialize process metrics + self.context.current_metrics = ProcessMetrics( + phase_start_time=datetime.now(), + target_temperature=self.context.recipe_parameters.get("heating_goal", 46.0) + ) + + # Start safety monitoring + self.safety_monitor.initialize(self.context) + + logger.info("Tempering state machine initialized successfully") + return True + + except Exception as e: + logger.error(f"Failed to initialize state machine: {e}") + return False + + async def start_control_loop(self) -> None: + """Start the main process control loop.""" + if self.control_loop_task and not self.control_loop_task.done(): + return + + self._running = True + self.control_loop_task = asyncio.create_task(self._control_loop()) + logger.info("Process control loop started") + + async def stop_control_loop(self) -> None: + """Stop the main process control loop.""" + self._running = False + if self.control_loop_task: + self.control_loop_task.cancel() + try: + await self.control_loop_task + except asyncio.CancelledError: + pass + logger.info("Process control loop stopped") + + async def _control_loop(self) -> None: + """Main control loop for process execution.""" + while self._running: + try: + current_state = self.current_state.name.lower() + + # Execute phase-specific handler + if current_state in self._phase_handlers: + await self._phase_handlers[current_state]() + + # Check for safety issues + await self._check_safety_conditions() + + # Update metrics + await self._update_process_metrics() + + # Control loop delay + await asyncio.sleep(settings.process.process_control_interval) + + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"Error in control loop: {e}") + try: + self.error_occurred() + except TransitionNotAllowed: + pass + await asyncio.sleep(1.0) # Error recovery delay + + async def _handle_preheating_phase(self) -> None: + """Handle preheating phase - warm up hardware.""" + # Get current temperatures + tank_temp = await hardware_manager.get_average_tank_temperature() + pump_temp_reading = await hardware_manager.get_temperature("pump") + pump_temp = pump_temp_reading.value if pump_temp_reading else 0.0 + + # Check if preheating targets are met + tank_target = settings.temperature.tank_max_heat + pump_target = settings.temperature.pump_max_heat + + if tank_temp and tank_temp >= tank_target and pump_temp >= pump_target: + logger.info("Preheating complete, transitioning to heating phase") + try: + self.begin_heating() + except TransitionNotAllowed as e: + logger.error(f"Cannot transition to heating: {e}") + else: + # Enable heaters for preheating + await hardware_manager.enable_heater("tank_heater") + await hardware_manager.enable_heater("pump_heater") + + # Update metrics + self.context.current_metrics.current_temperature = tank_temp + self.context.current_metrics.target_temperature = tank_target + + async def _handle_heating_phase(self) -> None: + """Handle heating phase - reach heating goal temperature.""" + heating_goal = self.context.recipe_parameters.get("heating_goal", 46.0) + fountain_temp_reading = await hardware_manager.get_temperature("fountain") + fountain_temp = fountain_temp_reading.value if fountain_temp_reading else 0.0 + + # Enable motors as specified in recipe + if self.context.recipe_parameters.get("mixer_enabled", True): + await hardware_manager.enable_motor("mixer_motor") + + if self.context.recipe_parameters.get("fountain_enabled", True): + await hardware_manager.enable_motor("fountain_motor") + + # Temperature control + temp_error = heating_goal - fountain_temp + tolerance = settings.temperature.temperature_tolerance + + if abs(temp_error) <= tolerance: + # Temperature reached, check stability + if self.stability_checker.is_stable(fountain_temp, heating_goal, tolerance): + logger.info(f"Heating goal reached and stable: {fountain_temp}°C") + try: + self.start_heating_delay() + except TransitionNotAllowed as e: + logger.error(f"Cannot transition to heating delay: {e}") + else: + # Adjust heating based on temperature error + if temp_error > 0: + # Need more heat + await hardware_manager.enable_heater("tank_heater") + await hardware_manager.enable_heater("pump_heater") + else: + # Too hot, reduce heating + await hardware_manager.disable_heater("tank_heater") + + # Update metrics + self.context.current_metrics.current_temperature = fountain_temp + self.context.current_metrics.target_temperature = heating_goal + self.context.current_metrics.temperature_error = temp_error + + async def _handle_heating_delay_phase(self) -> None: + """Handle heating delay phase - maintain temperature for specified time.""" + if not self._check_phase_timer("heating_delay", settings.process.heating_delay): + # Still in delay period, maintain temperature + await self._maintain_temperature( + self.context.recipe_parameters.get("heating_goal", 46.0) + ) + else: + logger.info("Heating delay complete, transitioning to cooling") + try: + self.begin_cooling() + except TransitionNotAllowed as e: + logger.error(f"Cannot transition to cooling: {e}") + + async def _handle_cooling_phase(self) -> None: + """Handle cooling phase - cool to cooling goal temperature.""" + cooling_goal = self.context.recipe_parameters.get("cooling_goal", 27.0) + fountain_temp_reading = await hardware_manager.get_temperature("fountain") + fountain_temp = fountain_temp_reading.value if fountain_temp_reading else 50.0 + + # Disable tank heaters for cooling + await hardware_manager.disable_heater("tank_heater") + + # Keep pump heater on to maintain pump temperature + await hardware_manager.enable_heater("pump_heater") + + # Keep motors running for circulation + if self.context.recipe_parameters.get("mixer_enabled", True): + await hardware_manager.enable_motor("mixer_motor") + + if self.context.recipe_parameters.get("fountain_enabled", True): + await hardware_manager.enable_motor("fountain_motor") + + # Check if cooling goal is reached + tolerance = settings.temperature.temperature_tolerance + temp_error = fountain_temp - cooling_goal + + if temp_error <= tolerance and fountain_temp <= cooling_goal: + if self.stability_checker.is_stable(fountain_temp, cooling_goal, tolerance): + logger.info(f"Cooling goal reached and stable: {fountain_temp}°C") + try: + self.start_cooling_delay() + except TransitionNotAllowed as e: + logger.error(f"Cannot transition to cooling delay: {e}") + + # Update metrics + self.context.current_metrics.current_temperature = fountain_temp + self.context.current_metrics.target_temperature = cooling_goal + self.context.current_metrics.temperature_error = temp_error + + async def _handle_cooling_delay_phase(self) -> None: + """Handle cooling delay phase - maintain cooling temperature.""" + if not self._check_phase_timer("cooling_delay", settings.process.cooling_delay): + # Still in delay period, maintain temperature + await self._maintain_temperature( + self.context.recipe_parameters.get("cooling_goal", 27.0) + ) + else: + logger.info("Cooling delay complete, transitioning to pouring") + try: + self.begin_pouring() + except TransitionNotAllowed as e: + logger.error(f"Cannot transition to pouring: {e}") + + async def _handle_pouring_phase(self) -> None: + """Handle pouring phase - maintain pouring temperature and manage pedal control.""" + pouring_goal = self.context.recipe_parameters.get("pouring_goal") + if not pouring_goal: + # Use cooling goal if no specific pouring goal + pouring_goal = self.context.recipe_parameters.get("cooling_goal", 30.0) + + # Maintain pouring temperature + await self._maintain_temperature(pouring_goal) + + # Handle pedal control if enabled + if self.context.recipe_parameters.get("pedal_control_enabled", True): + await self._manage_pedal_control() + + # Keep circulation motors running + if self.context.recipe_parameters.get("fountain_enabled", True): + await hardware_manager.enable_motor("fountain_motor") + + # Update metrics + fountain_temp_reading = await hardware_manager.get_temperature("fountain") + fountain_temp = fountain_temp_reading.value if fountain_temp_reading else pouring_goal + + self.context.current_metrics.current_temperature = fountain_temp + self.context.current_metrics.target_temperature = pouring_goal + self.context.current_metrics.temperature_error = fountain_temp - pouring_goal + + # Pouring phase continues until manually stopped or error occurs + # In a real system, this might be triggered by operator action or timer + + async def _maintain_temperature(self, target_temp: float) -> None: + """Maintain target temperature with PID control.""" + fountain_temp_reading = await hardware_manager.get_temperature("fountain") + current_temp = fountain_temp_reading.value if fountain_temp_reading else target_temp + + temp_error = current_temp - target_temp + tolerance = settings.temperature.temperature_tolerance + + if abs(temp_error) > tolerance: + if temp_error < 0: + # Need heating + await hardware_manager.enable_heater("pump_heater") + if temp_error < -2.0: # Significant heating needed + await hardware_manager.enable_heater("tank_heater") + else: + # Too hot, reduce heating + await hardware_manager.disable_heater("tank_heater") + if temp_error > 2.0: # Significant cooling needed + await hardware_manager.disable_heater("pump_heater") + + async def _manage_pedal_control(self) -> None: + """Manage automatic pedal control for pouring.""" + # This would implement automatic pedal cycling based on recipe parameters + # For now, just log the action + on_time = self.context.recipe_parameters.get("pedal_on_time", 2.0) + off_time = self.context.recipe_parameters.get("pedal_off_time", 3.0) + + logger.debug(f"Pedal control active: {on_time}s on, {off_time}s off") + # Implementation would control pedal solenoid via hardware manager + + def _check_phase_timer(self, phase_name: str, duration_seconds: float) -> bool: + """Check if a phase timer has completed.""" + if phase_name not in self.phase_timers: + self.phase_timers[phase_name] = datetime.now() + return False + + elapsed = (datetime.now() - self.phase_timers[phase_name]).total_seconds() + return elapsed >= duration_seconds + + async def _check_safety_conditions(self) -> None: + """Check safety conditions and trigger emergency stop if needed.""" + is_safe, issues = await hardware_manager.is_safe_to_operate() + + if not is_safe: + logger.error(f"Safety issue detected: {issues}") + try: + self.emergency_stop() + except TransitionNotAllowed: + pass + + async def _update_process_metrics(self) -> None: + """Update process execution metrics.""" + self.context.current_metrics.phase_start_time = datetime.now() + + # Update stability duration if temperature is stable + if hasattr(self, 'stability_checker'): + current_temp = self.context.current_metrics.current_temperature + target_temp = self.context.current_metrics.target_temperature + + if (current_temp is not None and target_temp is not None and + abs(current_temp - target_temp) <= settings.temperature.temperature_tolerance): + # Temperature is stable, update duration + pass + + def get_current_phase(self) -> RecipePhase: + """Get current recipe phase.""" + state_to_phase = { + "idle": RecipePhase.COMPLETED, + "preheating": RecipePhase.PREHEATING, + "heating": RecipePhase.HEATING, + "heating_delay": RecipePhase.HEATING_DELAY, + "cooling": RecipePhase.COOLING, + "cooling_delay": RecipePhase.COOLING_DELAY, + "pouring": RecipePhase.POURING, + "completed": RecipePhase.COMPLETED, + "error": RecipePhase.ERROR, + "emergency_stopped": RecipePhase.STOPPED, + } + return state_to_phase.get(self.current_state.name.lower(), RecipePhase.ERROR) + + def get_process_status(self) -> Dict[str, Any]: + """Get comprehensive process status.""" + return { + "current_state": self.current_state.name, + "current_phase": self.get_current_phase().value, + "recipe_id": self.context.recipe_id, + "session_id": self.context.session_id, + "start_time": self.context.start_time.isoformat() if self.context.start_time else None, + "current_temperature": self.context.current_metrics.current_temperature, + "target_temperature": self.context.current_metrics.target_temperature, + "temperature_error": self.context.current_metrics.temperature_error, + "error_count": self.context.current_metrics.error_count, + "warning_count": self.context.current_metrics.warning_count, + "is_running": self._running, + "phase_timers": {k: v.isoformat() for k, v in self.phase_timers.items()}, + } + + +class TemperatureStabilityChecker: + """Utility class to check temperature stability.""" + + def __init__(self, stability_time: float = 30.0): + self.stability_time = stability_time # seconds + self.temperature_history: Dict[str, list] = {} + + def is_stable(self, current_temp: float, target_temp: float, tolerance: float) -> bool: + """Check if temperature has been stable within tolerance for required time.""" + key = f"{target_temp}" + now = datetime.now() + + # Initialize history if needed + if key not in self.temperature_history: + self.temperature_history[key] = [] + + history = self.temperature_history[key] + + # Add current reading + history.append((now, current_temp)) + + # Remove old readings + cutoff_time = now - timedelta(seconds=self.stability_time) + history[:] = [(time, temp) for time, temp in history if time > cutoff_time] + + # Check if all recent readings are within tolerance + if len(history) < 3: # Need at least a few readings + return False + + return all(abs(temp - target_temp) <= tolerance for _, temp in history) + + +class ProcessSafetyMonitor: + """Monitor process safety during execution.""" + + def __init__(self): + self.context: Optional[ProcessContext] = None + self.last_safety_check = datetime.now() + + def initialize(self, context: ProcessContext) -> None: + """Initialize safety monitor with process context.""" + self.context = context + + async def check_safety(self) -> tuple[bool, list[str]]: + """Perform comprehensive safety check.""" + issues = [] + + # Check hardware safety + is_hardware_safe, hardware_issues = await hardware_manager.is_safe_to_operate() + if not is_hardware_safe: + issues.extend(hardware_issues) + + # Check temperature limits + for sensor_name, reading in (await hardware_manager.get_all_temperatures()).items(): + if not reading.is_valid: + issues.append(f"Temperature sensor {sensor_name}: {reading.error_message}") + + # Update last check time + self.last_safety_check = datetime.now() + + return len(issues) == 0, issues \ No newline at end of file diff --git a/python_rewrite/src/tempering_machine/services/safety/error_handler.py b/python_rewrite/src/tempering_machine/services/safety/error_handler.py new file mode 100644 index 0000000..413401b --- /dev/null +++ b/python_rewrite/src/tempering_machine/services/safety/error_handler.py @@ -0,0 +1,516 @@ +""" +Centralized error handling and recovery system. +Provides structured error management with automatic recovery procedures. +""" + +import asyncio +import logging +from typing import Dict, List, Optional, Any, Callable, Type +from datetime import datetime, timedelta +from dataclasses import dataclass +from enum import Enum +import traceback +import inspect + +from ...shared.config import settings +from ...shared.database import db_manager +from ...shared.models.system import ErrorLog, ErrorSeverity, ErrorCategory + + +logger = logging.getLogger(__name__) + + +class RecoveryAction(str, Enum): + """Available recovery actions.""" + NONE = "none" + RETRY = "retry" + RESET_COMPONENT = "reset_component" + RESTART_SERVICE = "restart_service" + EMERGENCY_STOP = "emergency_stop" + USER_INTERVENTION = "user_intervention" + + +@dataclass +class ErrorHandler: + """Error handler configuration.""" + exception_types: List[Type[Exception]] + recovery_action: RecoveryAction + max_retries: int = 3 + retry_delay: float = 1.0 + escalation_threshold: int = 5 + auto_recovery: bool = True + requires_user_acknowledgment: bool = False + recovery_function: Optional[Callable[[], bool]] = None + description: str = "" + + +@dataclass +class ErrorContext: + """Context information for error occurrence.""" + component: str + function_name: str + error_id: str + timestamp: datetime + exception: Exception + stack_trace: str + retry_count: int = 0 + recovery_attempts: int = 0 + user_notified: bool = False + additional_data: Dict[str, Any] = None + + +class ErrorHandlingSystem: + """ + Centralized error handling and recovery system. + Manages error classification, recovery procedures, and escalation. + """ + + def __init__(self): + self.error_handlers: Dict[str, ErrorHandler] = {} + self.active_errors: Dict[str, ErrorContext] = {} + self.error_statistics: Dict[str, int] = {} + self.recovery_tasks: Dict[str, asyncio.Task] = {} + + # Initialize standard error handlers + self._initialize_standard_handlers() + + # Error escalation callbacks + self.escalation_callbacks: List[Callable[[ErrorContext], None]] = [] + + def _initialize_standard_handlers(self) -> None: + """Initialize standard error handlers for common exceptions.""" + + # Communication errors + self.register_handler( + "communication_error", + ErrorHandler( + exception_types=[ConnectionError, TimeoutError, OSError], + recovery_action=RecoveryAction.RETRY, + max_retries=3, + retry_delay=2.0, + escalation_threshold=10, + description="Modbus communication errors" + ) + ) + + # Hardware errors + self.register_handler( + "hardware_error", + ErrorHandler( + exception_types=[RuntimeError], + recovery_action=RecoveryAction.RESET_COMPONENT, + max_retries=2, + retry_delay=5.0, + escalation_threshold=3, + requires_user_acknowledgment=True, + description="Hardware component errors" + ) + ) + + # Safety-critical errors + self.register_handler( + "safety_error", + ErrorHandler( + exception_types=[ValueError], # Temperature/safety limit violations + recovery_action=RecoveryAction.EMERGENCY_STOP, + max_retries=0, + auto_recovery=False, + requires_user_acknowledgment=True, + description="Safety-critical errors requiring immediate stop" + ) + ) + + # Database errors + self.register_handler( + "database_error", + ErrorHandler( + exception_types=[Exception], # SQLAlchemy exceptions would go here + recovery_action=RecoveryAction.RETRY, + max_retries=5, + retry_delay=1.0, + escalation_threshold=15, + description="Database connection and query errors" + ) + ) + + # Generic application errors + self.register_handler( + "application_error", + ErrorHandler( + exception_types=[Exception], + recovery_action=RecoveryAction.USER_INTERVENTION, + max_retries=0, + auto_recovery=False, + requires_user_acknowledgment=True, + description="General application errors requiring investigation" + ) + ) + + def register_handler(self, handler_id: str, handler: ErrorHandler) -> None: + """Register an error handler.""" + self.error_handlers[handler_id] = handler + logger.info(f"Registered error handler: {handler_id}") + + def get_handler_for_exception(self, exception: Exception) -> Optional[ErrorHandler]: + """Get the appropriate error handler for an exception.""" + exception_type = type(exception) + + # Look for exact match first + for handler in self.error_handlers.values(): + if exception_type in handler.exception_types: + return handler + + # Look for base class match + for handler in self.error_handlers.values(): + for handled_type in handler.exception_types: + if issubclass(exception_type, handled_type): + return handler + + # Return generic handler if no specific match + return self.error_handlers.get("application_error") + + async def handle_error( + self, + exception: Exception, + component: str, + additional_context: Optional[Dict[str, Any]] = None + ) -> bool: + """ + Handle an error with appropriate recovery action. + Returns True if error was handled successfully. + """ + try: + # Get caller information + frame = inspect.currentframe().f_back + function_name = frame.f_code.co_name if frame else "unknown" + + # Create error context + error_id = f"{component}_{function_name}_{type(exception).__name__}_{hash(str(exception)) % 10000}" + + context = ErrorContext( + component=component, + function_name=function_name, + error_id=error_id, + timestamp=datetime.now(), + exception=exception, + stack_trace=traceback.format_exc(), + additional_data=additional_context or {} + ) + + # Check if this is a recurring error + if error_id in self.active_errors: + existing_context = self.active_errors[error_id] + existing_context.retry_count += 1 + existing_context.timestamp = datetime.now() + context = existing_context + else: + self.active_errors[error_id] = context + + # Update statistics + self.error_statistics[error_id] = self.error_statistics.get(error_id, 0) + 1 + + # Get appropriate handler + handler = self.get_handler_for_exception(exception) + if not handler: + logger.error(f"No handler found for exception: {type(exception).__name__}") + return False + + # Log the error + await self._log_error(context, handler) + + # Execute recovery action + recovery_success = await self._execute_recovery_action(context, handler) + + # Check for escalation + if not recovery_success or context.retry_count > handler.escalation_threshold: + await self._escalate_error(context, handler) + + return recovery_success + + except Exception as e: + logger.critical(f"Error in error handler: {e}") + return False + + async def _execute_recovery_action(self, context: ErrorContext, handler: ErrorHandler) -> bool: + """Execute the recovery action for an error.""" + try: + logger.info(f"Executing recovery action {handler.recovery_action.value} for {context.error_id}") + + if handler.recovery_action == RecoveryAction.NONE: + return True + + elif handler.recovery_action == RecoveryAction.RETRY: + if context.retry_count < handler.max_retries: + await asyncio.sleep(handler.retry_delay) + return True + else: + logger.warning(f"Max retries exceeded for {context.error_id}") + return False + + elif handler.recovery_action == RecoveryAction.RESET_COMPONENT: + return await self._reset_component(context.component) + + elif handler.recovery_action == RecoveryAction.RESTART_SERVICE: + return await self._restart_service(context.component) + + elif handler.recovery_action == RecoveryAction.EMERGENCY_STOP: + return await self._execute_emergency_stop(context) + + elif handler.recovery_action == RecoveryAction.USER_INTERVENTION: + await self._request_user_intervention(context, handler) + return False # Requires manual intervention + + else: + logger.error(f"Unknown recovery action: {handler.recovery_action}") + return False + + except Exception as e: + logger.error(f"Error executing recovery action: {e}") + return False + + async def _reset_component(self, component: str) -> bool: + """Reset a specific component.""" + try: + logger.info(f"Resetting component: {component}") + + # Import here to avoid circular imports + from ..hardware.hardware_manager import hardware_manager + from ..recipe.recipe_controller import recipe_controller + + if component.lower() == "hardware": + # Reinitialize hardware manager + await hardware_manager.shutdown() + return await hardware_manager.initialize() + + elif component.lower() == "recipe": + # Stop current recipe if running + await recipe_controller.stop_recipe(reason="Component reset") + return True + + else: + logger.warning(f"Unknown component for reset: {component}") + return False + + except Exception as e: + logger.error(f"Error resetting component {component}: {e}") + return False + + async def _restart_service(self, service: str) -> bool: + """Restart a specific service.""" + try: + logger.info(f"Restarting service: {service}") + + # This would restart the specific service + # Implementation depends on service architecture + + return True + + except Exception as e: + logger.error(f"Error restarting service {service}: {e}") + return False + + async def _execute_emergency_stop(self, context: ErrorContext) -> bool: + """Execute emergency stop procedure.""" + try: + logger.critical(f"EMERGENCY STOP triggered by error: {context.error_id}") + + # Import here to avoid circular imports + from ..hardware.hardware_manager import hardware_manager + from ..recipe.recipe_controller import recipe_controller + + # Stop recipe controller + await recipe_controller.emergency_stop() + + # Emergency stop hardware + await hardware_manager.emergency_stop() + + return True + + except Exception as e: + logger.critical(f"Error during emergency stop: {e}") + return False + + async def _request_user_intervention(self, context: ErrorContext, handler: ErrorHandler) -> None: + """Request user intervention for error resolution.""" + try: + logger.warning(f"User intervention required for: {context.error_id}") + + # Mark as requiring user attention + context.user_notified = True + + # Create high-priority alarm through safety monitor + from .safety_monitor import safety_monitor + + await safety_monitor._create_alarm( + f"USER_INTERVENTION_{context.error_id}", + "User intervention required", + f"Error in {context.component}: {str(context.exception)}", + ErrorCategory.SYSTEM, + "HIGH" # This should be imported from safety_monitor but avoiding circular import + ) + + except Exception as e: + logger.error(f"Error requesting user intervention: {e}") + + async def _escalate_error(self, context: ErrorContext, handler: ErrorHandler) -> None: + """Escalate error to higher level handling.""" + try: + logger.critical(f"Escalating error: {context.error_id}") + + # Notify escalation callbacks + for callback in self.escalation_callbacks: + try: + callback(context) + except Exception as e: + logger.error(f"Error in escalation callback: {e}") + + # For critical escalations, trigger emergency stop + if self.error_statistics[context.error_id] > handler.escalation_threshold * 2: + logger.critical(f"Critical escalation threshold reached for {context.error_id}") + await self._execute_emergency_stop(context) + + except Exception as e: + logger.error(f"Error during error escalation: {e}") + + async def _log_error(self, context: ErrorContext, handler: ErrorHandler) -> None: + """Log error to database and system logs.""" + try: + # Determine severity based on recovery action and retry count + if handler.recovery_action == RecoveryAction.EMERGENCY_STOP: + severity = ErrorSeverity.CRITICAL + elif context.retry_count > handler.escalation_threshold: + severity = ErrorSeverity.HIGH + elif handler.requires_user_acknowledgment: + severity = ErrorSeverity.HIGH + else: + severity = ErrorSeverity.MEDIUM + + # Determine category based on component + category_map = { + "hardware": ErrorCategory.HARDWARE, + "communication": ErrorCategory.COMMUNICATION, + "modbus": ErrorCategory.COMMUNICATION, + "recipe": ErrorCategory.PROCESS, + "safety": ErrorCategory.SAFETY, + "database": ErrorCategory.SYSTEM, + } + + category = category_map.get(context.component.lower(), ErrorCategory.SYSTEM) + + # Log to system logger + log_method = getattr(logger, severity.value.lower(), logger.error) + log_method(f"[{context.component}] {context.function_name}: {str(context.exception)}") + + # Log to database + async with db_manager.get_async_session() as session: + error_log = ErrorLog( + error_code=context.error_id, + category=category, + severity=severity, + title=f"Error in {context.component}", + message=str(context.exception), + component=context.component, + stack_trace=context.stack_trace, + additional_data=context.additional_data, + occurrence_count=self.error_statistics[context.error_id], + first_occurrence=context.timestamp if self.error_statistics[context.error_id] == 1 else None, + last_occurrence=context.timestamp, + ) + + session.add(error_log) + await session.commit() + + except Exception as e: + logger.error(f"Error logging error to database: {e}") + + async def acknowledge_error(self, error_id: str, user: str) -> bool: + """Acknowledge an error that requires user intervention.""" + try: + if error_id in self.active_errors: + context = self.active_errors[error_id] + + # Remove from active errors + del self.active_errors[error_id] + + # Cancel any recovery tasks + if error_id in self.recovery_tasks: + self.recovery_tasks[error_id].cancel() + del self.recovery_tasks[error_id] + + logger.info(f"Error acknowledged by {user}: {error_id}") + return True + + except Exception as e: + logger.error(f"Error acknowledging error: {e}") + + return False + + def get_active_errors(self) -> List[ErrorContext]: + """Get list of active errors.""" + return list(self.active_errors.values()) + + def get_error_statistics(self) -> Dict[str, Any]: + """Get error handling statistics.""" + return { + "active_errors": len(self.active_errors), + "total_error_types": len(self.error_statistics), + "error_counts": self.error_statistics.copy(), + "recovery_tasks": len(self.recovery_tasks), + "registered_handlers": len(self.error_handlers), + } + + def add_escalation_callback(self, callback: Callable[[ErrorContext], None]) -> None: + """Add callback function for error escalation.""" + self.escalation_callbacks.append(callback) + + async def clear_resolved_errors(self, max_age_hours: int = 24) -> int: + """Clear resolved errors older than specified age.""" + try: + cutoff_time = datetime.now() - timedelta(hours=max_age_hours) + cleared_count = 0 + + errors_to_remove = [] + for error_id, context in self.active_errors.items(): + if context.timestamp < cutoff_time: + errors_to_remove.append(error_id) + + for error_id in errors_to_remove: + del self.active_errors[error_id] + cleared_count += 1 + + logger.info(f"Cleared {cleared_count} resolved errors") + return cleared_count + + except Exception as e: + logger.error(f"Error clearing resolved errors: {e}") + return 0 + + +# Global error handling system +error_handler = ErrorHandlingSystem() + + +# Decorator for automatic error handling +def handle_errors(component: str = "unknown", additional_context: Optional[Dict[str, Any]] = None): + """Decorator to automatically handle errors in functions.""" + def decorator(func): + async def async_wrapper(*args, **kwargs): + try: + return await func(*args, **kwargs) + except Exception as e: + success = await error_handler.handle_error(e, component, additional_context) + if not success: + raise + + def sync_wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + except Exception as e: + # For sync functions, we can't await the error handler + # So we log and re-raise + logger.error(f"Error in {component}.{func.__name__}: {e}") + raise + + return async_wrapper if inspect.iscoroutinefunction(func) else sync_wrapper + + return decorator \ No newline at end of file diff --git a/python_rewrite/src/tempering_machine/services/safety/safety_monitor.py b/python_rewrite/src/tempering_machine/services/safety/safety_monitor.py new file mode 100644 index 0000000..7208465 --- /dev/null +++ b/python_rewrite/src/tempering_machine/services/safety/safety_monitor.py @@ -0,0 +1,635 @@ +""" +Safety monitoring service for chocolate tempering machine. +Provides real-time safety monitoring with automatic responses and alarm management. +""" + +import asyncio +import logging +from typing import Dict, List, Optional, Any, Tuple, Callable +from datetime import datetime, timedelta +from dataclasses import dataclass, field +from enum import Enum + +from ...shared.config import settings +from ...shared.database import db_manager +from ...shared.models.system import ErrorLog, ErrorSeverity, ErrorCategory +from ..hardware.hardware_manager import hardware_manager + + +logger = logging.getLogger(__name__) + + +class AlarmPriority(str, Enum): + """Alarm priority levels.""" + LOW = "low" + MEDIUM = "medium" + HIGH = "high" + CRITICAL = "critical" + + +class AlarmState(str, Enum): + """Alarm states.""" + ACTIVE = "active" + ACKNOWLEDGED = "acknowledged" + CLEARED = "cleared" + SUPPRESSED = "suppressed" + + +@dataclass +class SafetyAlarm: + """Safety alarm information.""" + id: str + title: str + message: str + category: ErrorCategory + priority: AlarmPriority + state: AlarmState = AlarmState.ACTIVE + timestamp: datetime = field(default_factory=datetime.now) + acknowledgement_time: Optional[datetime] = None + acknowledgement_user: Optional[str] = None + auto_clear: bool = False + clear_condition: Optional[Callable[[], bool]] = None + suppression_time: Optional[datetime] = None + occurrence_count: int = 1 + first_occurrence: datetime = field(default_factory=datetime.now) + + +@dataclass +class SafetyLimits: + """Safety operating limits configuration.""" + # Temperature limits (°C) + max_tank_temperature: float = 80.0 + max_fountain_temperature: float = 70.0 + min_temperature: float = 5.0 + temperature_rate_limit: float = 5.0 # °C/min max rate of change + + # Electrical limits + max_voltage_deviation: float = 0.15 # ±15% + max_frequency_deviation: float = 0.02 # ±2% + max_neutral_current: float = 16.0 # A + max_motor_current: float = 10.0 # A + + # Process limits + max_process_duration: float = 14400.0 # 4 hours in seconds + max_phase_duration: float = 3600.0 # 1 hour in seconds + + # Communication limits + max_communication_errors: int = 10 + communication_timeout: float = 5.0 # seconds + + +class SafetyMonitor: + """ + Comprehensive safety monitoring system for the tempering machine. + Monitors all safety-critical parameters and manages alarm conditions. + """ + + def __init__(self): + self.active_alarms: Dict[str, SafetyAlarm] = {} + self.alarm_history: List[SafetyAlarm] = [] + self.safety_limits = SafetyLimits() + self.monitoring_task: Optional[asyncio.Task] = None + self.is_running = False + + # Safety state tracking + self.last_temperature_readings: Dict[str, Tuple[datetime, float]] = {} + self.consecutive_errors = 0 + self.last_safety_check = datetime.now() + self.emergency_stop_active = False + + # Alarm callbacks + self.alarm_callbacks: List[Callable[[SafetyAlarm], None]] = [] + + # Statistics + self.safety_stats = { + "total_alarms": 0, + "critical_alarms": 0, + "emergency_stops": 0, + "uptime_start": datetime.now(), + "last_critical_alarm": None, + } + + async def initialize(self) -> bool: + """Initialize safety monitoring system.""" + try: + logger.info("Initializing safety monitoring system") + + # Load safety limits from configuration + self._load_safety_limits() + + # Initialize hardware monitoring + if not await self._verify_hardware_safety(): + logger.error("Hardware safety verification failed") + return False + + # Start monitoring + await self.start_monitoring() + + logger.info("Safety monitoring system initialized successfully") + return True + + except Exception as e: + logger.error(f"Failed to initialize safety monitor: {e}") + return False + + async def start_monitoring(self) -> None: + """Start safety monitoring background task.""" + if self.monitoring_task and not self.monitoring_task.done(): + return + + self.is_running = True + self.monitoring_task = asyncio.create_task(self._monitoring_loop()) + logger.info("Safety monitoring started") + + async def stop_monitoring(self) -> None: + """Stop safety monitoring background task.""" + self.is_running = False + if self.monitoring_task: + self.monitoring_task.cancel() + try: + await self.monitoring_task + except asyncio.CancelledError: + pass + logger.info("Safety monitoring stopped") + + async def shutdown(self) -> None: + """Shutdown safety monitoring system.""" + logger.info("Shutting down safety monitoring system") + await self.stop_monitoring() + + # Log shutdown event + await self._create_alarm( + "SAFETY_SHUTDOWN", + "Safety monitoring shutdown", + "Safety monitoring system shutdown initiated", + ErrorCategory.SYSTEM, + AlarmPriority.MEDIUM + ) + + async def _monitoring_loop(self) -> None: + """Main safety monitoring loop.""" + while self.is_running: + try: + # Perform comprehensive safety checks + await self._check_temperature_safety() + await self._check_electrical_safety() + await self._check_communication_safety() + await self._check_process_safety() + await self._check_hardware_safety() + + # Update alarm states + await self._update_alarm_states() + + # Check for emergency conditions + await self._check_emergency_conditions() + + # Update statistics + self._update_safety_statistics() + + # Wait for next monitoring cycle + await asyncio.sleep(settings.safety.error_check_interval) + + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"Error in safety monitoring loop: {e}") + self.consecutive_errors += 1 + await asyncio.sleep(0.5) # Short delay before retry + + async def _check_temperature_safety(self) -> None: + """Check temperature-related safety conditions.""" + try: + temperatures = await hardware_manager.get_all_temperatures() + + for sensor_name, reading in temperatures.items(): + if not reading.is_valid: + await self._create_alarm( + f"TEMP_SENSOR_{sensor_name.upper()}", + f"Temperature sensor {sensor_name} error", + f"Temperature sensor {sensor_name}: {reading.error_message}", + ErrorCategory.TEMPERATURE, + AlarmPriority.HIGH + ) + continue + + temp = reading.value + + # Check absolute temperature limits + if temp > self.safety_limits.max_tank_temperature: + await self._create_alarm( + f"TEMP_HIGH_{sensor_name.upper()}", + f"High temperature alarm - {sensor_name}", + f"Temperature {temp:.1f}°C exceeds maximum {self.safety_limits.max_tank_temperature}°C", + ErrorCategory.TEMPERATURE, + AlarmPriority.CRITICAL, + auto_clear=True, + clear_condition=lambda t=temp, limit=self.safety_limits.max_tank_temperature: t <= limit - 2.0 + ) + + if temp < self.safety_limits.min_temperature: + await self._create_alarm( + f"TEMP_LOW_{sensor_name.upper()}", + f"Low temperature alarm - {sensor_name}", + f"Temperature {temp:.1f}°C below minimum {self.safety_limits.min_temperature}°C", + ErrorCategory.TEMPERATURE, + AlarmPriority.MEDIUM + ) + + # Check rate of temperature change + await self._check_temperature_rate(sensor_name, temp) + + # Update temperature history + self.last_temperature_readings[sensor_name] = (datetime.now(), temp) + + except Exception as e: + logger.error(f"Error checking temperature safety: {e}") + await self._create_alarm( + "TEMP_CHECK_ERROR", + "Temperature safety check error", + f"Error performing temperature safety checks: {e}", + ErrorCategory.SYSTEM, + AlarmPriority.HIGH + ) + + async def _check_temperature_rate(self, sensor_name: str, current_temp: float) -> None: + """Check temperature rate of change.""" + if sensor_name not in self.last_temperature_readings: + return + + last_time, last_temp = self.last_temperature_readings[sensor_name] + time_diff = (datetime.now() - last_time).total_seconds() / 60.0 # minutes + + if time_diff > 0.1: # Minimum time difference + rate = abs(current_temp - last_temp) / time_diff # °C/min + + if rate > self.safety_limits.temperature_rate_limit: + await self._create_alarm( + f"TEMP_RATE_{sensor_name.upper()}", + f"Temperature rate alarm - {sensor_name}", + f"Temperature changing too rapidly: {rate:.1f}°C/min > {self.safety_limits.temperature_rate_limit}°C/min", + ErrorCategory.TEMPERATURE, + AlarmPriority.HIGH + ) + + async def _check_electrical_safety(self) -> None: + """Check electrical safety parameters.""" + try: + hw_status = hardware_manager.get_hardware_status() + + # This would read actual electrical parameters from hardware + # For now, we'll simulate based on hardware communication health + + if hw_status.communication_health < 90: + await self._create_alarm( + "ELECTRICAL_COMM", + "Electrical communication issue", + f"Communication health degraded: {hw_status.communication_health:.1f}%", + ErrorCategory.COMMUNICATION, + AlarmPriority.MEDIUM + ) + + # Check for current alarms from hardware + for alarm_msg in hw_status.safety.current_alarms: + alarm_id = f"CURRENT_{hash(alarm_msg) % 10000}" + await self._create_alarm( + alarm_id, + "Electrical current alarm", + alarm_msg, + ErrorCategory.POWER, + AlarmPriority.HIGH + ) + + except Exception as e: + logger.error(f"Error checking electrical safety: {e}") + + async def _check_communication_safety(self) -> None: + """Check communication system safety.""" + try: + comm_stats = hardware_manager.get_communication_statistics() + + # Check communication error rate + if comm_stats.get("failed_requests", 0) > self.safety_limits.max_communication_errors: + await self._create_alarm( + "COMM_ERRORS", + "High communication error rate", + f"Communication errors: {comm_stats.get('failed_requests', 0)}", + ErrorCategory.COMMUNICATION, + AlarmPriority.HIGH + ) + + # Check communication timeout + last_successful = datetime.fromisoformat(comm_stats.get("last_successful_read", datetime.now().isoformat())) + time_since_success = (datetime.now() - last_successful).total_seconds() + + if time_since_success > self.safety_limits.communication_timeout: + await self._create_alarm( + "COMM_TIMEOUT", + "Communication timeout", + f"No successful communication for {time_since_success:.1f}s", + ErrorCategory.COMMUNICATION, + AlarmPriority.CRITICAL + ) + + except Exception as e: + logger.error(f"Error checking communication safety: {e}") + + async def _check_process_safety(self) -> None: + """Check process execution safety.""" + try: + # This would check with recipe controller for process safety + # For now, we'll implement basic checks + + # Check if emergency stop is active + hw_status = hardware_manager.get_hardware_status() + + if hw_status.safety.emergency_stop_active and not self.emergency_stop_active: + await self._create_alarm( + "EMERGENCY_STOP", + "Emergency stop activated", + "Hardware emergency stop has been activated", + ErrorCategory.SAFETY, + AlarmPriority.CRITICAL + ) + self.emergency_stop_active = True + + elif not hw_status.safety.emergency_stop_active and self.emergency_stop_active: + # Emergency stop cleared + self.emergency_stop_active = False + await self._clear_alarm("EMERGENCY_STOP") + + # Check cover sensor + if not hw_status.safety.cover_sensor_closed: + await self._create_alarm( + "COVER_OPEN", + "Safety cover open", + "Safety cover is open - operation not safe", + ErrorCategory.SAFETY, + AlarmPriority.HIGH + ) + else: + await self._clear_alarm("COVER_OPEN") + + except Exception as e: + logger.error(f"Error checking process safety: {e}") + + async def _check_hardware_safety(self) -> None: + """Check hardware component safety.""" + try: + is_safe, issues = await hardware_manager.is_safe_to_operate() + + if not is_safe: + for i, issue in enumerate(issues): + await self._create_alarm( + f"HARDWARE_ISSUE_{i}", + "Hardware safety issue", + issue, + ErrorCategory.HARDWARE, + AlarmPriority.HIGH + ) + + except Exception as e: + logger.error(f"Error checking hardware safety: {e}") + + async def _check_emergency_conditions(self) -> None: + """Check for conditions requiring emergency stop.""" + emergency_conditions = [] + + # Check for critical alarms + for alarm in self.active_alarms.values(): + if alarm.priority == AlarmPriority.CRITICAL and alarm.state == AlarmState.ACTIVE: + emergency_conditions.append(alarm.title) + + # If we have critical conditions, trigger emergency stop + if emergency_conditions and not self.emergency_stop_active: + logger.critical(f"Emergency conditions detected: {', '.join(emergency_conditions)}") + + # Trigger emergency stop through hardware manager + await hardware_manager.emergency_stop() + + # Create emergency stop alarm + await self._create_alarm( + "AUTO_EMERGENCY_STOP", + "Automatic emergency stop", + f"Emergency stop triggered by: {', '.join(emergency_conditions)}", + ErrorCategory.SAFETY, + AlarmPriority.CRITICAL + ) + + self.safety_stats["emergency_stops"] += 1 + + async def _create_alarm( + self, + alarm_id: str, + title: str, + message: str, + category: ErrorCategory, + priority: AlarmPriority, + auto_clear: bool = False, + clear_condition: Optional[Callable[[], bool]] = None + ) -> None: + """Create or update a safety alarm.""" + try: + # Check if alarm already exists + if alarm_id in self.active_alarms: + # Update existing alarm + alarm = self.active_alarms[alarm_id] + alarm.occurrence_count += 1 + alarm.timestamp = datetime.now() + logger.warning(f"Alarm updated: {alarm_id} (count: {alarm.occurrence_count})") + else: + # Create new alarm + alarm = SafetyAlarm( + id=alarm_id, + title=title, + message=message, + category=category, + priority=priority, + auto_clear=auto_clear, + clear_condition=clear_condition + ) + + self.active_alarms[alarm_id] = alarm + self.alarm_history.append(alarm) + self.safety_stats["total_alarms"] += 1 + + if priority == AlarmPriority.CRITICAL: + self.safety_stats["critical_alarms"] += 1 + self.safety_stats["last_critical_alarm"] = datetime.now() + + logger.warning(f"Alarm created: {alarm_id} - {title}") + + # Log to database + await self._log_alarm_to_database(alarm) + + # Notify callbacks + for callback in self.alarm_callbacks: + try: + callback(alarm) + except Exception as e: + logger.error(f"Error in alarm callback: {e}") + + except Exception as e: + logger.error(f"Error creating alarm: {e}") + + async def _clear_alarm(self, alarm_id: str, user: Optional[str] = None) -> bool: + """Clear an active alarm.""" + try: + if alarm_id in self.active_alarms: + alarm = self.active_alarms[alarm_id] + alarm.state = AlarmState.CLEARED + alarm.acknowledgement_time = datetime.now() + alarm.acknowledgement_user = user or "SYSTEM" + + # Remove from active alarms + del self.active_alarms[alarm_id] + + logger.info(f"Alarm cleared: {alarm_id} by {user or 'SYSTEM'}") + return True + + except Exception as e: + logger.error(f"Error clearing alarm: {e}") + + return False + + async def _update_alarm_states(self) -> None: + """Update alarm states and handle auto-clearing.""" + alarms_to_clear = [] + + for alarm_id, alarm in self.active_alarms.items(): + # Check auto-clear condition + if alarm.auto_clear and alarm.clear_condition: + try: + if alarm.clear_condition(): + alarms_to_clear.append(alarm_id) + except Exception as e: + logger.error(f"Error checking clear condition for {alarm_id}: {e}") + + # Clear alarms that meet their clear conditions + for alarm_id in alarms_to_clear: + await self._clear_alarm(alarm_id, "AUTO") + + async def acknowledge_alarm(self, alarm_id: str, user: str) -> bool: + """Acknowledge an alarm.""" + try: + if alarm_id in self.active_alarms: + alarm = self.active_alarms[alarm_id] + alarm.state = AlarmState.ACKNOWLEDGED + alarm.acknowledgement_time = datetime.now() + alarm.acknowledgement_user = user + + logger.info(f"Alarm acknowledged: {alarm_id} by {user}") + return True + + except Exception as e: + logger.error(f"Error acknowledging alarm: {e}") + + return False + + async def suppress_alarm(self, alarm_id: str, duration_minutes: int, user: str) -> bool: + """Suppress an alarm for a specified duration.""" + try: + if alarm_id in self.active_alarms: + alarm = self.active_alarms[alarm_id] + alarm.state = AlarmState.SUPPRESSED + alarm.suppression_time = datetime.now() + timedelta(minutes=duration_minutes) + + logger.info(f"Alarm suppressed: {alarm_id} for {duration_minutes}min by {user}") + return True + + except Exception as e: + logger.error(f"Error suppressing alarm: {e}") + + return False + + def get_active_alarms(self, priority_filter: Optional[AlarmPriority] = None) -> List[SafetyAlarm]: + """Get list of active alarms, optionally filtered by priority.""" + alarms = list(self.active_alarms.values()) + + if priority_filter: + alarms = [alarm for alarm in alarms if alarm.priority == priority_filter] + + # Sort by priority and timestamp + priority_order = { + AlarmPriority.CRITICAL: 0, + AlarmPriority.HIGH: 1, + AlarmPriority.MEDIUM: 2, + AlarmPriority.LOW: 3 + } + + return sorted(alarms, key=lambda a: (priority_order[a.priority], a.timestamp)) + + def get_alarm_summary(self) -> Dict[str, Any]: + """Get summary of alarm status.""" + active_by_priority = {} + for priority in AlarmPriority: + active_by_priority[priority.value] = len([ + a for a in self.active_alarms.values() if a.priority == priority + ]) + + return { + "total_active_alarms": len(self.active_alarms), + "active_by_priority": active_by_priority, + "total_alarms_today": len([ + a for a in self.alarm_history + if a.timestamp.date() == datetime.now().date() + ]), + "emergency_stop_active": self.emergency_stop_active, + "last_safety_check": self.last_safety_check.isoformat(), + "consecutive_errors": self.consecutive_errors, + "statistics": self.safety_stats.copy() + } + + def add_alarm_callback(self, callback: Callable[[SafetyAlarm], None]) -> None: + """Add callback function to be called when alarms are created.""" + self.alarm_callbacks.append(callback) + + async def _verify_hardware_safety(self) -> bool: + """Verify hardware is in a safe state for operation.""" + is_safe, issues = await hardware_manager.is_safe_to_operate() + + if not is_safe: + for issue in issues: + logger.error(f"Hardware safety issue: {issue}") + + return is_safe + + def _load_safety_limits(self) -> None: + """Load safety limits from configuration.""" + self.safety_limits = SafetyLimits( + max_tank_temperature=settings.temperature.absolute_max_temp, + min_temperature=settings.temperature.absolute_min_temp, + max_neutral_current=settings.safety.max_neutral_current, + max_motor_current=settings.safety.max_motor1_current, + communication_timeout=settings.safety.communication_timeout, + ) + + def _update_safety_statistics(self) -> None: + """Update safety monitoring statistics.""" + self.last_safety_check = datetime.now() + self.consecutive_errors = 0 # Reset if we got here successfully + + async def _log_alarm_to_database(self, alarm: SafetyAlarm) -> None: + """Log alarm to database for audit trail.""" + try: + async with db_manager.get_async_session() as session: + error_log = ErrorLog( + error_code=alarm.id, + category=alarm.category, + severity=ErrorSeverity(alarm.priority.value.upper()) if alarm.priority.value.upper() in [s.value for s in ErrorSeverity] else ErrorSeverity.MEDIUM, + title=alarm.title, + message=alarm.message, + component="SafetyMonitor", + occurrence_count=alarm.occurrence_count, + first_occurrence=alarm.first_occurrence, + last_occurrence=alarm.timestamp, + ) + + session.add(error_log) + await session.commit() + + except Exception as e: + logger.error(f"Error logging alarm to database: {e}") + + +# Global safety monitor instance +safety_monitor = SafetyMonitor() \ No newline at end of file diff --git a/python_rewrite/src/tempering_machine/services/web/main.py b/python_rewrite/src/tempering_machine/services/web/main.py new file mode 100644 index 0000000..a03cdd2 --- /dev/null +++ b/python_rewrite/src/tempering_machine/services/web/main.py @@ -0,0 +1,256 @@ +""" +FastAPI main application for chocolate tempering machine control system. +Provides REST API endpoints for system control and monitoring. +""" + +import logging +from contextlib import asynccontextmanager +from typing import Dict, Any + +from fastapi import FastAPI, Request, HTTPException +from fastapi.middleware.cors import CORSMiddleware +from fastapi.middleware.trustedhost import TrustedHostMiddleware +from fastapi.responses import JSONResponse +import uvicorn + +from ...shared.config import settings +from ...shared.database import init_database, create_tables, close_database +from ..hardware.hardware_manager import hardware_manager +from ..recipe.recipe_controller import recipe_controller +from ..safety.safety_monitor import safety_monitor +from ..safety.error_handler import error_handler +from .routers import recipes, process, hardware, users, system, health + + +logger = logging.getLogger(__name__) + + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Application lifespan manager for startup and shutdown tasks.""" + # Startup + logger.info("Starting chocolate tempering machine control system") + + try: + # Initialize database + init_database() + await create_tables() + logger.info("Database initialized") + + # Initialize hardware manager + if await hardware_manager.initialize(): + logger.info("Hardware manager initialized") + else: + logger.error("Hardware manager initialization failed") + + # Initialize safety monitor + if await safety_monitor.initialize(): + logger.info("Safety monitor initialized") + else: + logger.error("Safety monitor initialization failed") + + logger.info("System startup completed successfully") + + except Exception as e: + logger.critical(f"System startup failed: {e}") + raise + + yield + + # Shutdown + logger.info("Shutting down chocolate tempering machine control system") + + try: + # Stop any running recipes + await recipe_controller.emergency_stop(reason="System shutdown") + + # Shutdown safety monitor + await safety_monitor.shutdown() + + # Shutdown hardware manager + await hardware_manager.shutdown() + + # Close database connections + await close_database() + + logger.info("System shutdown completed") + + except Exception as e: + logger.error(f"Error during shutdown: {e}") + + +# Create FastAPI application +app = FastAPI( + title=settings.web.api_title, + version=settings.web.api_version, + description="Industrial chocolate tempering machine control system API", + docs_url="/docs", + redoc_url="/redoc", + openapi_url="/openapi.json", + lifespan=lifespan, +) + +# Add middleware +app.add_middleware( + CORSMiddleware, + allow_origins=settings.web.cors_origins, + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +app.add_middleware( + TrustedHostMiddleware, + allowed_hosts=["*"] # Configure appropriately for production +) + + +# Global exception handler +@app.exception_handler(Exception) +async def global_exception_handler(request: Request, exc: Exception): + """Global exception handler for unhandled errors.""" + logger.error(f"Unhandled exception in {request.url}: {exc}") + + # Handle error through error handling system + await error_handler.handle_error(exc, "web_api", { + "url": str(request.url), + "method": request.method, + "client": request.client.host if request.client else "unknown" + }) + + return JSONResponse( + status_code=500, + content={ + "error": "Internal server error", + "message": "An unexpected error occurred", + "request_id": getattr(request.state, "request_id", "unknown") + } + ) + + +# HTTP exception handler +@app.exception_handler(HTTPException) +async def http_exception_handler(request: Request, exc: HTTPException): + """Handler for HTTP exceptions.""" + return JSONResponse( + status_code=exc.status_code, + content={ + "error": exc.detail, + "status_code": exc.status_code + } + ) + + +# Request ID middleware +@app.middleware("http") +async def add_request_id(request: Request, call_next): + """Add request ID for tracing.""" + import uuid + request_id = str(uuid.uuid4()) + request.state.request_id = request_id + + response = await call_next(request) + response.headers["X-Request-ID"] = request_id + + return response + + +# Include routers +app.include_router( + health.router, + prefix="/health", + tags=["health"] +) + +app.include_router( + recipes.router, + prefix="/api/v1/recipes", + tags=["recipes"] +) + +app.include_router( + process.router, + prefix="/api/v1/process", + tags=["process"] +) + +app.include_router( + hardware.router, + prefix="/api/v1/hardware", + tags=["hardware"] +) + +app.include_router( + users.router, + prefix="/api/v1/users", + tags=["users"] +) + +app.include_router( + system.router, + prefix="/api/v1/system", + tags=["system"] +) + + +# Root endpoint +@app.get("/", response_model=Dict[str, Any]) +async def root(): + """Root endpoint with system information.""" + return { + "name": settings.app_name, + "version": settings.app_version, + "environment": settings.environment, + "api_version": "v1", + "docs_url": "/docs", + "health_url": "/health" + } + + +# API info endpoint +@app.get("/api/v1/info", response_model=Dict[str, Any]) +async def api_info(): + """API information endpoint.""" + return { + "api_title": settings.web.api_title, + "api_version": settings.web.api_version, + "environment": settings.environment, + "features": { + "recipe_management": True, + "process_control": True, + "hardware_monitoring": True, + "safety_monitoring": True, + "user_management": True, + "real_time_monitoring": True, + }, + "endpoints": { + "recipes": "/api/v1/recipes", + "process": "/api/v1/process", + "hardware": "/api/v1/hardware", + "users": "/api/v1/users", + "system": "/api/v1/system", + "health": "/health", + } + } + + +def create_app() -> FastAPI: + """Factory function to create FastAPI app.""" + return app + + +def run_server(): + """Run the development server.""" + uvicorn.run( + "tempering_machine.services.web.main:app", + host=settings.web.host, + port=settings.web.port, + reload=settings.web.reload, + workers=settings.web.workers if not settings.web.reload else 1, + access_log=settings.web.access_log, + log_level=settings.log_level.lower(), + ) + + +if __name__ == "__main__": + run_server() \ No newline at end of file diff --git a/python_rewrite/src/tempering_machine/services/web/routers/__init__.py b/python_rewrite/src/tempering_machine/services/web/routers/__init__.py new file mode 100644 index 0000000..da83358 --- /dev/null +++ b/python_rewrite/src/tempering_machine/services/web/routers/__init__.py @@ -0,0 +1,3 @@ +""" +API routers for different endpoints. +""" \ No newline at end of file diff --git a/python_rewrite/src/tempering_machine/services/web/routers/hardware.py b/python_rewrite/src/tempering_machine/services/web/routers/hardware.py new file mode 100644 index 0000000..c77d921 --- /dev/null +++ b/python_rewrite/src/tempering_machine/services/web/routers/hardware.py @@ -0,0 +1,409 @@ +""" +Hardware monitoring and control API endpoints. +""" + +import logging +from typing import Dict, Any, List, Optional + +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel + +from ...hardware.hardware_manager import hardware_manager + + +logger = logging.getLogger(__name__) + +router = APIRouter() + + +class TemperatureReading(BaseModel): + """Temperature reading response model.""" + value: float + sensor_name: str + timestamp: str + units: str = "°C" + is_valid: bool = True + error_message: Optional[str] = None + + +class MotorStatus(BaseModel): + """Motor status response model.""" + name: str + state: str + is_enabled: bool + current: Optional[float] = None + runtime_hours: float = 0.0 + error_count: int = 0 + + +class SafetyStatus(BaseModel): + """Safety status response model.""" + emergency_stop_active: bool = False + cover_sensor_closed: bool = True + temperature_alarms: List[str] = [] + current_alarms: List[str] = [] + last_safety_check: Optional[str] = None + + +class HardwareStatusResponse(BaseModel): + """Complete hardware status response.""" + temperatures: Dict[str, TemperatureReading] + motors: Dict[str, MotorStatus] + safety: SafetyStatus + communication_health: float + system_status: str + last_update: Optional[str] = None + + +class MotorControlRequest(BaseModel): + """Motor control request model.""" + enabled: bool + + +class HeaterControlRequest(BaseModel): + """Heater control request model.""" + enabled: bool + + +@router.get("/status", response_model=HardwareStatusResponse) +async def get_hardware_status(): + """Get complete hardware status.""" + try: + hw_status = hardware_manager.get_hardware_status() + + # Convert temperature readings + temperatures = {} + for sensor_name, reading in hw_status.temperatures.items(): + temperatures[sensor_name] = TemperatureReading( + value=reading.value, + sensor_name=reading.sensor_name, + timestamp=reading.timestamp.isoformat(), + units=reading.units, + is_valid=reading.is_valid, + error_message=reading.error_message + ) + + # Convert motor statuses + motors = {} + for motor_name, motor in hw_status.motors.items(): + motors[motor_name] = MotorStatus( + name=motor.name, + state=motor.state.value, + is_enabled=motor.is_enabled, + current=motor.current, + runtime_hours=motor.runtime_hours, + error_count=motor.error_count + ) + + # Convert safety status + safety = SafetyStatus( + emergency_stop_active=hw_status.safety.emergency_stop_active, + cover_sensor_closed=hw_status.safety.cover_sensor_closed, + temperature_alarms=hw_status.safety.temperature_alarms, + current_alarms=hw_status.safety.current_alarms, + last_safety_check=hw_status.safety.last_safety_check.isoformat() if hw_status.safety.last_safety_check else None + ) + + return HardwareStatusResponse( + temperatures=temperatures, + motors=motors, + safety=safety, + communication_health=hw_status.communication_health, + system_status=hw_status.system_status.value, + last_update=hw_status.last_update.isoformat() if hw_status.last_update else None + ) + + except Exception as e: + logger.error(f"Failed to get hardware status: {e}") + raise HTTPException( + status_code=500, + detail="Failed to retrieve hardware status" + ) + + +@router.get("/temperatures", response_model=Dict[str, TemperatureReading]) +async def get_temperatures(): + """Get all temperature readings.""" + try: + temperatures = await hardware_manager.get_all_temperatures() + + result = {} + for sensor_name, reading in temperatures.items(): + result[sensor_name] = TemperatureReading( + value=reading.value, + sensor_name=reading.sensor_name, + timestamp=reading.timestamp.isoformat(), + units=reading.units, + is_valid=reading.is_valid, + error_message=reading.error_message + ) + + return result + + except Exception as e: + logger.error(f"Failed to get temperatures: {e}") + raise HTTPException( + status_code=500, + detail="Failed to retrieve temperature readings" + ) + + +@router.get("/temperatures/{sensor_name}", response_model=TemperatureReading) +async def get_temperature(sensor_name: str): + """Get temperature reading for a specific sensor.""" + try: + reading = await hardware_manager.get_temperature(sensor_name) + + if not reading: + raise HTTPException( + status_code=404, + detail=f"Temperature sensor '{sensor_name}' not found" + ) + + return TemperatureReading( + value=reading.value, + sensor_name=reading.sensor_name, + timestamp=reading.timestamp.isoformat(), + units=reading.units, + is_valid=reading.is_valid, + error_message=reading.error_message + ) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Failed to get temperature for {sensor_name}: {e}") + raise HTTPException( + status_code=500, + detail=f"Failed to retrieve temperature for sensor '{sensor_name}'" + ) + + +@router.get("/temperatures/tank/average") +async def get_average_tank_temperature(): + """Get average temperature of tank sensors.""" + try: + avg_temp = await hardware_manager.get_average_tank_temperature() + + if avg_temp is None: + raise HTTPException( + status_code=503, + detail="No valid tank temperature readings available" + ) + + return { + "average_temperature": avg_temp, + "units": "°C", + "sensors_used": ["tank_bottom", "tank_wall"], + "timestamp": hardware_manager.current_status.last_update.isoformat() if hardware_manager.current_status.last_update else None + } + + except HTTPException: + raise + except Exception as e: + logger.error(f"Failed to get average tank temperature: {e}") + raise HTTPException( + status_code=500, + detail="Failed to calculate average tank temperature" + ) + + +@router.get("/motors", response_model=Dict[str, MotorStatus]) +async def get_motor_status(): + """Get status of all motors.""" + try: + hw_status = hardware_manager.get_hardware_status() + + motors = {} + for motor_name, motor in hw_status.motors.items(): + motors[motor_name] = MotorStatus( + name=motor.name, + state=motor.state.value, + is_enabled=motor.is_enabled, + current=motor.current, + runtime_hours=motor.runtime_hours, + error_count=motor.error_count + ) + + return motors + + except Exception as e: + logger.error(f"Failed to get motor status: {e}") + raise HTTPException( + status_code=500, + detail="Failed to retrieve motor status" + ) + + +@router.post("/motors/{motor_name}/control") +async def control_motor(motor_name: str, request: MotorControlRequest): + """Enable or disable a specific motor.""" + try: + success = await hardware_manager.set_motor_state(motor_name, request.enabled) + + if not success: + raise HTTPException( + status_code=400, + detail=f"Failed to control motor '{motor_name}'" + ) + + action = "enabled" if request.enabled else "disabled" + return { + "success": True, + "message": f"Motor '{motor_name}' {action} successfully", + "motor_name": motor_name, + "enabled": request.enabled + } + + except HTTPException: + raise + except Exception as e: + logger.error(f"Failed to control motor {motor_name}: {e}") + raise HTTPException( + status_code=500, + detail=f"Failed to control motor '{motor_name}'" + ) + + +@router.post("/motors/disable-all") +async def disable_all_motors(): + """Disable all motors for safety.""" + try: + success = await hardware_manager.disable_all_motors() + + return { + "success": success, + "message": "All motors disabled" if success else "Failed to disable all motors" + } + + except Exception as e: + logger.error(f"Failed to disable all motors: {e}") + raise HTTPException( + status_code=500, + detail="Failed to disable all motors" + ) + + +@router.post("/heaters/{heater_name}/control") +async def control_heater(heater_name: str, request: HeaterControlRequest): + """Enable or disable a specific heater.""" + try: + success = await hardware_manager.set_heater_state(heater_name, request.enabled) + + if not success: + raise HTTPException( + status_code=400, + detail=f"Failed to control heater '{heater_name}'" + ) + + action = "enabled" if request.enabled else "disabled" + return { + "success": True, + "message": f"Heater '{heater_name}' {action} successfully", + "heater_name": heater_name, + "enabled": request.enabled + } + + except HTTPException: + raise + except Exception as e: + logger.error(f"Failed to control heater {heater_name}: {e}") + raise HTTPException( + status_code=500, + detail=f"Failed to control heater '{heater_name}'" + ) + + +@router.post("/heaters/disable-all") +async def disable_all_heaters(): + """Disable all heaters for safety.""" + try: + success = await hardware_manager.disable_all_heaters() + + return { + "success": success, + "message": "All heaters disabled" if success else "Failed to disable all heaters" + } + + except Exception as e: + logger.error(f"Failed to disable all heaters: {e}") + raise HTTPException( + status_code=500, + detail="Failed to disable all heaters" + ) + + +@router.post("/emergency-stop") +async def emergency_stop(): + """Execute hardware emergency stop.""" + try: + success = await hardware_manager.emergency_stop() + + return { + "success": success, + "message": "Emergency stop executed" if success else "Emergency stop failed" + } + + except Exception as e: + logger.critical(f"Hardware emergency stop failed: {e}") + raise HTTPException( + status_code=500, + detail="Hardware emergency stop failed" + ) + + +@router.get("/safety", response_model=SafetyStatus) +async def get_safety_status(): + """Get hardware safety status.""" + try: + hw_status = hardware_manager.get_hardware_status() + + return SafetyStatus( + emergency_stop_active=hw_status.safety.emergency_stop_active, + cover_sensor_closed=hw_status.safety.cover_sensor_closed, + temperature_alarms=hw_status.safety.temperature_alarms, + current_alarms=hw_status.safety.current_alarms, + last_safety_check=hw_status.safety.last_safety_check.isoformat() if hw_status.safety.last_safety_check else None + ) + + except Exception as e: + logger.error(f"Failed to get safety status: {e}") + raise HTTPException( + status_code=500, + detail="Failed to retrieve safety status" + ) + + +@router.get("/safety/check") +async def check_safety(): + """Perform safety check and return results.""" + try: + is_safe, issues = await hardware_manager.is_safe_to_operate() + + return { + "is_safe": is_safe, + "issues": issues, + "timestamp": hardware_manager.current_status.last_update.isoformat() if hardware_manager.current_status.last_update else None + } + + except Exception as e: + logger.error(f"Safety check failed: {e}") + raise HTTPException( + status_code=500, + detail="Safety check failed" + ) + + +@router.get("/communication/statistics") +async def get_communication_statistics(): + """Get Modbus communication statistics.""" + try: + stats = hardware_manager.get_communication_statistics() + return stats + + except Exception as e: + logger.error(f"Failed to get communication statistics: {e}") + raise HTTPException( + status_code=500, + detail="Failed to retrieve communication statistics" + ) \ No newline at end of file diff --git a/python_rewrite/src/tempering_machine/services/web/routers/health.py b/python_rewrite/src/tempering_machine/services/web/routers/health.py new file mode 100644 index 0000000..e2448c9 --- /dev/null +++ b/python_rewrite/src/tempering_machine/services/web/routers/health.py @@ -0,0 +1,259 @@ +""" +Health check endpoints for system monitoring. +""" + +import logging +from typing import Dict, Any +from datetime import datetime + +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel + +from ....shared.config import settings +from ...hardware.hardware_manager import hardware_manager +from ...recipe.recipe_controller import recipe_controller +from ...safety.safety_monitor import safety_monitor + + +logger = logging.getLogger(__name__) + +router = APIRouter() + + +class HealthStatus(BaseModel): + """Health status response model.""" + status: str # "healthy", "degraded", "unhealthy" + timestamp: datetime + uptime_seconds: float + version: str + environment: str + + +class DetailedHealthStatus(BaseModel): + """Detailed health status with component information.""" + overall_status: str + timestamp: datetime + uptime_seconds: float + version: str + environment: str + components: Dict[str, Dict[str, Any]] + + +@router.get("/", response_model=HealthStatus) +async def health_check(): + """Basic health check endpoint.""" + try: + # Quick health check + overall_status = "healthy" + + # Check hardware communication + hw_status = hardware_manager.get_hardware_status() + if hw_status.communication_health < 80: + overall_status = "degraded" + + # Check for critical alarms + alarm_summary = safety_monitor.get_alarm_summary() + if alarm_summary["active_by_priority"].get("critical", 0) > 0: + overall_status = "unhealthy" + + return HealthStatus( + status=overall_status, + timestamp=datetime.now(), + uptime_seconds=0.0, # Would calculate actual uptime + version=settings.app_version, + environment=settings.environment + ) + + except Exception as e: + logger.error(f"Health check failed: {e}") + raise HTTPException(status_code=503, detail="Health check failed") + + +@router.get("/detailed", response_model=DetailedHealthStatus) +async def detailed_health_check(): + """Detailed health check with component status.""" + try: + # Overall system status + overall_status = "healthy" + component_statuses = {} + + # Hardware status + try: + hw_status = hardware_manager.get_hardware_status() + hw_healthy = hw_status.communication_health >= 80 + + component_statuses["hardware"] = { + "status": "healthy" if hw_healthy else "unhealthy", + "communication_health": hw_status.communication_health, + "system_status": hw_status.system_status.value, + "last_update": hw_status.last_update.isoformat() if hw_status.last_update else None, + "temperature_sensors": len(hw_status.temperatures), + "motor_count": len(hw_status.motors), + } + + if not hw_healthy: + overall_status = "degraded" + + except Exception as e: + logger.error(f"Hardware health check failed: {e}") + component_statuses["hardware"] = { + "status": "unhealthy", + "error": str(e) + } + overall_status = "unhealthy" + + # Recipe controller status + try: + process_status = await recipe_controller.get_process_status() + + component_statuses["recipe_controller"] = { + "status": "healthy", + "has_active_process": process_status is not None, + "process_status": process_status["session"]["status"] if process_status else None, + } + + except Exception as e: + logger.error(f"Recipe controller health check failed: {e}") + component_statuses["recipe_controller"] = { + "status": "unhealthy", + "error": str(e) + } + overall_status = "degraded" + + # Safety monitor status + try: + alarm_summary = safety_monitor.get_alarm_summary() + critical_alarms = alarm_summary["active_by_priority"].get("critical", 0) + + safety_healthy = critical_alarms == 0 + + component_statuses["safety_monitor"] = { + "status": "healthy" if safety_healthy else "unhealthy", + "active_alarms": alarm_summary["total_active_alarms"], + "critical_alarms": critical_alarms, + "emergency_stop_active": alarm_summary["emergency_stop_active"], + "last_check": alarm_summary["last_safety_check"], + } + + if not safety_healthy: + overall_status = "unhealthy" + + except Exception as e: + logger.error(f"Safety monitor health check failed: {e}") + component_statuses["safety_monitor"] = { + "status": "unhealthy", + "error": str(e) + } + overall_status = "unhealthy" + + # Database status + try: + from ....shared.database import db_manager + # Simple database connectivity test would go here + component_statuses["database"] = { + "status": "healthy", + "connected": True, + } + + except Exception as e: + logger.error(f"Database health check failed: {e}") + component_statuses["database"] = { + "status": "unhealthy", + "error": str(e) + } + overall_status = "unhealthy" + + return DetailedHealthStatus( + overall_status=overall_status, + timestamp=datetime.now(), + uptime_seconds=0.0, # Would calculate actual uptime + version=settings.app_version, + environment=settings.environment, + components=component_statuses + ) + + except Exception as e: + logger.error(f"Detailed health check failed: {e}") + raise HTTPException(status_code=503, detail="Detailed health check failed") + + +@router.get("/ready") +async def readiness_check(): + """Kubernetes readiness probe endpoint.""" + try: + # Check if all critical services are ready + hw_status = hardware_manager.get_hardware_status() + + if hw_status.communication_health < 50: + raise HTTPException(status_code=503, detail="Hardware communication not ready") + + return {"status": "ready", "timestamp": datetime.now()} + + except HTTPException: + raise + except Exception as e: + logger.error(f"Readiness check failed: {e}") + raise HTTPException(status_code=503, detail="Service not ready") + + +@router.get("/live") +async def liveness_check(): + """Kubernetes liveness probe endpoint.""" + # Simple liveness check - if this endpoint responds, service is alive + return {"status": "alive", "timestamp": datetime.now()} + + +@router.get("/metrics") +async def metrics(): + """Prometheus-compatible metrics endpoint.""" + try: + # Get hardware stats + hw_status = hardware_manager.get_hardware_status() + comm_stats = hardware_manager.get_communication_statistics() + + # Get safety stats + alarm_summary = safety_monitor.get_alarm_summary() + + # Format as Prometheus metrics + metrics_text = f""" +# HELP tempering_machine_communication_health_percent Hardware communication health percentage +# TYPE tempering_machine_communication_health_percent gauge +tempering_machine_communication_health_percent {hw_status.communication_health} + +# HELP tempering_machine_temperature_celsius Current temperature readings +# TYPE tempering_machine_temperature_celsius gauge +""" + + # Add temperature metrics + for sensor_name, reading in hw_status.temperatures.items(): + if reading.is_valid: + metrics_text += f'tempering_machine_temperature_celsius{{sensor="{sensor_name}"}} {reading.value}\n' + + # Add communication metrics + if comm_stats: + metrics_text += f""" +# HELP tempering_machine_requests_total Total number of communication requests +# TYPE tempering_machine_requests_total counter +tempering_machine_requests_total {comm_stats.get("total_requests", 0)} + +# HELP tempering_machine_request_failures_total Total number of failed requests +# TYPE tempering_machine_request_failures_total counter +tempering_machine_request_failures_total {comm_stats.get("failed_requests", 0)} +""" + + # Add alarm metrics + metrics_text += f""" +# HELP tempering_machine_active_alarms Total number of active alarms +# TYPE tempering_machine_active_alarms gauge +tempering_machine_active_alarms {alarm_summary["total_active_alarms"]} + +# HELP tempering_machine_critical_alarms Total number of critical alarms +# TYPE tempering_machine_critical_alarms gauge +tempering_machine_critical_alarms {alarm_summary["active_by_priority"].get("critical", 0)} +""" + + return metrics_text.strip() + + except Exception as e: + logger.error(f"Metrics collection failed: {e}") + raise HTTPException(status_code=500, detail="Metrics collection failed") \ No newline at end of file diff --git a/python_rewrite/src/tempering_machine/services/web/routers/process.py b/python_rewrite/src/tempering_machine/services/web/routers/process.py new file mode 100644 index 0000000..75cce5c --- /dev/null +++ b/python_rewrite/src/tempering_machine/services/web/routers/process.py @@ -0,0 +1,245 @@ +""" +Process control API endpoints. +""" + +import logging +from typing import Optional, Dict, Any + +from fastapi import APIRouter, HTTPException, Query +from pydantic import BaseModel + +from ...recipe.recipe_controller import recipe_controller +from ....shared.models.recipe import RecipePhase + + +logger = logging.getLogger(__name__) + +router = APIRouter() + + +class ProcessStartRequest(BaseModel): + """Request model for starting a process.""" + recipe_id: int + user_id: Optional[str] = None + + +class ProcessStatusResponse(BaseModel): + """Response model for process status.""" + session_id: Optional[str] + recipe_id: Optional[int] + recipe_name: Optional[str] + status: Optional[str] + current_phase: Optional[str] + started_at: Optional[str] + duration_seconds: Optional[int] + started_by: Optional[str] + current_temperature: Optional[float] + target_temperature: Optional[float] + temperature_error: Optional[float] + error_count: int = 0 + warning_count: int = 0 + is_running: bool = False + + +class ProcessActionResponse(BaseModel): + """Response model for process actions.""" + success: bool + message: str + session_id: Optional[str] = None + + +@router.post("/start", response_model=ProcessActionResponse) +async def start_process(request: ProcessStartRequest): + """Start a new tempering process.""" + try: + logger.info(f"Starting process for recipe {request.recipe_id} by user {request.user_id}") + + session_id = await recipe_controller.start_recipe( + recipe_id=request.recipe_id, + user_id=request.user_id + ) + + return ProcessActionResponse( + success=True, + message="Process started successfully", + session_id=session_id + ) + + except Exception as e: + logger.error(f"Failed to start process: {e}") + raise HTTPException( + status_code=400, + detail=f"Failed to start process: {str(e)}" + ) + + +@router.post("/stop", response_model=ProcessActionResponse) +async def stop_process( + user_id: Optional[str] = None, + reason: str = "Manual stop" +): + """Stop the currently running process.""" + try: + logger.info(f"Stopping process by user {user_id}, reason: {reason}") + + success = await recipe_controller.stop_recipe( + user_id=user_id, + reason=reason + ) + + return ProcessActionResponse( + success=success, + message="Process stopped successfully" if success else "No process was running" + ) + + except Exception as e: + logger.error(f"Failed to stop process: {e}") + raise HTTPException( + status_code=500, + detail=f"Failed to stop process: {str(e)}" + ) + + +@router.post("/pause", response_model=ProcessActionResponse) +async def pause_process(user_id: Optional[str] = None): + """Pause the currently running process.""" + try: + logger.info(f"Pausing process by user {user_id}") + + success = await recipe_controller.pause_recipe(user_id=user_id) + + return ProcessActionResponse( + success=success, + message="Process paused successfully" if success else "No process is running to pause" + ) + + except Exception as e: + logger.error(f"Failed to pause process: {e}") + raise HTTPException( + status_code=500, + detail=f"Failed to pause process: {str(e)}" + ) + + +@router.post("/resume", response_model=ProcessActionResponse) +async def resume_process(user_id: Optional[str] = None): + """Resume a paused process.""" + try: + logger.info(f"Resuming process by user {user_id}") + + success = await recipe_controller.resume_recipe(user_id=user_id) + + return ProcessActionResponse( + success=success, + message="Process resumed successfully" if success else "No paused process to resume" + ) + + except Exception as e: + logger.error(f"Failed to resume process: {e}") + raise HTTPException( + status_code=500, + detail=f"Failed to resume process: {str(e)}" + ) + + +@router.post("/emergency-stop", response_model=ProcessActionResponse) +async def emergency_stop(user_id: Optional[str] = None): + """Execute emergency stop of the current process.""" + try: + logger.critical(f"Emergency stop initiated by user {user_id}") + + success = await recipe_controller.emergency_stop(user_id=user_id) + + return ProcessActionResponse( + success=success, + message="Emergency stop executed" if success else "Emergency stop failed" + ) + + except Exception as e: + logger.critical(f"Emergency stop failed: {e}") + raise HTTPException( + status_code=500, + detail=f"Emergency stop failed: {str(e)}" + ) + + +@router.get("/status", response_model=Optional[ProcessStatusResponse]) +async def get_process_status(): + """Get current process status.""" + try: + status = await recipe_controller.get_process_status() + + if not status: + return None + + session_info = status.get("session", {}) + state_machine_info = status.get("state_machine", {}) + + return ProcessStatusResponse( + session_id=session_info.get("session_id"), + recipe_id=state_machine_info.get("recipe_id"), + recipe_name=session_info.get("recipe_name"), + status=session_info.get("status"), + current_phase=state_machine_info.get("current_phase"), + started_at=session_info.get("started_at"), + duration_seconds=session_info.get("duration_seconds"), + started_by=session_info.get("started_by"), + current_temperature=state_machine_info.get("current_temperature"), + target_temperature=state_machine_info.get("target_temperature"), + temperature_error=state_machine_info.get("temperature_error"), + error_count=state_machine_info.get("error_count", 0), + warning_count=state_machine_info.get("warning_count", 0), + is_running=state_machine_info.get("is_running", False) + ) + + except Exception as e: + logger.error(f"Failed to get process status: {e}") + raise HTTPException( + status_code=500, + detail="Failed to retrieve process status" + ) + + +@router.get("/phases", response_model=Dict[str, str]) +async def get_process_phases(): + """Get available process phases.""" + return {phase.value: phase.name for phase in RecipePhase} + + +@router.get("/active-sessions") +async def get_active_sessions(): + """Get list of active process sessions.""" + try: + sessions = await recipe_controller.get_active_sessions() + return {"active_sessions": sessions} + + except Exception as e: + logger.error(f"Failed to get active sessions: {e}") + raise HTTPException( + status_code=500, + detail="Failed to retrieve active sessions" + ) + + +@router.get("/history") +async def get_process_history( + limit: int = Query(50, ge=1, le=100, description="Maximum number of sessions to return"), + skip: int = Query(0, ge=0, description="Number of sessions to skip") +): + """Get process execution history.""" + try: + # This would query the database for process session history + # For now, return empty list as placeholder + return { + "sessions": [], + "total": 0, + "page": (skip // limit) + 1, + "per_page": limit + } + + except Exception as e: + logger.error(f"Failed to get process history: {e}") + raise HTTPException( + status_code=500, + detail="Failed to retrieve process history" + ) \ No newline at end of file diff --git a/python_rewrite/src/tempering_machine/services/web/routers/recipes.py b/python_rewrite/src/tempering_machine/services/web/routers/recipes.py new file mode 100644 index 0000000..74b40f6 --- /dev/null +++ b/python_rewrite/src/tempering_machine/services/web/routers/recipes.py @@ -0,0 +1,355 @@ +""" +Recipe management API endpoints. +""" + +import logging +from typing import List, Optional +from datetime import datetime + +from fastapi import APIRouter, HTTPException, Depends, Query +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select, func + +from ....shared.database import get_db +from ....shared.models.recipe import Recipe +from ....shared.schemas.recipe import ( + RecipeCreate, RecipeUpdate, RecipeResponse, RecipeList, RecipeSummary +) + + +logger = logging.getLogger(__name__) + +router = APIRouter() + + +@router.get("/", response_model=RecipeList) +async def list_recipes( + skip: int = Query(0, ge=0, description="Number of recipes to skip"), + limit: int = Query(50, ge=1, le=100, description="Maximum number of recipes to return"), + active_only: bool = Query(False, description="Return only active recipes"), + search: Optional[str] = Query(None, description="Search recipes by name"), + db: AsyncSession = Depends(get_db) +): + """Get list of recipes with pagination and filtering.""" + try: + # Build query + query = select(Recipe) + + if active_only: + query = query.where(Recipe.is_active == True) + + if search: + query = query.where(Recipe.name.ilike(f"%{search}%")) + + # Get total count + count_query = select(func.count(Recipe.id)) + if active_only: + count_query = count_query.where(Recipe.is_active == True) + if search: + count_query = count_query.where(Recipe.name.ilike(f"%{search}%")) + + total_result = await db.execute(count_query) + total = total_result.scalar() or 0 + + # Get paginated results + query = query.order_by(Recipe.created_at.desc()).offset(skip).limit(limit) + result = await db.execute(query) + recipes = result.scalars().all() + + return RecipeList( + recipes=[RecipeResponse.from_orm(recipe) for recipe in recipes], + total=total, + page=(skip // limit) + 1, + per_page=limit + ) + + except Exception as e: + logger.error(f"Error listing recipes: {e}") + raise HTTPException(status_code=500, detail="Failed to retrieve recipes") + + +@router.post("/", response_model=RecipeResponse, status_code=201) +async def create_recipe( + recipe_data: RecipeCreate, + db: AsyncSession = Depends(get_db) +): + """Create a new recipe.""" + try: + # Check if recipe name already exists + existing_query = select(Recipe).where(Recipe.name == recipe_data.name) + existing_result = await db.execute(existing_query) + existing_recipe = existing_result.scalar_one_or_none() + + if existing_recipe: + raise HTTPException( + status_code=409, + detail=f"Recipe with name '{recipe_data.name}' already exists" + ) + + # Create new recipe + new_recipe = Recipe( + name=recipe_data.name, + description=recipe_data.description, + heating_goal=recipe_data.heating_goal, + cooling_goal=recipe_data.cooling_goal, + pouring_goal=recipe_data.pouring_goal, + tank_temp=recipe_data.tank_temp, + fountain_temp=recipe_data.fountain_temp, + mixer_enabled=recipe_data.mixer_enabled, + fountain_enabled=recipe_data.fountain_enabled, + mold_heater_enabled=recipe_data.mold_heater_enabled, + vibration_enabled=recipe_data.vibration_enabled, + vib_heater_enabled=recipe_data.vib_heater_enabled, + pedal_control_enabled=recipe_data.pedal_control_enabled, + pedal_on_time=recipe_data.pedal_on_time, + pedal_off_time=recipe_data.pedal_off_time, + created_at=datetime.now(), + ) + + # Validate recipe parameters + if not new_recipe.validate_temperatures(): + raise HTTPException( + status_code=400, + detail="Invalid temperature parameters" + ) + + db.add(new_recipe) + await db.commit() + await db.refresh(new_recipe) + + logger.info(f"Created new recipe: {new_recipe.name} (ID: {new_recipe.id})") + + return RecipeResponse.from_orm(new_recipe) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error creating recipe: {e}") + await db.rollback() + raise HTTPException(status_code=500, detail="Failed to create recipe") + + +@router.get("/{recipe_id}", response_model=RecipeResponse) +async def get_recipe( + recipe_id: int, + db: AsyncSession = Depends(get_db) +): + """Get a specific recipe by ID.""" + try: + query = select(Recipe).where(Recipe.id == recipe_id) + result = await db.execute(query) + recipe = result.scalar_one_or_none() + + if not recipe: + raise HTTPException(status_code=404, detail="Recipe not found") + + return RecipeResponse.from_orm(recipe) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error retrieving recipe {recipe_id}: {e}") + raise HTTPException(status_code=500, detail="Failed to retrieve recipe") + + +@router.put("/{recipe_id}", response_model=RecipeResponse) +async def update_recipe( + recipe_id: int, + recipe_update: RecipeUpdate, + db: AsyncSession = Depends(get_db) +): + """Update an existing recipe.""" + try: + # Get existing recipe + query = select(Recipe).where(Recipe.id == recipe_id) + result = await db.execute(query) + recipe = result.scalar_one_or_none() + + if not recipe: + raise HTTPException(status_code=404, detail="Recipe not found") + + # Update fields that are provided + update_data = recipe_update.dict(exclude_unset=True) + + for field, value in update_data.items(): + setattr(recipe, field, value) + + # Update metadata + recipe.updated_at = datetime.now() + recipe.version += 1 + + # Validate updated recipe + if not recipe.validate_temperatures(): + raise HTTPException( + status_code=400, + detail="Invalid temperature parameters" + ) + + await db.commit() + await db.refresh(recipe) + + logger.info(f"Updated recipe: {recipe.name} (ID: {recipe.id})") + + return RecipeResponse.from_orm(recipe) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error updating recipe {recipe_id}: {e}") + await db.rollback() + raise HTTPException(status_code=500, detail="Failed to update recipe") + + +@router.delete("/{recipe_id}", status_code=204) +async def delete_recipe( + recipe_id: int, + db: AsyncSession = Depends(get_db) +): + """Delete a recipe (soft delete by setting inactive).""" + try: + # Get existing recipe + query = select(Recipe).where(Recipe.id == recipe_id) + result = await db.execute(query) + recipe = result.scalar_one_or_none() + + if not recipe: + raise HTTPException(status_code=404, detail="Recipe not found") + + # Soft delete by setting inactive + recipe.is_active = False + recipe.updated_at = datetime.now() + + await db.commit() + + logger.info(f"Deleted recipe: {recipe.name} (ID: {recipe.id})") + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error deleting recipe {recipe_id}: {e}") + await db.rollback() + raise HTTPException(status_code=500, detail="Failed to delete recipe") + + +@router.post("/{recipe_id}/duplicate", response_model=RecipeResponse, status_code=201) +async def duplicate_recipe( + recipe_id: int, + new_name: str = Query(..., description="Name for the duplicated recipe"), + db: AsyncSession = Depends(get_db) +): + """Duplicate an existing recipe with a new name.""" + try: + # Get source recipe + query = select(Recipe).where(Recipe.id == recipe_id) + result = await db.execute(query) + source_recipe = result.scalar_one_or_none() + + if not source_recipe: + raise HTTPException(status_code=404, detail="Source recipe not found") + + # Check if new name already exists + name_query = select(Recipe).where(Recipe.name == new_name) + name_result = await db.execute(name_query) + existing_recipe = name_result.scalar_one_or_none() + + if existing_recipe: + raise HTTPException( + status_code=409, + detail=f"Recipe with name '{new_name}' already exists" + ) + + # Create duplicate + duplicate_recipe = Recipe( + name=new_name, + description=f"Copy of {source_recipe.name}", + heating_goal=source_recipe.heating_goal, + cooling_goal=source_recipe.cooling_goal, + pouring_goal=source_recipe.pouring_goal, + tank_temp=source_recipe.tank_temp, + fountain_temp=source_recipe.fountain_temp, + mixer_enabled=source_recipe.mixer_enabled, + fountain_enabled=source_recipe.fountain_enabled, + mold_heater_enabled=source_recipe.mold_heater_enabled, + vibration_enabled=source_recipe.vibration_enabled, + vib_heater_enabled=source_recipe.vib_heater_enabled, + pedal_control_enabled=source_recipe.pedal_control_enabled, + pedal_on_time=source_recipe.pedal_on_time, + pedal_off_time=source_recipe.pedal_off_time, + created_at=datetime.now(), + ) + + db.add(duplicate_recipe) + await db.commit() + await db.refresh(duplicate_recipe) + + logger.info(f"Duplicated recipe: {source_recipe.name} -> {new_name} (ID: {duplicate_recipe.id})") + + return RecipeResponse.from_orm(duplicate_recipe) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error duplicating recipe {recipe_id}: {e}") + await db.rollback() + raise HTTPException(status_code=500, detail="Failed to duplicate recipe") + + +@router.get("/{recipe_id}/validate") +async def validate_recipe( + recipe_id: int, + db: AsyncSession = Depends(get_db) +): + """Validate a recipe's parameters.""" + try: + query = select(Recipe).where(Recipe.id == recipe_id) + result = await db.execute(query) + recipe = result.scalar_one_or_none() + + if not recipe: + raise HTTPException(status_code=404, detail="Recipe not found") + + # Validate recipe + is_valid = recipe.validate_temperatures() + + validation_result = { + "recipe_id": recipe_id, + "is_valid": is_valid, + "checks": { + "temperature_ranges": True, # Would implement detailed checks + "cooling_less_than_heating": recipe.cooling_goal < recipe.heating_goal, + "pouring_within_range": ( + recipe.pouring_goal is None or + (recipe.cooling_goal <= recipe.pouring_goal <= recipe.heating_goal) + ), + } + } + + if not is_valid: + validation_result["errors"] = [ + "Temperature parameters are invalid" + ] + + return validation_result + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error validating recipe {recipe_id}: {e}") + raise HTTPException(status_code=500, detail="Failed to validate recipe") + + +@router.get("/summary/active", response_model=List[RecipeSummary]) +async def get_active_recipes_summary( + db: AsyncSession = Depends(get_db) +): + """Get summary of all active recipes.""" + try: + query = select(Recipe).where(Recipe.is_active == True).order_by(Recipe.name) + result = await db.execute(query) + recipes = result.scalars().all() + + return [RecipeSummary.from_orm(recipe) for recipe in recipes] + + except Exception as e: + logger.error(f"Error getting active recipes summary: {e}") + raise HTTPException(status_code=500, detail="Failed to get active recipes") \ No newline at end of file diff --git a/python_rewrite/src/tempering_machine/services/web/routers/system.py b/python_rewrite/src/tempering_machine/services/web/routers/system.py new file mode 100644 index 0000000..fd231ba --- /dev/null +++ b/python_rewrite/src/tempering_machine/services/web/routers/system.py @@ -0,0 +1,159 @@ +""" +System management API endpoints. +""" + +import logging +from typing import List, Dict, Any + +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel + +from ...safety.safety_monitor import safety_monitor +from ...safety.error_handler import error_handler + +logger = logging.getLogger(__name__) + +router = APIRouter() + + +class AlarmResponse(BaseModel): + """Alarm response model.""" + id: str + title: str + message: str + category: str + priority: str + state: str + timestamp: str + occurrence_count: int = 1 + + +class SystemInfoResponse(BaseModel): + """System information response.""" + name: str + version: str + environment: str + uptime_seconds: float + active_alarms: int + error_count: int + + +@router.get("/info", response_model=SystemInfoResponse) +async def get_system_info(): + """Get system information.""" + try: + alarm_summary = safety_monitor.get_alarm_summary() + error_stats = error_handler.get_error_statistics() + + return SystemInfoResponse( + name="Chocolate Tempering Machine", + version="1.0.0", + environment="development", + uptime_seconds=0.0, # Would calculate actual uptime + active_alarms=alarm_summary["total_active_alarms"], + error_count=error_stats["active_errors"] + ) + + except Exception as e: + logger.error(f"Failed to get system info: {e}") + raise HTTPException(status_code=500, detail="Failed to get system information") + + +@router.get("/alarms", response_model=List[AlarmResponse]) +async def get_alarms(priority: Optional[str] = None): + """Get active alarms.""" + try: + # Filter by priority if specified + priority_filter = None + if priority: + from ...safety.safety_monitor import AlarmPriority + try: + priority_filter = AlarmPriority(priority.upper()) + except ValueError: + raise HTTPException(status_code=400, detail=f"Invalid priority: {priority}") + + alarms = safety_monitor.get_active_alarms(priority_filter) + + return [ + AlarmResponse( + id=alarm.id, + title=alarm.title, + message=alarm.message, + category=alarm.category.value, + priority=alarm.priority.value, + state=alarm.state.value, + timestamp=alarm.timestamp.isoformat(), + occurrence_count=alarm.occurrence_count + ) + for alarm in alarms + ] + + except HTTPException: + raise + except Exception as e: + logger.error(f"Failed to get alarms: {e}") + raise HTTPException(status_code=500, detail="Failed to retrieve alarms") + + +@router.post("/alarms/{alarm_id}/acknowledge") +async def acknowledge_alarm(alarm_id: str, user: str = "system"): + """Acknowledge an alarm.""" + try: + success = await safety_monitor.acknowledge_alarm(alarm_id, user) + + return { + "success": success, + "message": f"Alarm {alarm_id} acknowledged" if success else f"Failed to acknowledge alarm {alarm_id}" + } + + except Exception as e: + logger.error(f"Failed to acknowledge alarm {alarm_id}: {e}") + raise HTTPException(status_code=500, detail="Failed to acknowledge alarm") + + +@router.get("/errors") +async def get_active_errors(): + """Get active system errors.""" + try: + errors = error_handler.get_active_errors() + + return { + "active_errors": [ + { + "error_id": error.error_id, + "component": error.component, + "function_name": error.function_name, + "timestamp": error.timestamp.isoformat(), + "exception": str(error.exception), + "retry_count": error.retry_count, + } + for error in errors + ], + "statistics": error_handler.get_error_statistics() + } + + except Exception as e: + logger.error(f"Failed to get active errors: {e}") + raise HTTPException(status_code=500, detail="Failed to retrieve active errors") + + +@router.get("/configuration") +async def get_system_configuration(): + """Get system configuration.""" + try: + from ....shared.config import settings + + return { + "environment": settings.environment, + "debug": settings.debug, + "log_level": settings.log_level.value, + "app_name": settings.app_name, + "app_version": settings.app_version, + "api_version": settings.web.api_version, + "host": settings.web.host, + "port": settings.web.port, + } + + except Exception as e: + logger.error(f"Failed to get system configuration: {e}") + raise HTTPException(status_code=500, detail="Failed to retrieve system configuration") \ No newline at end of file diff --git a/python_rewrite/src/tempering_machine/services/web/routers/users.py b/python_rewrite/src/tempering_machine/services/web/routers/users.py new file mode 100644 index 0000000..bf7e40f --- /dev/null +++ b/python_rewrite/src/tempering_machine/services/web/routers/users.py @@ -0,0 +1,40 @@ +""" +User management API endpoints. +""" + +import logging +from typing import List, Optional + +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel + +logger = logging.getLogger(__name__) + +router = APIRouter() + +# Placeholder schemas since user models aren't fully implemented +class UserResponse(BaseModel): + id: int + username: str + email: Optional[str] + full_name: Optional[str] + role: str + is_active: bool + +class UserCreate(BaseModel): + username: str + email: Optional[str] + full_name: Optional[str] + role: str = "operator" + +@router.get("/", response_model=List[UserResponse]) +async def list_users(): + """Get list of users.""" + # Placeholder implementation + return [] + +@router.post("/", response_model=UserResponse, status_code=201) +async def create_user(user_data: UserCreate): + """Create a new user.""" + # Placeholder implementation + raise HTTPException(status_code=501, detail="User management not yet implemented") \ No newline at end of file diff --git a/python_rewrite/src/tempering_machine/shared/schemas/__init__.py b/python_rewrite/src/tempering_machine/shared/schemas/__init__.py index e69de29..79e0b85 100644 --- a/python_rewrite/src/tempering_machine/shared/schemas/__init__.py +++ b/python_rewrite/src/tempering_machine/shared/schemas/__init__.py @@ -0,0 +1,39 @@ +"""Pydantic schemas for API request/response validation. +""" + +from .recipe import RecipeCreate, RecipeUpdate, RecipeResponse, RecipeList +from .process import ProcessStart, ProcessStatus, ProcessResponse, ProcessLogResponse +from .hardware import HardwareStatus, TemperatureReading, MotorStatus, SafetyStatus +from .user import UserCreate, UserUpdate, UserResponse, UserLogin +from .system import SystemHealth, AlarmResponse, ErrorLogResponse + +__all__ = [ + # Recipe schemas + "RecipeCreate", + "RecipeUpdate", + "RecipeResponse", + "RecipeList", + + # Process schemas + "ProcessStart", + "ProcessStatus", + "ProcessResponse", + "ProcessLogResponse", + + # Hardware schemas + "HardwareStatus", + "TemperatureReading", + "MotorStatus", + "SafetyStatus", + + # User schemas + "UserCreate", + "UserUpdate", + "UserResponse", + "UserLogin", + + # System schemas + "SystemHealth", + "AlarmResponse", + "ErrorLogResponse", +] \ No newline at end of file diff --git a/python_rewrite/src/tempering_machine/shared/schemas/recipe.py b/python_rewrite/src/tempering_machine/shared/schemas/recipe.py new file mode 100644 index 0000000..64aa28d --- /dev/null +++ b/python_rewrite/src/tempering_machine/shared/schemas/recipe.py @@ -0,0 +1,127 @@ +""" +Pydantic schemas for recipe-related API operations. +""" + +from typing import Optional, List +from datetime import datetime +from pydantic import BaseModel, Field, validator + +from ..models.recipe import RecipePhase + + +class RecipeBase(BaseModel): + """Base recipe schema with common fields.""" + name: str = Field(..., min_length=1, max_length=100, description="Recipe name") + description: Optional[str] = Field(None, max_length=500, description="Recipe description") + + # Temperature goals + heating_goal: float = Field(..., ge=40.0, le=60.0, description="Target heating temperature (°C)") + cooling_goal: float = Field(..., ge=20.0, le=40.0, description="Target cooling temperature (°C)") + pouring_goal: Optional[float] = Field(None, ge=20.0, le=60.0, description="Target pouring temperature (°C)") + + # Operating temperatures + tank_temp: float = Field(45.0, ge=20.0, le=80.0, description="Tank operating temperature (°C)") + fountain_temp: float = Field(32.0, ge=20.0, le=60.0, description="Fountain operating temperature (°C)") + + # Motor settings + mixer_enabled: bool = Field(True, description="Enable mixer motor") + fountain_enabled: bool = Field(True, description="Enable fountain motor") + mold_heater_enabled: bool = Field(False, description="Enable mold heater") + vibration_enabled: bool = Field(False, description="Enable vibration motor") + vib_heater_enabled: bool = Field(False, description="Enable vibration heater") + + # Pedal control + pedal_control_enabled: bool = Field(True, description="Enable pedal control") + pedal_on_time: float = Field(2.0, ge=0.1, le=10.0, description="Pedal on duration (seconds)") + pedal_off_time: float = Field(3.0, ge=0.1, le=10.0, description="Pedal off duration (seconds)") + + @validator('cooling_goal') + def validate_cooling_goal(cls, v, values): + """Ensure cooling goal is less than heating goal.""" + heating_goal = values.get('heating_goal') + if heating_goal is not None and v >= heating_goal: + raise ValueError('Cooling goal must be less than heating goal') + return v + + @validator('pouring_goal') + def validate_pouring_goal(cls, v, values): + """Ensure pouring goal is within cooling and heating range.""" + if v is None: + return v + + cooling_goal = values.get('cooling_goal') + heating_goal = values.get('heating_goal') + + if cooling_goal is not None and v < cooling_goal: + raise ValueError('Pouring goal must be greater than or equal to cooling goal') + if heating_goal is not None and v > heating_goal: + raise ValueError('Pouring goal must be less than or equal to heating goal') + + return v + + +class RecipeCreate(RecipeBase): + """Schema for creating a new recipe.""" + pass + + +class RecipeUpdate(BaseModel): + """Schema for updating an existing recipe.""" + name: Optional[str] = Field(None, min_length=1, max_length=100) + description: Optional[str] = Field(None, max_length=500) + + heating_goal: Optional[float] = Field(None, ge=40.0, le=60.0) + cooling_goal: Optional[float] = Field(None, ge=20.0, le=40.0) + pouring_goal: Optional[float] = Field(None, ge=20.0, le=60.0) + + tank_temp: Optional[float] = Field(None, ge=20.0, le=80.0) + fountain_temp: Optional[float] = Field(None, ge=20.0, le=60.0) + + mixer_enabled: Optional[bool] = None + fountain_enabled: Optional[bool] = None + mold_heater_enabled: Optional[bool] = None + vibration_enabled: Optional[bool] = None + vib_heater_enabled: Optional[bool] = None + + pedal_control_enabled: Optional[bool] = None + pedal_on_time: Optional[float] = Field(None, ge=0.1, le=10.0) + pedal_off_time: Optional[float] = Field(None, ge=0.1, le=10.0) + + is_active: Optional[bool] = None + + +class RecipeResponse(RecipeBase): + """Schema for recipe API responses.""" + id: int + created_at: datetime + updated_at: datetime + created_by: Optional[str] = None + version: int + is_active: bool + usage_count: int + last_used: Optional[datetime] = None + + class Config: + from_attributes = True + + +class RecipeList(BaseModel): + """Schema for recipe list responses.""" + recipes: List[RecipeResponse] + total: int + page: int = 1 + per_page: int = 50 + + +class RecipeSummary(BaseModel): + """Schema for recipe summary information.""" + id: int + name: str + heating_goal: float + cooling_goal: float + is_active: bool + usage_count: int + last_used: Optional[datetime] = None + + class Config: + from_attributes = True \ No newline at end of file