Add models for process execution, recipes, system configuration, user management, and error logging

- Implemented ProcessSession, ProcessLog, and TemperatureReading models for tracking tempering processes.
- Created Recipe and RecipePhase models for managing chocolate tempering recipes.
- Developed SystemConfiguration, ErrorLog, and Backup models for system settings and error tracking.
- Introduced User and UserRole models for user management and authentication.
- Added basic structure for schemas and tests.
This commit is contained in:
2025-08-06 22:04:56 +02:00
parent 83b6a25fd5
commit 9cdd074a39
29 changed files with 3201 additions and 0 deletions

View File

@@ -0,0 +1,553 @@
"""
Hardware manager for chocolate tempering machine control.
Provides high-level interface to all hardware components via Modbus.
"""
import asyncio
import logging
from typing import Dict, List, Optional, Any, Tuple
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from enum import Enum
from .modbus_client import modbus_client, ModbusResponse
from ...shared.config import settings
from ...shared.database import get_sync_db
from ...shared.models.machine import HardwareMapping
logger = logging.getLogger(__name__)
class ComponentStatus(str, Enum):
"""Hardware component operational status."""
ONLINE = "online"
OFFLINE = "offline"
ERROR = "error"
MAINTENANCE = "maintenance"
UNKNOWN = "unknown"
class MotorState(str, Enum):
"""Motor operational states."""
STOPPED = "stopped"
STARTING = "starting"
RUNNING = "running"
STOPPING = "stopping"
ERROR = "error"
@dataclass
class TemperatureReading:
"""Temperature sensor reading with metadata."""
value: float
timestamp: datetime
sensor_name: str
units: str = "°C"
is_valid: bool = True
error_message: Optional[str] = None
@dataclass
class MotorStatus:
"""Motor status with control information."""
name: str
state: MotorState = MotorState.STOPPED
is_enabled: bool = False
current: Optional[float] = None
speed_percent: Optional[float] = None
runtime_hours: float = 0.0
last_started: Optional[datetime] = None
error_count: int = 0
last_error: Optional[str] = None
@dataclass
class SafetyStatus:
"""Safety system status."""
emergency_stop_active: bool = False
cover_sensor_closed: bool = True
temperature_alarms: List[str] = field(default_factory=list)
current_alarms: List[str] = field(default_factory=list)
communication_errors: int = 0
last_safety_check: Optional[datetime] = None
@dataclass
class HardwareStatus:
"""Complete hardware system status."""
temperatures: Dict[str, TemperatureReading] = field(default_factory=dict)
motors: Dict[str, MotorStatus] = field(default_factory=dict)
safety: SafetyStatus = field(default_factory=SafetyStatus)
communication_health: float = 100.0 # Percentage
last_update: Optional[datetime] = None
system_status: ComponentStatus = ComponentStatus.UNKNOWN
class HardwareManager:
"""
High-level hardware manager for chocolate tempering machine.
Manages all hardware components through Modbus communication.
"""
def __init__(self):
self.hardware_mappings: Dict[str, HardwareMapping] = {}
self.component_cache: Dict[str, Any] = {}
self.last_read_time: Dict[str, datetime] = {}
self.read_intervals: Dict[str, float] = {}
self.error_counts: Dict[str, int] = {}
# Hardware status
self.current_status = HardwareStatus()
# Control state
self._running = False
self._update_task: Optional[asyncio.Task] = None
# Known hardware components (from original system analysis)
self.temperature_sensors = {
"tank_bottom": 8, # Modbus address 8
"tank_wall": 9, # Modbus address 9
"pump": 10, # Modbus address 10
"fountain": 11, # Modbus address 11
}
self.digital_outputs = {
"mixer_motor": {"address": 0, "bit": 0},
"fountain_motor": {"address": 0, "bit": 1},
"tank_heater": {"address": 0, "bit": 2},
"pump_heater": {"address": 0, "bit": 3},
"cooling_valve": {"address": 0, "bit": 4},
"vibration_motor": {"address": 0, "bit": 5},
"mold_heater": {"address": 0, "bit": 6},
}
self.digital_inputs = {
"emergency_stop": {"address": 1, "bit": 0},
"cover_sensor": {"address": 1, "bit": 1},
"pedal_switch": {"address": 1, "bit": 2},
"level_sensor": {"address": 1, "bit": 3},
}
self.analog_inputs = {
"grid_voltage": 12,
"grid_frequency": 13,
"neutral_current": 14,
"motor1_current": 15,
"motor2_current": 16,
}
async def initialize(self) -> bool:
"""Initialize hardware manager and establish communications."""
try:
logger.info("Initializing hardware manager")
# Connect to Modbus
if not await modbus_client.connect():
logger.error("Failed to connect to Modbus RTU")
return False
# Load hardware mappings from database
await self._load_hardware_mappings()
# Initialize component status
await self._initialize_components()
# Start monitoring task
await self.start_monitoring()
logger.info("Hardware manager initialized successfully")
return True
except Exception as e:
logger.error(f"Hardware manager initialization failed: {e}")
return False
async def start_monitoring(self) -> None:
"""Start hardware monitoring background task."""
if self._update_task and not self._update_task.done():
return
self._running = True
self._update_task = asyncio.create_task(self._monitoring_loop())
logger.info("Hardware monitoring started")
async def stop_monitoring(self) -> None:
"""Stop hardware monitoring background task."""
self._running = False
if self._update_task:
self._update_task.cancel()
try:
await self._update_task
except asyncio.CancelledError:
pass
logger.info("Hardware monitoring stopped")
async def shutdown(self) -> None:
"""Shutdown hardware manager and close connections."""
logger.info("Shutting down hardware manager")
# Stop monitoring
await self.stop_monitoring()
# Turn off all motors and heaters for safety
await self.emergency_stop()
# Disconnect Modbus
await modbus_client.disconnect()
logger.info("Hardware manager shutdown complete")
async def _monitoring_loop(self) -> None:
"""Main monitoring loop for hardware status updates."""
while self._running:
try:
# Update temperatures
await self._update_temperatures()
# Update motor states
await self._update_motor_states()
# Update safety status
await self._update_safety_status()
# Update electrical parameters
await self._update_electrical_status()
# Update overall system status
self._update_system_status()
# Wait for next update cycle
await asyncio.sleep(settings.process.temperature_read_interval)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in hardware monitoring loop: {e}")
await asyncio.sleep(1.0) # Short delay before retry
async def _update_temperatures(self) -> None:
"""Update temperature sensor readings."""
for sensor_name, address in self.temperature_sensors.items():
try:
response = await modbus_client.read_input_registers(address, 1)
if response.success and response.value:
# Convert raw reading to temperature (assuming 0.1°C resolution)
temperature = response.value[0] * 0.1
reading = TemperatureReading(
value=temperature,
timestamp=datetime.now(),
sensor_name=sensor_name,
is_valid=True
)
# Validate temperature range
if not (settings.temperature.absolute_min_temp <= temperature <= settings.temperature.absolute_max_temp):
reading.is_valid = False
reading.error_message = f"Temperature {temperature}°C outside valid range"
logger.warning(reading.error_message)
self.current_status.temperatures[sensor_name] = reading
self.error_counts[sensor_name] = 0
else:
self.error_counts[sensor_name] = self.error_counts.get(sensor_name, 0) + 1
logger.error(f"Failed to read temperature sensor {sensor_name}: {response.error}")
except Exception as e:
self.error_counts[sensor_name] = self.error_counts.get(sensor_name, 0) + 1
logger.error(f"Exception reading temperature sensor {sensor_name}: {e}")
async def _update_motor_states(self) -> None:
"""Update motor status information."""
try:
# Read digital output states to get motor enable status
response = await modbus_client.read_coils(0, 8) # Read first 8 outputs
if response.success and response.value:
motor_states = response.value
for motor_name, config in self.digital_outputs.items():
if "motor" in motor_name:
bit_index = config["bit"]
is_enabled = bool(motor_states[bit_index]) if bit_index < len(motor_states) else False
# Update motor status
if motor_name not in self.current_status.motors:
self.current_status.motors[motor_name] = MotorStatus(name=motor_name)
motor_status = self.current_status.motors[motor_name]
motor_status.is_enabled = is_enabled
motor_status.state = MotorState.RUNNING if is_enabled else MotorState.STOPPED
except Exception as e:
logger.error(f"Failed to update motor states: {e}")
async def _update_safety_status(self) -> None:
"""Update safety system status."""
try:
# Read digital inputs for safety sensors
response = await modbus_client.read_discrete_inputs(1, 4)
if response.success and response.value:
inputs = response.value
self.current_status.safety.emergency_stop_active = bool(inputs[0]) if len(inputs) > 0 else False
self.current_status.safety.cover_sensor_closed = bool(inputs[1]) if len(inputs) > 1 else True
self.current_status.safety.last_safety_check = datetime.now()
except Exception as e:
logger.error(f"Failed to update safety status: {e}")
async def _update_electrical_status(self) -> None:
"""Update electrical parameters (voltage, current, frequency)."""
try:
# Read electrical parameters
for param_name, address in self.analog_inputs.items():
response = await modbus_client.read_input_registers(address, 1)
if response.success and response.value:
raw_value = response.value[0]
# Convert based on parameter type
if "voltage" in param_name:
value = raw_value * 0.1 # 0.1V resolution
elif "current" in param_name:
value = raw_value * 0.01 # 0.01A resolution
elif "frequency" in param_name:
value = raw_value * 0.01 # 0.01Hz resolution
else:
value = raw_value
# Check for alarms
if "current" in param_name:
limit_map = {
"neutral_current": settings.safety.max_neutral_current,
"motor1_current": settings.safety.max_motor1_current,
"motor2_current": settings.safety.max_motor2_current,
}
if param_name in limit_map and value > limit_map[param_name]:
alarm_msg = f"{param_name} over limit: {value}A > {limit_map[param_name]}A"
if alarm_msg not in self.current_status.safety.current_alarms:
self.current_status.safety.current_alarms.append(alarm_msg)
logger.warning(alarm_msg)
except Exception as e:
logger.error(f"Failed to update electrical status: {e}")
def _update_system_status(self) -> None:
"""Update overall system status based on component health."""
self.current_status.last_update = datetime.now()
# Calculate communication health
total_errors = sum(self.error_counts.values())
total_reads = len(self.error_counts) * 10 # Assume 10 reads per component
self.current_status.communication_health = max(0, (total_reads - total_errors) / max(1, total_reads) * 100)
# Determine overall system status
if self.current_status.safety.emergency_stop_active:
self.current_status.system_status = ComponentStatus.ERROR
elif total_errors > 10:
self.current_status.system_status = ComponentStatus.ERROR
elif self.current_status.communication_health < 80:
self.current_status.system_status = ComponentStatus.OFFLINE
else:
self.current_status.system_status = ComponentStatus.ONLINE
# Temperature Control Methods
async def get_temperature(self, sensor_name: str) -> Optional[TemperatureReading]:
"""Get current temperature reading for a specific sensor."""
return self.current_status.temperatures.get(sensor_name)
async def get_all_temperatures(self) -> Dict[str, TemperatureReading]:
"""Get all current temperature readings."""
return self.current_status.temperatures.copy()
async def get_average_tank_temperature(self) -> Optional[float]:
"""Get average temperature of tank sensors."""
tank_sensors = ["tank_bottom", "tank_wall"]
valid_temps = []
for sensor in tank_sensors:
reading = await self.get_temperature(sensor)
if reading and reading.is_valid:
valid_temps.append(reading.value)
return sum(valid_temps) / len(valid_temps) if valid_temps else None
# Motor Control Methods
async def set_motor_state(self, motor_name: str, enabled: bool) -> bool:
"""Enable or disable a motor."""
if motor_name not in self.digital_outputs:
logger.error(f"Unknown motor: {motor_name}")
return False
config = self.digital_outputs[motor_name]
address = config["address"]
bit = config["bit"]
try:
# Read current state
response = await modbus_client.read_coils(address, 8)
if not response.success:
logger.error(f"Failed to read current motor states: {response.error}")
return False
# Modify the specific bit
current_states = response.value[:]
current_states[bit] = enabled
# Write back the modified state
write_response = await modbus_client.write_multiple_coils(address, current_states)
if write_response.success:
logger.info(f"Motor {motor_name} {'enabled' if enabled else 'disabled'}")
return True
else:
logger.error(f"Failed to set motor {motor_name} state: {write_response.error}")
return False
except Exception as e:
logger.error(f"Exception setting motor {motor_name} state: {e}")
return False
async def enable_motor(self, motor_name: str) -> bool:
"""Enable a specific motor."""
return await self.set_motor_state(motor_name, True)
async def disable_motor(self, motor_name: str) -> bool:
"""Disable a specific motor."""
return await self.set_motor_state(motor_name, False)
async def disable_all_motors(self) -> bool:
"""Disable all motors for safety."""
success = True
for motor_name in self.digital_outputs:
if "motor" in motor_name:
result = await self.disable_motor(motor_name)
success = success and result
return success
# Heater Control Methods
async def set_heater_state(self, heater_name: str, enabled: bool) -> bool:
"""Enable or disable a heater."""
heater_map = {
"tank_heater": "tank_heater",
"pump_heater": "pump_heater",
"mold_heater": "mold_heater",
}
if heater_name not in heater_map:
logger.error(f"Unknown heater: {heater_name}")
return False
output_name = heater_map[heater_name]
return await self.set_motor_state(output_name, enabled) # Same method as motors
async def enable_heater(self, heater_name: str) -> bool:
"""Enable a specific heater."""
return await self.set_heater_state(heater_name, True)
async def disable_heater(self, heater_name: str) -> bool:
"""Disable a specific heater."""
return await self.set_heater_state(heater_name, False)
async def disable_all_heaters(self) -> bool:
"""Disable all heaters for safety."""
heaters = ["tank_heater", "pump_heater", "mold_heater"]
success = True
for heater in heaters:
result = await self.disable_heater(heater)
success = success and result
return success
# Safety Methods
async def emergency_stop(self) -> bool:
"""Execute emergency stop - disable all motors and heaters."""
logger.warning("EMERGENCY STOP ACTIVATED")
motor_success = await self.disable_all_motors()
heater_success = await self.disable_all_heaters()
success = motor_success and heater_success
if success:
logger.info("Emergency stop completed successfully")
else:
logger.error("Emergency stop failed to complete")
return success
async def is_safe_to_operate(self) -> Tuple[bool, List[str]]:
"""Check if system is safe to operate."""
issues = []
# Check emergency stop
if self.current_status.safety.emergency_stop_active:
issues.append("Emergency stop is active")
# Check cover sensor
if not self.current_status.safety.cover_sensor_closed:
issues.append("Safety cover is open")
# Check temperature limits
for sensor_name, reading in self.current_status.temperatures.items():
if not reading.is_valid:
issues.append(f"Temperature sensor {sensor_name} error: {reading.error_message}")
# Check communication health
if self.current_status.communication_health < 50:
issues.append("Poor communication with hardware")
# Check for current alarms
if self.current_status.safety.current_alarms:
issues.extend(self.current_status.safety.current_alarms)
return len(issues) == 0, issues
# Status and Information Methods
def get_hardware_status(self) -> HardwareStatus:
"""Get complete hardware status."""
return self.current_status
def get_communication_statistics(self) -> Dict[str, Any]:
"""Get Modbus communication statistics."""
return modbus_client.get_statistics()
async def _load_hardware_mappings(self) -> None:
"""Load hardware mappings from database."""
try:
# This would load from database in production
# For now, we'll use the hardcoded mappings
logger.info("Hardware mappings loaded from configuration")
except Exception as e:
logger.error(f"Failed to load hardware mappings: {e}")
async def _initialize_components(self) -> None:
"""Initialize all hardware components."""
# Initialize motor status objects
for motor_name in self.digital_outputs:
if "motor" in motor_name:
self.current_status.motors[motor_name] = MotorStatus(name=motor_name)
# Initialize error counters
for sensor_name in self.temperature_sensors:
self.error_counts[sensor_name] = 0
# Global hardware manager instance
hardware_manager = HardwareManager()

