from fastapi import FastAPI from fastapi_mqtt import MQTTConfig, FastMQTT from gmqtt import Client as MQTTClient import fastapi_mqtt as mqtt import uvicorn from contextlib import asynccontextmanager import datetime import db_connect mqtt_config = mqtt.MQTTConfig( host="localhost", port=1883, keepalive=60, ) fast_mqtt = mqtt.FastMQTT(config=mqtt_config) @asynccontextmanager async def _lifespan(_app: FastAPI): await fast_mqtt.mqtt_startup() yield await fast_mqtt.mqtt_shutdown() app = FastAPI(lifespan=_lifespan) topics = db_connect.DatabaseConnect().get_all_topics() @fast_mqtt.on_connect() def on_connect(client: MQTTClient, flags, rc, properties): for (topic) in topics: client.subscribe(topic, qos=0) print("Connected: ", client, flags, rc, properties) db_connect.DatabaseConnect().emit_log(3, "MQTT client connected and subscribed to topics.") @fast_mqtt.on_message() def on_message(client: MQTTClient, topic: str, payload: bytes, qos, properties): # find sensor ID from topic id = db_connect.DatabaseConnect().get_sensor_from_topic(topic) if id is not None: data = payload.decode().split(",") db_connect.DatabaseConnect().create_sensor_reading(id, datetime.datetime.fromisoformat(data[0]), float(data[1])) else: db_connect.DatabaseConnect().emit_log(1, f"Received MQTT message for unknown topic {topic}") #TODO: add device topics and handle those commands @fast_mqtt.on_disconnect() def on_disconnect(client: MQTTClient, packet, exc=None): print("Disconnected: ", client, packet, exc) db_connect.DatabaseConnect().emit_log(2, "MQTT client disconnected.") if __name__ == "__main__": uvicorn.run(app, port=8001)