Add API endpoints for health checks, process control, recipe management, system management, and user management

- Implement health check endpoints for system monitoring including basic, detailed, readiness, liveness, and metrics.
- Create process control endpoints to start, stop, pause, resume, and manage processes.
- Add recipe management endpoints for listing, creating, retrieving, updating, deleting, duplicating, and validating recipes.
- Introduce system management endpoints for retrieving system information, active alarms, and configuration.
- Establish user management endpoints for listing and creating users with placeholder implementations.
- Define Pydantic schemas for API request/response validation related to recipes, processes, and users.
This commit is contained in:
2025-08-06 22:15:54 +02:00
parent 9cdd074a39
commit c3bc2e453b
19 changed files with 4949 additions and 0 deletions

114
python_rewrite/.env.example Normal file
View File

@@ -0,0 +1,114 @@
# Environment Configuration for Chocolate Tempering Machine
# Copy this file to .env and adjust values for your environment
# Application Settings
TEMPERING_ENVIRONMENT=development
TEMPERING_DEBUG=true
TEMPERING_LOG_LEVEL=DEBUG
TEMPERING_APP_NAME=Chocolate Tempering Machine
TEMPERING_APP_VERSION=1.0.0
# Database Configuration
TEMPERING_DATABASE_URL=sqlite:///data/tempering_machine.db
# For PostgreSQL: postgresql+asyncpg://user:password@localhost:5432/tempering_machine
TEMPERING_DATABASE__ECHO=false
TEMPERING_DATABASE__POOL_SIZE=5
TEMPERING_DATABASE__MAX_OVERFLOW=10
# Serial/Modbus Hardware Configuration
TEMPERING_SERIAL__PORT=/dev/ttyUSB0
TEMPERING_SERIAL__BAUDRATE=9600
TEMPERING_SERIAL__TIMEOUT=2.0
TEMPERING_SERIAL__BYTESIZE=8
TEMPERING_SERIAL__PARITY=N
TEMPERING_SERIAL__STOPBITS=1
TEMPERING_SERIAL__MAX_RETRIES=3
TEMPERING_SERIAL__RETRY_DELAY=0.1
# Modbus Configuration
TEMPERING_MODBUS__SLAVE_ADDRESS=1
TEMPERING_MODBUS__READ_TIMEOUT=2.0
TEMPERING_MODBUS__WRITE_TIMEOUT=2.0
TEMPERING_MODBUS__MAX_READ_REGISTERS=125
TEMPERING_MODBUS__MAX_WRITE_REGISTERS=100
TEMPERING_MODBUS__CONNECTION_CHECK_INTERVAL=10.0
# Temperature Control Configuration
TEMPERING_TEMPERATURE__ABSOLUTE_MAX_TEMP=80.0
TEMPERING_TEMPERATURE__ABSOLUTE_MIN_TEMP=10.0
TEMPERING_TEMPERATURE__TANK_MAX_HEAT=60.0
TEMPERING_TEMPERATURE__PUMP_MAX_HEAT=55.0
TEMPERING_TEMPERATURE__PUMP_MIN_HEAT=25.0
TEMPERING_TEMPERATURE__TEMPERATURE_TOLERANCE=0.5
TEMPERING_TEMPERATURE__SENSOR_ACCURACY=0.1
# PID Control Defaults
TEMPERING_TEMPERATURE__DEFAULT_KP=1.0
TEMPERING_TEMPERATURE__DEFAULT_KI=0.1
TEMPERING_TEMPERATURE__DEFAULT_KD=0.01
TEMPERING_TEMPERATURE__DEFAULT_KL=100.0
# Safety Configuration
TEMPERING_SAFETY__GRID_VOLTAGE_NOMINAL=230.0
TEMPERING_SAFETY__GRID_VOLTAGE_TOLERANCE=0.1
TEMPERING_SAFETY__GRID_FREQUENCY_NOMINAL=50.0
TEMPERING_SAFETY__GRID_FREQUENCY_TOLERANCE=0.02
TEMPERING_SAFETY__MAX_NEUTRAL_CURRENT=16.0
TEMPERING_SAFETY__MAX_MOTOR1_CURRENT=10.0
TEMPERING_SAFETY__MAX_MOTOR2_CURRENT=10.0
TEMPERING_SAFETY__ERROR_CHECK_INTERVAL=1.0
TEMPERING_SAFETY__AUTO_RECOVERY_ATTEMPTS=3
TEMPERING_SAFETY__AUTO_RECOVERY_DELAY=5.0
TEMPERING_SAFETY__COMMUNICATION_TIMEOUT=3.0
TEMPERING_SAFETY__HEARTBEAT_INTERVAL=1.0
# Process Control Configuration
TEMPERING_PROCESS__HEATING_DELAY=60.0
TEMPERING_PROCESS__COOLING_DELAY=120.0
TEMPERING_PROCESS__POURING_DELAY=30.0
TEMPERING_PROCESS__PUMP_DELAY=10.0
TEMPERING_PROCESS__MIXER_DELAY=5.0
TEMPERING_PROCESS__PROCESS_CONTROL_INTERVAL=0.5
TEMPERING_PROCESS__TEMPERATURE_READ_INTERVAL=1.0
TEMPERING_PROCESS__STATUS_UPDATE_INTERVAL=2.0
TEMPERING_PROCESS__MIN_HEATING_GOAL=40.0
TEMPERING_PROCESS__MAX_HEATING_GOAL=60.0
TEMPERING_PROCESS__MIN_COOLING_GOAL=20.0
TEMPERING_PROCESS__MAX_COOLING_GOAL=40.0
# Redis Configuration (for message queuing)
TEMPERING_REDIS__HOST=localhost
TEMPERING_REDIS__PORT=6379
TEMPERING_REDIS__DB=0
TEMPERING_REDIS__PASSWORD=
TEMPERING_REDIS__SOCKET_TIMEOUT=5.0
TEMPERING_REDIS__CONNECTION_POOL_MAX_CONNECTIONS=10
# Web Service Configuration
TEMPERING_WEB__HOST=0.0.0.0
TEMPERING_WEB__PORT=8000
TEMPERING_WEB__WORKERS=1
TEMPERING_WEB__RELOAD=true
TEMPERING_WEB__ACCESS_LOG=true
TEMPERING_WEB__API_TITLE=Chocolate Tempering Machine API
TEMPERING_WEB__API_VERSION=1.0.0
TEMPERING_WEB__CORS_ORIGINS=["http://localhost:3000", "http://localhost:8080"]
# File Paths
TEMPERING_DATA_DIRECTORY=data
TEMPERING_LOG_DIRECTORY=logs
TEMPERING_CONFIG_DIRECTORY=config
TEMPERING_BACKUP_DIRECTORY=backups
# Development/Testing Settings
# Uncomment for development
# TEMPERING_WEB__RELOAD=true
# TEMPERING_DATABASE__ECHO=true
# Production Settings
# Uncomment for production
# TEMPERING_ENVIRONMENT=production
# TEMPERING_DEBUG=false
# TEMPERING_LOG_LEVEL=INFO
# TEMPERING_WEB__RELOAD=false
# TEMPERING_WEB__WORKERS=4

81
python_rewrite/Dockerfile Normal file
View File

@@ -0,0 +1,81 @@
# Multi-stage Docker build for chocolate tempering machine control system
# Build stage
FROM python:3.11-slim as builder
# Set build arguments
ARG BUILD_ENV=production
# Install system dependencies
RUN apt-get update && apt-get install -y \
build-essential \
gcc \
g++ \
&& rm -rf /var/lib/apt/lists/*
# Set work directory
WORKDIR /app
# Copy requirements
COPY requirements.txt requirements-dev.txt ./
# Install Python dependencies
RUN pip install --no-cache-dir --upgrade pip && \
pip install --no-cache-dir -r requirements.txt
# If development build, install dev dependencies
RUN if [ "$BUILD_ENV" = "development" ]; then \
pip install --no-cache-dir -r requirements-dev.txt; \
fi
# Production stage
FROM python:3.11-slim as production
# Set environment variables
ENV PYTHONPATH=/app/src
ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1
# Create non-root user
RUN groupadd -r tempering && useradd -r -g tempering tempering
# Install runtime dependencies
RUN apt-get update && apt-get install -y \
curl \
&& rm -rf /var/lib/apt/lists/*
# Set work directory
WORKDIR /app
# Copy Python dependencies from builder stage
COPY --from=builder /usr/local/lib/python3.11/site-packages /usr/local/lib/python3.11/site-packages
COPY --from=builder /usr/local/bin /usr/local/bin
# Copy application code
COPY src/ ./src/
COPY alembic.ini ./
COPY alembic/ ./alembic/
# Create necessary directories
RUN mkdir -p /app/data /app/logs /app/config /app/backups && \
chown -R tempering:tempering /app
# Copy startup script
COPY docker/entrypoint.sh ./entrypoint.sh
RUN chmod +x ./entrypoint.sh
# Health check
HEALTHCHECK --interval=30s --timeout=30s --start-period=60s --retries=3 \
CMD curl -f http://localhost:8000/health/live || exit 1
# Switch to non-root user
USER tempering
# Expose port
EXPOSE 8000
# Set entrypoint
ENTRYPOINT ["./entrypoint.sh"]
# Default command
CMD ["web"]

View File

@@ -0,0 +1,152 @@
version: '3.8'
services:
# Main application
tempering-machine:
build:
context: .
dockerfile: Dockerfile
args:
BUILD_ENV: production
container_name: tempering-machine
restart: unless-stopped
ports:
- "8000:8000"
environment:
# Database configuration
- TEMPERING_DATABASE_URL=sqlite:///data/tempering_machine.db
# Serial/Modbus configuration
- TEMPERING_SERIAL__PORT=/dev/ttyUSB0
- TEMPERING_SERIAL__BAUDRATE=9600
- TEMPERING_SERIAL__TIMEOUT=2.0
# Safety configuration
- TEMPERING_SAFETY__MAX_TANK_TEMPERATURE=80.0
- TEMPERING_SAFETY__MAX_NEUTRAL_CURRENT=16.0
- TEMPERING_SAFETY__COMMUNICATION_TIMEOUT=5.0
# Web configuration
- TEMPERING_WEB__HOST=0.0.0.0
- TEMPERING_WEB__PORT=8000
- TEMPERING_WEB__CORS_ORIGINS=["http://localhost:3000", "http://localhost:8080"]
# Application settings
- TEMPERING_ENVIRONMENT=production
- TEMPERING_LOG_LEVEL=INFO
- TEMPERING_DEBUG=false
volumes:
- ./data:/app/data
- ./logs:/app/logs
- ./backups:/app/backups
- ./config:/app/config
# Mount serial device (adjust as needed)
- /dev/ttyUSB0:/dev/ttyUSB0
devices:
- /dev/ttyUSB0:/dev/ttyUSB0
networks:
- tempering-network
depends_on:
- redis
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health/live"]
interval: 30s
timeout: 10s
retries: 3
start_period: 60s
# Redis for message queuing and caching
redis:
image: redis:7-alpine
container_name: tempering-redis
restart: unless-stopped
ports:
- "6379:6379"
volumes:
- redis-data:/data
networks:
- tempering-network
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 30s
timeout: 10s
retries: 3
# Optional: PostgreSQL database (if not using SQLite)
postgres:
image: postgres:15-alpine
container_name: tempering-postgres
restart: unless-stopped
ports:
- "5432:5432"
environment:
- POSTGRES_DB=tempering_machine
- POSTGRES_USER=tempering
- POSTGRES_PASSWORD=tempering123
volumes:
- postgres-data:/var/lib/postgresql/data
- ./docker/postgres/init.sql:/docker-entrypoint-initdb.d/init.sql
networks:
- tempering-network
healthcheck:
test: ["CMD-SHELL", "pg_isready -U tempering"]
interval: 30s
timeout: 10s
retries: 3
profiles:
- postgres
# Optional: Grafana for monitoring dashboards
grafana:
image: grafana/grafana:latest
container_name: tempering-grafana
restart: unless-stopped
ports:
- "3001:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin123
- GF_USERS_ALLOW_SIGN_UP=false
volumes:
- grafana-data:/var/lib/grafana
- ./docker/grafana/dashboards:/etc/grafana/provisioning/dashboards
- ./docker/grafana/datasources:/etc/grafana/provisioning/datasources
networks:
- tempering-network
profiles:
- monitoring
# Optional: Prometheus for metrics collection
prometheus:
image: prom/prometheus:latest
container_name: tempering-prometheus
restart: unless-stopped
ports:
- "9090:9090"
volumes:
- prometheus-data:/prometheus
- ./docker/prometheus/prometheus.yml:/etc/prometheus/prometheus.yml
networks:
- tempering-network
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
- '--web.console.libraries=/etc/prometheus/console_libraries'
- '--web.console.templates=/etc/prometheus/consoles'
- '--storage.tsdb.retention.time=200h'
- '--web.enable-lifecycle'
profiles:
- monitoring
networks:
tempering-network:
driver: bridge
volumes:
redis-data:
driver: local
postgres-data:
driver: local
grafana-data:
driver: local
prometheus-data:
driver: local

View File

@@ -0,0 +1,134 @@
#!/bin/bash
set -e
# Entrypoint script for chocolate tempering machine container
# Function to wait for service
wait_for_service() {
local host=$1
local port=$2
local service_name=$3
echo "Waiting for $service_name..."
while ! nc -z $host $port; do
sleep 1
done
echo "$service_name is ready!"
}
# Function to run database migrations
run_migrations() {
echo "Running database migrations..."
alembic upgrade head
echo "Database migrations completed"
}
# Function to create initial data
create_initial_data() {
echo "Creating initial data..."
python -c "
import asyncio
from src.tempering_machine.shared.database import init_database, create_tables
async def setup():
init_database()
await create_tables()
print('Database tables created')
asyncio.run(setup())
"
}
# Main execution
case "$1" in
web)
echo "Starting chocolate tempering machine web service..."
# Wait for Redis if configured
if [ ! -z "$TEMPERING_REDIS__HOST" ]; then
wait_for_service ${TEMPERING_REDIS__HOST:-redis} ${TEMPERING_REDIS__PORT:-6379} "Redis"
fi
# Wait for PostgreSQL if configured
if [[ "$TEMPERING_DATABASE_URL" == postgresql* ]]; then
DB_HOST=$(echo $TEMPERING_DATABASE_URL | sed -n 's/.*@\([^:]*\).*/\1/p')
DB_PORT=$(echo $TEMPERING_DATABASE_URL | sed -n 's/.*:\([0-9]*\).*/\1/p')
wait_for_service ${DB_HOST:-postgres} ${DB_PORT:-5432} "PostgreSQL"
fi
# Run migrations
run_migrations
# Create initial data
create_initial_data
# Start web service
echo "Starting FastAPI web service..."
exec python -m uvicorn tempering_machine.services.web.main:app \
--host ${TEMPERING_WEB__HOST:-0.0.0.0} \
--port ${TEMPERING_WEB__PORT:-8000} \
--workers ${TEMPERING_WEB__WORKERS:-1} \
--access-log
;;
worker)
echo "Starting background worker..."
# Wait for Redis
wait_for_service ${TEMPERING_REDIS__HOST:-redis} ${TEMPERING_REDIS__PORT:-6379} "Redis"
# Start Celery worker
exec celery -A tempering_machine.services.worker worker \
--loglevel=${TEMPERING_LOG_LEVEL:-info} \
--concurrency=2
;;
migrate)
echo "Running database migrations only..."
run_migrations
;;
shell)
echo "Starting Python shell..."
exec python -c "
import asyncio
from src.tempering_machine.shared.database import init_database
from src.tempering_machine.shared.config import settings
print('Chocolate Tempering Machine - Python Shell')
print(f'Environment: {settings.environment}')
print(f'Database: {settings.database.url}')
print('')
init_database()
print('Database initialized. You can now import and use the application modules.')
print('')
# Import commonly used modules
from tempering_machine.shared.models import *
from tempering_machine.services.hardware.hardware_manager import hardware_manager
from tempering_machine.services.recipe.recipe_controller import recipe_controller
from tempering_machine.services.safety.safety_monitor import safety_monitor
import IPython
IPython.start_ipython(argv=[])
"
;;
test)
echo "Running tests..."
exec python -m pytest tests/ -v
;;
*)
echo "Usage: $0 {web|worker|migrate|shell|test}"
echo ""
echo "Commands:"
echo " web - Start FastAPI web service (default)"
echo " worker - Start Celery background worker"
echo " migrate - Run database migrations only"
echo " shell - Start interactive Python shell"
echo " test - Run test suite"
exit 1
;;
esac

View File