View File

@@ -0,0 +1,393 @@
"""
Modbus RTU client for chocolate tempering machine hardware communication.
Provides async interface to industrial hardware via RS-485/serial.
"""
import asyncio
import logging
from typing import Dict, List, Optional, Union, Any
from datetime import datetime, timedelta
from dataclasses import dataclass
from enum import Enum
from pymodbus.client import AsyncModbusSerialClient
from pymodbus.exceptions import ModbusException, ConnectionException
from pymodbus.pdu import ExceptionResponse
from ...shared.config import settings
logger = logging.getLogger(__name__)
class ModbusFunction(Enum):
"""Supported Modbus function codes."""
READ_COILS = 0x01
READ_DISCRETE_INPUTS = 0x02
READ_HOLDING_REGISTERS = 0x03
READ_INPUT_REGISTERS = 0x04
WRITE_SINGLE_COIL = 0x05
WRITE_SINGLE_REGISTER = 0x06
WRITE_MULTIPLE_COILS = 0x0F
WRITE_MULTIPLE_REGISTERS = 0x10
@dataclass
class ModbusRequest:
"""Modbus operation request."""
function: ModbusFunction
address: int
count: Optional[int] = None
value: Optional[Union[int, List[int], bool, List[bool]]] = None
slave_id: int = 1
retry_count: int = 0
max_retries: int = 3
timeout: float = 2.0
priority: int = 1 # Lower number = higher priority
@dataclass
class ModbusResponse:
"""Modbus operation response."""
success: bool
value: Optional[Union[int, List[int], bool, List[bool]]] = None
error: Optional[str] = None
timestamp: Optional[datetime] = None
duration_ms: Optional[float] = None
class ModbusClientError(Exception):
"""Custom exception for Modbus client errors."""
pass
class AsyncModbusClient:
"""
Asynchronous Modbus RTU client with connection management and error recovery.
"""
def __init__(self):
self.client: Optional[AsyncModbusSerialClient] = None
self.is_connected = False
self.connection_lock = asyncio.Lock()
self.request_queue: asyncio.Queue = asyncio.Queue()
self.response_cache: Dict[str, ModbusResponse] = {}
self.last_successful_read = datetime.now()
self.consecutive_errors = 0
self.max_consecutive_errors = 10
# Configuration
self.port = settings.serial.port
self.baudrate = settings.serial.baudrate
self.timeout = settings.serial.timeout
self.slave_address = settings.modbus.slave_address
self.max_read_registers = settings.modbus.max_read_registers
self.max_write_registers = settings.modbus.max_write_registers
# Statistics
self.stats = {
"total_requests": 0,
"successful_requests": 0,
"failed_requests": 0,
"connection_errors": 0,
"timeout_errors": 0,
"last_error": None,
"uptime_start": datetime.now()
}
async def connect(self) -> bool:
"""Establish Modbus RTU connection."""
async with self.connection_lock:
try:
if self.client:
await self.disconnect()
logger.info(f"Connecting to Modbus RTU on {self.port} at {self.baudrate} baud")
self.client = AsyncModbusSerialClient(
port=self.port,
baudrate=self.baudrate,
bytesize=settings.serial.bytesize,
parity=settings.serial.parity,
stopbits=settings.serial.stopbits,
timeout=self.timeout,
)
connected = await self.client.connect()
if connected:
self.is_connected = True
self.consecutive_errors = 0
logger.info("Modbus RTU connection established successfully")
return True
else:
self.is_connected = False
self.stats["connection_errors"] += 1
logger.error("Failed to establish Modbus RTU connection")
return False
except Exception as e:
self.is_connected = False
self.stats["connection_errors"] += 1
logger.error(f"Modbus connection error: {e}")
return False
async def disconnect(self) -> None:
"""Close Modbus connection."""
async with self.connection_lock:
if self.client:
try:
self.client.close()
logger.info("Modbus RTU connection closed")
except Exception as e:
logger.error(f"Error closing Modbus connection: {e}")
finally:
self.client = None
self.is_connected = False
async def ensure_connected(self) -> bool:
"""Ensure Modbus connection is active, reconnect if necessary."""
if not self.is_connected or not self.client:
return await self.connect()
# Test connection with a simple read
try:
# Try to read a single register to test connection
result = await self.client.read_holding_registers(0, 1, slave=self.slave_address)
if isinstance(result, ExceptionResponse):
# Connection might be broken, try to reconnect
logger.warning("Connection test failed, attempting reconnection")
return await self.connect()
return True
except Exception as e:
logger.warning(f"Connection test error: {e}, attempting reconnection")
return await self.connect()
async def read_coils(self, address: int, count: int, slave_id: Optional[int] = None) -> ModbusResponse:
"""Read coil states (digital outputs)."""
return await self._execute_request(ModbusRequest(
function=ModbusFunction.READ_COILS,
address=address,
count=count,
slave_id=slave_id or self.slave_address
))
async def read_discrete_inputs(self, address: int, count: int, slave_id: Optional[int] = None) -> ModbusResponse:
"""Read discrete input states (digital inputs)."""
return await self._execute_request(ModbusRequest(
function=ModbusFunction.READ_DISCRETE_INPUTS,
address=address,
count=count,
slave_id=slave_id or self.slave_address
))
async def read_holding_registers(self, address: int, count: int, slave_id: Optional[int] = None) -> ModbusResponse:
"""Read holding register values (analog outputs/settings)."""
return await self._execute_request(ModbusRequest(
function=ModbusFunction.READ_HOLDING_REGISTERS,
address=address,
count=count,
slave_id=slave_id or self.slave_address
))
async def read_input_registers(self, address: int, count: int, slave_id: Optional[int] = None) -> ModbusResponse:
"""Read input register values (analog inputs/sensors)."""
return await self._execute_request(ModbusRequest(
function=ModbusFunction.READ_INPUT_REGISTERS,
address=address,
count=count,
slave_id=slave_id or self.slave_address
))
async def write_coil(self, address: int, value: bool, slave_id: Optional[int] = None) -> ModbusResponse:
"""Write single coil state (digital output)."""
return await self._execute_request(ModbusRequest(
function=ModbusFunction.WRITE_SINGLE_COIL,
address=address,
value=value,
slave_id=slave_id or self.slave_address
))
async def write_register(self, address: int, value: int, slave_id: Optional[int] = None) -> ModbusResponse:
"""Write single register value (analog output/setting)."""
return await self._execute_request(ModbusRequest(
function=ModbusFunction.WRITE_SINGLE_REGISTER,
address=address,
value=value,
slave_id=slave_id or self.slave_address
))
async def write_multiple_coils(self, address: int, values: List[bool], slave_id: Optional[int] = None) -> ModbusResponse:
"""Write multiple coil states (digital outputs)."""
return await self._execute_request(ModbusRequest(
function=ModbusFunction.WRITE_MULTIPLE_COILS,
address=address,
value=values,
slave_id=slave_id or self.slave_address
))
async def write_multiple_registers(self, address: int, values: List[int], slave_id: Optional[int] = None) -> ModbusResponse:
"""Write multiple register values (analog outputs/settings)."""
return await self._execute_request(ModbusRequest(
function=ModbusFunction.WRITE_MULTIPLE_REGISTERS,
address=address,
value=values,
slave_id=slave_id or self.slave_address
))
async def _execute_request(self, request: ModbusRequest) -> ModbusResponse:
"""Execute a Modbus request with error handling and retries."""
start_time = datetime.now()
for attempt in range(request.max_retries + 1):
try:
# Ensure connection is active
if not await self.ensure_connected():
raise ModbusClientError("Failed to establish connection")
# Execute the request based on function code
result = await self._perform_operation(request)
# Process successful response
if not isinstance(result, ExceptionResponse):
self.consecutive_errors = 0
self.last_successful_read = datetime.now()
self.stats["successful_requests"] += 1
response = ModbusResponse(
success=True,
value=self._extract_value(result, request.function),
timestamp=datetime.now(),
duration_ms=(datetime.now() - start_time).total_seconds() * 1000
)
logger.debug(f"Modbus request successful: {request.function.name} addr={request.address}")
return response
else:
raise ModbusException(f"Modbus exception response: {result}")
except Exception as e:
self.consecutive_errors += 1
self.stats["failed_requests"] += 1
error_msg = f"Modbus request failed (attempt {attempt + 1}/{request.max_retries + 1}): {e}"
logger.error(error_msg)
# Check if we should trigger connection recovery
if self.consecutive_errors >= self.max_consecutive_errors:
logger.critical("Too many consecutive errors, forcing reconnection")
await self.disconnect()
# If this was the last attempt, return error response
if attempt >= request.max_retries:
self.stats["last_error"] = str(e)
return ModbusResponse(
success=False,
error=error_msg,
timestamp=datetime.now(),
duration_ms=(datetime.now() - start_time).total_seconds() * 1000
)
# Wait before retry with exponential backoff
retry_delay = settings.serial.retry_delay * (2 ** attempt)
await asyncio.sleep(retry_delay)
# Should never reach here
return ModbusResponse(success=False, error="Unexpected error in request execution")
async def _perform_operation(self, request: ModbusRequest):
"""Perform the actual Modbus operation."""
if not self.client:
raise ModbusClientError("Modbus client not initialized")
slave = request.slave_id
if request.function == ModbusFunction.READ_COILS:
return await self.client.read_coils(request.address, request.count, slave=slave)
elif request.function == ModbusFunction.READ_DISCRETE_INPUTS:
return await self.client.read_discrete_inputs(request.address, request.count, slave=slave)
elif request.function == ModbusFunction.READ_HOLDING_REGISTERS:
return await self.client.read_holding_registers(request.address, request.count, slave=slave)
elif request.function == ModbusFunction.READ_INPUT_REGISTERS:
return await self.client.read_input_registers(request.address, request.count, slave=slave)
elif request.function == ModbusFunction.WRITE_SINGLE_COIL:
return await self.client.write_coil(request.address, request.value, slave=slave)
elif request.function == ModbusFunction.WRITE_SINGLE_REGISTER:
return await self.client.write_register(request.address, request.value, slave=slave)
elif request.function == ModbusFunction.WRITE_MULTIPLE_COILS:
return await self.client.write_coils(request.address, request.value, slave=slave)
elif request.function == ModbusFunction.WRITE_MULTIPLE_REGISTERS:
return await self.client.write_registers(request.address, request.value, slave=slave)
else:
raise ModbusClientError(f"Unsupported Modbus function: {request.function}")
def _extract_value(self, result, function: ModbusFunction) -> Union[int, List[int], bool, List[bool]]:
"""Extract value from Modbus response."""
if function in [ModbusFunction.READ_COILS, ModbusFunction.READ_DISCRETE_INPUTS]:
# Boolean values
if hasattr(result, 'bits'):
return result.bits
return []
elif function in [ModbusFunction.READ_HOLDING_REGISTERS, ModbusFunction.READ_INPUT_REGISTERS]:
# Register values
if hasattr(result, 'registers'):
return result.registers
return []
elif function in [ModbusFunction.WRITE_SINGLE_COIL, ModbusFunction.WRITE_SINGLE_REGISTER,
ModbusFunction.WRITE_MULTIPLE_COILS, ModbusFunction.WRITE_MULTIPLE_REGISTERS]:
# Write operations - return success status
return True
return None
def get_statistics(self) -> Dict[str, Any]:
"""Get client operation statistics."""
uptime = datetime.now() - self.stats["uptime_start"]
return {
**self.stats,
"is_connected": self.is_connected,
"consecutive_errors": self.consecutive_errors,
"uptime_hours": uptime.total_seconds() / 3600,
"success_rate": (
self.stats["successful_requests"] / max(1, self.stats["total_requests"]) * 100
if self.stats["total_requests"] > 0 else 0
),
"last_successful_read": self.last_successful_read.isoformat(),
}
def reset_statistics(self) -> None:
"""Reset operation statistics."""
self.stats = {
"total_requests": 0,
"successful_requests": 0,
"failed_requests": 0,
"connection_errors": 0,
"timeout_errors": 0,
"last_error": None,
"uptime_start": datetime.now()
}
async def health_check(self) -> bool:
"""Perform a health check on the Modbus connection."""
try:
# Try to read a register to test communication
response = await self.read_holding_registers(0, 1)
return response.success
except Exception as e:
logger.error(f"Health check failed: {e}")
return False
# Global Modbus client instance
modbus_client = AsyncModbusClient()

