/dev/null 2>&1 & * - Windows: start /B php rule_evaluator.php */ // Configuración set_time_limit(0); ini_set('display_errors', 1); error_reporting(E_ALL); // Cargar dependencias require_once __DIR__ . '/../vendor/autoload.php'; // Cargar variables de entorno if (class_exists('Dotenv\\Dotenv')) { $dotenv = Dotenv\Dotenv::createImmutable(__DIR__ . '/..'); $dotenv->safeLoad(); } use PhpMqtt\Client\MqttClient; use PhpMqtt\Client\ConnectionSettings; // Archivos de datos $reglas_file = __DIR__ . '/../data/reglas.json'; $puertos_file = __DIR__ . '/../data/puertos.json'; $log_file = __DIR__ . '/../data/rule_executions.log'; // Estado actual de todos los pines (chipid => [pin => valor]) $pinStates = []; // Última ejecución de cada regla (para evitar spam) $lastExecution = []; // Cliente MQTT $mqtt = null; /** * Cargar reglas desde archivo */ function loadRules() { global $reglas_file; if (!file_exists($reglas_file)) return []; $content = @file_get_contents($reglas_file); if ($content === false) return []; $data = json_decode($content, true); return is_array($data) ? $data : []; } /** * Cargar puertos desde archivo */ function loadPorts() { global $puertos_file; if (!file_exists($puertos_file)) return []; $content = @file_get_contents($puertos_file); if ($content === false) return []; $data = json_decode($content, true); return is_array($data) ? $data : []; } /** * Logging de ejecuciones */ function logExecution($chipid, $ruleId, $ruleName, $message) { global $log_file; $timestamp = date('Y-m-d H:i:s'); $line = "[$timestamp] [$chipid] [$ruleId] $ruleName: $message\n"; @file_put_contents($log_file, $line, FILE_APPEND); echo $line; // También a stdout } /** * Evaluar condiciones de una regla */ function evaluateConditions($conditions, $chipid) { global $pinStates; if (empty($conditions)) return true; // Sin condiciones = siempre true foreach ($conditions as $cond) { $pin = $cond['pin'] ?? ''; $operator = $cond['operator'] ?? '=='; $expectedValue = strtoupper($cond['value'] ?? 'ON'); $currentValue = strtoupper($pinStates[$chipid][$pin] ?? 'OFF'); $result = false; if ($operator === '==') { $result = ($currentValue === $expectedValue); } elseif ($operator === '!=') { $result = ($currentValue !== $expectedValue); } // Si alguna condición falla, retornar false (lógica AND) if (!$result) { return false; } } return true; // Todas las condiciones pasaron } /** * Ejecutar acciones de una regla */ function executeActions($actions, $chipid) { global $mqtt; foreach ($actions as $action) { $type = $action['type'] ?? 'set'; if ($type === 'set') { $pin = $action['pin'] ?? ''; $value = strtoupper($action['value'] ?? 'ON'); if (empty($pin)) continue; // Publicar comando MQTT $topic = "dispositivo/$chipid/comando"; $payload = json_encode(['alias' => $pin, 'valor' => $value]); try { if ($mqtt && $mqtt->isConnected()) { $mqtt->publish($topic, $payload, 0, true); logExecution($chipid, '', '', "Acción ejecutada: $pin → $value"); } } catch (Exception $e) { logExecution($chipid, '', '', "Error al publicar: " . $e->getMessage()); } } elseif ($type === 'delay') { $seconds = intval($action['seconds'] ?? 1); logExecution($chipid, '', '', "Esperando $seconds segundos..."); sleep($seconds); } } } /** * Evaluar reglas de un dispositivo */ function evaluateRules($chipid, $changedPin = null) { global $lastExecution; $allRules = loadRules(); $deviceRules = $allRules[$chipid] ?? []; if (empty($deviceRules)) return; foreach ($deviceRules as $rule) { // Solo evaluar reglas activas if (!($rule['enabled'] ?? true)) continue; $ruleId = $rule['id'] ?? ''; $ruleName = $rule['name'] ?? 'Sin nombre'; $trigger = $rule['trigger'] ?? []; $conditions = $rule['conditions'] ?? []; $actions = $rule['actions'] ?? []; // Verificar si el trigger coincide $triggerType = $trigger['type'] ?? 'input_change'; if ($triggerType === 'input_change') { $triggerPin = $trigger['pin'] ?? ''; $triggerValue = strtoupper($trigger['value'] ?? 'ON'); // Solo evaluar si el pin que cambió es el trigger if ($changedPin !== $triggerPin) continue; // Verificar valor del trigger global $pinStates; $currentValue = strtoupper($pinStates[$chipid][$triggerPin] ?? 'OFF'); if ($currentValue !== $triggerValue) continue; // Evitar ejecutar la misma regla múltiples veces en poco tiempo $lastKey = "$chipid:$ruleId"; $now = time(); if (isset($lastExecution[$lastKey]) && ($now - $lastExecution[$lastKey]) < 2) { continue; // Cooldown de 2 segundos } // Evaluar condiciones adicionales if (!evaluateConditions($conditions, $chipid)) { logExecution($chipid, $ruleId, $ruleName, "Condiciones no cumplidas"); continue; } // Todas las condiciones se cumplen, ejecutar acciones logExecution($chipid, $ruleId, $ruleName, "✅ Regla activada"); executeActions($actions, $chipid); $lastExecution[$lastKey] = $now; } // TODO: Implementar triggers de tipo 'timer' y 'manual' } } /** * Conectar a MQTT */ function connectMQTT() { $host = getenv('MQTT_HOST') ?: 'mqtt.penki.com.ar'; $port = intval(getenv('MQTT_PORT') ?: 8883); $username = getenv('MQTT_USERNAME') ?: 'penki'; $password = getenv('MQTT_PASSWORD') ?: 'Penki615'; $useTLS = filter_var(getenv('MQTT_TLS') ?: 'true', FILTER_VALIDATE_BOOLEAN); $clientId = 'rule_evaluator_' . uniqid(); try { $connectionSettings = (new ConnectionSettings()) ->setUsername($username) ->setPassword($password) ->setUseTls($useTLS) ->setTlsSelfSignedAllowed(true) ->setKeepAliveInterval(60) ->setConnectTimeout(10); $mqtt = new MqttClient($host, $port, $clientId); $mqtt->connect($connectionSettings, true); echo "✅ Conectado a MQTT broker: $host:$port\n"; return $mqtt; } catch (Exception $e) { echo "❌ Error conectando a MQTT: " . $e->getMessage() . "\n"; return null; } } /** * Escuchar SSE y evaluar reglas */ function listenSSE() { global $pinStates, $mqtt; // Usar localhost si está en el mismo servidor, o la URL completa $sseUrl = getenv('SSE_URL') ?: 'http://localhost/api/index.php?r=sse_states'; // Usar cURL para mantener conexión SSE abierta $ch = curl_init($sseUrl); curl_setopt($ch, CURLOPT_RETURNTRANSFER, false); curl_setopt($ch, CURLOPT_HEADER, false); curl_setopt($ch, CURLOPT_TIMEOUT, 0); curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, 10); // Buffer para acumular datos $buffer = ''; curl_setopt($ch, CURLOPT_WRITEFUNCTION, function($curl, $data) use (&$buffer) { global $pinStates, $mqtt; $buffer .= $data; $lines = explode("\n", $buffer); // Procesar líneas completas for ($i = 0; $i < count($lines) - 1; $i++) { $line = trim($lines[$i]); // Parsear eventos SSE if (strpos($line, 'event:') === 0) { $eventType = trim(substr($line, 6)); } elseif (strpos($line, 'data:') === 0) { $jsonData = trim(substr($line, 5)); try { $data = json_decode($jsonData, true); if (isset($data['chipid']) && isset($data['alias']) && isset($data['valor'])) { $chipid = $data['chipid']; $pin = $data['alias']; $value = strtoupper($data['valor']); // Actualizar estado if (!isset($pinStates[$chipid])) { $pinStates[$chipid] = []; } $oldValue = $pinStates[$chipid][$pin] ?? null; $pinStates[$chipid][$pin] = $value; // Solo evaluar si el valor cambió if ($oldValue !== $value) { echo "[" . date('H:i:s') . "] 📍 $chipid/$pin: $oldValue → $value\n"; // Evaluar reglas para este dispositivo evaluateRules($chipid, $pin); } } } catch (Exception $e) { echo "Error parseando SSE: " . $e->getMessage() . "\n"; } } } // Mantener última línea incompleta en buffer $buffer = $lines[count($lines) - 1]; return strlen($data); }); echo "🎧 Escuchando eventos SSE desde: $sseUrl\n"; echo "⚙️ Evaluador de reglas iniciado...\n\n"; curl_exec($ch); if (curl_errno($ch)) { echo "❌ Error en SSE: " . curl_error($ch) . "\n"; } curl_close($ch); } /** * Main loop */ function main() { global $mqtt; echo "🚀 Iniciando Evaluador de Reglas ESP Admin\n"; echo "==========================================\n\n"; // Conectar a MQTT $mqtt = connectMQTT(); if (!$mqtt) { echo "❌ No se pudo conectar a MQTT. Abortando.\n"; exit(1); } // Cargar reglas iniciales $allRules = loadRules(); $totalRules = 0; foreach ($allRules as $chipid => $rules) { $totalRules += count($rules); } echo "📋 Reglas cargadas: $totalRules reglas en " . count($allRules) . " dispositivos\n\n"; // Iniciar escucha de SSE while (true) { try { listenSSE(); } catch (Exception $e) { echo "❌ Error en loop principal: " . $e->getMessage() . "\n"; echo "⏳ Reintentando en 5 segundos...\n\n"; sleep(5); // Reconectar MQTT si es necesario if (!$mqtt || !$mqtt->isConnected()) { $mqtt = connectMQTT(); } } } } // Manejo de señales para shutdown graceful if (function_exists('pcntl_signal')) { pcntl_signal(SIGTERM, function() { global $mqtt; echo "\n🛑 Señal de terminación recibida. Cerrando...\n"; if ($mqtt && $mqtt->isConnected()) { $mqtt->disconnect(); } exit(0); }); pcntl_signal(SIGINT, function() { global $mqtt; echo "\n🛑 Interrupción recibida (Ctrl+C). Cerrando...\n"; if ($mqtt && $mqtt->isConnected()) { $mqtt->disconnect(); } exit(0); }); } // Ejecutar main();