@@ -0,0 +1,379 @@
"""
Data logging service for chocolate tempering machine.
Handles high-frequency data collection, storage, and export.
"""
import asyncio
import logging
from typing import Dict, List, Optional, Any
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from pathlib import Path
import json
import csv
from ...shared.config import settings
from ...shared.database import db_manager
from ...shared.models.process import ProcessSession, TemperatureReading, ProcessLog
from ..hardware.hardware_manager import hardware_manager
logger = logging.getLogger(__name__)
@dataclass
class DataPoint:
"""Single data point for logging."""
timestamp: datetime
session_id: Optional[str]
sensor_name: str
value: float
units: str = "°C"
quality: str = "good" # good, poor, bad
additional_data: Dict[str, Any] = field(default_factory=dict)
class DataLogger:
"""
High-frequency data logger for process monitoring and analysis.
Collects temperature, motor, and process data at regular intervals.
"""
def __init__(self):
self.is_logging = False
self.logging_task: Optional[asyncio.Task] = None
self.data_buffer: List[DataPoint] = []
self.buffer_size = 1000
self.log_interval = 1.0 # seconds
self.current_session_id: Optional[str] = None
# Statistics
self.stats = {
"total_points_logged": 0,
"buffer_flushes": 0,
"logging_errors": 0,
"last_log_time": None,
"start_time": datetime.now(),
}
async def start_logging(self, session_id: Optional[str] = None) -> None:
"""Start data logging."""
if self.is_logging:
return
self.current_session_id = session_id
self.is_logging = True
self.logging_task = asyncio.create_task(self._logging_loop())
logger.info(f"Data logging started for session: {session_id}")
async def stop_logging(self) -> None:
"""Stop data logging and flush remaining data."""
if not self.is_logging:
return
self.is_logging = False
if self.logging_task:
self.logging_task.cancel()
try:
await self.logging_task
except asyncio.CancelledError:
pass
# Flush remaining data
await self._flush_buffer()
logger.info("Data logging stopped")
async def _logging_loop(self) -> None:
"""Main data logging loop."""
while self.is_logging:
try:
await self._collect_data_points()
# Flush buffer if full
if len(self.data_buffer) >= self.buffer_size:
await self._flush_buffer()
# Update statistics
self.stats["last_log_time"] = datetime.now()
# Wait for next collection cycle
await asyncio.sleep(self.log_interval)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in data logging loop: {e}")
self.stats["logging_errors"] += 1
await asyncio.sleep(1.0) # Error recovery delay
async def _collect_data_points(self) -> None:
"""Collect current data points from hardware."""
try:
timestamp = datetime.now()
# Collect temperature data
temperatures = await hardware_manager.get_all_temperatures()
for sensor_name, reading in temperatures.items():
quality = "good" if reading.is_valid else "bad"
data_point = DataPoint(
timestamp=timestamp,
session_id=self.current_session_id,
sensor_name=sensor_name,
value=reading.value if reading.is_valid else 0.0,
units=reading.units,
quality=quality,
additional_data={
"error_message": reading.error_message if not reading.is_valid else None
}
)
self.data_buffer.append(data_point)
self.stats["total_points_logged"] += 1
# Collect motor current data (if available)
hw_status = hardware_manager.get_hardware_status()
for motor_name, motor in hw_status.motors.items():
if motor.current is not None:
data_point = DataPoint(
timestamp=timestamp,
session_id=self.current_session_id,
sensor_name=f"{motor_name}_current",
value=motor.current,
units="A",
quality="good",
additional_data={
"motor_state": motor.state.value,
"is_enabled": motor.is_enabled
}
)
self.data_buffer.append(data_point)
self.stats["total_points_logged"] += 1
except Exception as e:
logger.error(f"Error collecting data points: {e}")
raise
async def _flush_buffer(self) -> None:
"""Flush data buffer to database."""
if not self.data_buffer:
return
try:
async with db_manager.get_async_session() as session:
# Convert data points to temperature readings
temperature_readings = []
for data_point in self.data_buffer:
if data_point.units == "°C": # Temperature readings
# Map sensor names to columns
temp_reading = TemperatureReading(
session_id=data_point.session_id,
timestamp=data_point.timestamp,
**{self._map_sensor_to_column(data_point.sensor_name): data_point.value}
)
temperature_readings.append(temp_reading)
# Batch insert temperature readings
if temperature_readings:
session.add_all(temperature_readings)
await session.commit()
# Clear buffer
self.data_buffer.clear()
self.stats["buffer_flushes"] += 1
logger.debug(f"Flushed {len(temperature_readings)} temperature readings to database")
except Exception as e:
logger.error(f"Error flushing data buffer: {e}")
# Don't clear buffer on error to retry later
raise
def _map_sensor_to_column(self, sensor_name: str) -> str:
"""Map sensor names to database column names."""
mapping = {
"tank_bottom": "tank_bottom",
"tank_wall": "tank_wall",
"pump": "pump",
"fountain": "fountain",
}
return mapping.get(sensor_name, "tank_bottom") # Default fallback
async def export_data(
self,
session_id: str,
format: str = "csv",
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None
) -> str:
"""Export logged data for a session."""
try:
async with db_manager.get_async_session() as session:
# Query temperature readings
query = session.query(TemperatureReading).filter(
TemperatureReading.session_id == session_id
)
if start_time:
query = query.filter(TemperatureReading.timestamp >= start_time)
if end_time:
query = query.filter(TemperatureReading.timestamp <= end_time)
query = query.order_by(TemperatureReading.timestamp)
readings = await query.all()
# Export to specified format
if format.lower() == "csv":
return await self._export_to_csv(readings, session_id)
elif format.lower() == "json":
return await self._export_to_json(readings, session_id)
else:
raise ValueError(f"Unsupported export format: {format}")
except Exception as e:
logger.error(f"Error exporting data: {e}")
raise
async def _export_to_csv(self, readings: List[TemperatureReading], session_id: str) -> str:
"""Export readings to CSV format."""
try:
export_dir = settings.data_directory / "exports"
export_dir.mkdir(exist_ok=True)
filename = f"temperature_data_{session_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
filepath = export_dir / filename
with open(filepath, 'w', newline='') as csvfile:
fieldnames = [
'timestamp', 'phase', 'tank_bottom', 'tank_wall',
'pump', 'fountain', 'target_temperature', 'temperature_error'
]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for reading in readings:
writer.writerow({
'timestamp': reading.timestamp.isoformat(),
'phase': reading.phase.value if reading.phase else None,
'tank_bottom': reading.tank_bottom,
'tank_wall': reading.tank_wall,
'pump': reading.pump,
'fountain': reading.fountain,
'target_temperature': reading.target_temperature,
'temperature_error': reading.temperature_error,
})
logger.info(f"Exported {len(readings)} readings to {filepath}")
return str(filepath)
except Exception as e:
logger.error(f"Error exporting to CSV: {e}")
raise
async def _export_to_json(self, readings: List[TemperatureReading], session_id: str) -> str:
"""Export readings to JSON format."""
try:
export_dir = settings.data_directory / "exports"
export_dir.mkdir(exist_ok=True)
filename = f"temperature_data_{session_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
filepath = export_dir / filename
data = {
"session_id": session_id,
"export_time": datetime.now().isoformat(),
"reading_count": len(readings),
"readings": [
{
"timestamp": reading.timestamp.isoformat(),
"phase": reading.phase.value if reading.phase else None,
"temperatures": {
"tank_bottom": reading.tank_bottom,
"tank_wall": reading.tank_wall,
"pump": reading.pump,
"fountain": reading.fountain,
},
"target_temperature": reading.target_temperature,
"temperature_error": reading.temperature_error,
}
for reading in readings
]
}
with open(filepath, 'w') as jsonfile:
json.dump(data, jsonfile, indent=2)
logger.info(f"Exported {len(readings)} readings to {filepath}")
return str(filepath)
except Exception as e:
logger.error(f"Error exporting to JSON: {e}")
raise
async def get_data_summary(self, session_id: str) -> Dict[str, Any]:
"""Get data summary for a session."""
try:
async with db_manager.get_async_session() as session:
# Query session data
query = session.query(TemperatureReading).filter(
TemperatureReading.session_id == session_id
)
readings = await query.all()
if not readings:
return {"session_id": session_id, "reading_count": 0}
# Calculate statistics
timestamps = [r.timestamp for r in readings]
fountain_temps = [r.fountain for r in readings if r.fountain is not None]
summary = {
"session_id": session_id,
"reading_count": len(readings),
"start_time": min(timestamps).isoformat(),
"end_time": max(timestamps).isoformat(),
"duration_seconds": (max(timestamps) - min(timestamps)).total_seconds(),
}
if fountain_temps:
summary.update({
"fountain_temp_min": min(fountain_temps),
"fountain_temp_max": max(fountain_temps),
"fountain_temp_avg": sum(fountain_temps) / len(fountain_temps),
})
return summary
except Exception as e:
logger.error(f"Error getting data summary: {e}")
raise
def get_logging_statistics(self) -> Dict[str, Any]:
"""Get logging statistics."""
uptime = datetime.now() - self.stats["start_time"]
return {
**self.stats,
"is_logging": self.is_logging,
"current_session_id": self.current_session_id,
"buffer_size": len(self.data_buffer),
"buffer_capacity": self.buffer_size,
"log_interval": self.log_interval,
"uptime_seconds": uptime.total_seconds(),
"points_per_second": (
self.stats["total_points_logged"] / max(1, uptime.total_seconds())
),
}
# Global data logger instance
data_logger = DataLogger()

View File

@@ -0,0 +1,532 @@
"""
Recipe controller for managing chocolate tempering process execution.
Orchestrates recipe execution, state management, and process monitoring.
"""
import asyncio
import logging
from typing import Optional, Dict, Any, List
from datetime import datetime, timezone
from uuid import uuid4
from ...shared.database import get_db, db_manager
from ...shared.models.recipe import Recipe
from ...shared.models.process import ProcessSession, ProcessStatus, ProcessLog, LogLevel
from .state_machine import TemperingStateMachine, ProcessContext, ProcessMetrics
from ..hardware.hardware_manager import hardware_manager
logger = logging.getLogger(__name__)
class RecipeExecutionError(Exception):
"""Exception raised during recipe execution."""
pass
class RecipeController:
"""
Recipe controller manages the complete lifecycle of tempering process execution.
Handles recipe loading, process control, monitoring, and data logging.
"""
def __init__(self):
self.current_session: Optional[ProcessSession] = None
self.state_machine: Optional[TemperingStateMachine] = None
self.recipe: Optional[Recipe] = None
self.session_id: Optional[str] = None
self.monitoring_task: Optional[asyncio.Task] = None
self._is_running = False
async def start_recipe(self, recipe_id: int, user_id: Optional[str] = None) -> str:
"""
Start executing a recipe.
Returns session ID for tracking.
"""
try:
# Check if another recipe is already running
if self._is_running:
raise RecipeExecutionError("Another recipe is already running")
logger.info(f"Starting recipe execution: recipe_id={recipe_id}, user={user_id}")
# Load recipe from database
async with db_manager.get_async_session() as session:
self.recipe = await session.get(Recipe, recipe_id)
if not self.recipe:
raise RecipeExecutionError(f"Recipe {recipe_id} not found")
if not self.recipe.is_active:
raise RecipeExecutionError(f"Recipe {recipe_id} is not active")
# Validate recipe parameters
if not self.recipe.validate_temperatures():
raise RecipeExecutionError(f"Recipe {recipe_id} has invalid temperature parameters")
# Check hardware readiness
is_safe, issues = await hardware_manager.is_safe_to_operate()
if not is_safe:
raise RecipeExecutionError(f"Hardware not ready: {', '.join(issues)}")
# Create process session
self.session_id = str(uuid4())
await self._create_process_session(user_id)
# Create process context
context = ProcessContext(
recipe_id=recipe_id,
session_id=self.session_id,
recipe_parameters=self.recipe.to_dict(),
current_metrics=ProcessMetrics(phase_start_time=datetime.now()),
user_id=user_id,
start_time=datetime.now()
)
# Initialize state machine
self.state_machine = TemperingStateMachine(context)
if not await self.state_machine.initialize():
raise RecipeExecutionError("Failed to initialize state machine")
# Start the process
await self._start_process_execution()
logger.info(f"Recipe execution started successfully: session_id={self.session_id}")
return self.session_id
except Exception as e:
logger.error(f"Failed to start recipe execution: {e}")
await self._handle_startup_error(str(e))
raise
async def stop_recipe(self, user_id: Optional[str] = None, reason: str = "Manual stop") -> bool:
"""
Stop the currently running recipe.
"""
try:
if not self._is_running or not self.state_machine:
logger.warning("No recipe is currently running")
return False
logger.info(f"Stopping recipe execution: session_id={self.session_id}, reason={reason}")
# Stop state machine control loop
await self.state_machine.stop_control_loop()
# Emergency stop hardware for safety
await hardware_manager.emergency_stop()
# Update process session
await self._finalize_process_session(ProcessStatus.ABORTED, reason, user_id)
# Log the stop event
await self._log_process_event(LogLevel.INFO, f"Recipe stopped: {reason}")
# Clean up
await self._cleanup_process()
logger.info("Recipe execution stopped successfully")
return True
except Exception as e:
logger.error(f"Error stopping recipe: {e}")
return False
async def pause_recipe(self, user_id: Optional[str] = None) -> bool:
"""
Pause the currently running recipe.
"""
try:
if not self._is_running or not self.state_machine:
return False
logger.info(f"Pausing recipe execution: session_id={self.session_id}")
# Stop control loop but don't emergency stop
await self.state_machine.stop_control_loop()
# Update session status
async with db_manager.get_async_session() as session:
if self.current_session:
self.current_session.status = ProcessStatus.PAUSED
session.add(self.current_session)
await session.commit()
await self._log_process_event(LogLevel.INFO, f"Recipe paused by {user_id or 'system'}")
logger.info("Recipe execution paused")
return True
except Exception as e:
logger.error(f"Error pausing recipe: {e}")
return False
async def resume_recipe(self, user_id: Optional[str] = None) -> bool:
"""
Resume a paused recipe.
"""
try:
if not self.current_session or self.current_session.status != ProcessStatus.PAUSED:
return False
logger.info(f"Resuming recipe execution: session_id={self.session_id}")
# Check hardware is still safe
is_safe, issues = await hardware_manager.is_safe_to_operate()
if not is_safe:
await self._log_process_event(LogLevel.ERROR, f"Cannot resume - hardware issues: {', '.join(issues)}")
return False
# Resume control loop
await self.state_machine.start_control_loop()
# Update session status
async with db_manager.get_async_session() as session:
self.current_session.status = ProcessStatus.RUNNING
session.add(self.current_session)
await session.commit()
await self._log_process_event(LogLevel.INFO, f"Recipe resumed by {user_id or 'system'}")
logger.info("Recipe execution resumed")
return True
except Exception as e:
logger.error(f"Error resuming recipe: {e}")
return False
async def emergency_stop(self, user_id: Optional[str] = None) -> bool:
"""
Execute emergency stop of the current recipe.
"""
try:
logger.critical(f"EMERGENCY STOP activated: session_id={self.session_id}")
# Immediate hardware emergency stop
await hardware_manager.emergency_stop()
# Stop state machine if running
if self.state_machine:
try:
self.state_machine.emergency_stop()
except:
pass # Force stop regardless of state
await self.state_machine.stop_control_loop()
# Update process session
await self._finalize_process_session(
ProcessStatus.ABORTED,
"Emergency stop activated",
user_id
)
# Log critical event
await self._log_process_event(
LogLevel.CRITICAL,
f"EMERGENCY STOP activated by {user_id or 'system'}"
)
# Clean up
await self._cleanup_process()
logger.critical("Emergency stop completed")
return True
except Exception as e:
logger.critical(f"Error during emergency stop: {e}")
return False
async def get_process_status(self) -> Optional[Dict[str, Any]]:
"""
Get current process status and metrics.
"""
if not self.state_machine or not self.current_session:
return None
try:
# Get state machine status
sm_status = self.state_machine.get_process_status()
# Get hardware status
hw_status = hardware_manager.get_hardware_status()
# Get session information
session_info = {
"session_id": self.current_session.id,
"recipe_name": self.recipe.name if self.recipe else "Unknown",
"status": self.current_session.status.value,
"started_at": self.current_session.started_at.isoformat() if self.current_session.started_at else None,
"duration_seconds": self.current_session.duration,
"started_by": self.current_session.started_by,
}
return {
"session": session_info,
"state_machine": sm_status,
"hardware": {
"temperatures": {k: v.value for k, v in hw_status.temperatures.items()},
"motors": {k: {"state": v.state.value, "enabled": v.is_enabled}
for k, v in hw_status.motors.items()},
"safety": {
"emergency_stop": hw_status.safety.emergency_stop_active,
"cover_closed": hw_status.safety.cover_sensor_closed,
"alarms": hw_status.safety.temperature_alarms + hw_status.safety.current_alarms,
},
"communication_health": hw_status.communication_health,
"system_status": hw_status.system_status.value,
}
}
except Exception as e:
logger.error(f"Error getting process status: {e}")
return None
async def get_active_sessions(self) -> List[Dict[str, Any]]:
"""Get list of all active process sessions."""
try:
async with db_manager.get_async_session() as session:
# This would query active sessions from database
# For now, return current session if active
if self.current_session and self.current_session.status in [
ProcessStatus.RUNNING, ProcessStatus.PAUSED, ProcessStatus.STARTING
]:
return [self.current_session.to_dict()]
return []
except Exception as e:
logger.error(f"Error getting active sessions: {e}")
return []
async def _create_process_session(self, user_id: Optional[str]) -> None:
"""Create a new process session in the database."""
try:
async with db_manager.get_async_session() as session:
self.current_session = ProcessSession(
id=self.session_id,
recipe_id=self.recipe.id,
status=ProcessStatus.STARTING,
started_at=datetime.now(timezone.utc),
started_by=user_id,
process_parameters=self.recipe.to_dict(),
target_heating_temp=self.recipe.heating_goal,
target_cooling_temp=self.recipe.cooling_goal,
)
session.add(self.current_session)
await session.commit()
logger.info(f"Process session created: {self.session_id}")
except Exception as e:
logger.error(f"Failed to create process session: {e}")
raise
async def _start_process_execution(self) -> None:
"""Start the actual process execution."""
try:
# Update session status
async with db_manager.get_async_session() as session:
self.current_session.status = ProcessStatus.RUNNING
session.add(self.current_session)
await session.commit()
# Start state machine
self.state_machine.start_process()
await self.state_machine.start_control_loop()
# Start monitoring task
self.monitoring_task = asyncio.create_task(self._monitoring_loop())
self._is_running = True
# Log start event
await self._log_process_event(LogLevel.INFO, "Recipe execution started")
except Exception as e:
logger.error(f"Failed to start process execution: {e}")
raise
async def _monitoring_loop(self) -> None:
"""Background monitoring loop for process execution."""
while self._is_running:
try:
# Update process session with current metrics
await self._update_session_metrics()
# Check for process completion or error conditions
await self._check_process_completion()
# Sleep until next monitoring cycle
await asyncio.sleep(5.0) # Monitor every 5 seconds
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in monitoring loop: {e}")
await asyncio.sleep(1.0)
async def _update_session_metrics(self) -> None:
"""Update process session with current metrics."""
if not self.current_session or not self.state_machine:
return
try:
# Get current metrics from state machine
context = self.state_machine.context
metrics = context.current_metrics
async with db_manager.get_async_session() as session:
# Update session with current metrics
self.current_session.current_phase = self.state_machine.get_current_phase()
self.current_session.achieved_heating_temp = metrics.current_temperature
self.current_session.error_count = metrics.error_count
self.current_session.warning_count = metrics.warning_count
session.add(self.current_session)
await session.commit()
except Exception as e:
logger.error(f"Error updating session metrics: {e}")
async def _check_process_completion(self) -> None:
"""Check if process has completed and handle completion."""
if not self.state_machine:
return
current_state = self.state_machine.current_state.name.lower()
if current_state == "completed":
await self._handle_process_completion()
elif current_state == "error":
await self._handle_process_error()
elif current_state == "emergency_stopped":
await self._handle_emergency_stop()
async def _handle_process_completion(self) -> None:
"""Handle successful process completion."""
logger.info(f"Process completed successfully: session_id={self.session_id}")
await self._finalize_process_session(
ProcessStatus.COMPLETED,
"Process completed successfully",
None
)
await self._log_process_event(LogLevel.INFO, "Process completed successfully")
await self._cleanup_process()
async def _handle_process_error(self) -> None:
"""Handle process error condition."""
logger.error(f"Process error detected: session_id={self.session_id}")
await self._finalize_process_session(
ProcessStatus.ERROR,
"Process error occurred",
None
)
await self._log_process_event(LogLevel.ERROR, "Process error occurred")
await self._cleanup_process()
async def _handle_emergency_stop(self) -> None:
"""Handle emergency stop condition."""
logger.critical(f"Emergency stop condition: session_id={self.session_id}")
await self._finalize_process_session(
ProcessStatus.ABORTED,
"Emergency stop condition",
None
)
await self._log_process_event(LogLevel.CRITICAL, "Emergency stop condition")
await self._cleanup_process()
async def _finalize_process_session(
self,
status: ProcessStatus,
reason: str,
user_id: Optional[str]
) -> None:
"""Finalize process session with final status."""
try:
async with db_manager.get_async_session() as session:
if self.current_session:
self.current_session.status = status
self.current_session.completed_at = datetime.now(timezone.utc)
self.current_session.stop_reason = reason
self.current_session.stopped_by = user_id
session.add(self.current_session)
await session.commit()
except Exception as e:
logger.error(f"Error finalizing process session: {e}")
async def _log_process_event(self, level: LogLevel, message: str) -> None:
"""Log a process event to the database."""
try:
if not self.current_session:
return
async with db_manager.get_async_session() as session:
log_entry = ProcessLog(
session_id=self.current_session.id,
level=level,
message=message,
component="RecipeController",
phase=self.state_machine.get_current_phase() if self.state_machine else None,
)
session.add(log_entry)
await session.commit()
except Exception as e:
logger.error(f"Error logging process event: {e}")
async def _handle_startup_error(self, error_message: str) -> None:
"""Handle errors during startup."""
try:
if self.current_session:
await self._finalize_process_session(
ProcessStatus.ERROR,
f"Startup error: {error_message}",
None
)
await self._cleanup_process()
except Exception as e:
logger.error(f"Error handling startup error: {e}")
async def _cleanup_process(self) -> None:
"""Clean up process resources."""
try:
self._is_running = False
# Cancel monitoring task
if self.monitoring_task:
self.monitoring_task.cancel()
try:
await self.monitoring_task
except asyncio.CancelledError:
pass
# Clean up state machine
if self.state_machine:
await self.state_machine.stop_control_loop()
self.state_machine = None
# Reset instance variables
self.current_session = None
self.recipe = None
self.session_id = None
self.monitoring_task = None
logger.info("Process cleanup completed")
except Exception as e:
logger.error(f"Error during process cleanup: {e}")
# Global recipe controller instance
recipe_controller = RecipeController()