View File

@@ -0,0 +1,254 @@
"""
Pydantic configuration settings for the chocolate tempering machine control system.
Provides type-safe configuration management with validation.
"""
import logging
from enum import Enum
from pathlib import Path
from typing import Optional, Dict, Any
from pydantic import BaseModel, Field, validator, root_validator
from pydantic_settings import BaseSettings
class LogLevel(str, Enum):
"""Logging levels."""
CRITICAL = "CRITICAL"
ERROR = "ERROR"
WARNING = "WARNING"
INFO = "INFO"
DEBUG = "DEBUG"
class SerialConfig(BaseModel):
"""Serial communication configuration."""
port: str = Field(default="/dev/ttyUSB0", description="Serial port device path")
baudrate: int = Field(default=9600, ge=1200, le=115200, description="Serial baudrate")
timeout: float = Field(default=1.0, ge=0.1, le=10.0, description="Communication timeout in seconds")
bytesize: int = Field(default=8, ge=5, le=8, description="Data bits")
parity: str = Field(default="N", regex="^[NEO]$", description="Parity: N/E/O")
stopbits: float = Field(default=1, description="Stop bits")
max_retries: int = Field(default=3, ge=0, le=10, description="Maximum retry attempts")
retry_delay: float = Field(default=0.1, ge=0.01, le=1.0, description="Delay between retries")
class ModbusConfig(BaseModel):
"""Modbus RTU configuration."""
slave_address: int = Field(default=1, ge=1, le=247, description="Modbus slave address")
read_timeout: float = Field(default=2.0, ge=0.5, le=10.0, description="Read operation timeout")
write_timeout: float = Field(default=2.0, ge=0.5, le=10.0, description="Write operation timeout")
max_read_registers: int = Field(default=125, ge=1, le=125, description="Maximum registers per read")
max_write_registers: int = Field(default=100, ge=1, le=100, description="Maximum registers per write")
connection_check_interval: float = Field(default=10.0, ge=1.0, le=300.0, description="Connection health check interval")
class TemperatureConfig(BaseModel):
"""Temperature control configuration."""
# Safety limits
absolute_max_temp: float = Field(default=80.0, ge=20.0, le=100.0, description="Absolute maximum temperature (°C)")
absolute_min_temp: float = Field(default=10.0, ge=0.0, le=30.0, description="Absolute minimum temperature (°C)")
# Process parameters
tank_max_heat: float = Field(default=60.0, ge=20.0, le=80.0, description="Tank maximum heating temperature (°C)")
pump_max_heat: float = Field(default=55.0, ge=20.0, le=80.0, description="Pump maximum heating temperature (°C)")
pump_min_heat: float = Field(default=25.0, ge=10.0, le=40.0, description="Pump minimum heating temperature (°C)")
# Temperature tolerances
temperature_tolerance: float = Field(default=0.5, ge=0.1, le=2.0, description="Temperature control tolerance (°C)")
sensor_accuracy: float = Field(default=0.1, ge=0.05, le=1.0, description="Temperature sensor accuracy (°C)")
# PID control defaults
default_kp: float = Field(default=1.0, ge=0.0, le=10.0, description="Default proportional gain")
default_ki: float = Field(default=0.1, ge=0.0, le=1.0, description="Default integral gain")
default_kd: float = Field(default=0.01, ge=0.0, le=1.0, description="Default derivative gain")
default_kl: float = Field(default=100.0, ge=0.0, le=1000.0, description="Default output limit")
@validator('tank_max_heat')
def validate_tank_max_heat(cls, v, values):
if 'absolute_max_temp' in values and v > values['absolute_max_temp']:
raise ValueError('Tank max heat cannot exceed absolute max temperature')
return v
@validator('pump_max_heat')
def validate_pump_max_heat(cls, v, values):
if 'absolute_max_temp' in values and v > values['absolute_max_temp']:
raise ValueError('Pump max heat cannot exceed absolute max temperature')
return v
class SafetyConfig(BaseModel):
"""Safety monitoring configuration."""
# Electrical safety limits
grid_voltage_nominal: float = Field(default=230.0, ge=100.0, le=400.0, description="Nominal grid voltage (V)")
grid_voltage_tolerance: float = Field(default=0.1, ge=0.05, le=0.2, description="Grid voltage tolerance (±%)")
grid_frequency_nominal: float = Field(default=50.0, ge=45.0, le=65.0, description="Nominal grid frequency (Hz)")
grid_frequency_tolerance: float = Field(default=0.02, ge=0.01, le=0.05, description="Grid frequency tolerance (±%)")
# Current limits
max_neutral_current: float = Field(default=16.0, ge=1.0, le=63.0, description="Maximum neutral current (A)")
max_motor1_current: float = Field(default=10.0, ge=1.0, le=32.0, description="Maximum motor 1 current (A)")
max_motor2_current: float = Field(default=10.0, ge=1.0, le=32.0, description="Maximum motor 2 current (A)")
# Error handling
error_check_interval: float = Field(default=1.0, ge=0.1, le=10.0, description="Error monitoring interval (s)")
auto_recovery_attempts: int = Field(default=3, ge=0, le=10, description="Automatic recovery attempts")
auto_recovery_delay: float = Field(default=5.0, ge=1.0, le=60.0, description="Auto recovery delay (s)")
# Communication timeouts
communication_timeout: float = Field(default=3.0, ge=1.0, le=30.0, description="Communication timeout (s)")
heartbeat_interval: float = Field(default=1.0, ge=0.1, le=10.0, description="Heartbeat interval (s)")
class ProcessConfig(BaseModel):
"""Process timing and control configuration."""
# Phase delays (seconds)
heating_delay: float = Field(default=60.0, ge=0.0, le=300.0, description="Heating phase delay (s)")
cooling_delay: float = Field(default=120.0, ge=0.0, le=600.0, description="Cooling phase delay (s)")
pouring_delay: float = Field(default=30.0, ge=0.0, le=120.0, description="Pouring phase delay (s)")
pump_delay: float = Field(default=10.0, ge=0.0, le=60.0, description="Pump startup delay (s)")
mixer_delay: float = Field(default=5.0, ge=0.0, le=30.0, description="Mixer startup delay (s)")
# Control loop intervals
process_control_interval: float = Field(default=0.5, ge=0.1, le=2.0, description="Process control loop interval (s)")
temperature_read_interval: float = Field(default=1.0, ge=0.1, le=5.0, description="Temperature reading interval (s)")
status_update_interval: float = Field(default=2.0, ge=0.5, le=10.0, description="Status update interval (s)")
# Recipe validation
min_heating_goal: float = Field(default=40.0, ge=20.0, le=80.0, description="Minimum heating goal temperature (°C)")
max_heating_goal: float = Field(default=60.0, ge=40.0, le=80.0, description="Maximum heating goal temperature (°C)")
min_cooling_goal: float = Field(default=20.0, ge=10.0, le=40.0, description="Minimum cooling goal temperature (°C)")
max_cooling_goal: float = Field(default=40.0, ge=20.0, le=60.0, description="Maximum cooling goal temperature (°C)")
@root_validator
def validate_temperature_goals(cls, values):
min_heating = values.get('min_heating_goal', 40.0)
max_cooling = values.get('max_cooling_goal', 40.0)
if max_cooling >= min_heating:
raise ValueError('Maximum cooling goal must be less than minimum heating goal')
return values
class DatabaseConfig(BaseModel):
"""Database configuration."""
url: str = Field(default="sqlite:///tempering_machine.db", description="Database connection URL")
echo: bool = Field(default=False, description="Enable SQL query logging")
pool_pre_ping: bool = Field(default=True, description="Validate connections before use")
pool_recycle: int = Field(default=3600, ge=60, le=86400, description="Connection recycle time (s)")
max_overflow: int = Field(default=10, ge=0, le=100, description="Maximum connection overflow")
pool_size: int = Field(default=5, ge=1, le=50, description="Connection pool size")
class RedisConfig(BaseModel):
"""Redis configuration for message queue."""
host: str = Field(default="localhost", description="Redis host")
port: int = Field(default=6379, ge=1, le=65535, description="Redis port")
db: int = Field(default=0, ge=0, le=15, description="Redis database number")
password: Optional[str] = Field(default=None, description="Redis password")
socket_timeout: float = Field(default=5.0, ge=1.0, le=30.0, description="Socket timeout (s)")
connection_pool_max_connections: int = Field(default=10, ge=1, le=100, description="Max pool connections")
class WebConfig(BaseModel):
"""Web service configuration."""
host: str = Field(default="0.0.0.0", description="Server bind address")
port: int = Field(default=8000, ge=1024, le=65535, description="Server port")
workers: int = Field(default=1, ge=1, le=10, description="Number of worker processes")
reload: bool = Field(default=False, description="Enable auto-reload in development")
access_log: bool = Field(default=True, description="Enable access logging")
cors_origins: list[str] = Field(default=["http://localhost:3000"], description="CORS allowed origins")
api_title: str = Field(default="Chocolate Tempering Machine API", description="API title")
api_version: str = Field(default="1.0.0", description="API version")
class Settings(BaseSettings):
"""Main application settings."""
# Environment
environment: str = Field(default="development", regex="^(development|production|testing)$")
debug: bool = Field(default=False, description="Enable debug mode")
log_level: LogLevel = Field(default=LogLevel.INFO, description="Logging level")
# Component configurations
serial: SerialConfig = Field(default_factory=SerialConfig)
modbus: ModbusConfig = Field(default_factory=ModbusConfig)
temperature: TemperatureConfig = Field(default_factory=TemperatureConfig)
safety: SafetyConfig = Field(default_factory=SafetyConfig)
process: ProcessConfig = Field(default_factory=ProcessConfig)
database: DatabaseConfig = Field(default_factory=DatabaseConfig)
redis: RedisConfig = Field(default_factory=RedisConfig)
web: WebConfig = Field(default_factory=WebConfig)
# File paths
data_directory: Path = Field(default=Path("data"), description="Data storage directory")
log_directory: Path = Field(default=Path("logs"), description="Log file directory")
config_directory: Path = Field(default=Path("config"), description="Configuration directory")
backup_directory: Path = Field(default=Path("backups"), description="Backup directory")
# Application metadata
app_name: str = Field(default="Chocolate Tempering Machine", description="Application name")
app_version: str = Field(default="1.0.0", description="Application version")
class Config:
env_file = ".env"
env_prefix = "TEMPERING_"
env_nested_delimiter = "__"
case_sensitive = False
@validator('data_directory', 'log_directory', 'config_directory', 'backup_directory')
def ensure_directory_exists(cls, v):
"""Ensure directories exist."""
if isinstance(v, str):
v = Path(v)
v.mkdir(parents=True, exist_ok=True)
return v
def get_logging_config(self) -> Dict[str, Any]:
"""Get logging configuration dictionary."""
return {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"default": {
"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s",
},
"detailed": {
"format": "%(asctime)s - %(name)s - %(levelname)s - %(module)s - %(funcName)s:%(lineno)d - %(message)s",
},
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"level": self.log_level.value,
"formatter": "default",
},
"file": {
"class": "logging.handlers.RotatingFileHandler",
"level": self.log_level.value,
"formatter": "detailed",
"filename": str(self.log_directory / "tempering_machine.log"),
"maxBytes": 10485760, # 10MB
"backupCount": 10,
},
},
"loggers": {
"tempering_machine": {
"level": self.log_level.value,
"handlers": ["console", "file"],
"propagate": False,
},
"uvicorn": {
"level": "INFO",
"handlers": ["console"],
"propagate": False,
},
},
"root": {
"level": self.log_level.value,
"handlers": ["console"],
},
}
# Global settings instance
settings = Settings()

