189 lines
7.8 KiB
PHP
189 lines
7.8 KiB
PHP
<?php
|
|
// Endpoint SSE que retransmite estados desde MQTT vía Redis
|
|
if (!defined('NON_JSON')) define('NON_JSON', true);
|
|
|
|
// Logging inmediato para ver si se ejecuta el script
|
|
error_log("=== SSE Script Started at " . date('Y-m-d H:i:s') . " ===");
|
|
error_log("Request URI: " . $_SERVER['REQUEST_URI']);
|
|
|
|
// Headers SSE - FORZAR INMEDIATAMENTE
|
|
header('Content-Type: text/event-stream');
|
|
header('Cache-Control: no-cache, no-transform, no-store');
|
|
header('Connection: keep-alive');
|
|
header('X-Accel-Buffering: no');
|
|
header('Access-Control-Allow-Origin: *');
|
|
|
|
// Desactivar cualquier buffering
|
|
if (ob_get_level()) ob_clean();
|
|
@ini_set('output_buffering', 'off');
|
|
@ob_implicit_flush(1);
|
|
|
|
// Funciones SSE
|
|
function sse_send($event, $data) {
|
|
echo "event: {$event}\n";
|
|
echo 'data: ' . json_encode($data, JSON_UNESCAPED_UNICODE) . "\n\n";
|
|
flush();
|
|
}
|
|
|
|
function normalize_state($message) {
|
|
$m = strtolower(trim((string)$message));
|
|
if ($m === 'online' || $m === 'connected' || $m === 'available' || $m === '1' || $m === 'true' || $m === 'on') {
|
|
return 'online';
|
|
}
|
|
if ($m === 'offline' || $m === 'disconnected' || $m === 'unavailable' || $m === '0' || $m === 'false' || $m === 'off') {
|
|
return 'offline';
|
|
}
|
|
return $m ?: 'desconocido';
|
|
}
|
|
|
|
// Padding inicial
|
|
echo ':' . str_repeat(' ', 2048) . "\n\n";
|
|
flush();
|
|
|
|
// Enviar mensaje de conexión
|
|
sse_send('connected', ['message' => 'SSE connected', 'time' => date('H:i:s')]);
|
|
|
|
// Forzar evento de prueba para pines
|
|
sse_send('pin', [
|
|
'chipid' => '628347',
|
|
'alias' => 'D5',
|
|
'valor' => 'TEST'
|
|
]);
|
|
|
|
// Conexión a Redis
|
|
try {
|
|
error_log("=== Intentando conectar a Redis ===");
|
|
$redis = new Redis();
|
|
$redis->connect('127.0.0.1', 6379, 2);
|
|
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
|
|
error_log("=== Redis conectado exitosamente ===");
|
|
sse_send('redis_connected', ['message' => 'Redis connected', 'time' => date('H:i:s')]);
|
|
} catch (Throwable $e) {
|
|
error_log("=== Error conectando a Redis: " . $e->getMessage() . " ===");
|
|
sse_send('error', ['message' => 'Redis connection error: ' . $e->getMessage()]);
|
|
exit;
|
|
}
|
|
|
|
$processedMessages = [];
|
|
$lastPing = time();
|
|
$lastTopicsCheck = 0;
|
|
|
|
// Loop principal: leer desde Redis
|
|
while (!connection_aborted()) {
|
|
try {
|
|
$currentTime = time();
|
|
|
|
// Obtener todos los topics (cada vez, no cache)
|
|
$topics = $redis->sMembers('mqtt:topics');
|
|
error_log("SSE-Redis: Topics check - count=" . count($topics) . ", time=" . date('H:i:s'));
|
|
|
|
foreach ($topics as $topic) {
|
|
// Solo procesar topics de dispositivos
|
|
if (strpos($topic, 'dispositivo/') !== 0) {
|
|
continue;
|
|
}
|
|
|
|
// Monitoreo específico del dispositivo 628347
|
|
if (strpos($topic, 'dispositivo/628347') !== false) {
|
|
error_log("SSE-Redis: Encontrado topic 628347 = {$topic}");
|
|
}
|
|
|
|
$messageData = $redis->hGet('mqtt:mensajes', $topic);
|
|
if ($messageData) {
|
|
$data = json_decode($messageData, true);
|
|
if ($data && isset($data['payload'])) {
|
|
$payload = $data['payload'];
|
|
$messageId = md5($topic . ':' . $payload);
|
|
|
|
// Procesar solo mensajes nuevos
|
|
if (!isset($processedMessages[$messageId])) {
|
|
$processedMessages[$messageId] = true;
|
|
|
|
// Parsear topic y enviar eventos SSE
|
|
if (preg_match('/^dispositivo\/([^\/]+)\/comando$/', $topic, $matches)) {
|
|
$chipid = $matches[1];
|
|
// Parsear JSON para alias/valor
|
|
$decoded = json_decode($payload, true);
|
|
if (json_last_error() === JSON_ERROR_NONE && is_array($decoded)) {
|
|
$alias = isset($decoded['alias']) ? (string)$decoded['alias'] : null;
|
|
$valor = isset($decoded['valor']) ? (string)$decoded['valor'] : null;
|
|
if ($alias && $valor !== null) {
|
|
error_log("SSE-Redis: PROCESANDO NUEVO MENSAJE - chipid={$chipid}, alias={$alias}, valor={$valor}");
|
|
sse_send('pin', [
|
|
'chipid' => $chipid,
|
|
'alias' => $alias,
|
|
'valor' => $valor,
|
|
]);
|
|
error_log("SSE-Redis: EVENTO ENVIADO AL FRONTEND");
|
|
} else {
|
|
error_log("SSE-Redis: Ignorando comando SIN ALIAS/VALOR - alias={$alias}, valor={$valor}");
|
|
}
|
|
} else {
|
|
error_log("SSE-Redis: ERROR PARSEANDO JSON COMANDO - payload={$payload}");
|
|
}
|
|
}
|
|
elseif (preg_match('/^dispositivo\/([^\/]+)\/valores\/([^\/]+)$/', $topic, $matches)) {
|
|
$chipid = $matches[1];
|
|
$alias = $matches[2];
|
|
// Parsear JSON del payload
|
|
$decoded = json_decode($payload, true);
|
|
if (json_last_error() === JSON_ERROR_NONE && is_array($decoded)) {
|
|
$valor = isset($decoded['valor']) ? (string)$decoded['valor'] : $payload;
|
|
sse_send('pin', [
|
|
'chipid' => $chipid,
|
|
'alias' => $alias,
|
|
'valor' => $valor,
|
|
]);
|
|
} else {
|
|
sse_send('pin', [
|
|
'chipid' => $chipid,
|
|
'alias' => $alias,
|
|
'valor' => $payload,
|
|
]);
|
|
}
|
|
}
|
|
elseif (preg_match('/^dispositivo\/([^\/]+)\/state$/', $topic, $matches)) {
|
|
$chipid = $matches[1];
|
|
sse_send('state', [
|
|
'chipid' => $chipid,
|
|
'estado' => normalize_state($payload),
|
|
]);
|
|
}
|
|
elseif (preg_match('/^dispositivo\/([^\/]+)\/ip$/', $topic, $matches)) {
|
|
$chipid = $matches[1];
|
|
sse_send('ip', [
|
|
'chipid' => $chipid,
|
|
'ip' => trim($payload),
|
|
]);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Enviar ping cada 2 segundos
|
|
if ($currentTime - $lastPing >= 2) {
|
|
sse_send('ping', ['time' => date('H:i:s')]);
|
|
$lastPing = $currentTime;
|
|
}
|
|
|
|
// Limitar memoria de mensajes procesados
|
|
if (count($processedMessages) > 1000) {
|
|
$processedMessages = array_slice($processedMessages, -500, null, true);
|
|
}
|
|
|
|
// Pequeña pausa - reducir para más tiempo real
|
|
usleep(200000); // 0.2 segundos
|
|
|
|
// Forzar check cada vez
|
|
if ($currentTime - $lastTopicsCheck >= 1) {
|
|
error_log("SSE-Redis: Active monitoring - checking for new messages...");
|
|
$lastTopicsCheck = $currentTime;
|
|
}
|
|
|
|
} catch (Throwable $e) {
|
|
sse_send('error', ['message' => 'SSE error: ' . $e->getMessage()]);
|
|
break;
|
|
}
|
|
}
|
|
?>
|