View File

@@ -0,0 +1,514 @@
"""
State machine implementation for chocolate tempering process control.
Manages the complete tempering process workflow with proper state transitions.
"""
import asyncio
import logging
from typing import Optional, Dict, Any, Callable
from datetime import datetime, timedelta
from enum import Enum
from dataclasses import dataclass
from statemachine import StateMachine, State
from statemachine.exceptions import TransitionNotAllowed
from ...shared.models.recipe import RecipePhase
from ...shared.config import settings
from ..hardware.hardware_manager import hardware_manager
logger = logging.getLogger(__name__)
@dataclass
class ProcessMetrics:
"""Process execution metrics and measurements."""
phase_start_time: datetime
target_temperature: Optional[float] = None
current_temperature: Optional[float] = None
temperature_error: float = 0.0
stability_duration: float = 0.0 # Time at stable temperature
energy_consumption: float = 0.0
error_count: int = 0
warning_count: int = 0
@dataclass
class ProcessContext:
"""Context information for process execution."""
recipe_id: int
session_id: str
recipe_parameters: Dict[str, Any]
current_metrics: ProcessMetrics
user_id: Optional[str] = None
start_time: Optional[datetime] = None
estimated_completion: Optional[datetime] = None
class TemperingStateMachine(StateMachine):
"""
State machine for chocolate tempering process control.
Implements the complete tempering workflow with safety checks.
"""
# Define states based on recipe phases
idle = State("Idle", initial=True)
preheating = State("Preheating")
heating = State("Heating")
heating_delay = State("HeatingDelay")
cooling = State("Cooling")
cooling_delay = State("CoolingDelay")
pouring = State("Pouring")
completed = State("Completed")
error = State("Error")
emergency_stopped = State("EmergencyStopped")
# Define transitions
start_process = idle.to(preheating)
begin_heating = preheating.to(heating)
start_heating_delay = heating.to(heating_delay)
begin_cooling = heating_delay.to(cooling)
start_cooling_delay = cooling.to(cooling_delay)
begin_pouring = cooling_delay.to(pouring)
finish_process = pouring.to(completed)
# Error and emergency transitions (from any state)
error_occurred = (
preheating.to(error) |
heating.to(error) |
heating_delay.to(error) |
cooling.to(error) |
cooling_delay.to(error) |
pouring.to(error)
)
emergency_stop = (
preheating.to(emergency_stopped) |
heating.to(emergency_stopped) |
heating_delay.to(emergency_stopped) |
cooling.to(emergency_stopped) |
cooling_delay.to(emergency_stopped) |
pouring.to(emergency_stopped) |
error.to(emergency_stopped)
)
# Recovery transitions
reset_from_error = error.to(idle)
reset_from_emergency = emergency_stopped.to(idle)
reset_from_completed = completed.to(idle)
def __init__(self, context: ProcessContext):
self.context = context
self.phase_timers: Dict[str, datetime] = {}
self.stability_checker = TemperatureStabilityChecker()
self.safety_monitor = ProcessSafetyMonitor()
self.control_loop_task: Optional[asyncio.Task] = None
self._running = False
super().__init__()
# Set up phase-specific handlers
self._phase_handlers = {
"preheating": self._handle_preheating_phase,
"heating": self._handle_heating_phase,
"heating_delay": self._handle_heating_delay_phase,
"cooling": self._handle_cooling_phase,
"cooling_delay": self._handle_cooling_delay_phase,
"pouring": self._handle_pouring_phase,
}
async def initialize(self) -> bool:
"""Initialize the state machine and hardware systems."""
try:
logger.info(f"Initializing tempering process for recipe {self.context.recipe_id}")
# Verify hardware is ready
is_safe, issues = await hardware_manager.is_safe_to_operate()
if not is_safe:
logger.error(f"Hardware not safe to operate: {issues}")
return False
# Initialize process metrics
self.context.current_metrics = ProcessMetrics(
phase_start_time=datetime.now(),
target_temperature=self.context.recipe_parameters.get("heating_goal", 46.0)
)
# Start safety monitoring
self.safety_monitor.initialize(self.context)
logger.info("Tempering state machine initialized successfully")
return True
except Exception as e:
logger.error(f"Failed to initialize state machine: {e}")
return False
async def start_control_loop(self) -> None:
"""Start the main process control loop."""
if self.control_loop_task and not self.control_loop_task.done():
return
self._running = True
self.control_loop_task = asyncio.create_task(self._control_loop())
logger.info("Process control loop started")
async def stop_control_loop(self) -> None:
"""Stop the main process control loop."""
self._running = False
if self.control_loop_task:
self.control_loop_task.cancel()
try:
await self.control_loop_task
except asyncio.CancelledError:
pass
logger.info("Process control loop stopped")
async def _control_loop(self) -> None:
"""Main control loop for process execution."""
while self._running:
try:
current_state = self.current_state.name.lower()
# Execute phase-specific handler
if current_state in self._phase_handlers:
await self._phase_handlers[current_state]()
# Check for safety issues
await self._check_safety_conditions()
# Update metrics
await self._update_process_metrics()
# Control loop delay
await asyncio.sleep(settings.process.process_control_interval)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in control loop: {e}")
try:
self.error_occurred()
except TransitionNotAllowed:
pass
await asyncio.sleep(1.0) # Error recovery delay
async def _handle_preheating_phase(self) -> None:
"""Handle preheating phase - warm up hardware."""
# Get current temperatures
tank_temp = await hardware_manager.get_average_tank_temperature()
pump_temp_reading = await hardware_manager.get_temperature("pump")
pump_temp = pump_temp_reading.value if pump_temp_reading else 0.0
# Check if preheating targets are met
tank_target = settings.temperature.tank_max_heat
pump_target = settings.temperature.pump_max_heat
if tank_temp and tank_temp >= tank_target and pump_temp >= pump_target:
logger.info("Preheating complete, transitioning to heating phase")
try:
self.begin_heating()
except TransitionNotAllowed as e:
logger.error(f"Cannot transition to heating: {e}")
else:
# Enable heaters for preheating
await hardware_manager.enable_heater("tank_heater")
await hardware_manager.enable_heater("pump_heater")
# Update metrics
self.context.current_metrics.current_temperature = tank_temp
self.context.current_metrics.target_temperature = tank_target
async def _handle_heating_phase(self) -> None:
"""Handle heating phase - reach heating goal temperature."""
heating_goal = self.context.recipe_parameters.get("heating_goal", 46.0)
fountain_temp_reading = await hardware_manager.get_temperature("fountain")
fountain_temp = fountain_temp_reading.value if fountain_temp_reading else 0.0
# Enable motors as specified in recipe
if self.context.recipe_parameters.get("mixer_enabled", True):
await hardware_manager.enable_motor("mixer_motor")
if self.context.recipe_parameters.get("fountain_enabled", True):
await hardware_manager.enable_motor("fountain_motor")
# Temperature control
temp_error = heating_goal - fountain_temp
tolerance = settings.temperature.temperature_tolerance
if abs(temp_error) <= tolerance:
# Temperature reached, check stability
if self.stability_checker.is_stable(fountain_temp, heating_goal, tolerance):
logger.info(f"Heating goal reached and stable: {fountain_temp}°C")
try:
self.start_heating_delay()
except TransitionNotAllowed as e:
logger.error(f"Cannot transition to heating delay: {e}")
else:
# Adjust heating based on temperature error
if temp_error > 0:
# Need more heat
await hardware_manager.enable_heater("tank_heater")
await hardware_manager.enable_heater("pump_heater")
else:
# Too hot, reduce heating
await hardware_manager.disable_heater("tank_heater")
# Update metrics
self.context.current_metrics.current_temperature = fountain_temp
self.context.current_metrics.target_temperature = heating_goal
self.context.current_metrics.temperature_error = temp_error
async def _handle_heating_delay_phase(self) -> None:
"""Handle heating delay phase - maintain temperature for specified time."""
if not self._check_phase_timer("heating_delay", settings.process.heating_delay):
# Still in delay period, maintain temperature
await self._maintain_temperature(
self.context.recipe_parameters.get("heating_goal", 46.0)
)
else:
logger.info("Heating delay complete, transitioning to cooling")
try:
self.begin_cooling()
except TransitionNotAllowed as e:
logger.error(f"Cannot transition to cooling: {e}")
async def _handle_cooling_phase(self) -> None:
"""Handle cooling phase - cool to cooling goal temperature."""
cooling_goal = self.context.recipe_parameters.get("cooling_goal", 27.0)
fountain_temp_reading = await hardware_manager.get_temperature("fountain")
fountain_temp = fountain_temp_reading.value if fountain_temp_reading else 50.0
# Disable tank heaters for cooling
await hardware_manager.disable_heater("tank_heater")
# Keep pump heater on to maintain pump temperature
await hardware_manager.enable_heater("pump_heater")
# Keep motors running for circulation
if self.context.recipe_parameters.get("mixer_enabled", True):
await hardware_manager.enable_motor("mixer_motor")
if self.context.recipe_parameters.get("fountain_enabled", True):
await hardware_manager.enable_motor("fountain_motor")
# Check if cooling goal is reached
tolerance = settings.temperature.temperature_tolerance
temp_error = fountain_temp - cooling_goal
if temp_error <= tolerance and fountain_temp <= cooling_goal:
if self.stability_checker.is_stable(fountain_temp, cooling_goal, tolerance):
logger.info(f"Cooling goal reached and stable: {fountain_temp}°C")
try:
self.start_cooling_delay()
except TransitionNotAllowed as e:
logger.error(f"Cannot transition to cooling delay: {e}")
# Update metrics
self.context.current_metrics.current_temperature = fountain_temp
self.context.current_metrics.target_temperature = cooling_goal
self.context.current_metrics.temperature_error = temp_error
async def _handle_cooling_delay_phase(self) -> None:
"""Handle cooling delay phase - maintain cooling temperature."""
if not self._check_phase_timer("cooling_delay", settings.process.cooling_delay):
# Still in delay period, maintain temperature
await self._maintain_temperature(
self.context.recipe_parameters.get("cooling_goal", 27.0)
)
else:
logger.info("Cooling delay complete, transitioning to pouring")
try:
self.begin_pouring()
except TransitionNotAllowed as e:
logger.error(f"Cannot transition to pouring: {e}")
async def _handle_pouring_phase(self) -> None:
"""Handle pouring phase - maintain pouring temperature and manage pedal control."""
pouring_goal = self.context.recipe_parameters.get("pouring_goal")
if not pouring_goal:
# Use cooling goal if no specific pouring goal
pouring_goal = self.context.recipe_parameters.get("cooling_goal", 30.0)
# Maintain pouring temperature
await self._maintain_temperature(pouring_goal)
# Handle pedal control if enabled
if self.context.recipe_parameters.get("pedal_control_enabled", True):
await self._manage_pedal_control()
# Keep circulation motors running
if self.context.recipe_parameters.get("fountain_enabled", True):
await hardware_manager.enable_motor("fountain_motor")
# Update metrics
fountain_temp_reading = await hardware_manager.get_temperature("fountain")
fountain_temp = fountain_temp_reading.value if fountain_temp_reading else pouring_goal
self.context.current_metrics.current_temperature = fountain_temp
self.context.current_metrics.target_temperature = pouring_goal
self.context.current_metrics.temperature_error = fountain_temp - pouring_goal
# Pouring phase continues until manually stopped or error occurs
# In a real system, this might be triggered by operator action or timer
async def _maintain_temperature(self, target_temp: float) -> None:
"""Maintain target temperature with PID control."""
fountain_temp_reading = await hardware_manager.get_temperature("fountain")
current_temp = fountain_temp_reading.value if fountain_temp_reading else target_temp
temp_error = current_temp - target_temp
tolerance = settings.temperature.temperature_tolerance
if abs(temp_error) > tolerance:
if temp_error < 0:
# Need heating
await hardware_manager.enable_heater("pump_heater")
if temp_error < -2.0: # Significant heating needed
await hardware_manager.enable_heater("tank_heater")
else:
# Too hot, reduce heating
await hardware_manager.disable_heater("tank_heater")
if temp_error > 2.0: # Significant cooling needed
await hardware_manager.disable_heater("pump_heater")
async def _manage_pedal_control(self) -> None:
"""Manage automatic pedal control for pouring."""
# This would implement automatic pedal cycling based on recipe parameters
# For now, just log the action
on_time = self.context.recipe_parameters.get("pedal_on_time", 2.0)
off_time = self.context.recipe_parameters.get("pedal_off_time", 3.0)
logger.debug(f"Pedal control active: {on_time}s on, {off_time}s off")
# Implementation would control pedal solenoid via hardware manager
def _check_phase_timer(self, phase_name: str, duration_seconds: float) -> bool:
"""Check if a phase timer has completed."""
if phase_name not in self.phase_timers:
self.phase_timers[phase_name] = datetime.now()
return False
elapsed = (datetime.now() - self.phase_timers[phase_name]).total_seconds()
return elapsed >= duration_seconds
async def _check_safety_conditions(self) -> None:
"""Check safety conditions and trigger emergency stop if needed."""
is_safe, issues = await hardware_manager.is_safe_to_operate()
if not is_safe:
logger.error(f"Safety issue detected: {issues}")
try:
self.emergency_stop()
except TransitionNotAllowed:
pass
async def _update_process_metrics(self) -> None:
"""Update process execution metrics."""
self.context.current_metrics.phase_start_time = datetime.now()
# Update stability duration if temperature is stable
if hasattr(self, 'stability_checker'):
current_temp = self.context.current_metrics.current_temperature
target_temp = self.context.current_metrics.target_temperature
if (current_temp is not None and target_temp is not None and
abs(current_temp - target_temp) <= settings.temperature.temperature_tolerance):
# Temperature is stable, update duration
pass
def get_current_phase(self) -> RecipePhase:
"""Get current recipe phase."""
state_to_phase = {
"idle": RecipePhase.COMPLETED,
"preheating": RecipePhase.PREHEATING,
"heating": RecipePhase.HEATING,
"heating_delay": RecipePhase.HEATING_DELAY,
"cooling": RecipePhase.COOLING,
"cooling_delay": RecipePhase.COOLING_DELAY,
"pouring": RecipePhase.POURING,
"completed": RecipePhase.COMPLETED,
"error": RecipePhase.ERROR,
"emergency_stopped": RecipePhase.STOPPED,
}
return state_to_phase.get(self.current_state.name.lower(), RecipePhase.ERROR)
def get_process_status(self) -> Dict[str, Any]:
"""Get comprehensive process status."""
return {
"current_state": self.current_state.name,
"current_phase": self.get_current_phase().value,
"recipe_id": self.context.recipe_id,
"session_id": self.context.session_id,
"start_time": self.context.start_time.isoformat() if self.context.start_time else None,
"current_temperature": self.context.current_metrics.current_temperature,
"target_temperature": self.context.current_metrics.target_temperature,
"temperature_error": self.context.current_metrics.temperature_error,
"error_count": self.context.current_metrics.error_count,
"warning_count": self.context.current_metrics.warning_count,
"is_running": self._running,
"phase_timers": {k: v.isoformat() for k, v in self.phase_timers.items()},
}
class TemperatureStabilityChecker:
"""Utility class to check temperature stability."""
def __init__(self, stability_time: float = 30.0):
self.stability_time = stability_time # seconds
self.temperature_history: Dict[str, list] = {}
def is_stable(self, current_temp: float, target_temp: float, tolerance: float) -> bool:
"""Check if temperature has been stable within tolerance for required time."""
key = f"{target_temp}"
now = datetime.now()
# Initialize history if needed
if key not in self.temperature_history:
self.temperature_history[key] = []
history = self.temperature_history[key]
# Add current reading
history.append((now, current_temp))
# Remove old readings
cutoff_time = now - timedelta(seconds=self.stability_time)
history[:] = [(time, temp) for time, temp in history if time > cutoff_time]
# Check if all recent readings are within tolerance
if len(history) < 3: # Need at least a few readings
return False
return all(abs(temp - target_temp) <= tolerance for _, temp in history)
class ProcessSafetyMonitor:
"""Monitor process safety during execution."""
def __init__(self):
self.context: Optional[ProcessContext] = None
self.last_safety_check = datetime.now()
def initialize(self, context: ProcessContext) -> None:
"""Initialize safety monitor with process context."""
self.context = context
async def check_safety(self) -> tuple[bool, list[str]]:
"""Perform comprehensive safety check."""
issues = []
# Check hardware safety
is_hardware_safe, hardware_issues = await hardware_manager.is_safe_to_operate()
if not is_hardware_safe:
issues.extend(hardware_issues)
# Check temperature limits
for sensor_name, reading in (await hardware_manager.get_all_temperatures()).items():
if not reading.is_valid:
issues.append(f"Temperature sensor {sensor_name}: {reading.error_message}")
# Update last check time
self.last_safety_check = datetime.now()
return len(issues) == 0, issues