View File

@@ -0,0 +1,209 @@
"""
SQLAlchemy database setup and session management.
Provides async database connectivity for the tempering machine control system.
"""
from contextlib import asynccontextmanager
from typing import AsyncGenerator
from sqlalchemy import create_engine, MetaData, event
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy.pool import StaticPool
from .config import settings
# Database metadata and base model
metadata = MetaData(
naming_convention={
"ix": "ix_%(column_0_label)s",
"uq": "uq_%(table_name)s_%(column_0_name)s",
"ck": "ck_%(table_name)s_%(constraint_name)s",
"fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
"pk": "pk_%(table_name)s"
}
)
Base = declarative_base(metadata=metadata)
class DatabaseManager:
"""Database connection and session manager."""
def __init__(self):
self.engine = None
self.async_engine = None
self.session_factory = None
self.async_session_factory = None
def init_sync_db(self) -> None:
"""Initialize synchronous database connection."""
database_url = settings.database.url
# Handle SQLite-specific configuration
if database_url.startswith("sqlite"):
connect_args = {
"check_same_thread": False,
"timeout": 20,
"isolation_level": None, # Autocommit mode
}
self.engine = create_engine(
database_url,
echo=settings.database.echo,
connect_args=connect_args,
poolclass=StaticPool,
pool_pre_ping=settings.database.pool_pre_ping,
pool_recycle=settings.database.pool_recycle,
)
else:
# PostgreSQL or other databases
self.engine = create_engine(
database_url,
echo=settings.database.echo,
pool_size=settings.database.pool_size,
max_overflow=settings.database.max_overflow,
pool_pre_ping=settings.database.pool_pre_ping,
pool_recycle=settings.database.pool_recycle,
)
self.session_factory = sessionmaker(
autocommit=False,
autoflush=False,
bind=self.engine,
)
# Enable WAL mode for SQLite for better concurrent access
if database_url.startswith("sqlite"):
@event.listens_for(self.engine, "connect")
def set_sqlite_pragma(dbapi_connection, connection_record):
cursor = dbapi_connection.cursor()
# Enable WAL mode for better concurrency
cursor.execute("PRAGMA journal_mode=WAL")
# Set synchronous mode for better performance
cursor.execute("PRAGMA synchronous=NORMAL")
# Enable foreign key constraints
cursor.execute("PRAGMA foreign_keys=ON")
# Set busy timeout
cursor.execute("PRAGMA busy_timeout=30000")
cursor.close()
def init_async_db(self) -> None:
"""Initialize asynchronous database connection."""
database_url = settings.database.url
# Convert sync SQLite URL to async
if database_url.startswith("sqlite"):
async_database_url = database_url.replace("sqlite://", "sqlite+aiosqlite://")
connect_args = {
"check_same_thread": False,
"timeout": 20,
}
self.async_engine = create_async_engine(
async_database_url,
echo=settings.database.echo,
connect_args=connect_args,
poolclass=StaticPool,
pool_pre_ping=settings.database.pool_pre_ping,
pool_recycle=settings.database.pool_recycle,
)
else:
# Convert PostgreSQL URL to async
if database_url.startswith("postgresql://"):
async_database_url = database_url.replace("postgresql://", "postgresql+asyncpg://")
else:
async_database_url = database_url
self.async_engine = create_async_engine(
async_database_url,
echo=settings.database.echo,
pool_size=settings.database.pool_size,
max_overflow=settings.database.max_overflow,
pool_pre_ping=settings.database.pool_pre_ping,
pool_recycle=settings.database.pool_recycle,
)
self.async_session_factory = async_sessionmaker(
self.async_engine,
class_=AsyncSession,
autocommit=False,
autoflush=False,
)
def create_tables(self) -> None:
"""Create all database tables."""
if not self.engine:
raise RuntimeError("Database engine not initialized")
Base.metadata.create_all(bind=self.engine)
async def create_tables_async(self) -> None:
"""Create all database tables asynchronously."""
if not self.async_engine:
raise RuntimeError("Async database engine not initialized")
async with self.async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
def get_session(self):
"""Get a synchronous database session."""
if not self.session_factory:
raise RuntimeError("Database session factory not initialized")
return self.session_factory()
@asynccontextmanager
async def get_async_session(self) -> AsyncGenerator[AsyncSession, None]:
"""Get an asynchronous database session."""
if not self.async_session_factory:
raise RuntimeError("Async database session factory not initialized")
async with self.async_session_factory() as session:
try:
yield session
except Exception:
await session.rollback()
raise
finally:
await session.close()
async def close_async(self) -> None:
"""Close async database connections."""
if self.async_engine:
await self.async_engine.dispose()
def close_sync(self) -> None:
"""Close synchronous database connections."""
if self.engine:
self.engine.dispose()
# Global database manager instance
db_manager = DatabaseManager()
def init_database() -> None:
"""Initialize database connections."""
db_manager.init_sync_db()
db_manager.init_async_db()
async def get_db() -> AsyncGenerator[AsyncSession, None]:
"""Dependency for FastAPI to get database session."""
async with db_manager.get_async_session() as session:
yield session
def get_sync_db():
"""Get synchronous database session for non-async contexts."""
return db_manager.get_session()
async def create_tables():
"""Create all database tables."""
await db_manager.create_tables_async()
async def close_database():
"""Close database connections."""
await db_manager.close_async()
db_manager.close_sync()

