esp/api/sse_states.php

189 lines
7.8 KiB
PHP

<?php
// Endpoint SSE que retransmite estados desde MQTT vía Redis
if (!defined('NON_JSON')) define('NON_JSON', true);
// Logging inmediato para ver si se ejecuta el script
error_log("=== SSE Script Started at " . date('Y-m-d H:i:s') . " ===");
error_log("Request URI: " . $_SERVER['REQUEST_URI']);
// Headers SSE - FORZAR INMEDIATAMENTE
header('Content-Type: text/event-stream');
header('Cache-Control: no-cache, no-transform, no-store');
header('Connection: keep-alive');
header('X-Accel-Buffering: no');
header('Access-Control-Allow-Origin: *');
// Desactivar cualquier buffering
if (ob_get_level()) ob_clean();
@ini_set('output_buffering', 'off');
@ob_implicit_flush(1);
// Funciones SSE
function sse_send($event, $data) {
echo "event: {$event}\n";
echo 'data: ' . json_encode($data, JSON_UNESCAPED_UNICODE) . "\n\n";
flush();
}
function normalize_state($message) {
$m = strtolower(trim((string)$message));
if ($m === 'online' || $m === 'connected' || $m === 'available' || $m === '1' || $m === 'true' || $m === 'on') {
return 'online';
}
if ($m === 'offline' || $m === 'disconnected' || $m === 'unavailable' || $m === '0' || $m === 'false' || $m === 'off') {
return 'offline';
}
return $m ?: 'desconocido';
}
// Padding inicial
echo ':' . str_repeat(' ', 2048) . "\n\n";
flush();
// Enviar mensaje de conexión
sse_send('connected', ['message' => 'SSE connected', 'time' => date('H:i:s')]);
// Forzar evento de prueba para pines
sse_send('pin', [
'chipid' => '628347',
'alias' => 'D5',
'valor' => 'TEST'
]);
// Conexión a Redis
try {
error_log("=== Intentando conectar a Redis ===");
$redis = new Redis();
$redis->connect('127.0.0.1', 6379, 2);
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
error_log("=== Redis conectado exitosamente ===");
sse_send('redis_connected', ['message' => 'Redis connected', 'time' => date('H:i:s')]);
} catch (Throwable $e) {
error_log("=== Error conectando a Redis: " . $e->getMessage() . " ===");
sse_send('error', ['message' => 'Redis connection error: ' . $e->getMessage()]);
exit;
}
$processedMessages = [];
$lastPing = time();
$lastTopicsCheck = 0;
// Loop principal: leer desde Redis
while (!connection_aborted()) {
try {
$currentTime = time();
// Obtener todos los topics (cada vez, no cache)
$topics = $redis->sMembers('mqtt:topics');
error_log("SSE-Redis: Topics check - count=" . count($topics) . ", time=" . date('H:i:s'));
foreach ($topics as $topic) {
// Solo procesar topics de dispositivos
if (strpos($topic, 'dispositivo/') !== 0) {
continue;
}
// Monitoreo específico del dispositivo 628347
if (strpos($topic, 'dispositivo/628347') !== false) {
error_log("SSE-Redis: Encontrado topic 628347 = {$topic}");
}
$messageData = $redis->hGet('mqtt:mensajes', $topic);
if ($messageData) {
$data = json_decode($messageData, true);
if ($data && isset($data['payload'])) {
$payload = $data['payload'];
$messageId = md5($topic . ':' . $payload);
// Procesar solo mensajes nuevos
if (!isset($processedMessages[$messageId])) {
$processedMessages[$messageId] = true;
// Parsear topic y enviar eventos SSE
if (preg_match('/^dispositivo\/([^\/]+)\/comando$/', $topic, $matches)) {
$chipid = $matches[1];
// Parsear JSON para alias/valor
$decoded = json_decode($payload, true);
if (json_last_error() === JSON_ERROR_NONE && is_array($decoded)) {
$alias = isset($decoded['alias']) ? (string)$decoded['alias'] : null;
$valor = isset($decoded['valor']) ? (string)$decoded['valor'] : null;
if ($alias && $valor !== null) {
error_log("SSE-Redis: PROCESANDO NUEVO MENSAJE - chipid={$chipid}, alias={$alias}, valor={$valor}");
sse_send('pin', [
'chipid' => $chipid,
'alias' => $alias,
'valor' => $valor,
]);
error_log("SSE-Redis: EVENTO ENVIADO AL FRONTEND");
} else {
error_log("SSE-Redis: Ignorando comando SIN ALIAS/VALOR - alias={$alias}, valor={$valor}");
}
} else {
error_log("SSE-Redis: ERROR PARSEANDO JSON COMANDO - payload={$payload}");
}
}
elseif (preg_match('/^dispositivo\/([^\/]+)\/valores\/([^\/]+)$/', $topic, $matches)) {
$chipid = $matches[1];
$alias = $matches[2];
// Parsear JSON del payload
$decoded = json_decode($payload, true);
if (json_last_error() === JSON_ERROR_NONE && is_array($decoded)) {
$valor = isset($decoded['valor']) ? (string)$decoded['valor'] : $payload;
sse_send('pin', [
'chipid' => $chipid,
'alias' => $alias,
'valor' => $valor,
]);
} else {
sse_send('pin', [
'chipid' => $chipid,
'alias' => $alias,
'valor' => $payload,
]);
}
}
elseif (preg_match('/^dispositivo\/([^\/]+)\/state$/', $topic, $matches)) {
$chipid = $matches[1];
sse_send('state', [
'chipid' => $chipid,
'estado' => normalize_state($payload),
]);
}
elseif (preg_match('/^dispositivo\/([^\/]+)\/ip$/', $topic, $matches)) {
$chipid = $matches[1];
sse_send('ip', [
'chipid' => $chipid,
'ip' => trim($payload),
]);
}
}
}
}
}
// Enviar ping cada 2 segundos
if ($currentTime - $lastPing >= 2) {
sse_send('ping', ['time' => date('H:i:s')]);
$lastPing = $currentTime;
}
// Limitar memoria de mensajes procesados
if (count($processedMessages) > 1000) {
$processedMessages = array_slice($processedMessages, -500, null, true);
}
// Pequeña pausa - reducir para más tiempo real
usleep(200000); // 0.2 segundos
// Forzar check cada vez
if ($currentTime - $lastTopicsCheck >= 1) {
error_log("SSE-Redis: Active monitoring - checking for new messages...");
$lastTopicsCheck = $currentTime;
}
} catch (Throwable $e) {
sse_send('error', ['message' => 'SSE error: ' . $e->getMessage()]);
break;
}
}
?>