View File

@@ -0,0 +1,516 @@
"""
Centralized error handling and recovery system.
Provides structured error management with automatic recovery procedures.
"""
import asyncio
import logging
from typing import Dict, List, Optional, Any, Callable, Type
from datetime import datetime, timedelta
from dataclasses import dataclass
from enum import Enum
import traceback
import inspect
from ...shared.config import settings
from ...shared.database import db_manager
from ...shared.models.system import ErrorLog, ErrorSeverity, ErrorCategory
logger = logging.getLogger(__name__)
class RecoveryAction(str, Enum):
"""Available recovery actions."""
NONE = "none"
RETRY = "retry"
RESET_COMPONENT = "reset_component"
RESTART_SERVICE = "restart_service"
EMERGENCY_STOP = "emergency_stop"
USER_INTERVENTION = "user_intervention"
@dataclass
class ErrorHandler:
"""Error handler configuration."""
exception_types: List[Type[Exception]]
recovery_action: RecoveryAction
max_retries: int = 3
retry_delay: float = 1.0
escalation_threshold: int = 5
auto_recovery: bool = True
requires_user_acknowledgment: bool = False
recovery_function: Optional[Callable[[], bool]] = None
description: str = ""
@dataclass
class ErrorContext:
"""Context information for error occurrence."""
component: str
function_name: str
error_id: str
timestamp: datetime
exception: Exception
stack_trace: str
retry_count: int = 0
recovery_attempts: int = 0
user_notified: bool = False
additional_data: Dict[str, Any] = None
class ErrorHandlingSystem:
"""
Centralized error handling and recovery system.
Manages error classification, recovery procedures, and escalation.
"""
def __init__(self):
self.error_handlers: Dict[str, ErrorHandler] = {}
self.active_errors: Dict[str, ErrorContext] = {}
self.error_statistics: Dict[str, int] = {}
self.recovery_tasks: Dict[str, asyncio.Task] = {}
# Initialize standard error handlers
self._initialize_standard_handlers()
# Error escalation callbacks
self.escalation_callbacks: List[Callable[[ErrorContext], None]] = []
def _initialize_standard_handlers(self) -> None:
"""Initialize standard error handlers for common exceptions."""
# Communication errors
self.register_handler(
"communication_error",
ErrorHandler(
exception_types=[ConnectionError, TimeoutError, OSError],
recovery_action=RecoveryAction.RETRY,
max_retries=3,
retry_delay=2.0,
escalation_threshold=10,
description="Modbus communication errors"
)
)
# Hardware errors
self.register_handler(
"hardware_error",
ErrorHandler(
exception_types=[RuntimeError],
recovery_action=RecoveryAction.RESET_COMPONENT,
max_retries=2,
retry_delay=5.0,
escalation_threshold=3,
requires_user_acknowledgment=True,
description="Hardware component errors"
)
)
# Safety-critical errors
self.register_handler(
"safety_error",
ErrorHandler(
exception_types=[ValueError], # Temperature/safety limit violations
recovery_action=RecoveryAction.EMERGENCY_STOP,
max_retries=0,
auto_recovery=False,
requires_user_acknowledgment=True,
description="Safety-critical errors requiring immediate stop"
)
)
# Database errors
self.register_handler(
"database_error",
ErrorHandler(
exception_types=[Exception], # SQLAlchemy exceptions would go here
recovery_action=RecoveryAction.RETRY,
max_retries=5,
retry_delay=1.0,
escalation_threshold=15,
description="Database connection and query errors"
)
)
# Generic application errors
self.register_handler(
"application_error",
ErrorHandler(
exception_types=[Exception],
recovery_action=RecoveryAction.USER_INTERVENTION,
max_retries=0,
auto_recovery=False,
requires_user_acknowledgment=True,
description="General application errors requiring investigation"
)
)
def register_handler(self, handler_id: str, handler: ErrorHandler) -> None:
"""Register an error handler."""
self.error_handlers[handler_id] = handler
logger.info(f"Registered error handler: {handler_id}")
def get_handler_for_exception(self, exception: Exception) -> Optional[ErrorHandler]:
"""Get the appropriate error handler for an exception."""
exception_type = type(exception)
# Look for exact match first
for handler in self.error_handlers.values():
if exception_type in handler.exception_types:
return handler
# Look for base class match
for handler in self.error_handlers.values():
for handled_type in handler.exception_types:
if issubclass(exception_type, handled_type):
return handler
# Return generic handler if no specific match
return self.error_handlers.get("application_error")
async def handle_error(
self,
exception: Exception,
component: str,
additional_context: Optional[Dict[str, Any]] = None
) -> bool:
"""
Handle an error with appropriate recovery action.
Returns True if error was handled successfully.
"""
try:
# Get caller information
frame = inspect.currentframe().f_back
function_name = frame.f_code.co_name if frame else "unknown"
# Create error context
error_id = f"{component}_{function_name}_{type(exception).__name__}_{hash(str(exception)) % 10000}"
context = ErrorContext(
component=component,
function_name=function_name,
error_id=error_id,
timestamp=datetime.now(),
exception=exception,
stack_trace=traceback.format_exc(),
additional_data=additional_context or {}
)
# Check if this is a recurring error
if error_id in self.active_errors:
existing_context = self.active_errors[error_id]
existing_context.retry_count += 1
existing_context.timestamp = datetime.now()
context = existing_context
else:
self.active_errors[error_id] = context
# Update statistics
self.error_statistics[error_id] = self.error_statistics.get(error_id, 0) + 1
# Get appropriate handler
handler = self.get_handler_for_exception(exception)
if not handler:
logger.error(f"No handler found for exception: {type(exception).__name__}")
return False
# Log the error
await self._log_error(context, handler)
# Execute recovery action
recovery_success = await self._execute_recovery_action(context, handler)
# Check for escalation
if not recovery_success or context.retry_count > handler.escalation_threshold:
await self._escalate_error(context, handler)
return recovery_success
except Exception as e:
logger.critical(f"Error in error handler: {e}")
return False
async def _execute_recovery_action(self, context: ErrorContext, handler: ErrorHandler) -> bool:
"""Execute the recovery action for an error."""
try:
logger.info(f"Executing recovery action {handler.recovery_action.value} for {context.error_id}")
if handler.recovery_action == RecoveryAction.NONE:
return True
elif handler.recovery_action == RecoveryAction.RETRY:
if context.retry_count < handler.max_retries:
await asyncio.sleep(handler.retry_delay)
return True
else:
logger.warning(f"Max retries exceeded for {context.error_id}")
return False
elif handler.recovery_action == RecoveryAction.RESET_COMPONENT:
return await self._reset_component(context.component)
elif handler.recovery_action == RecoveryAction.RESTART_SERVICE:
return await self._restart_service(context.component)
elif handler.recovery_action == RecoveryAction.EMERGENCY_STOP:
return await self._execute_emergency_stop(context)
elif handler.recovery_action == RecoveryAction.USER_INTERVENTION:
await self._request_user_intervention(context, handler)
return False # Requires manual intervention
else:
logger.error(f"Unknown recovery action: {handler.recovery_action}")
return False
except Exception as e:
logger.error(f"Error executing recovery action: {e}")
return False
async def _reset_component(self, component: str) -> bool:
"""Reset a specific component."""
try:
logger.info(f"Resetting component: {component}")
# Import here to avoid circular imports
from ..hardware.hardware_manager import hardware_manager
from ..recipe.recipe_controller import recipe_controller
if component.lower() == "hardware":
# Reinitialize hardware manager
await hardware_manager.shutdown()
return await hardware_manager.initialize()
elif component.lower() == "recipe":
# Stop current recipe if running
await recipe_controller.stop_recipe(reason="Component reset")
return True
else:
logger.warning(f"Unknown component for reset: {component}")
return False
except Exception as e:
logger.error(f"Error resetting component {component}: {e}")
return False
async def _restart_service(self, service: str) -> bool:
"""Restart a specific service."""
try:
logger.info(f"Restarting service: {service}")
# This would restart the specific service
# Implementation depends on service architecture
return True
except Exception as e:
logger.error(f"Error restarting service {service}: {e}")
return False
async def _execute_emergency_stop(self, context: ErrorContext) -> bool:
"""Execute emergency stop procedure."""
try:
logger.critical(f"EMERGENCY STOP triggered by error: {context.error_id}")
# Import here to avoid circular imports
from ..hardware.hardware_manager import hardware_manager
from ..recipe.recipe_controller import recipe_controller
# Stop recipe controller
await recipe_controller.emergency_stop()
# Emergency stop hardware
await hardware_manager.emergency_stop()
return True
except Exception as e:
logger.critical(f"Error during emergency stop: {e}")
return False
async def _request_user_intervention(self, context: ErrorContext, handler: ErrorHandler) -> None:
"""Request user intervention for error resolution."""
try:
logger.warning(f"User intervention required for: {context.error_id}")
# Mark as requiring user attention
context.user_notified = True
# Create high-priority alarm through safety monitor
from .safety_monitor import safety_monitor
await safety_monitor._create_alarm(
f"USER_INTERVENTION_{context.error_id}",
"User intervention required",
f"Error in {context.component}: {str(context.exception)}",
ErrorCategory.SYSTEM,
"HIGH" # This should be imported from safety_monitor but avoiding circular import
)
except Exception as e:
logger.error(f"Error requesting user intervention: {e}")
async def _escalate_error(self, context: ErrorContext, handler: ErrorHandler) -> None:
"""Escalate error to higher level handling."""
try:
logger.critical(f"Escalating error: {context.error_id}")
# Notify escalation callbacks
for callback in self.escalation_callbacks:
try:
callback(context)
except Exception as e:
logger.error(f"Error in escalation callback: {e}")
# For critical escalations, trigger emergency stop
if self.error_statistics[context.error_id] > handler.escalation_threshold * 2:
logger.critical(f"Critical escalation threshold reached for {context.error_id}")
await self._execute_emergency_stop(context)
except Exception as e:
logger.error(f"Error during error escalation: {e}")
async def _log_error(self, context: ErrorContext, handler: ErrorHandler) -> None:
"""Log error to database and system logs."""
try:
# Determine severity based on recovery action and retry count
if handler.recovery_action == RecoveryAction.EMERGENCY_STOP:
severity = ErrorSeverity.CRITICAL
elif context.retry_count > handler.escalation_threshold:
severity = ErrorSeverity.HIGH
elif handler.requires_user_acknowledgment:
severity = ErrorSeverity.HIGH
else:
severity = ErrorSeverity.MEDIUM
# Determine category based on component
category_map = {
"hardware": ErrorCategory.HARDWARE,
"communication": ErrorCategory.COMMUNICATION,
"modbus": ErrorCategory.COMMUNICATION,
"recipe": ErrorCategory.PROCESS,
"safety": ErrorCategory.SAFETY,
"database": ErrorCategory.SYSTEM,
}
category = category_map.get(context.component.lower(), ErrorCategory.SYSTEM)
# Log to system logger
log_method = getattr(logger, severity.value.lower(), logger.error)
log_method(f"[{context.component}] {context.function_name}: {str(context.exception)}")
# Log to database
async with db_manager.get_async_session() as session:
error_log = ErrorLog(
error_code=context.error_id,
category=category,
severity=severity,
title=f"Error in {context.component}",
message=str(context.exception),
component=context.component,
stack_trace=context.stack_trace,
additional_data=context.additional_data,
occurrence_count=self.error_statistics[context.error_id],
first_occurrence=context.timestamp if self.error_statistics[context.error_id] == 1 else None,
last_occurrence=context.timestamp,
)
session.add(error_log)
await session.commit()
except Exception as e:
logger.error(f"Error logging error to database: {e}")
async def acknowledge_error(self, error_id: str, user: str) -> bool:
"""Acknowledge an error that requires user intervention."""
try:
if error_id in self.active_errors:
context = self.active_errors[error_id]
# Remove from active errors
del self.active_errors[error_id]
# Cancel any recovery tasks
if error_id in self.recovery_tasks:
self.recovery_tasks[error_id].cancel()
del self.recovery_tasks[error_id]
logger.info(f"Error acknowledged by {user}: {error_id}")
return True
except Exception as e:
logger.error(f"Error acknowledging error: {e}")
return False
def get_active_errors(self) -> List[ErrorContext]:
"""Get list of active errors."""
return list(self.active_errors.values())
def get_error_statistics(self) -> Dict[str, Any]:
"""Get error handling statistics."""
return {
"active_errors": len(self.active_errors),
"total_error_types": len(self.error_statistics),
"error_counts": self.error_statistics.copy(),
"recovery_tasks": len(self.recovery_tasks),
"registered_handlers": len(self.error_handlers),
}
def add_escalation_callback(self, callback: Callable[[ErrorContext], None]) -> None:
"""Add callback function for error escalation."""
self.escalation_callbacks.append(callback)
async def clear_resolved_errors(self, max_age_hours: int = 24) -> int:
"""Clear resolved errors older than specified age."""
try:
cutoff_time = datetime.now() - timedelta(hours=max_age_hours)
cleared_count = 0
errors_to_remove = []
for error_id, context in self.active_errors.items():
if context.timestamp < cutoff_time:
errors_to_remove.append(error_id)
for error_id in errors_to_remove:
del self.active_errors[error_id]
cleared_count += 1
logger.info(f"Cleared {cleared_count} resolved errors")
return cleared_count
except Exception as e:
logger.error(f"Error clearing resolved errors: {e}")
return 0
# Global error handling system
error_handler = ErrorHandlingSystem()
# Decorator for automatic error handling
def handle_errors(component: str = "unknown", additional_context: Optional[Dict[str, Any]] = None):
"""Decorator to automatically handle errors in functions."""
def decorator(func):
async def async_wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except Exception as e:
success = await error_handler.handle_error(e, component, additional_context)
if not success:
raise
def sync_wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
# For sync functions, we can't await the error handler
# So we log and re-raise
logger.error(f"Error in {component}.{func.__name__}: {e}")
raise
return async_wrapper if inspect.iscoroutinefunction(func) else sync_wrapper
return decorator