View File

@@ -0,0 +1,32 @@
"""Database models for the chocolate tempering machine control system.
"""
from .recipe import Recipe, RecipePhase
from .machine import MachineConfiguration, HardwareMapping
from .process import ProcessSession, ProcessLog, TemperatureReading
from .user import User, UserRole
from .system import ErrorLog, SystemConfiguration, Backup
__all__ = [
# Recipe models
"Recipe",
"RecipePhase",
# Machine models
"MachineConfiguration",
"HardwareMapping",
# Process models
"ProcessSession",
"ProcessLog",
"TemperatureReading",
# User models
"User",
"UserRole",
# System models
"ErrorLog",
"SystemConfiguration",
"Backup",
]

View File

@@ -0,0 +1,207 @@
"""
Machine configuration and hardware mapping models.
"""
from datetime import datetime, timezone
from enum import Enum
from typing import Optional
from sqlalchemy import Column, DateTime, Float, Integer, String, Text, Boolean, JSON
from sqlalchemy.orm import relationship
from ..database import Base
class MachineConfiguration(Base):
"""
Machine configuration parameters and limits.
Based on the original Machine.csv structure.
"""
__tablename__ = "machine_configurations"
id = Column(Integer, primary_key=True, index=True)
# Temperature limits (°C)
tank_max_heat = Column(Float, nullable=False, default=60.0, comment="Tank maximum heating temperature (°C)")
pump_max_heat = Column(Float, nullable=False, default=55.0, comment="Pump maximum heating temperature (°C)")
pump_min_heat = Column(Float, nullable=False, default=25.0, comment="Pump minimum heating temperature (°C)")
absolute_max_temp = Column(Float, nullable=False, default=80.0, comment="Absolute maximum temperature (°C)")
absolute_min_temp = Column(Float, nullable=False, default=10.0, comment="Absolute minimum temperature (°C)")
# Process delays (seconds)
pump_delay = Column(Float, nullable=False, default=10.0, comment="Pump startup delay (seconds)")
mixer_delay = Column(Float, nullable=False, default=5.0, comment="Mixer startup delay (seconds)")
heating_delay = Column(Float, nullable=False, default=60.0, comment="Heating phase delay (seconds)")
cooling_delay = Column(Float, nullable=False, default=120.0, comment="Cooling phase delay (seconds)")
pouring_delay = Column(Float, nullable=False, default=30.0, comment="Pouring phase delay (seconds)")
# PID control parameters for different zones
pid_parameters = Column(JSON, nullable=True, comment="PID control parameters by zone")
# Safety limits
max_neutral_current = Column(Float, nullable=False, default=16.0, comment="Maximum neutral current (A)")
max_motor1_current = Column(Float, nullable=False, default=10.0, comment="Maximum motor 1 current (A)")
max_motor2_current = Column(Float, nullable=False, default=10.0, comment="Maximum motor 2 current (A)")
grid_voltage_nominal = Column(Float, nullable=False, default=230.0, comment="Nominal grid voltage (V)")
grid_frequency_nominal = Column(Float, nullable=False, default=50.0, comment="Nominal grid frequency (Hz)")
# Configuration metadata
name = Column(String(100), nullable=False, unique=True, index=True, comment="Configuration name")
description = Column(Text, nullable=True, comment="Configuration description")
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc))
is_active = Column(Boolean, nullable=False, default=True, comment="Configuration is active")
version = Column(Integer, nullable=False, default=1, comment="Configuration version")
def __repr__(self) -> str:
return f"<MachineConfiguration(id={self.id}, name='{self.name}', tank_max_heat={self.tank_max_heat})>"
def validate_parameters(self) -> bool:
"""Validate machine configuration parameters."""
# Temperature validations
if self.tank_max_heat > self.absolute_max_temp:
return False
if self.pump_max_heat > self.absolute_max_temp:
return False
if self.pump_min_heat < self.absolute_min_temp:
return False
if self.pump_min_heat >= self.pump_max_heat:
return False
# Safety limit validations
if self.max_neutral_current <= 0 or self.max_motor1_current <= 0 or self.max_motor2_current <= 0:
return False
# Timing validations
if any(delay < 0 for delay in [self.pump_delay, self.mixer_delay, self.heating_delay, self.cooling_delay, self.pouring_delay]):
return False
return True
@classmethod
def from_csv_row(cls, row: dict) -> "MachineConfiguration":
"""Create MachineConfiguration from CSV row data (migration utility)."""
return cls(
id=int(row.get("ID", 1)),
name=f"Config_{row.get('ID', 1)}",
tank_max_heat=float(row.get("TankMaxHeat", 60.0)),
pump_max_heat=float(row.get("PumbMaxHeat", 55.0)), # Note: original CSV has typo "Pumb"
pump_min_heat=float(row.get("PumbMinHeat", 25.0)),
absolute_max_temp=float(row.get("AbsMaxTemp", 80.0)),
absolute_min_temp=float(row.get("AbsMinTemp", 10.0)),
pump_delay=float(row.get("PumbDelay", 10.0)),
mixer_delay=float(row.get("MixerDelay", 5.0)),
heating_delay=float(row.get("HeatingDelay", 60.0)),
cooling_delay=float(row.get("CoolingDelay", 120.0)),
pouring_delay=float(row.get("PouringDelay", 30.0)),
)
class HardwareType(str, Enum):
"""Types of hardware components."""
TEMPERATURE_SENSOR = "temperature_sensor"
DIGITAL_INPUT = "digital_input"
DIGITAL_OUTPUT = "digital_output"
ANALOG_INPUT = "analog_input"
ANALOG_OUTPUT = "analog_output"
MOTOR_CONTROL = "motor_control"
SAFETY_RELAY = "safety_relay"
class HardwareMapping(Base):
"""
Hardware component to Modbus address mapping.
Based on the original Mapping.csv structure.
"""
__tablename__ = "hardware_mappings"
id = Column(Integer, primary_key=True, index=True)
# Hardware identification
component_name = Column(String(50), nullable=False, unique=True, index=True, comment="Hardware component name")
component_type = Column(String(20), nullable=False, comment="Hardware component type")
description = Column(Text, nullable=True, comment="Component description")
# Modbus addressing
modbus_address = Column(Integer, nullable=False, comment="Modbus register address")
bit_position = Column(Integer, nullable=True, comment="Bit position for digital I/O (0-15)")
data_type = Column(String(20), nullable=False, default="uint16", comment="Data type (uint16, float32, bool, etc.)")
# Scaling and conversion
scale_factor = Column(Float, nullable=False, default=1.0, comment="Scaling factor for value conversion")
offset = Column(Float, nullable=False, default=0.0, comment="Offset for value conversion")
units = Column(String(20), nullable=True, comment="Engineering units (°C, A, V, etc.)")
# Operational parameters
read_frequency = Column(Float, nullable=False, default=1.0, comment="Read frequency (Hz)")
is_readable = Column(Boolean, nullable=False, default=True, comment="Component can be read")
is_writable = Column(Boolean, nullable=False, default=False, comment="Component can be written")
# Safety and limits
min_value = Column(Float, nullable=True, comment="Minimum safe value")
max_value = Column(Float, nullable=True, comment="Maximum safe value")
alarm_low = Column(Float, nullable=True, comment="Low alarm threshold")
alarm_high = Column(Float, nullable=True, comment="High alarm threshold")
# Configuration metadata
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc))
is_active = Column(Boolean, nullable=False, default=True, comment="Mapping is active")
def __repr__(self) -> str:
return f"<HardwareMapping(name='{self.component_name}', address={self.modbus_address}, type='{self.component_type}')>"
def convert_raw_value(self, raw_value: int) -> float:
"""Convert raw Modbus value to engineering units."""
if self.component_type == "digital_input" or self.component_type == "digital_output":
# Extract bit value if bit_position is specified
if self.bit_position is not None:
return float((raw_value >> self.bit_position) & 1)
return float(raw_value)
# Apply scaling and offset for analog values
return (raw_value * self.scale_factor) + self.offset
def convert_to_raw_value(self, engineering_value: float) -> int:
"""Convert engineering value to raw Modbus value."""
if self.component_type == "digital_input" or self.component_type == "digital_output":
return int(engineering_value)
# Remove offset and apply inverse scaling
raw_value = (engineering_value - self.offset) / self.scale_factor
return int(round(raw_value))
def is_within_limits(self, value: float) -> bool:
"""Check if value is within safe operating limits."""
if self.min_value is not None and value < self.min_value:
return False
if self.max_value is not None and value > self.max_value:
return False
return True
def is_alarm_condition(self, value: float) -> tuple[bool, Optional[str]]:
"""Check for alarm conditions."""
if self.alarm_low is not None and value <= self.alarm_low:
return True, "LOW_ALARM"
if self.alarm_high is not None and value >= self.alarm_high:
return True, "HIGH_ALARM"
return False, None
@classmethod
def from_csv_row(cls, row: dict) -> "HardwareMapping":
"""Create HardwareMapping from CSV row data (migration utility)."""
return cls(
component_name=row.get("Name", "Unknown"),
component_type=row.get("Type", "unknown"),
description=row.get("Description", ""),
modbus_address=int(row.get("Address", 0)),
bit_position=int(row.get("Bit", 0)) if row.get("Bit") else None,
data_type=row.get("DataType", "uint16"),
scale_factor=float(row.get("Scale", 1.0)),
offset=float(row.get("Offset", 0.0)),
units=row.get("Units", ""),
min_value=float(row.get("MinValue")) if row.get("MinValue") else None,
max_value=float(row.get("MaxValue")) if row.get("MaxValue") else None,
alarm_low=float(row.get("AlarmLow")) if row.get("AlarmLow") else None,
alarm_high=float(row.get("AlarmHigh")) if row.get("AlarmHigh") else None,
)

