Files
sensors/db_connect.py

126 lines
5.8 KiB
Python

import mariadb
import logging
from typing import Generator
import datetime
from components import SensorInfo
# main database connector class
# permission validation is not performed on this class
# perform those higher on the call stack
class DatabaseConnect:
def __init__(self) -> None:
with open("db_creds.csv","r") as f:
credentials = f.read().split(",")
try:
self.conn = mariadb.connect(
user=credentials[0],
password=credentials[1],
host=credentials[2],
port=int(credentials[3]),
database=credentials[4]
)
except mariadb.Error as e:
logging.fatal(f"Error connecting to database: {e}")
return
self.cursor = self.conn.cursor()
self.credentials = f.read().split(",")
super().__init__()
def emit_log(self, severity:int, message: str) -> None:
self.cursor.execute("INSERT INTO `log` (`timestamp`, `type` ,`message`) VALUES (?,?,?);",(datetime.datetime.now(), severity, message))
match severity:
case 1:
logging.error(f"Log emitted: {message}")
case 2:
logging.warning(f"Log emitted: {message}")
case 3:
logging.info(f"Log emitted: {message}")
def create_user(self, username: str, salt: bytes, key: bytes) -> None:
self.cursor.execute("INSERT INTO `users` (`username`, `salt`, `key`) VALUES (?,?,?)",(username, salt, key))
self.conn.commit()
self.emit_log(2, f"Created user {username}")
def delete_user(self, id) -> None:
self.cursor.execute("DELETE FROM Users WHERE ID = ?",(id))
self.emit_log(2, f"Deleted user ID {id}")
def display_users(self) -> tuple[tuple[int, str, int]]:
self.cursor.execute("SELECT * FROM Users")
return self.cursor.fetchall()
def username_from_id(self, user_id):
self.cursor.execute("SELECT users.`username` FROM users WHERE `ID` = ?;", (user_id,))
return self.cursor.fetchone()
def user_by_username(self, username) -> tuple[int, str, bytes, bytes] | None:
self.cursor.execute("SELECT * FROM users WHERE `username` = ?;", (username,))
return self.cursor.fetchone()
def view_valid_rooms(self, user_id) -> list[tuple[str, str, int]]:
self.cursor.execute(
"SELECT rooms.name, rooms.shortname, rooms.ID from permissions INNER JOIN rooms ON permissions.`roomID` = rooms.`ID` WHERE permissions.`userID` = ? AND permissions.`view` = 1;",
(user_id,))
return self.cursor.fetchall()
def user_has_room_perms(self, user_id, room_shortname) -> bool:
self.cursor.execute("SELECT permissions.`view` FROM permissions LEFT JOIN rooms ON permissions.`roomID` = rooms.ID WHERE rooms.shortname = ? AND `userID` = ?;",(room_shortname, user_id))
res = self.cursor.fetchone()
if res is None:
return False
else:
return res[0] == 1
# def roomID_from_shortname(self, shortname):
# self.cursor.execute("SELECT rooms.`ID` from rooms WHERE rooms.shortname = ?;",(shortname,))
# return self.cursor.fetchone()
def get_sensors_in_room_shortname(self, shortname) -> list[tuple[int, int]]:
self.cursor.execute("SELECT Sensors.`ID`, Sensors.`Type` from Sensors LEFT JOIN rooms ON Sensors.`roomID` = rooms.ID WHERE rooms.shortname = ?;",(shortname,))
return [x for x in self.cursor.fetchall()]
def get_sensors_in_room_id(self, roomid) -> list[tuple[int, int]]:
self.cursor.execute("SELECT Sensors.`ID`, Sensors.`Type` from Sensors LEFT JOIN rooms ON Sensors.`roomID` = rooms.ID WHERE rooms.`ID` = ?;",(roomid,))
return [(x[0], x[1]) for x in self.cursor.fetchall()]
def get_sensorsinfo_by_sensorid(self, sensorid) -> SensorInfo | None:
self.cursor.execute("SELECT readings.`Timestamp`, readings.`reading` FROM readings WHERE readings.`sensorID` = ? ORDER BY readings.`Timestamp` DESC FETCH FIRST 1 ROWS ONLY;",(sensorid,))
fetch = self.cursor.fetchone()
if fetch is None:
return None
else:
return SensorInfo(0,sensorid, fetch[0], fetch[1])
def get_sensor_data(self, sensor_ID: int) -> Generator[tuple[datetime.datetime, float]]:
self.cursor.execute("SELECT Timestamp, reading FROM Readings WHERE `sensorID` = ? ORDER BY `Timestamp` DESC;",(sensor_ID,))
for row in self.cursor:
yield row
def get_sensor_type(self, sensor_ID):
self.cursor.execute("SELECT types.`type_desc` from types LEFT JOIN Sensors ON types.`ID` = Sensors.`type` WHERE Sensors.`ID` = ?;",(sensor_ID,))
return self.cursor.fetchone()[0]
def get_sensor_from_topic(self, topic: str) -> int | None:
self.cursor.execute("SELECT Sensors.`ID` FROM Sensors WHERE Sensors.`mqttTopic` = ?;", (topic,))
return self.cursor.fetchone()[0]
def get_unit_for_type(self, type_id: int) -> str | None:
self.cursor.execute("SELECT `unit` FROM `types` WHERE `ID` = ?;", (type_id,))
return self.cursor.fetchone()[0]
def get_all_topics(self) -> list[str]:
self.cursor.execute("SELECT `mqttTopic` FROM `sensors`;")
return [x[0] for x in self.cursor.fetchall()]
def create_sensor_reading(self, sensor_id: int, timestamp: datetime.datetime, reading: float) -> None:
self.cursor.execute("INSERT INTO `readings` (`sensorID`, `Timestamp`, `reading`) VALUES (?,?,?);",(sensor_id, timestamp, reading))
self.conn.commit()
self.emit_log(3, f"Inserted reading for sensor ID {sensor_id} at {timestamp}: {reading}")
if __name__ == "__main__":
a = DatabaseConnect()
print(a.get_sensor_type(1))