View File

@@ -0,0 +1,635 @@
"""
Safety monitoring service for chocolate tempering machine.
Provides real-time safety monitoring with automatic responses and alarm management.
"""
import asyncio
import logging
from typing import Dict, List, Optional, Any, Tuple, Callable
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from enum import Enum
from ...shared.config import settings
from ...shared.database import db_manager
from ...shared.models.system import ErrorLog, ErrorSeverity, ErrorCategory
from ..hardware.hardware_manager import hardware_manager
logger = logging.getLogger(__name__)
class AlarmPriority(str, Enum):
"""Alarm priority levels."""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class AlarmState(str, Enum):
"""Alarm states."""
ACTIVE = "active"
ACKNOWLEDGED = "acknowledged"
CLEARED = "cleared"
SUPPRESSED = "suppressed"
@dataclass
class SafetyAlarm:
"""Safety alarm information."""
id: str
title: str
message: str
category: ErrorCategory
priority: AlarmPriority
state: AlarmState = AlarmState.ACTIVE
timestamp: datetime = field(default_factory=datetime.now)
acknowledgement_time: Optional[datetime] = None
acknowledgement_user: Optional[str] = None
auto_clear: bool = False
clear_condition: Optional[Callable[[], bool]] = None
suppression_time: Optional[datetime] = None
occurrence_count: int = 1
first_occurrence: datetime = field(default_factory=datetime.now)
@dataclass
class SafetyLimits:
"""Safety operating limits configuration."""
# Temperature limits (°C)
max_tank_temperature: float = 80.0
max_fountain_temperature: float = 70.0
min_temperature: float = 5.0
temperature_rate_limit: float = 5.0 # °C/min max rate of change
# Electrical limits
max_voltage_deviation: float = 0.15 # ±15%
max_frequency_deviation: float = 0.02 # ±2%
max_neutral_current: float = 16.0 # A
max_motor_current: float = 10.0 # A
# Process limits
max_process_duration: float = 14400.0 # 4 hours in seconds
max_phase_duration: float = 3600.0 # 1 hour in seconds
# Communication limits
max_communication_errors: int = 10
communication_timeout: float = 5.0 # seconds
class SafetyMonitor:
"""
Comprehensive safety monitoring system for the tempering machine.
Monitors all safety-critical parameters and manages alarm conditions.
"""
def __init__(self):
self.active_alarms: Dict[str, SafetyAlarm] = {}
self.alarm_history: List[SafetyAlarm] = []
self.safety_limits = SafetyLimits()
self.monitoring_task: Optional[asyncio.Task] = None
self.is_running = False
# Safety state tracking
self.last_temperature_readings: Dict[str, Tuple[datetime, float]] = {}
self.consecutive_errors = 0
self.last_safety_check = datetime.now()
self.emergency_stop_active = False
# Alarm callbacks
self.alarm_callbacks: List[Callable[[SafetyAlarm], None]] = []
# Statistics
self.safety_stats = {
"total_alarms": 0,
"critical_alarms": 0,
"emergency_stops": 0,
"uptime_start": datetime.now(),
"last_critical_alarm": None,
}
async def initialize(self) -> bool:
"""Initialize safety monitoring system."""
try:
logger.info("Initializing safety monitoring system")
# Load safety limits from configuration
self._load_safety_limits()
# Initialize hardware monitoring
if not await self._verify_hardware_safety():
logger.error("Hardware safety verification failed")
return False
# Start monitoring
await self.start_monitoring()
logger.info("Safety monitoring system initialized successfully")
return True
except Exception as e:
logger.error(f"Failed to initialize safety monitor: {e}")
return False
async def start_monitoring(self) -> None:
"""Start safety monitoring background task."""
if self.monitoring_task and not self.monitoring_task.done():
return
self.is_running = True
self.monitoring_task = asyncio.create_task(self._monitoring_loop())
logger.info("Safety monitoring started")
async def stop_monitoring(self) -> None:
"""Stop safety monitoring background task."""
self.is_running = False
if self.monitoring_task:
self.monitoring_task.cancel()
try:
await self.monitoring_task
except asyncio.CancelledError:
pass
logger.info("Safety monitoring stopped")
async def shutdown(self) -> None:
"""Shutdown safety monitoring system."""
logger.info("Shutting down safety monitoring system")
await self.stop_monitoring()
# Log shutdown event
await self._create_alarm(
"SAFETY_SHUTDOWN",
"Safety monitoring shutdown",
"Safety monitoring system shutdown initiated",
ErrorCategory.SYSTEM,
AlarmPriority.MEDIUM
)
async def _monitoring_loop(self) -> None:
"""Main safety monitoring loop."""
while self.is_running:
try:
# Perform comprehensive safety checks
await self._check_temperature_safety()
await self._check_electrical_safety()
await self._check_communication_safety()
await self._check_process_safety()
await self._check_hardware_safety()
# Update alarm states
await self._update_alarm_states()
# Check for emergency conditions
await self._check_emergency_conditions()
# Update statistics
self._update_safety_statistics()
# Wait for next monitoring cycle
await asyncio.sleep(settings.safety.error_check_interval)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in safety monitoring loop: {e}")
self.consecutive_errors += 1
await asyncio.sleep(0.5) # Short delay before retry
async def _check_temperature_safety(self) -> None:
"""Check temperature-related safety conditions."""
try:
temperatures = await hardware_manager.get_all_temperatures()
for sensor_name, reading in temperatures.items():
if not reading.is_valid:
await self._create_alarm(
f"TEMP_SENSOR_{sensor_name.upper()}",
f"Temperature sensor {sensor_name} error",
f"Temperature sensor {sensor_name}: {reading.error_message}",
ErrorCategory.TEMPERATURE,
AlarmPriority.HIGH
)
continue
temp = reading.value
# Check absolute temperature limits
if temp > self.safety_limits.max_tank_temperature:
await self._create_alarm(
f"TEMP_HIGH_{sensor_name.upper()}",
f"High temperature alarm - {sensor_name}",
f"Temperature {temp:.1f}°C exceeds maximum {self.safety_limits.max_tank_temperature}°C",
ErrorCategory.TEMPERATURE,
AlarmPriority.CRITICAL,
auto_clear=True,
clear_condition=lambda t=temp, limit=self.safety_limits.max_tank_temperature: t <= limit - 2.0
)
if temp < self.safety_limits.min_temperature:
await self._create_alarm(
f"TEMP_LOW_{sensor_name.upper()}",
f"Low temperature alarm - {sensor_name}",
f"Temperature {temp:.1f}°C below minimum {self.safety_limits.min_temperature}°C",
ErrorCategory.TEMPERATURE,
AlarmPriority.MEDIUM
)
# Check rate of temperature change
await self._check_temperature_rate(sensor_name, temp)
# Update temperature history
self.last_temperature_readings[sensor_name] = (datetime.now(), temp)
except Exception as e:
logger.error(f"Error checking temperature safety: {e}")
await self._create_alarm(
"TEMP_CHECK_ERROR",
"Temperature safety check error",
f"Error performing temperature safety checks: {e}",
ErrorCategory.SYSTEM,
AlarmPriority.HIGH
)
async def _check_temperature_rate(self, sensor_name: str, current_temp: float) -> None:
"""Check temperature rate of change."""
if sensor_name not in self.last_temperature_readings:
return
last_time, last_temp = self.last_temperature_readings[sensor_name]
time_diff = (datetime.now() - last_time).total_seconds() / 60.0 # minutes
if time_diff > 0.1: # Minimum time difference
rate = abs(current_temp - last_temp) / time_diff # °C/min
if rate > self.safety_limits.temperature_rate_limit:
await self._create_alarm(
f"TEMP_RATE_{sensor_name.upper()}",
f"Temperature rate alarm - {sensor_name}",
f"Temperature changing too rapidly: {rate:.1f}°C/min > {self.safety_limits.temperature_rate_limit}°C/min",
ErrorCategory.TEMPERATURE,
AlarmPriority.HIGH
)
async def _check_electrical_safety(self) -> None:
"""Check electrical safety parameters."""
try:
hw_status = hardware_manager.get_hardware_status()
# This would read actual electrical parameters from hardware
# For now, we'll simulate based on hardware communication health
if hw_status.communication_health < 90:
await self._create_alarm(
"ELECTRICAL_COMM",
"Electrical communication issue",
f"Communication health degraded: {hw_status.communication_health:.1f}%",
ErrorCategory.COMMUNICATION,
AlarmPriority.MEDIUM
)
# Check for current alarms from hardware
for alarm_msg in hw_status.safety.current_alarms:
alarm_id = f"CURRENT_{hash(alarm_msg) % 10000}"
await self._create_alarm(
alarm_id,
"Electrical current alarm",
alarm_msg,
ErrorCategory.POWER,
AlarmPriority.HIGH
)
except Exception as e:
logger.error(f"Error checking electrical safety: {e}")
async def _check_communication_safety(self) -> None:
"""Check communication system safety."""
try:
comm_stats = hardware_manager.get_communication_statistics()
# Check communication error rate
if comm_stats.get("failed_requests", 0) > self.safety_limits.max_communication_errors:
await self._create_alarm(
"COMM_ERRORS",
"High communication error rate",
f"Communication errors: {comm_stats.get('failed_requests', 0)}",
ErrorCategory.COMMUNICATION,
AlarmPriority.HIGH
)
# Check communication timeout
last_successful = datetime.fromisoformat(comm_stats.get("last_successful_read", datetime.now().isoformat()))
time_since_success = (datetime.now() - last_successful).total_seconds()
if time_since_success > self.safety_limits.communication_timeout:
await self._create_alarm(
"COMM_TIMEOUT",
"Communication timeout",
f"No successful communication for {time_since_success:.1f}s",
ErrorCategory.COMMUNICATION,
AlarmPriority.CRITICAL
)
except Exception as e:
logger.error(f"Error checking communication safety: {e}")
async def _check_process_safety(self) -> None:
"""Check process execution safety."""
try:
# This would check with recipe controller for process safety
# For now, we'll implement basic checks
# Check if emergency stop is active
hw_status = hardware_manager.get_hardware_status()
if hw_status.safety.emergency_stop_active and not self.emergency_stop_active:
await self._create_alarm(
"EMERGENCY_STOP",
"Emergency stop activated",
"Hardware emergency stop has been activated",
ErrorCategory.SAFETY,
AlarmPriority.CRITICAL
)
self.emergency_stop_active = True
elif not hw_status.safety.emergency_stop_active and self.emergency_stop_active:
# Emergency stop cleared
self.emergency_stop_active = False
await self._clear_alarm("EMERGENCY_STOP")
# Check cover sensor
if not hw_status.safety.cover_sensor_closed:
await self._create_alarm(
"COVER_OPEN",
"Safety cover open",
"Safety cover is open - operation not safe",
ErrorCategory.SAFETY,
AlarmPriority.HIGH
)
else:
await self._clear_alarm("COVER_OPEN")
except Exception as e:
logger.error(f"Error checking process safety: {e}")
async def _check_hardware_safety(self) -> None:
"""Check hardware component safety."""
try:
is_safe, issues = await hardware_manager.is_safe_to_operate()
if not is_safe:
for i, issue in enumerate(issues):
await self._create_alarm(
f"HARDWARE_ISSUE_{i}",
"Hardware safety issue",
issue,
ErrorCategory.HARDWARE,
AlarmPriority.HIGH
)
except Exception as e:
logger.error(f"Error checking hardware safety: {e}")
async def _check_emergency_conditions(self) -> None:
"""Check for conditions requiring emergency stop."""
emergency_conditions = []
# Check for critical alarms
for alarm in self.active_alarms.values():
if alarm.priority == AlarmPriority.CRITICAL and alarm.state == AlarmState.ACTIVE:
emergency_conditions.append(alarm.title)
# If we have critical conditions, trigger emergency stop
if emergency_conditions and not self.emergency_stop_active:
logger.critical(f"Emergency conditions detected: {', '.join(emergency_conditions)}")
# Trigger emergency stop through hardware manager
await hardware_manager.emergency_stop()
# Create emergency stop alarm
await self._create_alarm(
"AUTO_EMERGENCY_STOP",
"Automatic emergency stop",
f"Emergency stop triggered by: {', '.join(emergency_conditions)}",
ErrorCategory.SAFETY,
AlarmPriority.CRITICAL
)
self.safety_stats["emergency_stops"] += 1
async def _create_alarm(
self,
alarm_id: str,
title: str,
message: str,
category: ErrorCategory,
priority: AlarmPriority,
auto_clear: bool = False,
clear_condition: Optional[Callable[[], bool]] = None
) -> None:
"""Create or update a safety alarm."""
try:
# Check if alarm already exists
if alarm_id in self.active_alarms:
# Update existing alarm
alarm = self.active_alarms[alarm_id]
alarm.occurrence_count += 1
alarm.timestamp = datetime.now()
logger.warning(f"Alarm updated: {alarm_id} (count: {alarm.occurrence_count})")
else:
# Create new alarm
alarm = SafetyAlarm(
id=alarm_id,
title=title,
message=message,
category=category,
priority=priority,
auto_clear=auto_clear,
clear_condition=clear_condition
)
self.active_alarms[alarm_id] = alarm
self.alarm_history.append(alarm)
self.safety_stats["total_alarms"] += 1
if priority == AlarmPriority.CRITICAL:
self.safety_stats["critical_alarms"] += 1
self.safety_stats["last_critical_alarm"] = datetime.now()
logger.warning(f"Alarm created: {alarm_id} - {title}")
# Log to database
await self._log_alarm_to_database(alarm)
# Notify callbacks
for callback in self.alarm_callbacks:
try:
callback(alarm)
except Exception as e:
logger.error(f"Error in alarm callback: {e}")
except Exception as e:
logger.error(f"Error creating alarm: {e}")
async def _clear_alarm(self, alarm_id: str, user: Optional[str] = None) -> bool:
"""Clear an active alarm."""
try:
if alarm_id in self.active_alarms:
alarm = self.active_alarms[alarm_id]
alarm.state = AlarmState.CLEARED
alarm.acknowledgement_time = datetime.now()
alarm.acknowledgement_user = user or "SYSTEM"
# Remove from active alarms
del self.active_alarms[alarm_id]
logger.info(f"Alarm cleared: {alarm_id} by {user or 'SYSTEM'}")
return True
except Exception as e:
logger.error(f"Error clearing alarm: {e}")
return False
async def _update_alarm_states(self) -> None:
"""Update alarm states and handle auto-clearing."""
alarms_to_clear = []
for alarm_id, alarm in self.active_alarms.items():
# Check auto-clear condition
if alarm.auto_clear and alarm.clear_condition:
try:
if alarm.clear_condition():
alarms_to_clear.append(alarm_id)
except Exception as e:
logger.error(f"Error checking clear condition for {alarm_id}: {e}")
# Clear alarms that meet their clear conditions
for alarm_id in alarms_to_clear:
await self._clear_alarm(alarm_id, "AUTO")
async def acknowledge_alarm(self, alarm_id: str, user: str) -> bool:
"""Acknowledge an alarm."""
try:
if alarm_id in self.active_alarms:
alarm = self.active_alarms[alarm_id]
alarm.state = AlarmState.ACKNOWLEDGED
alarm.acknowledgement_time = datetime.now()
alarm.acknowledgement_user = user
logger.info(f"Alarm acknowledged: {alarm_id} by {user}")
return True
except Exception as e:
logger.error(f"Error acknowledging alarm: {e}")
return False
async def suppress_alarm(self, alarm_id: str, duration_minutes: int, user: str) -> bool:
"""Suppress an alarm for a specified duration."""
try:
if alarm_id in self.active_alarms:
alarm = self.active_alarms[alarm_id]
alarm.state = AlarmState.SUPPRESSED
alarm.suppression_time = datetime.now() + timedelta(minutes=duration_minutes)
logger.info(f"Alarm suppressed: {alarm_id} for {duration_minutes}min by {user}")
return True
except Exception as e:
logger.error(f"Error suppressing alarm: {e}")
return False
def get_active_alarms(self, priority_filter: Optional[AlarmPriority] = None) -> List[SafetyAlarm]:
"""Get list of active alarms, optionally filtered by priority."""
alarms = list(self.active_alarms.values())
if priority_filter:
alarms = [alarm for alarm in alarms if alarm.priority == priority_filter]
# Sort by priority and timestamp
priority_order = {
AlarmPriority.CRITICAL: 0,
AlarmPriority.HIGH: 1,
AlarmPriority.MEDIUM: 2,
AlarmPriority.LOW: 3
}
return sorted(alarms, key=lambda a: (priority_order[a.priority], a.timestamp))
def get_alarm_summary(self) -> Dict[str, Any]:
"""Get summary of alarm status."""
active_by_priority = {}
for priority in AlarmPriority:
active_by_priority[priority.value] = len([
a for a in self.active_alarms.values() if a.priority == priority
])
return {
"total_active_alarms": len(self.active_alarms),
"active_by_priority": active_by_priority,
"total_alarms_today": len([
a for a in self.alarm_history
if a.timestamp.date() == datetime.now().date()
]),
"emergency_stop_active": self.emergency_stop_active,
"last_safety_check": self.last_safety_check.isoformat(),
"consecutive_errors": self.consecutive_errors,
"statistics": self.safety_stats.copy()
}
def add_alarm_callback(self, callback: Callable[[SafetyAlarm], None]) -> None:
"""Add callback function to be called when alarms are created."""
self.alarm_callbacks.append(callback)
async def _verify_hardware_safety(self) -> bool:
"""Verify hardware is in a safe state for operation."""
is_safe, issues = await hardware_manager.is_safe_to_operate()
if not is_safe:
for issue in issues:
logger.error(f"Hardware safety issue: {issue}")
return is_safe
def _load_safety_limits(self) -> None:
"""Load safety limits from configuration."""
self.safety_limits = SafetyLimits(
max_tank_temperature=settings.temperature.absolute_max_temp,
min_temperature=settings.temperature.absolute_min_temp,
max_neutral_current=settings.safety.max_neutral_current,
max_motor_current=settings.safety.max_motor1_current,
communication_timeout=settings.safety.communication_timeout,
)
def _update_safety_statistics(self) -> None:
"""Update safety monitoring statistics."""
self.last_safety_check = datetime.now()
self.consecutive_errors = 0 # Reset if we got here successfully
async def _log_alarm_to_database(self, alarm: SafetyAlarm) -> None:
"""Log alarm to database for audit trail."""
try:
async with db_manager.get_async_session() as session:
error_log = ErrorLog(
error_code=alarm.id,
category=alarm.category,
severity=ErrorSeverity(alarm.priority.value.upper()) if alarm.priority.value.upper() in [s.value for s in ErrorSeverity] else ErrorSeverity.MEDIUM,
title=alarm.title,
message=alarm.message,
component="SafetyMonitor",
occurrence_count=alarm.occurrence_count,
first_occurrence=alarm.first_occurrence,
last_occurrence=alarm.timestamp,
)
session.add(error_log)
await session.commit()
except Exception as e:
logger.error(f"Error logging alarm to database: {e}")
# Global safety monitor instance
safety_monitor = SafetyMonitor()