View File

@@ -0,0 +1,232 @@
"""
Process execution and monitoring models.
"""
from datetime import datetime, timezone
from enum import Enum
from typing import Optional
from uuid import uuid4, UUID
from sqlalchemy import Column, DateTime, Enum as SQLEnum, Float, ForeignKey, Integer, String, Text, Boolean, JSON
from sqlalchemy.dialects.postgresql import UUID as PostgresUUID
from sqlalchemy.orm import relationship
from sqlalchemy.types import TypeDecorator, CHAR
from ..database import Base
from .recipe import RecipePhase
class GUID(TypeDecorator):
"""Platform-independent GUID type."""
impl = CHAR
cache_ok = True
def load_dialect_impl(self, dialect):
if dialect.name == 'postgresql':
return dialect.type_descriptor(PostgresUUID())
else:
return dialect.type_descriptor(CHAR(36))
def process_bind_param(self, value, dialect):
if value is None:
return value
elif dialect.name == 'postgresql':
return str(value)
else:
if not isinstance(value, UUID):
return str(UUID(value))
else:
return str(value)
def process_result_value(self, value, dialect):
if value is None:
return value
else:
if not isinstance(value, UUID):
return UUID(value)
return value
class ProcessStatus(str, Enum):
"""Process execution status."""
IDLE = "idle"
STARTING = "starting"
RUNNING = "running"
PAUSED = "paused"
STOPPING = "stopping"
COMPLETED = "completed"
ABORTED = "aborted"
ERROR = "error"
class ProcessSession(Base):
"""
Process execution session tracking.
Records complete tempering process runs.
"""
__tablename__ = "process_sessions"
# Primary identification
id = Column(GUID(), primary_key=True, default=uuid4, unique=True, nullable=False)
session_name = Column(String(100), nullable=True, comment="Human-readable session name")
# Recipe association
recipe_id = Column(Integer, ForeignKey("recipes.id"), nullable=False, index=True)
recipe = relationship("Recipe", back_populates="process_sessions")
# Process state
status = Column(SQLEnum(ProcessStatus), nullable=False, default=ProcessStatus.IDLE, index=True)
current_phase = Column(SQLEnum(RecipePhase), nullable=True, index=True)
phase_start_time = Column(DateTime(timezone=True), nullable=True)
# Timing information
started_at = Column(DateTime(timezone=True), nullable=True)
completed_at = Column(DateTime(timezone=True), nullable=True)
paused_duration = Column(Integer, nullable=False, default=0, comment="Total paused time in seconds")
# Process parameters (snapshot of recipe at execution time)
process_parameters = Column(JSON, nullable=True, comment="Recipe parameters when process started")
# Results and metrics
target_heating_temp = Column(Float, nullable=True, comment="Target heating temperature (°C)")
target_cooling_temp = Column(Float, nullable=True, comment="Target cooling temperature (°C)")
achieved_heating_temp = Column(Float, nullable=True, comment="Actual achieved heating temperature (°C)")
achieved_cooling_temp = Column(Float, nullable=True, comment="Actual achieved cooling temperature (°C)")
# Quality metrics
temperature_accuracy = Column(Float, nullable=True, comment="Average temperature accuracy (°C)")
cycle_efficiency = Column(Float, nullable=True, comment="Process efficiency percentage")
energy_consumption = Column(Float, nullable=True, comment="Total energy consumption (kWh)")
# Error and stop information
stop_reason = Column(String(200), nullable=True, comment="Reason for process stop/abort")
error_count = Column(Integer, nullable=False, default=0, comment="Number of errors during process")
warning_count = Column(Integer, nullable=False, default=0, comment="Number of warnings during process")
# User information
started_by = Column(String(50), nullable=True, comment="User who started the process")
stopped_by = Column(String(50), nullable=True, comment="User who stopped the process")
# Relationships
temperature_readings = relationship("TemperatureReading", back_populates="session")
process_logs = relationship("ProcessLog", back_populates="session")
def __repr__(self) -> str:
return f"<ProcessSession(id={self.id}, recipe_id={self.recipe_id}, status='{self.status}', phase='{self.current_phase}')>"
@property
def duration(self) -> Optional[int]:
"""Get process duration in seconds."""
if self.started_at is None:
return None
end_time = self.completed_at or datetime.now(timezone.utc)
duration = (end_time - self.started_at).total_seconds()
return int(duration - self.paused_duration)
@property
def is_active(self) -> bool:
"""Check if process is currently active."""
return self.status in [ProcessStatus.STARTING, ProcessStatus.RUNNING, ProcessStatus.PAUSED]
def get_phase_duration(self) -> Optional[int]:
"""Get current phase duration in seconds."""
if self.phase_start_time is None:
return None
return int((datetime.now(timezone.utc) - self.phase_start_time).total_seconds())
class LogLevel(str, Enum):
"""Log message severity levels."""
DEBUG = "debug"
INFO = "info"
WARNING = "warning"
ERROR = "error"
CRITICAL = "critical"
class ProcessLog(Base):
"""
Process execution log entries.
Records all significant events during process execution.
"""
__tablename__ = "process_logs"
id = Column(Integer, primary_key=True, index=True)
# Session association
session_id = Column(GUID(), ForeignKey("process_sessions.id"), nullable=False, index=True)
session = relationship("ProcessSession", back_populates="process_logs")
# Log entry details
timestamp = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), index=True)
level = Column(SQLEnum(LogLevel), nullable=False, default=LogLevel.INFO, index=True)
message = Column(Text, nullable=False, comment="Log message")
component = Column(String(50), nullable=True, comment="System component that generated the log")
# Process context
phase = Column(SQLEnum(RecipePhase), nullable=True, comment="Recipe phase when log was generated")
temperature_tank = Column(Float, nullable=True, comment="Tank temperature at time of log (°C)")
temperature_fountain = Column(Float, nullable=True, comment="Fountain temperature at time of log (°C)")
# Additional data
additional_data = Column(JSON, nullable=True, comment="Additional structured data")
def __repr__(self) -> str:
return f"<ProcessLog(id={self.id}, session_id={self.session_id}, level='{self.level}', timestamp={self.timestamp})>"
class TemperatureReading(Base):
"""
Temperature sensor readings during process execution.
High-frequency data collection for monitoring and analysis.
"""
__tablename__ = "temperature_readings"
id = Column(Integer, primary_key=True, index=True)
# Session association
session_id = Column(GUID(), ForeignKey("process_sessions.id"), nullable=False, index=True)
session = relationship("ProcessSession", back_populates="temperature_readings")
# Timing
timestamp = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), index=True)
phase = Column(SQLEnum(RecipePhase), nullable=True, index=True)
# Temperature readings (°C)
tank_bottom = Column(Float, nullable=True, comment="Tank bottom temperature sensor (°C)")
tank_wall = Column(Float, nullable=True, comment="Tank wall temperature sensor (°C)")
pump = Column(Float, nullable=True, comment="Pump temperature sensor (°C)")
fountain = Column(Float, nullable=True, comment="Fountain temperature sensor (°C)")
# Ambient conditions
ambient_temp = Column(Float, nullable=True, comment="Ambient temperature (°C)")
humidity = Column(Float, nullable=True, comment="Ambient humidity (%)")
# Process targets for comparison
target_temperature = Column(Float, nullable=True, comment="Target temperature for current phase (°C)")
temperature_error = Column(Float, nullable=True, comment="Temperature error from target (°C)")
# Control outputs
heating_output = Column(Float, nullable=True, comment="Heating control output (%)")
cooling_output = Column(Float, nullable=True, comment="Cooling control output (%)")
def __repr__(self) -> str:
return f"<TemperatureReading(id={self.id}, session_id={self.session_id}, timestamp={self.timestamp})>"
@property
def average_tank_temp(self) -> Optional[float]:
"""Calculate average tank temperature from available sensors."""
temps = [t for t in [self.tank_bottom, self.tank_wall] if t is not None]
return sum(temps) / len(temps) if temps else None
def get_temperature_by_zone(self, zone: str) -> Optional[float]:
"""Get temperature reading for a specific zone."""
zone_map = {
"tank_bottom": self.tank_bottom,
"tank_wall": self.tank_wall,
"pump": self.pump,
"fountain": self.fountain,
"ambient": self.ambient_temp,
}
return zone_map.get(zone.lower())

View File

