Files
sensors/mqtt_data_generator.py

64 lines
1.7 KiB
Python

# test script to generate sensor data for testing
# made to reprsent real world data
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi_mqtt import MQTTConfig, FastMQTT
from gmqtt import Client as MQTTClient
import fastapi_mqtt as mqtt
import uvicorn
import datetime
import logging
import random
import asyncio
mqtt_config = mqtt.MQTTConfig(
host="localhost",
port=1883,
keepalive=60,
)
fast_mqtt = mqtt.FastMQTT(config=mqtt_config)
@asynccontextmanager
async def _lifespan(_app: FastAPI):
await fast_mqtt.mqtt_startup()
asyncio.create_task(main())
yield
await fast_mqtt.mqtt_shutdown()
app = FastAPI(lifespan=_lifespan)
# weights for hourly temps, based on LKTB sample on nov, 4th 2025
TIME_WEIGHTS = [
4, 3, 3, 2, 2, 2, 2, 1, 3, 6, 8, 9, 11, 12, 12, 12, 11, 10, 9, 9, 8, 8, 5, 5
]
# weights for specific rooms, here inside/outside
ROOM_WEIGHTS = {
"outside": 0,
"inside": 10
}
@fast_mqtt.on_connect()
def connect(client: MQTTClient, flags, rc, properties):
print("Connected: ", client, flags, rc, properties)
async def publish_sensor_data(inside: bool = True):
now = datetime.datetime.now()
base_temp = TIME_WEIGHTS[now.hour]
temp = base_temp + ROOM_WEIGHTS["inside" if inside else "outside"] + random.gauss(0,0.5)
logging.info(f"{now.isoformat()},{temp}")
fast_mqtt.publish("210/temp" if inside else "000/temp", f"{now.isoformat()},{temp}", qos=0)
async def main():
while True:
await publish_sensor_data(inside=False)
await publish_sensor_data(inside=True)
await asyncio.sleep(300)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
uvicorn.run(app)