View File

@@ -0,0 +1,256 @@
"""
FastAPI main application for chocolate tempering machine control system.
Provides REST API endpoints for system control and monitoring.
"""
import logging
from contextlib import asynccontextmanager
from typing import Dict, Any
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from fastapi.responses import JSONResponse
import uvicorn
from ...shared.config import settings
from ...shared.database import init_database, create_tables, close_database
from ..hardware.hardware_manager import hardware_manager
from ..recipe.recipe_controller import recipe_controller
from ..safety.safety_monitor import safety_monitor
from ..safety.error_handler import error_handler
from .routers import recipes, process, hardware, users, system, health
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager for startup and shutdown tasks."""
# Startup
logger.info("Starting chocolate tempering machine control system")
try:
# Initialize database
init_database()
await create_tables()
logger.info("Database initialized")
# Initialize hardware manager
if await hardware_manager.initialize():
logger.info("Hardware manager initialized")
else:
logger.error("Hardware manager initialization failed")
# Initialize safety monitor
if await safety_monitor.initialize():
logger.info("Safety monitor initialized")
else:
logger.error("Safety monitor initialization failed")
logger.info("System startup completed successfully")
except Exception as e:
logger.critical(f"System startup failed: {e}")
raise
yield
# Shutdown
logger.info("Shutting down chocolate tempering machine control system")
try:
# Stop any running recipes
await recipe_controller.emergency_stop(reason="System shutdown")
# Shutdown safety monitor
await safety_monitor.shutdown()
# Shutdown hardware manager
await hardware_manager.shutdown()
# Close database connections
await close_database()
logger.info("System shutdown completed")
except Exception as e:
logger.error(f"Error during shutdown: {e}")
# Create FastAPI application
app = FastAPI(
title=settings.web.api_title,
version=settings.web.api_version,
description="Industrial chocolate tempering machine control system API",
docs_url="/docs",
redoc_url="/redoc",
openapi_url="/openapi.json",
lifespan=lifespan,
)
# Add middleware
app.add_middleware(
CORSMiddleware,
allow_origins=settings.web.cors_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=["*"] # Configure appropriately for production
)
# Global exception handler
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
"""Global exception handler for unhandled errors."""
logger.error(f"Unhandled exception in {request.url}: {exc}")
# Handle error through error handling system
await error_handler.handle_error(exc, "web_api", {
"url": str(request.url),
"method": request.method,
"client": request.client.host if request.client else "unknown"
})
return JSONResponse(
status_code=500,
content={
"error": "Internal server error",
"message": "An unexpected error occurred",
"request_id": getattr(request.state, "request_id", "unknown")
}
)
# HTTP exception handler
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
"""Handler for HTTP exceptions."""
return JSONResponse(
status_code=exc.status_code,
content={
"error": exc.detail,
"status_code": exc.status_code
}
)
# Request ID middleware
@app.middleware("http")
async def add_request_id(request: Request, call_next):
"""Add request ID for tracing."""
import uuid
request_id = str(uuid.uuid4())
request.state.request_id = request_id
response = await call_next(request)
response.headers["X-Request-ID"] = request_id
return response
# Include routers
app.include_router(
health.router,
prefix="/health",
tags=["health"]
)
app.include_router(
recipes.router,
prefix="/api/v1/recipes",
tags=["recipes"]
)
app.include_router(
process.router,
prefix="/api/v1/process",
tags=["process"]
)
app.include_router(
hardware.router,
prefix="/api/v1/hardware",
tags=["hardware"]
)
app.include_router(
users.router,
prefix="/api/v1/users",
tags=["users"]
)
app.include_router(
system.router,
prefix="/api/v1/system",
tags=["system"]
)
# Root endpoint
@app.get("/", response_model=Dict[str, Any])
async def root():
"""Root endpoint with system information."""
return {
"name": settings.app_name,
"version": settings.app_version,
"environment": settings.environment,
"api_version": "v1",
"docs_url": "/docs",
"health_url": "/health"
}
# API info endpoint
@app.get("/api/v1/info", response_model=Dict[str, Any])
async def api_info():
"""API information endpoint."""
return {
"api_title": settings.web.api_title,
"api_version": settings.web.api_version,
"environment": settings.environment,
"features": {
"recipe_management": True,
"process_control": True,
"hardware_monitoring": True,
"safety_monitoring": True,
"user_management": True,
"real_time_monitoring": True,
},
"endpoints": {
"recipes": "/api/v1/recipes",
"process": "/api/v1/process",
"hardware": "/api/v1/hardware",
"users": "/api/v1/users",
"system": "/api/v1/system",
"health": "/health",
}
}
def create_app() -> FastAPI:
"""Factory function to create FastAPI app."""
return app
def run_server():
"""Run the development server."""
uvicorn.run(
"tempering_machine.services.web.main:app",
host=settings.web.host,
port=settings.web.port,
reload=settings.web.reload,
workers=settings.web.workers if not settings.web.reload else 1,
access_log=settings.web.access_log,
log_level=settings.log_level.lower(),
)
if __name__ == "__main__":
run_server()

View File

@@ -0,0 +1,3 @@
"""
API routers for different endpoints.
"""

View File

@@ -0,0 +1,409 @@
"""
Hardware monitoring and control API endpoints.
"""
import logging
from typing import Dict, Any, List, Optional
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from ...hardware.hardware_manager import hardware_manager
logger = logging.getLogger(__name__)
router = APIRouter()
class TemperatureReading(BaseModel):
"""Temperature reading response model."""
value: float
sensor_name: str
timestamp: str
units: str = "°C"
is_valid: bool = True
error_message: Optional[str] = None
class MotorStatus(BaseModel):
"""Motor status response model."""
name: str
state: str
is_enabled: bool
current: Optional[float] = None
runtime_hours: float = 0.0
error_count: int = 0
class SafetyStatus(BaseModel):
"""Safety status response model."""
emergency_stop_active: bool = False
cover_sensor_closed: bool = True
temperature_alarms: List[str] = []
current_alarms: List[str] = []
last_safety_check: Optional[str] = None
class HardwareStatusResponse(BaseModel):
"""Complete hardware status response."""
temperatures: Dict[str, TemperatureReading]
motors: Dict[str, MotorStatus]
safety: SafetyStatus
communication_health: float
system_status: str
last_update: Optional[str] = None
class MotorControlRequest(BaseModel):
"""Motor control request model."""
enabled: bool
class HeaterControlRequest(BaseModel):
"""Heater control request model."""
enabled: bool
@router.get("/status", response_model=HardwareStatusResponse)
async def get_hardware_status():
"""Get complete hardware status."""
try:
hw_status = hardware_manager.get_hardware_status()
# Convert temperature readings
temperatures = {}
for sensor_name, reading in hw_status.temperatures.items():
temperatures[sensor_name] = TemperatureReading(
value=reading.value,
sensor_name=reading.sensor_name,
timestamp=reading.timestamp.isoformat(),
units=reading.units,
is_valid=reading.is_valid,
error_message=reading.error_message
)
# Convert motor statuses
motors = {}
for motor_name, motor in hw_status.motors.items():
motors[motor_name] = MotorStatus(
name=motor.name,
state=motor.state.value,
is_enabled=motor.is_enabled,
current=motor.current,
runtime_hours=motor.runtime_hours,
error_count=motor.error_count
)
# Convert safety status
safety = SafetyStatus(
emergency_stop_active=hw_status.safety.emergency_stop_active,
cover_sensor_closed=hw_status.safety.cover_sensor_closed,
temperature_alarms=hw_status.safety.temperature_alarms,
current_alarms=hw_status.safety.current_alarms,
last_safety_check=hw_status.safety.last_safety_check.isoformat() if hw_status.safety.last_safety_check else None
)
return HardwareStatusResponse(
temperatures=temperatures,
motors=motors,
safety=safety,
communication_health=hw_status.communication_health,
system_status=hw_status.system_status.value,
last_update=hw_status.last_update.isoformat() if hw_status.last_update else None
)
except Exception as e:
logger.error(f"Failed to get hardware status: {e}")
raise HTTPException(
status_code=500,
detail="Failed to retrieve hardware status"
)
@router.get("/temperatures", response_model=Dict[str, TemperatureReading])
async def get_temperatures():
"""Get all temperature readings."""
try:
temperatures = await hardware_manager.get_all_temperatures()
result = {}
for sensor_name, reading in temperatures.items():
result[sensor_name] = TemperatureReading(
value=reading.value,
sensor_name=reading.sensor_name,
timestamp=reading.timestamp.isoformat(),
units=reading.units,
is_valid=reading.is_valid,
error_message=reading.error_message
)
return result
except Exception as e:
logger.error(f"Failed to get temperatures: {e}")
raise HTTPException(
status_code=500,
detail="Failed to retrieve temperature readings"
)
@router.get("/temperatures/{sensor_name}", response_model=TemperatureReading)
async def get_temperature(sensor_name: str):
"""Get temperature reading for a specific sensor."""
try:
reading = await hardware_manager.get_temperature(sensor_name)
if not reading:
raise HTTPException(
status_code=404,
detail=f"Temperature sensor '{sensor_name}' not found"
)
return TemperatureReading(
value=reading.value,
sensor_name=reading.sensor_name,
timestamp=reading.timestamp.isoformat(),
units=reading.units,
is_valid=reading.is_valid,
error_message=reading.error_message
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get temperature for {sensor_name}: {e}")
raise HTTPException(
status_code=500,
detail=f"Failed to retrieve temperature for sensor '{sensor_name}'"
)
@router.get("/temperatures/tank/average")
async def get_average_tank_temperature():
"""Get average temperature of tank sensors."""
try:
avg_temp = await hardware_manager.get_average_tank_temperature()
if avg_temp is None:
raise HTTPException(
status_code=503,
detail="No valid tank temperature readings available"
)
return {
"average_temperature": avg_temp,
"units": "°C",
"sensors_used": ["tank_bottom", "tank_wall"],
"timestamp": hardware_manager.current_status.last_update.isoformat() if hardware_manager.current_status.last_update else None
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get average tank temperature: {e}")
raise HTTPException(
status_code=500,
detail="Failed to calculate average tank temperature"
)
@router.get("/motors", response_model=Dict[str, MotorStatus])
async def get_motor_status():
"""Get status of all motors."""
try:
hw_status = hardware_manager.get_hardware_status()
motors = {}
for motor_name, motor in hw_status.motors.items():
motors[motor_name] = MotorStatus(
name=motor.name,
state=motor.state.value,
is_enabled=motor.is_enabled,
current=motor.current,
runtime_hours=motor.runtime_hours,
error_count=motor.error_count
)
return motors
except Exception as e:
logger.error(f"Failed to get motor status: {e}")
raise HTTPException(
status_code=500,
detail="Failed to retrieve motor status"
)
@router.post("/motors/{motor_name}/control")
async def control_motor(motor_name: str, request: MotorControlRequest):
"""Enable or disable a specific motor."""
try:
success = await hardware_manager.set_motor_state(motor_name, request.enabled)
if not success:
raise HTTPException(
status_code=400,
detail=f"Failed to control motor '{motor_name}'"
)
action = "enabled" if request.enabled else "disabled"
return {
"success": True,
"message": f"Motor '{motor_name}' {action} successfully",
"motor_name": motor_name,
"enabled": request.enabled
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to control motor {motor_name}: {e}")
raise HTTPException(
status_code=500,
detail=f"Failed to control motor '{motor_name}'"
)
@router.post("/motors/disable-all")
async def disable_all_motors():
"""Disable all motors for safety."""
try:
success = await hardware_manager.disable_all_motors()
return {
"success": success,
"message": "All motors disabled" if success else "Failed to disable all motors"
}
except Exception as e:
logger.error(f"Failed to disable all motors: {e}")
raise HTTPException(
status_code=500,
detail="Failed to disable all motors"
)
@router.post("/heaters/{heater_name}/control")
async def control_heater(heater_name: str, request: HeaterControlRequest):
"""Enable or disable a specific heater."""
try:
success = await hardware_manager.set_heater_state(heater_name, request.enabled)
if not success:
raise HTTPException(
status_code=400,
detail=f"Failed to control heater '{heater_name}'"
)
action = "enabled" if request.enabled else "disabled"
return {
"success": True,
"message": f"Heater '{heater_name}' {action} successfully",
"heater_name": heater_name,
"enabled": request.enabled
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to control heater {heater_name}: {e}")
raise HTTPException(
status_code=500,
detail=f"Failed to control heater '{heater_name}'"
)
@router.post("/heaters/disable-all")
async def disable_all_heaters():
"""Disable all heaters for safety."""
try:
success = await hardware_manager.disable_all_heaters()
return {
"success": success,
"message": "All heaters disabled" if success else "Failed to disable all heaters"
}
except Exception as e:
logger.error(f"Failed to disable all heaters: {e}")
raise HTTPException(
status_code=500,
detail="Failed to disable all heaters"
)
@router.post("/emergency-stop")
async def emergency_stop():
"""Execute hardware emergency stop."""
try:
success = await hardware_manager.emergency_stop()
return {
"success": success,
"message": "Emergency stop executed" if success else "Emergency stop failed"
}
except Exception as e:
logger.critical(f"Hardware emergency stop failed: {e}")
raise HTTPException(
status_code=500,
detail="Hardware emergency stop failed"
)
@router.get("/safety", response_model=SafetyStatus)
async def get_safety_status():
"""Get hardware safety status."""
try:
hw_status = hardware_manager.get_hardware_status()
return SafetyStatus(
emergency_stop_active=hw_status.safety.emergency_stop_active,
cover_sensor_closed=hw_status.safety.cover_sensor_closed,
temperature_alarms=hw_status.safety.temperature_alarms,
current_alarms=hw_status.safety.current_alarms,
last_safety_check=hw_status.safety.last_safety_check.isoformat() if hw_status.safety.last_safety_check else None
)
except Exception as e:
logger.error(f"Failed to get safety status: {e}")
raise HTTPException(
status_code=500,
detail="Failed to retrieve safety status"
)
@router.get("/safety/check")
async def check_safety():
"""Perform safety check and return results."""
try:
is_safe, issues = await hardware_manager.is_safe_to_operate()
return {
"is_safe": is_safe,
"issues": issues,
"timestamp": hardware_manager.current_status.last_update.isoformat() if hardware_manager.current_status.last_update else None
}
except Exception as e:
logger.error(f"Safety check failed: {e}")
raise HTTPException(
status_code=500,
detail="Safety check failed"
)
@router.get("/communication/statistics")
async def get_communication_statistics():
"""Get Modbus communication statistics."""
try:
stats = hardware_manager.get_communication_statistics()
return stats
except Exception as e:
logger.error(f"Failed to get communication statistics: {e}")
raise HTTPException(
status_code=500,
detail="Failed to retrieve communication statistics"
)

View File

@@ -0,0 +1,259 @@
"""
Health check endpoints for system monitoring.
"""
import logging
from typing import Dict, Any
from datetime import datetime
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from ....shared.config import settings
from ...hardware.hardware_manager import hardware_manager
from ...recipe.recipe_controller import recipe_controller
from ...safety.safety_monitor import safety_monitor
logger = logging.getLogger(__name__)
router = APIRouter()
class HealthStatus(BaseModel):
"""Health status response model."""
status: str # "healthy", "degraded", "unhealthy"
timestamp: datetime
uptime_seconds: float
version: str
environment: str
class DetailedHealthStatus(BaseModel):
"""Detailed health status with component information."""
overall_status: str
timestamp: datetime
uptime_seconds: float
version: str
environment: str
components: Dict[str, Dict[str, Any]]
@router.get("/", response_model=HealthStatus)
async def health_check():
"""Basic health check endpoint."""
try:
# Quick health check
overall_status = "healthy"
# Check hardware communication
hw_status = hardware_manager.get_hardware_status()
if hw_status.communication_health < 80:
overall_status = "degraded"
# Check for critical alarms
alarm_summary = safety_monitor.get_alarm_summary()
if alarm_summary["active_by_priority"].get("critical", 0) > 0:
overall_status = "unhealthy"
return HealthStatus(
status=overall_status,
timestamp=datetime.now(),
uptime_seconds=0.0, # Would calculate actual uptime
version=settings.app_version,
environment=settings.environment
)
except Exception as e:
logger.error(f"Health check failed: {e}")
raise HTTPException(status_code=503, detail="Health check failed")
@router.get("/detailed", response_model=DetailedHealthStatus)
async def detailed_health_check():
"""Detailed health check with component status."""
try:
# Overall system status
overall_status = "healthy"
component_statuses = {}
# Hardware status
try:
hw_status = hardware_manager.get_hardware_status()
hw_healthy = hw_status.communication_health >= 80
component_statuses["hardware"] = {
"status": "healthy" if hw_healthy else "unhealthy",
"communication_health": hw_status.communication_health,
"system_status": hw_status.system_status.value,
"last_update": hw_status.last_update.isoformat() if hw_status.last_update else None,
"temperature_sensors": len(hw_status.temperatures),
"motor_count": len(hw_status.motors),
}
if not hw_healthy:
overall_status = "degraded"
except Exception as e:
logger.error(f"Hardware health check failed: {e}")
component_statuses["hardware"] = {
"status": "unhealthy",
"error": str(e)
}
overall_status = "unhealthy"
# Recipe controller status
try:
process_status = await recipe_controller.get_process_status()
component_statuses["recipe_controller"] = {
"status": "healthy",
"has_active_process": process_status is not None,
"process_status": process_status["session"]["status"] if process_status else None,
}
except Exception as e:
logger.error(f"Recipe controller health check failed: {e}")
component_statuses["recipe_controller"] = {
"status": "unhealthy",
"error": str(e)
}
overall_status = "degraded"
# Safety monitor status
try:
alarm_summary = safety_monitor.get_alarm_summary()
critical_alarms = alarm_summary["active_by_priority"].get("critical", 0)
safety_healthy = critical_alarms == 0
component_statuses["safety_monitor"] = {
"status": "healthy" if safety_healthy else "unhealthy",
"active_alarms": alarm_summary["total_active_alarms"],
"critical_alarms": critical_alarms,
"emergency_stop_active": alarm_summary["emergency_stop_active"],
"last_check": alarm_summary["last_safety_check"],
}
if not safety_healthy:
overall_status = "unhealthy"
except Exception as e:
logger.error(f"Safety monitor health check failed: {e}")
component_statuses["safety_monitor"] = {
"status": "unhealthy",
"error": str(e)
}
overall_status = "unhealthy"
# Database status
try:
from ....shared.database import db_manager
# Simple database connectivity test would go here
component_statuses["database"] = {
"status": "healthy",
"connected": True,
}
except Exception as e:
logger.error(f"Database health check failed: {e}")
component_statuses["database"] = {
"status": "unhealthy",
"error": str(e)
}
overall_status = "unhealthy"
return DetailedHealthStatus(
overall_status=overall_status,
timestamp=datetime.now(),
uptime_seconds=0.0, # Would calculate actual uptime
version=settings.app_version,
environment=settings.environment,
components=component_statuses
)
except Exception as e:
logger.error(f"Detailed health check failed: {e}")
raise HTTPException(status_code=503, detail="Detailed health check failed")
@router.get("/ready")
async def readiness_check():
"""Kubernetes readiness probe endpoint."""
try:
# Check if all critical services are ready
hw_status = hardware_manager.get_hardware_status()
if hw_status.communication_health < 50:
raise HTTPException(status_code=503, detail="Hardware communication not ready")
return {"status": "ready", "timestamp": datetime.now()}
except HTTPException:
raise
except Exception as e:
logger.error(f"Readiness check failed: {e}")
raise HTTPException(status_code=503, detail="Service not ready")
@router.get("/live")
async def liveness_check():
"""Kubernetes liveness probe endpoint."""
# Simple liveness check - if this endpoint responds, service is alive
return {"status": "alive", "timestamp": datetime.now()}
@router.get("/metrics")
async def metrics():
"""Prometheus-compatible metrics endpoint."""
try:
# Get hardware stats
hw_status = hardware_manager.get_hardware_status()
comm_stats = hardware_manager.get_communication_statistics()
# Get safety stats
alarm_summary = safety_monitor.get_alarm_summary()
# Format as Prometheus metrics
metrics_text = f"""
# HELP tempering_machine_communication_health_percent Hardware communication health percentage
# TYPE tempering_machine_communication_health_percent gauge
tempering_machine_communication_health_percent {hw_status.communication_health}
# HELP tempering_machine_temperature_celsius Current temperature readings
# TYPE tempering_machine_temperature_celsius gauge
"""
# Add temperature metrics
for sensor_name, reading in hw_status.temperatures.items():
if reading.is_valid:
metrics_text += f'tempering_machine_temperature_celsius{{sensor="{sensor_name}"}} {reading.value}\n'
# Add communication metrics
if comm_stats:
metrics_text += f"""
# HELP tempering_machine_requests_total Total number of communication requests
# TYPE tempering_machine_requests_total counter
tempering_machine_requests_total {comm_stats.get("total_requests", 0)}
# HELP tempering_machine_request_failures_total Total number of failed requests
# TYPE tempering_machine_request_failures_total counter
tempering_machine_request_failures_total {comm_stats.get("failed_requests", 0)}
"""
# Add alarm metrics
metrics_text += f"""
# HELP tempering_machine_active_alarms Total number of active alarms
# TYPE tempering_machine_active_alarms gauge
tempering_machine_active_alarms {alarm_summary["total_active_alarms"]}
# HELP tempering_machine_critical_alarms Total number of critical alarms
# TYPE tempering_machine_critical_alarms gauge
tempering_machine_critical_alarms {alarm_summary["active_by_priority"].get("critical", 0)}
"""
return metrics_text.strip()
except Exception as e:
logger.error(f"Metrics collection failed: {e}")
raise HTTPException(status_code=500, detail="Metrics collection failed")

View File

@@ -0,0 +1,245 @@
"""
Process control API endpoints.
"""
import logging
from typing import Optional, Dict, Any
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel
from ...recipe.recipe_controller import recipe_controller
from ....shared.models.recipe import RecipePhase
logger = logging.getLogger(__name__)
router = APIRouter()
class ProcessStartRequest(BaseModel):
"""Request model for starting a process."""
recipe_id: int
user_id: Optional[str] = None
class ProcessStatusResponse(BaseModel):
"""Response model for process status."""
session_id: Optional[str]
recipe_id: Optional[int]
recipe_name: Optional[str]
status: Optional[str]
current_phase: Optional[str]
started_at: Optional[str]
duration_seconds: Optional[int]
started_by: Optional[str]
current_temperature: Optional[float]
target_temperature: Optional[float]
temperature_error: Optional[float]
error_count: int = 0
warning_count: int = 0
is_running: bool = False
class ProcessActionResponse(BaseModel):
"""Response model for process actions."""
success: bool
message: str
session_id: Optional[str] = None
@router.post("/start", response_model=ProcessActionResponse)
async def start_process(request: ProcessStartRequest):
"""Start a new tempering process."""
try:
logger.info(f"Starting process for recipe {request.recipe_id} by user {request.user_id}")
session_id = await recipe_controller.start_recipe(
recipe_id=request.recipe_id,
user_id=request.user_id
)
return ProcessActionResponse(
success=True,
message="Process started successfully",
session_id=session_id
)
except Exception as e:
logger.error(f"Failed to start process: {e}")
raise HTTPException(
status_code=400,
detail=f"Failed to start process: {str(e)}"
)
@router.post("/stop", response_model=ProcessActionResponse)
async def stop_process(
user_id: Optional[str] = None,
reason: str = "Manual stop"
):
"""Stop the currently running process."""
try:
logger.info(f"Stopping process by user {user_id}, reason: {reason}")
success = await recipe_controller.stop_recipe(
user_id=user_id,
reason=reason
)
return ProcessActionResponse(
success=success,
message="Process stopped successfully" if success else "No process was running"
)
except Exception as e:
logger.error(f"Failed to stop process: {e}")
raise HTTPException(
status_code=500,
detail=f"Failed to stop process: {str(e)}"
)
@router.post("/pause", response_model=ProcessActionResponse)
async def pause_process(user_id: Optional[str] = None):
"""Pause the currently running process."""
try:
logger.info(f"Pausing process by user {user_id}")
success = await recipe_controller.pause_recipe(user_id=user_id)
return ProcessActionResponse(
success=success,
message="Process paused successfully" if success else "No process is running to pause"
)
except Exception as e:
logger.error(f"Failed to pause process: {e}")
raise HTTPException(
status_code=500,
detail=f"Failed to pause process: {str(e)}"
)
@router.post("/resume", response_model=ProcessActionResponse)
async def resume_process(user_id: Optional[str] = None):
"""Resume a paused process."""
try:
logger.info(f"Resuming process by user {user_id}")
success = await recipe_controller.resume_recipe(user_id=user_id)
return ProcessActionResponse(
success=success,
message="Process resumed successfully" if success else "No paused process to resume"
)
except Exception as e:
logger.error(f"Failed to resume process: {e}")
raise HTTPException(
status_code=500,
detail=f"Failed to resume process: {str(e)}"
)
@router.post("/emergency-stop", response_model=ProcessActionResponse)
async def emergency_stop(user_id: Optional[str] = None):
"""Execute emergency stop of the current process."""
try:
logger.critical(f"Emergency stop initiated by user {user_id}")
success = await recipe_controller.emergency_stop(user_id=user_id)
return ProcessActionResponse(
success=success,
message="Emergency stop executed" if success else "Emergency stop failed"
)
except Exception as e:
logger.critical(f"Emergency stop failed: {e}")
raise HTTPException(
status_code=500,
detail=f"Emergency stop failed: {str(e)}"
)
@router.get("/status", response_model=Optional[ProcessStatusResponse])
async def get_process_status():
"""Get current process status."""
try:
status = await recipe_controller.get_process_status()
if not status:
return None
session_info = status.get("session", {})
state_machine_info = status.get("state_machine", {})
return ProcessStatusResponse(
session_id=session_info.get("session_id"),
recipe_id=state_machine_info.get("recipe_id"),
recipe_name=session_info.get("recipe_name"),
status=session_info.get("status"),
current_phase=state_machine_info.get("current_phase"),
started_at=session_info.get("started_at"),
duration_seconds=session_info.get("duration_seconds"),
started_by=session_info.get("started_by"),
current_temperature=state_machine_info.get("current_temperature"),
target_temperature=state_machine_info.get("target_temperature"),
temperature_error=state_machine_info.get("temperature_error"),
error_count=state_machine_info.get("error_count", 0),
warning_count=state_machine_info.get("warning_count", 0),
is_running=state_machine_info.get("is_running", False)
)
except Exception as e:
logger.error(f"Failed to get process status: {e}")
raise HTTPException(
status_code=500,
detail="Failed to retrieve process status"
)
@router.get("/phases", response_model=Dict[str, str])
async def get_process_phases():
"""Get available process phases."""
return {phase.value: phase.name for phase in RecipePhase}
@router.get("/active-sessions")
async def get_active_sessions():
"""Get list of active process sessions."""
try:
sessions = await recipe_controller.get_active_sessions()
return {"active_sessions": sessions}
except Exception as e:
logger.error(f"Failed to get active sessions: {e}")
raise HTTPException(
status_code=500,
detail="Failed to retrieve active sessions"
)
@router.get("/history")
async def get_process_history(
limit: int = Query(50, ge=1, le=100, description="Maximum number of sessions to return"),
skip: int = Query(0, ge=0, description="Number of sessions to skip")
):
"""Get process execution history."""
try:
# This would query the database for process session history
# For now, return empty list as placeholder
return {
"sessions": [],
"total": 0,
"page": (skip // limit) + 1,
"per_page": limit
}
except Exception as e:
logger.error(f"Failed to get process history: {e}")
raise HTTPException(
status_code=500,
detail="Failed to retrieve process history"
)

View File

@@ -0,0 +1,355 @@
"""
Recipe management API endpoints.
"""
import logging
from typing import List, Optional
from datetime import datetime
from fastapi import APIRouter, HTTPException, Depends, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from ....shared.database import get_db
from ....shared.models.recipe import Recipe
from ....shared.schemas.recipe import (
RecipeCreate, RecipeUpdate, RecipeResponse, RecipeList, RecipeSummary
)
logger = logging.getLogger(__name__)
router = APIRouter()
@router.get("/", response_model=RecipeList)
async def list_recipes(
skip: int = Query(0, ge=0, description="Number of recipes to skip"),
limit: int = Query(50, ge=1, le=100, description="Maximum number of recipes to return"),
active_only: bool = Query(False, description="Return only active recipes"),
search: Optional[str] = Query(None, description="Search recipes by name"),
db: AsyncSession = Depends(get_db)
):
"""Get list of recipes with pagination and filtering."""
try:
# Build query
query = select(Recipe)
if active_only:
query = query.where(Recipe.is_active == True)
if search:
query = query.where(Recipe.name.ilike(f"%{search}%"))
# Get total count
count_query = select(func.count(Recipe.id))
if active_only:
count_query = count_query.where(Recipe.is_active == True)
if search:
count_query = count_query.where(Recipe.name.ilike(f"%{search}%"))
total_result = await db.execute(count_query)
total = total_result.scalar() or 0
# Get paginated results
query = query.order_by(Recipe.created_at.desc()).offset(skip).limit(limit)
result = await db.execute(query)
recipes = result.scalars().all()
return RecipeList(
recipes=[RecipeResponse.from_orm(recipe) for recipe in recipes],
total=total,
page=(skip // limit) + 1,
per_page=limit
)
except Exception as e:
logger.error(f"Error listing recipes: {e}")
raise HTTPException(status_code=500, detail="Failed to retrieve recipes")
@router.post("/", response_model=RecipeResponse, status_code=201)
async def create_recipe(
recipe_data: RecipeCreate,
db: AsyncSession = Depends(get_db)
):
"""Create a new recipe."""
try:
# Check if recipe name already exists
existing_query = select(Recipe).where(Recipe.name == recipe_data.name)
existing_result = await db.execute(existing_query)
existing_recipe = existing_result.scalar_one_or_none()
if existing_recipe:
raise HTTPException(
status_code=409,
detail=f"Recipe with name '{recipe_data.name}' already exists"
)
# Create new recipe
new_recipe = Recipe(
name=recipe_data.name,
description=recipe_data.description,
heating_goal=recipe_data.heating_goal,
cooling_goal=recipe_data.cooling_goal,
pouring_goal=recipe_data.pouring_goal,
tank_temp=recipe_data.tank_temp,
fountain_temp=recipe_data.fountain_temp,
mixer_enabled=recipe_data.mixer_enabled,
fountain_enabled=recipe_data.fountain_enabled,
mold_heater_enabled=recipe_data.mold_heater_enabled,
vibration_enabled=recipe_data.vibration_enabled,
vib_heater_enabled=recipe_data.vib_heater_enabled,
pedal_control_enabled=recipe_data.pedal_control_enabled,
pedal_on_time=recipe_data.pedal_on_time,
pedal_off_time=recipe_data.pedal_off_time,
created_at=datetime.now(),
)
# Validate recipe parameters
if not new_recipe.validate_temperatures():
raise HTTPException(
status_code=400,
detail="Invalid temperature parameters"
)
db.add(new_recipe)
await db.commit()
await db.refresh(new_recipe)
logger.info(f"Created new recipe: {new_recipe.name} (ID: {new_recipe.id})")
return RecipeResponse.from_orm(new_recipe)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error creating recipe: {e}")
await db.rollback()
raise HTTPException(status_code=500, detail="Failed to create recipe")
@router.get("/{recipe_id}", response_model=RecipeResponse)
async def get_recipe(
recipe_id: int,
db: AsyncSession = Depends(get_db)
):
"""Get a specific recipe by ID."""
try:
query = select(Recipe).where(Recipe.id == recipe_id)
result = await db.execute(query)
recipe = result.scalar_one_or_none()
if not recipe:
raise HTTPException(status_code=404, detail="Recipe not found")
return RecipeResponse.from_orm(recipe)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error retrieving recipe {recipe_id}: {e}")
raise HTTPException(status_code=500, detail="Failed to retrieve recipe")
@router.put("/{recipe_id}", response_model=RecipeResponse)
async def update_recipe(
recipe_id: int,
recipe_update: RecipeUpdate,
db: AsyncSession = Depends(get_db)
):
"""Update an existing recipe."""
try:
# Get existing recipe
query = select(Recipe).where(Recipe.id == recipe_id)
result = await db.execute(query)
recipe = result.scalar_one_or_none()
if not recipe:
raise HTTPException(status_code=404, detail="Recipe not found")
# Update fields that are provided
update_data = recipe_update.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(recipe, field, value)
# Update metadata
recipe.updated_at = datetime.now()
recipe.version += 1
# Validate updated recipe
if not recipe.validate_temperatures():
raise HTTPException(
status_code=400,
detail="Invalid temperature parameters"
)
await db.commit()
await db.refresh(recipe)
logger.info(f"Updated recipe: {recipe.name} (ID: {recipe.id})")
return RecipeResponse.from_orm(recipe)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error updating recipe {recipe_id}: {e}")
await db.rollback()
raise HTTPException(status_code=500, detail="Failed to update recipe")
@router.delete("/{recipe_id}", status_code=204)
async def delete_recipe(
recipe_id: int,
db: AsyncSession = Depends(get_db)
):
"""Delete a recipe (soft delete by setting inactive)."""
try:
# Get existing recipe
query = select(Recipe).where(Recipe.id == recipe_id)
result = await db.execute(query)
recipe = result.scalar_one_or_none()
if not recipe:
raise HTTPException(status_code=404, detail="Recipe not found")
# Soft delete by setting inactive
recipe.is_active = False
recipe.updated_at = datetime.now()
await db.commit()
logger.info(f"Deleted recipe: {recipe.name} (ID: {recipe.id})")
except HTTPException:
raise
except Exception as e:
logger.error(f"Error deleting recipe {recipe_id}: {e}")
await db.rollback()
raise HTTPException(status_code=500, detail="Failed to delete recipe")
@router.post("/{recipe_id}/duplicate", response_model=RecipeResponse, status_code=201)
async def duplicate_recipe(
recipe_id: int,
new_name: str = Query(..., description="Name for the duplicated recipe"),
db: AsyncSession = Depends(get_db)
):
"""Duplicate an existing recipe with a new name."""
try:
# Get source recipe
query = select(Recipe).where(Recipe.id == recipe_id)
result = await db.execute(query)
source_recipe = result.scalar_one_or_none()
if not source_recipe:
raise HTTPException(status_code=404, detail="Source recipe not found")
# Check if new name already exists
name_query = select(Recipe).where(Recipe.name == new_name)
name_result = await db.execute(name_query)
existing_recipe = name_result.scalar_one_or_none()
if existing_recipe:
raise HTTPException(
status_code=409,
detail=f"Recipe with name '{new_name}' already exists"
)
# Create duplicate
duplicate_recipe = Recipe(
name=new_name,
description=f"Copy of {source_recipe.name}",
heating_goal=source_recipe.heating_goal,
cooling_goal=source_recipe.cooling_goal,
pouring_goal=source_recipe.pouring_goal,
tank_temp=source_recipe.tank_temp,
fountain_temp=source_recipe.fountain_temp,
mixer_enabled=source_recipe.mixer_enabled,
fountain_enabled=source_recipe.fountain_enabled,
mold_heater_enabled=source_recipe.mold_heater_enabled,
vibration_enabled=source_recipe.vibration_enabled,
vib_heater_enabled=source_recipe.vib_heater_enabled,
pedal_control_enabled=source_recipe.pedal_control_enabled,
pedal_on_time=source_recipe.pedal_on_time,
pedal_off_time=source_recipe.pedal_off_time,
created_at=datetime.now(),
)
db.add(duplicate_recipe)
await db.commit()
await db.refresh(duplicate_recipe)
logger.info(f"Duplicated recipe: {source_recipe.name} -> {new_name} (ID: {duplicate_recipe.id})")
return RecipeResponse.from_orm(duplicate_recipe)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error duplicating recipe {recipe_id}: {e}")
await db.rollback()
raise HTTPException(status_code=500, detail="Failed to duplicate recipe")
@router.get("/{recipe_id}/validate")
async def validate_recipe(
recipe_id: int,
db: AsyncSession = Depends(get_db)
):
"""Validate a recipe's parameters."""
try:
query = select(Recipe).where(Recipe.id == recipe_id)
result = await db.execute(query)
recipe = result.scalar_one_or_none()
if not recipe:
raise HTTPException(status_code=404, detail="Recipe not found")
# Validate recipe
is_valid = recipe.validate_temperatures()
validation_result = {
"recipe_id": recipe_id,
"is_valid": is_valid,
"checks": {
"temperature_ranges": True, # Would implement detailed checks
"cooling_less_than_heating": recipe.cooling_goal < recipe.heating_goal,
"pouring_within_range": (
recipe.pouring_goal is None or
(recipe.cooling_goal <= recipe.pouring_goal <= recipe.heating_goal)
),
}
}
if not is_valid:
validation_result["errors"] = [
"Temperature parameters are invalid"
]
return validation_result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error validating recipe {recipe_id}: {e}")
raise HTTPException(status_code=500, detail="Failed to validate recipe")
@router.get("/summary/active", response_model=List[RecipeSummary])
async def get_active_recipes_summary(
db: AsyncSession = Depends(get_db)
):
"""Get summary of all active recipes."""
try:
query = select(Recipe).where(Recipe.is_active == True).order_by(Recipe.name)
result = await db.execute(query)
recipes = result.scalars().all()
return [RecipeSummary.from_orm(recipe) for recipe in recipes]
except Exception as e:
logger.error(f"Error getting active recipes summary: {e}")
raise HTTPException(status_code=500, detail="Failed to get active recipes")

