import mariadb import logging from typing import Generator import datetime from components import SensorInfo # main database connector class # permission validation is not performed on this class # perform those higher on the call stack class DatabaseConnect: def __init__(self) -> None: with open("db_creds.csv","r") as f: credentials = f.read().split(",") try: self.conn = mariadb.connect( user=credentials[0], password=credentials[1], host=credentials[2], port=int(credentials[3]), database=credentials[4] ) except mariadb.Error as e: logging.fatal(f"Error connecting to database: {e}") return self.cursor = self.conn.cursor() self.credentials = f.read().split(",") super().__init__() def emit_log(self, severity:int, message: str) -> None: self.cursor.execute("INSERT INTO `log` (`timestamp`, `type` ,`message`) VALUES (?,?,?);",(datetime.datetime.now(), severity, message)) match severity: case 1: logging.error(f"Log emitted: {message}") case 2: logging.warning(f"Log emitted: {message}") case 3: logging.info(f"Log emitted: {message}") def create_user(self, username: str, salt: bytes, key: bytes, administer: bool = False) -> None: self.cursor.execute("INSERT INTO `users` (`username`, `salt`, `key`, `administer`) VALUES (?,?,?,?)",(username, salt, key, administer)) self.conn.commit() self.emit_log(2, f"Created user {username}") def change_user_password(self, id: int, salt: bytes, key: bytes) -> None: self.cursor.execute("UPDATE `users` SET `salt` = ?, `key` = ? WHERE `ID` = ?;",(salt, key, id)) self.conn.commit() self.emit_log(2, f"Changed password for user ID {id}") def delete_user(self, id) -> None: self.cursor.execute("DELETE FROM Users WHERE ID = ?",(id,)) self.conn.commit() self.emit_log(2, f"Deleted user ID {id}") def display_users(self) -> tuple[tuple[int, str, bool]]: self.cursor.execute("SELECT users.`ID`, users.`username`, users.`administer` FROM Users") return self.cursor.fetchall() def username_from_id(self, user_id): self.cursor.execute("SELECT users.`username` FROM users WHERE `ID` = ?;", (user_id,)) return self.cursor.fetchone() def user_by_username(self, username) -> tuple[int, str, bytes, bytes, bool] | None: self.cursor.execute("SELECT * FROM users WHERE `username` = ?;", (username,)) return self.cursor.fetchone() def is_admin(self, user_id) -> bool: self.cursor.execute("SELECT users.`administer` FROM users WHERE `ID` = ?;", (user_id,)) res = self.cursor.fetchone() if res is None: return False else: return res[0] == 1 def view_valid_rooms(self, user_id) -> list[tuple[str, str, int]]: self.cursor.execute( "SELECT rooms.name, rooms.shortname, rooms.ID from permissions INNER JOIN rooms ON permissions.`roomID` = rooms.`ID` WHERE permissions.`userID` = ? AND permissions.`view` = 1;", (user_id,)) return self.cursor.fetchall() def view_all_rooms(self) -> list[tuple[str, str, int]]: self.cursor.execute( "SELECT rooms.name, rooms.shortname, rooms.ID from rooms;") return self.cursor.fetchall() def user_has_room_perms(self, user_id, room_shortname) -> bool: self.cursor.execute("SELECT permissions.`view` FROM permissions LEFT JOIN rooms ON permissions.`roomID` = rooms.ID WHERE rooms.shortname = ? AND `userID` = ?;",(room_shortname, user_id)) res = self.cursor.fetchone() if res is None: return False else: return res[0] == 1 # def roomID_from_shortname(self, shortname): # self.cursor.execute("SELECT rooms.`ID` from rooms WHERE rooms.shortname = ?;",(shortname,)) # return self.cursor.fetchone() def get_sensors_in_room_shortname(self, shortname) -> list[tuple[int, int]]: self.cursor.execute("Select sensors.`ID`, sensors.`Type` FROM sensors LEFT JOIN devices ON devices.`ID` = sensors.`deviceID` LEFT JOIN rooms ON devices.`roomID` = rooms.`ID` WHERE rooms.shortname = ?;",(shortname,)) return [x for x in self.cursor.fetchall()] def get_sensors_in_room_id(self, roomid) -> list[tuple[int, int]]: self.cursor.execute("Select sensors.`ID`, sensors.`Type` FROM sensors LEFT JOIN devices ON devices.`ID` = sensors.`deviceID` LEFT JOIN rooms ON devices.`roomID` = rooms.`ID` WHERE rooms.`ID` = ?;",(roomid,)) return [(x[0], x[1]) for x in self.cursor.fetchall()] def get_sensorsinfo_by_sensorid(self, sensorid) -> SensorInfo | None: self.cursor.execute("SELECT readings.`Timestamp`, readings.`reading` FROM readings WHERE readings.`sensorID` = ? ORDER BY readings.`Timestamp` DESC FETCH FIRST 1 ROWS ONLY;",(sensorid,)) fetch = self.cursor.fetchone() if fetch is None: return None else: return SensorInfo(0,sensorid, fetch[0], fetch[1]) def get_sensor_data(self, sensor_ID: int) -> Generator[tuple[datetime.datetime, float]]: self.cursor.execute("SELECT Timestamp, reading FROM Readings WHERE `sensorID` = ? ORDER BY `Timestamp` DESC;",(sensor_ID,)) for row in self.cursor: yield row def get_sensor_type(self, sensor_ID): self.cursor.execute("SELECT types.`type_desc` from types LEFT JOIN Sensors ON types.`ID` = Sensors.`type` WHERE Sensors.`ID` = ?;",(sensor_ID,)) return self.cursor.fetchone()[0] def get_sensor_from_topic(self, topic: str) -> int | None: self.cursor.execute("SELECT Sensors.`ID` FROM Sensors WHERE Sensors.`mqttTopic` = ?;", (topic,)) return self.cursor.fetchone()[0] def get_unit_for_type(self, type_id: int) -> str | None: self.cursor.execute("SELECT `unit` FROM `types` WHERE `ID` = ?;", (type_id,)) return self.cursor.fetchone()[0] def get_all_topics(self) -> list[str]: self.cursor.execute("SELECT `mqttTopic` FROM `sensors`;") return [x[0] for x in self.cursor.fetchall()] def create_sensor_reading(self, sensor_id: int, timestamp: datetime.datetime, reading: float) -> None: self.cursor.execute("INSERT INTO `readings` (`sensorID`, `Timestamp`, `reading`) VALUES (?,?,?);",(sensor_id, timestamp, reading)) self.conn.commit() self.emit_log(3, f"Inserted reading for sensor ID {sensor_id} at {timestamp}: {reading}") if __name__ == "__main__": a = DatabaseConnect() print(a.get_sensor_type(1))