'SSE connected', 'time' => date('H:i:s')]); // Forzar evento de prueba para pines sse_send('pin', [ 'chipid' => '628347', 'alias' => 'D5', 'valor' => 'TEST' ]); // Conexión a Redis try { error_log("=== Intentando conectar a Redis ==="); $redis = new Redis(); $redis->connect('127.0.0.1', 6379, 2); $redis->setOption(Redis::OPT_READ_TIMEOUT, -1); error_log("=== Redis conectado exitosamente ==="); sse_send('redis_connected', ['message' => 'Redis connected', 'time' => date('H:i:s')]); } catch (Throwable $e) { error_log("=== Error conectando a Redis: " . $e->getMessage() . " ==="); sse_send('error', ['message' => 'Redis connection error: ' . $e->getMessage()]); exit; } $processedMessages = []; $lastPing = time(); $lastTopicsCheck = 0; // Loop principal: leer desde Redis while (!connection_aborted()) { try { $currentTime = time(); // Obtener todos los topics (cada vez, no cache) $topics = $redis->sMembers('mqtt:topics'); error_log("SSE-Redis: Topics check - count=" . count($topics) . ", time=" . date('H:i:s')); foreach ($topics as $topic) { // Solo procesar topics de dispositivos if (strpos($topic, 'dispositivo/') !== 0) { continue; } // Monitoreo específico del dispositivo 628347 if (strpos($topic, 'dispositivo/628347') !== false) { error_log("SSE-Redis: Encontrado topic 628347 = {$topic}"); } $messageData = $redis->hGet('mqtt:mensajes', $topic); if ($messageData) { $data = json_decode($messageData, true); if ($data && isset($data['payload'])) { $payload = $data['payload']; $messageId = md5($topic . ':' . $payload); // Procesar solo mensajes nuevos if (!isset($processedMessages[$messageId])) { $processedMessages[$messageId] = true; // Parsear topic y enviar eventos SSE if (preg_match('/^dispositivo\/([^\/]+)\/comando$/', $topic, $matches)) { $chipid = $matches[1]; // Parsear JSON para alias/valor $decoded = json_decode($payload, true); if (json_last_error() === JSON_ERROR_NONE && is_array($decoded)) { $alias = isset($decoded['alias']) ? (string)$decoded['alias'] : null; $valor = isset($decoded['valor']) ? (string)$decoded['valor'] : null; if ($alias && $valor !== null) { error_log("SSE-Redis: PROCESANDO NUEVO MENSAJE - chipid={$chipid}, alias={$alias}, valor={$valor}"); sse_send('pin', [ 'chipid' => $chipid, 'alias' => $alias, 'valor' => $valor, ]); error_log("SSE-Redis: EVENTO ENVIADO AL FRONTEND"); } else { error_log("SSE-Redis: Ignorando comando SIN ALIAS/VALOR - alias={$alias}, valor={$valor}"); } } else { error_log("SSE-Redis: ERROR PARSEANDO JSON COMANDO - payload={$payload}"); } } elseif (preg_match('/^dispositivo\/([^\/]+)\/valores\/([^\/]+)$/', $topic, $matches)) { $chipid = $matches[1]; $alias = $matches[2]; // Parsear JSON del payload $decoded = json_decode($payload, true); if (json_last_error() === JSON_ERROR_NONE && is_array($decoded)) { $valor = isset($decoded['valor']) ? (string)$decoded['valor'] : $payload; sse_send('pin', [ 'chipid' => $chipid, 'alias' => $alias, 'valor' => $valor, ]); } else { sse_send('pin', [ 'chipid' => $chipid, 'alias' => $alias, 'valor' => $payload, ]); } } elseif (preg_match('/^dispositivo\/([^\/]+)\/state$/', $topic, $matches)) { $chipid = $matches[1]; sse_send('state', [ 'chipid' => $chipid, 'estado' => normalize_state($payload), ]); } elseif (preg_match('/^dispositivo\/([^\/]+)\/ip$/', $topic, $matches)) { $chipid = $matches[1]; sse_send('ip', [ 'chipid' => $chipid, 'ip' => trim($payload), ]); } } } } } // Enviar ping cada 2 segundos if ($currentTime - $lastPing >= 2) { sse_send('ping', ['time' => date('H:i:s')]); $lastPing = $currentTime; } // Limitar memoria de mensajes procesados if (count($processedMessages) > 1000) { $processedMessages = array_slice($processedMessages, -500, null, true); } // Pequeña pausa - reducir para más tiempo real usleep(200000); // 0.2 segundos // Forzar check cada vez if ($currentTime - $lastTopicsCheck >= 1) { error_log("SSE-Redis: Active monitoring - checking for new messages..."); $lastTopicsCheck = $currentTime; } } catch (Throwable $e) { sse_send('error', ['message' => 'SSE error: ' . $e->getMessage()]); break; } } ?>