View File

@@ -0,0 +1,159 @@
"""
System management API endpoints.
"""
import logging
from typing import List, Dict, Any
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from ...safety.safety_monitor import safety_monitor
from ...safety.error_handler import error_handler
logger = logging.getLogger(__name__)
router = APIRouter()
class AlarmResponse(BaseModel):
"""Alarm response model."""
id: str
title: str
message: str
category: str
priority: str
state: str
timestamp: str
occurrence_count: int = 1
class SystemInfoResponse(BaseModel):
"""System information response."""
name: str
version: str
environment: str
uptime_seconds: float
active_alarms: int
error_count: int
@router.get("/info", response_model=SystemInfoResponse)
async def get_system_info():
"""Get system information."""
try:
alarm_summary = safety_monitor.get_alarm_summary()
error_stats = error_handler.get_error_statistics()
return SystemInfoResponse(
name="Chocolate Tempering Machine",
version="1.0.0",
environment="development",
uptime_seconds=0.0, # Would calculate actual uptime
active_alarms=alarm_summary["total_active_alarms"],
error_count=error_stats["active_errors"]
)
except Exception as e:
logger.error(f"Failed to get system info: {e}")
raise HTTPException(status_code=500, detail="Failed to get system information")
@router.get("/alarms", response_model=List[AlarmResponse])
async def get_alarms(priority: Optional[str] = None):
"""Get active alarms."""
try:
# Filter by priority if specified
priority_filter = None
if priority:
from ...safety.safety_monitor import AlarmPriority
try:
priority_filter = AlarmPriority(priority.upper())
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid priority: {priority}")
alarms = safety_monitor.get_active_alarms(priority_filter)
return [
AlarmResponse(
id=alarm.id,
title=alarm.title,
message=alarm.message,
category=alarm.category.value,
priority=alarm.priority.value,
state=alarm.state.value,
timestamp=alarm.timestamp.isoformat(),
occurrence_count=alarm.occurrence_count
)
for alarm in alarms
]
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get alarms: {e}")
raise HTTPException(status_code=500, detail="Failed to retrieve alarms")
@router.post("/alarms/{alarm_id}/acknowledge")
async def acknowledge_alarm(alarm_id: str, user: str = "system"):
"""Acknowledge an alarm."""
try:
success = await safety_monitor.acknowledge_alarm(alarm_id, user)
return {
"success": success,
"message": f"Alarm {alarm_id} acknowledged" if success else f"Failed to acknowledge alarm {alarm_id}"
}
except Exception as e:
logger.error(f"Failed to acknowledge alarm {alarm_id}: {e}")
raise HTTPException(status_code=500, detail="Failed to acknowledge alarm")
@router.get("/errors")
async def get_active_errors():
"""Get active system errors."""
try:
errors = error_handler.get_active_errors()
return {
"active_errors": [
{
"error_id": error.error_id,
"component": error.component,
"function_name": error.function_name,
"timestamp": error.timestamp.isoformat(),
"exception": str(error.exception),
"retry_count": error.retry_count,
}
for error in errors
],
"statistics": error_handler.get_error_statistics()
}
except Exception as e:
logger.error(f"Failed to get active errors: {e}")
raise HTTPException(status_code=500, detail="Failed to retrieve active errors")
@router.get("/configuration")
async def get_system_configuration():
"""Get system configuration."""
try:
from ....shared.config import settings
return {
"environment": settings.environment,
"debug": settings.debug,
"log_level": settings.log_level.value,
"app_name": settings.app_name,
"app_version": settings.app_version,
"api_version": settings.web.api_version,
"host": settings.web.host,
"port": settings.web.port,
}
except Exception as e:
logger.error(f"Failed to get system configuration: {e}")
raise HTTPException(status_code=500, detail="Failed to retrieve system configuration")