@@ -0,0 +1,157 @@
"""
Recipe models for chocolate tempering process management.
"""
from datetime import datetime, timezone
from enum import Enum
from typing import Optional
from uuid import uuid4
from sqlalchemy import Column, DateTime, Enum as SQLEnum, Float, ForeignKey, Integer, String, Text, Boolean
from sqlalchemy.orm import relationship
from ..database import Base
class RecipePhase(str, Enum):
"""Chocolate tempering process phases."""
PREHEATING = "preheating"
HEATING = "heating"
HEATING_DELAY = "heating_delay"
COOLING = "cooling"
COOLING_DELAY = "cooling_delay"
POURING = "pouring"
COMPLETED = "completed"
STOPPED = "stopped"
ERROR = "error"
class Recipe(Base):
"""
Recipe configuration for chocolate tempering process.
Based on the original RecipeTable.csv structure.
"""
__tablename__ = "recipes"
# Primary identification
id = Column(Integer, primary_key=True, index=True)
name = Column(String(100), nullable=False, unique=True, index=True)
description = Column(Text, nullable=True)
# Temperature goals (°C)
heating_goal = Column(Float, nullable=False, comment="Target heating temperature (°C)")
cooling_goal = Column(Float, nullable=False, comment="Target cooling temperature (°C)")
pouring_goal = Column(Float, nullable=True, comment="Target pouring temperature (°C)")
# Tank and fountain temperature settings
tank_temp = Column(Float, nullable=False, default=45.0, comment="Tank operating temperature (°C)")
fountain_temp = Column(Float, nullable=False, default=32.0, comment="Fountain operating temperature (°C)")
# Motor control settings (boolean flags)
mixer_enabled = Column(Boolean, nullable=False, default=True, comment="Enable mixer motor")
fountain_enabled = Column(Boolean, nullable=False, default=True, comment="Enable fountain motor")
mold_heater_enabled = Column(Boolean, nullable=False, default=False, comment="Enable mold heater")
vibration_enabled = Column(Boolean, nullable=False, default=False, comment="Enable vibration motor")
vib_heater_enabled = Column(Boolean, nullable=False, default=False, comment="Enable vibration heater")
# Pedal control settings
pedal_control_enabled = Column(Boolean, nullable=False, default=True, comment="Enable pedal control")
pedal_on_time = Column(Float, nullable=False, default=2.0, comment="Pedal on duration (seconds)")
pedal_off_time = Column(Float, nullable=False, default=3.0, comment="Pedal off duration (seconds)")
# Recipe metadata
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc))
created_by = Column(String(50), nullable=True, comment="User who created the recipe")
version = Column(Integer, nullable=False, default=1, comment="Recipe version number")
is_active = Column(Boolean, nullable=False, default=True, comment="Recipe is active and can be used")
# Usage statistics
usage_count = Column(Integer, nullable=False, default=0, comment="Number of times recipe was used")
last_used = Column(DateTime(timezone=True), nullable=True, comment="Last time recipe was used")
# Relationships
process_sessions = relationship("ProcessSession", back_populates="recipe")
def __repr__(self) -> str:
return f"<Recipe(id={self.id}, name='{self.name}', heating_goal={self.heating_goal}, cooling_goal={self.cooling_goal})>"
def validate_temperatures(self) -> bool:
"""Validate recipe temperature parameters."""
# Cooling goal must be less than heating goal
if self.cooling_goal >= self.heating_goal:
return False
# Temperature ranges validation
if not (20.0 <= self.cooling_goal <= 40.0):
return False
if not (40.0 <= self.heating_goal <= 60.0):
return False
# Optional pouring goal validation
if self.pouring_goal is not None:
if not (self.cooling_goal <= self.pouring_goal <= self.heating_goal):
return False
return True
def get_phase_sequence(self) -> list[RecipePhase]:
"""Get the sequence of phases for this recipe."""
return [
RecipePhase.PREHEATING,
RecipePhase.HEATING,
RecipePhase.HEATING_DELAY,
RecipePhase.COOLING,
RecipePhase.COOLING_DELAY,
RecipePhase.POURING
]
def to_dict(self) -> dict:
"""Convert recipe to dictionary."""
return {
"id": self.id,
"name": self.name,
"description": self.description,
"heating_goal": self.heating_goal,
"cooling_goal": self.cooling_goal,
"pouring_goal": self.pouring_goal,
"tank_temp": self.tank_temp,
"fountain_temp": self.fountain_temp,
"mixer_enabled": self.mixer_enabled,
"fountain_enabled": self.fountain_enabled,
"mold_heater_enabled": self.mold_heater_enabled,
"vibration_enabled": self.vibration_enabled,
"vib_heater_enabled": self.vib_heater_enabled,
"pedal_control_enabled": self.pedal_control_enabled,
"pedal_on_time": self.pedal_on_time,
"pedal_off_time": self.pedal_off_time,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
"created_by": self.created_by,
"version": self.version,
"is_active": self.is_active,
"usage_count": self.usage_count,
"last_used": self.last_used.isoformat() if self.last_used else None,
}
@classmethod
def from_csv_row(cls, row: dict) -> "Recipe":
"""Create Recipe instance from CSV row data (migration utility)."""
return cls(
id=int(row.get("ID", 0)),
name=row.get("Name", "Unknown Recipe"),
heating_goal=float(row.get("HeatingGoal", 46.0)),
cooling_goal=float(row.get("CoolingGoal", 27.0)),
pouring_goal=float(row.get("PouringGoal", 30.0)) if row.get("PouringGoal") else None,
tank_temp=float(row.get("TankTemp", 45.0)),
fountain_temp=float(row.get("FountainTemp", 32.0)),
mixer_enabled=bool(int(row.get("Mixer", 1))),
fountain_enabled=bool(int(row.get("Fountain", 1))),
mold_heater_enabled=bool(int(row.get("MoldHeater", 0))),
vibration_enabled=bool(int(row.get("Vibration", 0))),
vib_heater_enabled=bool(int(row.get("VibHeater", 0))),
pedal_control_enabled=bool(int(row.get("Pedal", 1))),
pedal_on_time=float(row.get("PedalOnTime", 2.0)),
pedal_off_time=float(row.get("PedalOffTime", 3.0)),
)

View File

