import threading from collections import deque import gradio as gr import paho.mqtt.client as mqtt import json import pandas as pd from datetime import datetime import plotly.express as px # Data buffer to store received sensor data (max 100 records) data_buffer = deque(maxlen=100) # MQTT client and configuration variables (using your temperature sensor credentials) client = mqtt.Client() MQTT_BROKER = "b6bdb89571144b3d8e5ca4bbe666ddb5.s1.eu.hivemq.cloud" MQTT_PORT = 8883 MQTT_TOPIC = "sensors/bme680/data" MQTT_USERNAME = "LuthiraMQ" MQTT_PASSWORD = "jLVx8y9v83gmgERTr0AP" # Global variables to track connection and data status connection_status = "❌ Not connected" data_status = "⌛ Waiting for data..." # Mutex for thread-safe status updates status_lock = threading.Lock() def get_status(): """ Returns the current status of the MQTT connection and data reception. """ with status_lock: return f"**MQTT Connection:** {connection_status}\n**Data Status:** {data_status}" def update_data_status(new_status): """ Safely updates the data reception status. """ global data_status with status_lock: data_status = new_status def on_connect(client, userdata, flags, rc): """ Callback for MQTT connection events. """ global connection_status if rc == 0: client.subscribe(MQTT_TOPIC) connection_status = "✅ Connected" else: connection_status = f"❌ Connection failed (code {rc})" def on_message(client, userdata, msg): """ Callback for processing incoming MQTT messages. Expects JSON payload with keys: "temperature", "humidity", "pressure", "gas". """ try: payload = json.loads(msg.payload.decode()) # Convert sensor values to float temperature = float(payload.get("temperature", 0)) humidity = float(payload.get("humidity", 0)) pressure = float(payload.get("pressure", 0)) gas = float(payload.get("gas", 0)) # Use current time as timestamp dt = datetime.now() data_buffer.append({ 'timestamp': dt, 'temperature': temperature, 'humidity': humidity, 'pressure': pressure, 'gas': gas }) update_data_status("✅ Receiving data") reset_inactivity_timer() except Exception as e: print(f"Error processing message: {e}") def connect_mqtt(broker, username, password): """ Connects to the MQTT broker using provided credentials. """ global MQTT_BROKER, MQTT_USERNAME, MQTT_PASSWORD, connection_status try: MQTT_BROKER = broker MQTT_USERNAME = username MQTT_PASSWORD = password client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD) # Configure TLS if not already set if not client._ssl: client.tls_set() client.on_connect = on_connect client.on_message = on_message client.connect(MQTT_BROKER, MQTT_PORT, 60) client.loop_start() return "Connection initiated..." except Exception as e: connection_status = f"❌ Connection error: {str(e)}" return f"Connection error: {str(e)}" # Inactivity timer to detect when data stops arriving inactivity_timer = None def reset_inactivity_timer(): """ Resets the inactivity timer to monitor data reception. """ global inactivity_timer if inactivity_timer is not None: inactivity_timer.cancel() inactivity_timer = threading.Timer(5.0, on_inactivity) inactivity_timer.start() def on_inactivity(): """ Callback for when no data is received within the timeout period. """ update_data_status("⌛ Waiting for data...") def update_graph(): """ Returns a Plotly figure showing Temperature over time from the data buffer. """ if not data_buffer: fig = px.line(title="Waiting for Data...") else: df = pd.DataFrame(list(data_buffer)) fig = px.line(df, x='timestamp', y='temperature', title="Temperature Over Time") fig.update_layout( xaxis=dict( type="date", tickformat="%H:%M:%S", title="Time" ), yaxis=dict(title="Temperature (°C)") ) return fig def update_humidity_graph(): """ Returns a Plotly figure showing Humidity over time from the data buffer. """ if not data_buffer: fig = px.line(title="Waiting for Data...") else: df = pd.DataFrame(list(data_buffer)) fig = px.line(df, x='timestamp', y='humidity', title="Humidity Over Time") fig.update_layout( xaxis=dict( type="date", tickformat="%H:%M:%S", title="Time" ), yaxis=dict(title="Humidity (%)") ) return fig def update_metrics(): """ Returns a Markdown string with the latest sensor readings. """ if not data_buffer: return "No sensor data available yet." df = pd.DataFrame(list(data_buffer)) latest = df.iloc[-1] return ( f"**Latest Sensor Readings:** \n" f"- Temperature: {latest['temperature']:.2f} °C \n" f"- Humidity: {latest['humidity']:.2f} % \n" f"- Pressure: {latest['pressure']:.2f} hPa \n" f"- Gas: {latest['gas']:.2f} ohms \n\n" f"{get_status()}" ) # Build the Gradio Blocks UI with gr.Blocks() as app: gr.Markdown("# Temperature Sensor Monitor") gr.Markdown("## Connect to MQTT") with gr.Row(): broker_input = gr.Textbox( label="MQTT Broker", value="b6bdb89571144b3d8e5ca4bbe666ddb5.s1.eu.hivemq.cloud" ) username_input = gr.Textbox( label="MQTT Username", value="LuthiraMQ" ) password_input = gr.Textbox( label="MQTT Password", value="jLVx8y9v83gmgERTr0AP", type="password" ) connect_button = gr.Button("Connect") connection_status_md = gr.Markdown(get_status, every=10) gr.Markdown("### Temperature Graph") temp_plot = gr.Plot(update_graph, every=2.5) gr.Markdown("### Humidity Graph") humidity_plot = gr.Plot(update_humidity_graph, every=2.5) gr.Markdown("### Latest Sensor Readings") sensor_metrics_md = gr.Markdown(update_metrics, every=2.5) connect_button.click( fn=connect_mqtt, inputs=[broker_input, username_input, password_input], outputs=[connection_status_md] ) if __name__ == "__main__": app.launch()