View File

@@ -0,0 +1,40 @@
"""
User management API endpoints.
"""
import logging
from typing import List, Optional
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
logger = logging.getLogger(__name__)
router = APIRouter()
# Placeholder schemas since user models aren't fully implemented
class UserResponse(BaseModel):
id: int
username: str
email: Optional[str]
full_name: Optional[str]
role: str
is_active: bool
class UserCreate(BaseModel):
username: str
email: Optional[str]
full_name: Optional[str]
role: str = "operator"
@router.get("/", response_model=List[UserResponse])
async def list_users():
"""Get list of users."""
# Placeholder implementation
return []
@router.post("/", response_model=UserResponse, status_code=201)
async def create_user(user_data: UserCreate):
"""Create a new user."""
# Placeholder implementation
raise HTTPException(status_code=501, detail="User management not yet implemented")

View File

@@ -0,0 +1,39 @@
"""Pydantic schemas for API request/response validation.
"""
from .recipe import RecipeCreate, RecipeUpdate, RecipeResponse, RecipeList
from .process import ProcessStart, ProcessStatus, ProcessResponse, ProcessLogResponse
from .hardware import HardwareStatus, TemperatureReading, MotorStatus, SafetyStatus
from .user import UserCreate, UserUpdate, UserResponse, UserLogin
from .system import SystemHealth, AlarmResponse, ErrorLogResponse
__all__ = [
# Recipe schemas
"RecipeCreate",
"RecipeUpdate",
"RecipeResponse",
"RecipeList",
# Process schemas
"ProcessStart",
"ProcessStatus",
"ProcessResponse",
"ProcessLogResponse",
# Hardware schemas
"HardwareStatus",
"TemperatureReading",
"MotorStatus",
"SafetyStatus",
# User schemas
"UserCreate",
"UserUpdate",
"UserResponse",
"UserLogin",
# System schemas
"SystemHealth",
"AlarmResponse",
"ErrorLogResponse",
]

View File

@@ -0,0 +1,127 @@
"""
Pydantic schemas for recipe-related API operations.
"""
from typing import Optional, List
from datetime import datetime
from pydantic import BaseModel, Field, validator
from ..models.recipe import RecipePhase
class RecipeBase(BaseModel):
"""Base recipe schema with common fields."""
name: str = Field(..., min_length=1, max_length=100, description="Recipe name")
description: Optional[str] = Field(None, max_length=500, description="Recipe description")
# Temperature goals
heating_goal: float = Field(..., ge=40.0, le=60.0, description="Target heating temperature (°C)")
cooling_goal: float = Field(..., ge=20.0, le=40.0, description="Target cooling temperature (°C)")
pouring_goal: Optional[float] = Field(None, ge=20.0, le=60.0, description="Target pouring temperature (°C)")
# Operating temperatures
tank_temp: float = Field(45.0, ge=20.0, le=80.0, description="Tank operating temperature (°C)")
fountain_temp: float = Field(32.0, ge=20.0, le=60.0, description="Fountain operating temperature (°C)")
# Motor settings
mixer_enabled: bool = Field(True, description="Enable mixer motor")
fountain_enabled: bool = Field(True, description="Enable fountain motor")
mold_heater_enabled: bool = Field(False, description="Enable mold heater")
vibration_enabled: bool = Field(False, description="Enable vibration motor")
vib_heater_enabled: bool = Field(False, description="Enable vibration heater")
# Pedal control
pedal_control_enabled: bool = Field(True, description="Enable pedal control")
pedal_on_time: float = Field(2.0, ge=0.1, le=10.0, description="Pedal on duration (seconds)")
pedal_off_time: float = Field(3.0, ge=0.1, le=10.0, description="Pedal off duration (seconds)")
@validator('cooling_goal')
def validate_cooling_goal(cls, v, values):
"""Ensure cooling goal is less than heating goal."""
heating_goal = values.get('heating_goal')
if heating_goal is not None and v >= heating_goal:
raise ValueError('Cooling goal must be less than heating goal')
return v
@validator('pouring_goal')
def validate_pouring_goal(cls, v, values):
"""Ensure pouring goal is within cooling and heating range."""
if v is None:
return v
cooling_goal = values.get('cooling_goal')
heating_goal = values.get('heating_goal')
if cooling_goal is not None and v < cooling_goal:
raise ValueError('Pouring goal must be greater than or equal to cooling goal')
if heating_goal is not None and v > heating_goal:
raise ValueError('Pouring goal must be less than or equal to heating goal')
return v
class RecipeCreate(RecipeBase):
"""Schema for creating a new recipe."""
pass
class RecipeUpdate(BaseModel):
"""Schema for updating an existing recipe."""
name: Optional[str] = Field(None, min_length=1, max_length=100)
description: Optional[str] = Field(None, max_length=500)
heating_goal: Optional[float] = Field(None, ge=40.0, le=60.0)
cooling_goal: Optional[float] = Field(None, ge=20.0, le=40.0)
pouring_goal: Optional[float] = Field(None, ge=20.0, le=60.0)
tank_temp: Optional[float] = Field(None, ge=20.0, le=80.0)
fountain_temp: Optional[float] = Field(None, ge=20.0, le=60.0)
mixer_enabled: Optional[bool] = None
fountain_enabled: Optional[bool] = None
mold_heater_enabled: Optional[bool] = None
vibration_enabled: Optional[bool] = None
vib_heater_enabled: Optional[bool] = None
pedal_control_enabled: Optional[bool] = None
pedal_on_time: Optional[float] = Field(None, ge=0.1, le=10.0)
pedal_off_time: Optional[float] = Field(None, ge=0.1, le=10.0)
is_active: Optional[bool] = None
class RecipeResponse(RecipeBase):
"""Schema for recipe API responses."""
id: int
created_at: datetime
updated_at: datetime
created_by: Optional[str] = None
version: int
is_active: bool
usage_count: int
last_used: Optional[datetime] = None
class Config:
from_attributes = True
class RecipeList(BaseModel):
"""Schema for recipe list responses."""
recipes: List[RecipeResponse]
total: int
page: int = 1
per_page: int = 50
class RecipeSummary(BaseModel):
"""Schema for recipe summary information."""
id: int
name: str
heating_goal: float
cooling_goal: float
is_active: bool
usage_count: int
last_used: Optional[datetime] = None
class Config:
from_attributes = True