@@ -0,0 +1,287 @@
"""
System configuration, error logging, and backup models.
"""
from datetime import datetime, timezone
from enum import Enum
from typing import Optional
from uuid import uuid4
from sqlalchemy import Column, DateTime, Enum as SQLEnum, Float, Integer, String, Text, Boolean, JSON, LargeBinary
from ..database import Base
class ErrorSeverity(str, Enum):
"""Error severity levels."""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class ErrorCategory(str, Enum):
"""Error categories for classification."""
HARDWARE = "hardware"
COMMUNICATION = "communication"
TEMPERATURE = "temperature"
SAFETY = "safety"
POWER = "power"
PROCESS = "process"
USER = "user"
SYSTEM = "system"
class ErrorLog(Base):
"""
System error and event logging.
Captures all errors, warnings, and significant events.
"""
__tablename__ = "error_logs"
id = Column(Integer, primary_key=True, index=True)
# Error identification
error_code = Column(String(20), nullable=True, index=True, comment="Standardized error code")
category = Column(SQLEnum(ErrorCategory), nullable=False, default=ErrorCategory.SYSTEM, index=True)
severity = Column(SQLEnum(ErrorSeverity), nullable=False, default=ErrorSeverity.LOW, index=True)
# Error details
title = Column(String(200), nullable=False, comment="Error title/summary")
message = Column(Text, nullable=False, comment="Detailed error message")
component = Column(String(50), nullable=True, comment="System component where error occurred")
# Context information
timestamp = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), index=True)
session_id = Column(String(36), nullable=True, comment="Associated process session ID")
user_id = Column(Integer, nullable=True, comment="User who triggered the error (if applicable)")
# Technical details
stack_trace = Column(Text, nullable=True, comment="Error stack trace for debugging")
additional_data = Column(JSON, nullable=True, comment="Additional diagnostic data")
# Resolution tracking
is_resolved = Column(Boolean, nullable=False, default=False, comment="Error has been resolved")
resolution_notes = Column(Text, nullable=True, comment="Notes on how error was resolved")
resolved_by = Column(String(50), nullable=True, comment="User who resolved the error")
resolved_at = Column(DateTime(timezone=True), nullable=True, comment="When error was resolved")
# Hardware/Process context
temperature_tank = Column(Float, nullable=True, comment="Tank temperature when error occurred (°C)")
temperature_fountain = Column(Float, nullable=True, comment="Fountain temperature when error occurred (°C)")
current_phase = Column(String(20), nullable=True, comment="Process phase when error occurred")
# Recurrence tracking
occurrence_count = Column(Integer, nullable=False, default=1, comment="Number of times this error has occurred")
first_occurrence = Column(DateTime(timezone=True), nullable=True, comment="When this error first occurred")
last_occurrence = Column(DateTime(timezone=True), nullable=True, comment="When this error last occurred")
def __repr__(self) -> str:
return f"<ErrorLog(id={self.id}, category='{self.category}', severity='{self.severity}', title='{self.title}')>"
def is_critical(self) -> bool:
"""Check if error is critical severity."""
return self.severity == ErrorSeverity.CRITICAL
def requires_immediate_attention(self) -> bool:
"""Check if error requires immediate operator attention."""
return self.severity in [ErrorSeverity.HIGH, ErrorSeverity.CRITICAL] and not self.is_resolved
def mark_resolved(self, resolved_by: str, notes: Optional[str] = None) -> None:
"""Mark error as resolved."""
self.is_resolved = True
self.resolved_by = resolved_by
self.resolved_at = datetime.now(timezone.utc)
if notes:
self.resolution_notes = notes
class SystemConfiguration(Base):
"""
System-wide configuration parameters.
Stores global settings and operational parameters.
"""
__tablename__ = "system_configurations"
id = Column(Integer, primary_key=True, index=True)
# Configuration identification
key = Column(String(100), nullable=False, unique=True, index=True, comment="Configuration parameter key")
category = Column(String(50), nullable=False, index=True, comment="Configuration category")
description = Column(Text, nullable=True, comment="Parameter description")
# Value storage (polymorphic)
value_string = Column(Text, nullable=True, comment="String value")
value_number = Column(Float, nullable=True, comment="Numeric value")
value_boolean = Column(Boolean, nullable=True, comment="Boolean value")
value_json = Column(JSON, nullable=True, comment="JSON/object value")
# Validation and constraints
data_type = Column(String(20), nullable=False, default="string", comment="Expected data type")
min_value = Column(Float, nullable=True, comment="Minimum allowed value (for numeric types)")
max_value = Column(Float, nullable=True, comment="Maximum allowed value (for numeric types)")
valid_options = Column(JSON, nullable=True, comment="Valid options for enum-type parameters")
# Configuration metadata
is_readonly = Column(Boolean, nullable=False, default=False, comment="Parameter cannot be modified via UI")
requires_restart = Column(Boolean, nullable=False, default=False, comment="Change requires system restart")
is_sensitive = Column(Boolean, nullable=False, default=False, comment="Parameter contains sensitive data")
# Change tracking
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc))
last_modified_by = Column(String(50), nullable=True, comment="User who last modified this parameter")
# Default and factory values
default_value = Column(Text, nullable=True, comment="Default/factory value")
previous_value = Column(Text, nullable=True, comment="Previous value (for rollback)")
def __repr__(self) -> str:
return f"<SystemConfiguration(key='{self.key}', category='{self.category}', type='{self.data_type}')>"
def get_value(self):
"""Get the configuration value with appropriate type conversion."""
if self.data_type == "string":
return self.value_string
elif self.data_type == "number" or self.data_type == "float":
return self.value_number
elif self.data_type == "integer":
return int(self.value_number) if self.value_number is not None else None
elif self.data_type == "boolean":
return self.value_boolean
elif self.data_type == "json":
return self.value_json
else:
return self.value_string
def set_value(self, value, modified_by: Optional[str] = None):
"""Set the configuration value with type validation."""
# Store previous value for rollback
self.previous_value = str(self.get_value()) if self.get_value() is not None else None
# Set new value based on data type
if self.data_type == "string":
self.value_string = str(value) if value is not None else None
elif self.data_type in ["number", "float", "integer"]:
if value is not None:
self.value_number = float(value)
# Validate range
if self.min_value is not None and self.value_number < self.min_value:
raise ValueError(f"Value {value} is below minimum {self.min_value}")
if self.max_value is not None and self.value_number > self.max_value:
raise ValueError(f"Value {value} is above maximum {self.max_value}")
elif self.data_type == "boolean":
self.value_boolean = bool(value) if value is not None else None
elif self.data_type == "json":
self.value_json = value
# Update metadata
self.updated_at = datetime.now(timezone.utc)
self.last_modified_by = modified_by
def validate_value(self, value) -> bool:
"""Validate a value against this configuration's constraints."""
try:
if self.data_type in ["number", "float", "integer"]:
num_value = float(value)
if self.min_value is not None and num_value < self.min_value:
return False
if self.max_value is not None and num_value > self.max_value:
return False
if self.valid_options and value not in self.valid_options:
return False
return True
except (ValueError, TypeError):
return False
class BackupStatus(str, Enum):
"""Backup operation status."""
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
CORRUPTED = "corrupted"
class Backup(Base):
"""
System backup records.
Tracks configuration and data backups.
"""
__tablename__ = "backups"
id = Column(Integer, primary_key=True, index=True)
# Backup identification
backup_name = Column(String(100), nullable=False, comment="Backup name/identifier")
backup_type = Column(String(20), nullable=False, default="full", comment="Backup type (full, incremental, config)")
description = Column(Text, nullable=True, comment="Backup description")
# Backup metadata
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), index=True)
created_by = Column(String(50), nullable=True, comment="User who initiated the backup")
status = Column(SQLEnum(BackupStatus), nullable=False, default=BackupStatus.PENDING, index=True)
# File information
file_path = Column(String(500), nullable=True, comment="Path to backup file")
file_size = Column(Integer, nullable=True, comment="Backup file size in bytes")
checksum = Column(String(64), nullable=True, comment="MD5/SHA256 checksum for integrity")
# Content information
tables_included = Column(JSON, nullable=True, comment="List of database tables included")
record_count = Column(Integer, nullable=True, comment="Total number of records backed up")
# Compression and encryption
is_compressed = Column(Boolean, nullable=False, default=True, comment="Backup is compressed")
is_encrypted = Column(Boolean, nullable=False, default=False, comment="Backup is encrypted")
compression_ratio = Column(Float, nullable=True, comment="Compression ratio achieved")
# Backup operation metrics
duration_seconds = Column(Integer, nullable=True, comment="Time taken to create backup")
error_message = Column(Text, nullable=True, comment="Error message if backup failed")
# Retention and cleanup
expires_at = Column(DateTime(timezone=True), nullable=True, comment="When backup expires and can be deleted")
is_archived = Column(Boolean, nullable=False, default=False, comment="Backup has been archived")
# Restoration tracking
last_restored = Column(DateTime(timezone=True), nullable=True, comment="When backup was last restored")
restored_by = Column(String(50), nullable=True, comment="User who last restored from this backup")
def __repr__(self) -> str:
return f"<Backup(id={self.id}, name='{self.backup_name}', type='{self.backup_type}', status='{self.status}')>"
def is_expired(self) -> bool:
"""Check if backup has expired."""
if self.expires_at is None:
return False
return datetime.now(timezone.utc) > self.expires_at
def is_restorable(self) -> bool:
"""Check if backup can be restored."""
return (
self.status == BackupStatus.COMPLETED
and not self.is_expired()
and self.file_path is not None
)
def calculate_size_mb(self) -> Optional[float]:
"""Get backup size in MB."""
if self.file_size is None:
return None
return self.file_size / (1024 * 1024)
def mark_completed(self, file_path: str, file_size: int, checksum: str) -> None:
"""Mark backup as completed with file information."""
self.status = BackupStatus.COMPLETED
self.file_path = file_path
self.file_size = file_size
self.checksum = checksum
def mark_failed(self, error_message: str) -> None:
"""Mark backup as failed with error message."""
self.status = BackupStatus.FAILED
self.error_message = error_message

View File

@@ -0,0 +1,164 @@
"""
User management and authentication models.
"""
from datetime import datetime, timezone
from enum import Enum
from typing import Optional
from sqlalchemy import Column, DateTime, Enum as SQLEnum, Integer, String, Boolean, Text
from ..database import Base
class UserRole(str, Enum):
"""User access roles."""
OPERATOR = "operator" # Basic machine operation
SUPERVISOR = "supervisor" # Recipe management, advanced operations
TECHNICIAN = "technician" # Maintenance, diagnostics, configuration
ADMIN = "admin" # Full system access
class User(Base):
"""
User account for system access and audit trail.
Based on the original Users.csv structure.
"""
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
# User identification
username = Column(String(50), nullable=False, unique=True, index=True, comment="Unique username")
email = Column(String(100), nullable=True, unique=True, index=True, comment="User email address")
full_name = Column(String(100), nullable=True, comment="User full name")
# Authentication
password_hash = Column(String(255), nullable=False, comment="Hashed password")
salt = Column(String(50), nullable=True, comment="Password salt")
# Authorization
role = Column(SQLEnum(UserRole), nullable=False, default=UserRole.OPERATOR, index=True)
permissions = Column(Text, nullable=True, comment="Additional permissions (JSON)")
# Account status
is_active = Column(Boolean, nullable=False, default=True, comment="Account is active")
is_locked = Column(Boolean, nullable=False, default=False, comment="Account is locked")
failed_login_attempts = Column(Integer, nullable=False, default=0, comment="Failed login count")
# Timestamps
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc))
last_login = Column(DateTime(timezone=True), nullable=True, comment="Last successful login")
last_login_attempt = Column(DateTime(timezone=True), nullable=True, comment="Last login attempt")
password_changed_at = Column(DateTime(timezone=True), nullable=True, comment="Last password change")
# Profile information
phone = Column(String(20), nullable=True, comment="Phone number")
department = Column(String(50), nullable=True, comment="Department or team")
shift = Column(String(20), nullable=True, comment="Work shift")
badge_number = Column(String(20), nullable=True, comment="Employee badge number")
# Preferences
language = Column(String(5), nullable=False, default="en", comment="Preferred language")
timezone = Column(String(50), nullable=False, default="UTC", comment="User timezone")
# Security
require_password_change = Column(Boolean, nullable=False, default=False, comment="Force password change on next login")
session_timeout = Column(Integer, nullable=False, default=3600, comment="Session timeout in seconds")
def __repr__(self) -> str:
return f"<User(id={self.id}, username='{self.username}', role='{self.role}', active={self.is_active})>"
def has_permission(self, required_role: UserRole) -> bool:
"""Check if user has required permission level."""
role_hierarchy = {
UserRole.OPERATOR: 1,
UserRole.SUPERVISOR: 2,
UserRole.TECHNICIAN: 3,
UserRole.ADMIN: 4
}
return role_hierarchy.get(self.role, 0) >= role_hierarchy.get(required_role, 0)
def can_manage_recipes(self) -> bool:
"""Check if user can create/edit recipes."""
return self.has_permission(UserRole.SUPERVISOR)
def can_access_diagnostics(self) -> bool:
"""Check if user can access diagnostic features."""
return self.has_permission(UserRole.TECHNICIAN)
def can_modify_system_config(self) -> bool:
"""Check if user can modify system configuration."""
return self.has_permission(UserRole.ADMIN)
def is_session_expired(self, last_activity: datetime) -> bool:
"""Check if user session has expired."""
if not self.is_active or self.is_locked:
return True
time_since_activity = (datetime.now(timezone.utc) - last_activity).total_seconds()
return time_since_activity > self.session_timeout
def record_login_attempt(self, success: bool) -> None:
"""Record login attempt and update counters."""
self.last_login_attempt = datetime.now(timezone.utc)
if success:
self.last_login = self.last_login_attempt
self.failed_login_attempts = 0
if self.is_locked:
self.is_locked = False
else:
self.failed_login_attempts += 1
# Lock account after 5 failed attempts
if self.failed_login_attempts >= 5:
self.is_locked = True
def to_dict(self, include_sensitive: bool = False) -> dict:
"""Convert user to dictionary, optionally including sensitive data."""
user_data = {
"id": self.id,
"username": self.username,
"email": self.email,
"full_name": self.full_name,
"role": self.role.value if self.role else None,
"is_active": self.is_active,
"is_locked": self.is_locked,
"created_at": self.created_at.isoformat() if self.created_at else None,
"last_login": self.last_login.isoformat() if self.last_login else None,
"phone": self.phone,
"department": self.department,
"shift": self.shift,
"badge_number": self.badge_number,
"language": self.language,
"timezone": self.timezone,
}
if include_sensitive:
user_data.update({
"failed_login_attempts": self.failed_login_attempts,
"last_login_attempt": self.last_login_attempt.isoformat() if self.last_login_attempt else None,
"password_changed_at": self.password_changed_at.isoformat() if self.password_changed_at else None,
"require_password_change": self.require_password_change,
"session_timeout": self.session_timeout,
})
return user_data
@classmethod
def from_csv_row(cls, row: dict) -> "User":
"""Create User from CSV row data (migration utility)."""
return cls(
id=int(row.get("ID", 0)),
username=row.get("Username", "unknown"),
full_name=row.get("Name", ""),
email=row.get("Email", ""),
role=UserRole(row.get("Role", "operator").lower()),
phone=row.get("Phone", ""),
department=row.get("Department", ""),
badge_number=row.get("BadgeNumber", ""),
# Note: password will need to be set separately for security
password_hash="MIGRATION_REQUIRED",
)