from flask import Flask, abort, request, flash, redirect, get_flashed_messages from flask_login import LoginManager, login_user, logout_user, login_required, current_user from jinja2 import FileSystemLoader from plotly.subplots import make_subplots import plotly.graph_objects as go import datetime from db_connect import DatabaseConnect import templating import user_handling import components app = Flask(__name__) with open("session_key.txt","rb") as f: app.secret_key = f.read().strip() login_manager = LoginManager() login_manager.init_app(app) login_manager.anonymous_user = components.AnonymousUser @login_manager.user_loader def load_user(user_id): user_data = DatabaseConnect().username_from_id(user_id) if user_data: return components.User(user_id, user_data[0]) return None Flask.jinja_options["loader"] = FileSystemLoader("web") env = app.create_jinja_environment() DISPLAYEDTYPES = 2 TDLATE = datetime.timedelta(hours=8) def handle_user_login(username: str, password: str): user_data = DatabaseConnect().user_by_username(username) if user_data: user_id, _, salt, key, _ = user_data if user_handling.verify_password(salt, key, password.encode()): user = components.User(user_id, username) login_user(user) DatabaseConnect().emit_log(3, f"User {username} logged in successfully.") return True DatabaseConnect().emit_log(2, f"Failed login attempt for user {username}.") return False @app.route("/login", methods=["GET", "POST"]) def login(): if request.method == "POST": username = request.form["username"] password = request.form["password"] if handle_user_login(username, password): return redirect("/") flash("Invalid username or password.") return templating.LoginPage(env, current_user, roominfo=gather_room_info()).render() @app.route("/logout") @login_required def logout(): logout_user() return redirect("/") @app.route("/") def index(): rooms_info = gather_room_info() out_html = templating.LandingPage(env, current_user,rooms_info) return out_html.render() def gather_room_info(): rooms_info = [] for room in DatabaseConnect().view_all_rooms() if DatabaseConnect().is_admin(current_user.id) else DatabaseConnect().view_valid_rooms(current_user.id): sensor_info = [] sensors_avail = DatabaseConnect().get_sensors_in_room_id(room[2]) for i in range(DISPLAYEDTYPES): sensors_with_type = [x for x in sensors_avail if x[1] == i] # TODO: handle more than one sensor in one room if sensors_with_type: reading = DatabaseConnect().get_sensorsinfo_by_sensorid(sensors_with_type[0][0]) if reading is None: sensor_info.append(components.SensorInfo(2,None, None, "")) continue reading = components.SensorInfo(1 if reading.timestamp + TDLATE < datetime.datetime.now() else 0, reading.type, reading.timestamp, f"{reading.reading:.2f}{DatabaseConnect().get_unit_for_type(i) or ''}") sensor_info.append(reading) else: sensor_info.append(components.SensorInfo(2,None, None, "")) rooms_info.append(components.RoomInfo(room[0], room[1], sensor_info)) return rooms_info @app.route('/room/') def room_page(room_name=None): if not (DatabaseConnect().user_has_room_perms(current_user.id,room_name) or DatabaseConnect().is_admin(current_user.id)): abort(403) roominfos = gather_room_info() current_room = next((x for x in roominfos if x.shortcode == room_name), None) sensor_list = DatabaseConnect().get_sensors_in_room_shortname(room_name) if not sensor_list: return templating.EmptyRoom( env, current_user, roominfos, current_room ).render() fig = make_subplots(rows=1, cols=len(sensor_list)) lastest_readings = [] for idx, (sensorID, sensor_type) in enumerate(sensor_list): lst = [x for x in DatabaseConnect().get_sensor_data(sensorID)] if lst: lastest_readings.append(components.SensorInfo( 1 if lst[0][0] + TDLATE < datetime.datetime.now() else 0, sensor_type, lst[0][0], f"{lst[0][1]:.2f}{DatabaseConnect().get_unit_for_type(sensor_type) or ''}" )) x = [] y = [] last = None for i in lst: if last is not None and (last - i[0]) > TDLATE: x.append(None) y.append(last + datetime.timedelta(seconds=1)) x.append(i[0]) y.append(i[1]) last = i[0] fig.add_trace( go.Scatter( connectgaps=False, x = x, y = y, name=DatabaseConnect().get_sensor_type(sensorID)), row = 1, col = idx + 1 ) fig.update_layout(title_text=f"Available Devices in room {room_name}:") return templating.RoomPage( env, current_user, roominfos, current_room, lastest_readings, fig.to_html(full_html=False, include_plotlyjs='cdn') ).render() @app.route("/users", methods=["GET","POST"]) def user_management(): if request.method == "POST": # no options allowed for logged out users if current_user.id == 1: abort(403) if "old_password" in request.form: # password change request old_password = request.form["old_password"] new_password = request.form["new_password"] if handle_user_login(current_user.username, old_password): salt, key = user_handling.new_password(new_password.encode()) DatabaseConnect().change_user_password(current_user.id, salt, key) flash("Password changed successfully.") return redirect("/users") else: flash("Old password incorrect.") return redirect("/users") else: if not DatabaseConnect().is_admin(current_user.id): abort(403) elif "new_username" in request.form: new_username = request.form["new_username"] new_password = request.form["new_password"] is_admin = request.form.get("is_admin") == "on" salt, key = user_handling.new_password(new_password.encode()) DatabaseConnect().create_user(new_username, salt, key, is_admin) flash(f"Created user {new_username}.") return redirect("/users") elif "delete" in request.form: value = request.form["delete"] if value == current_user.id: flash("Cannot delete currently logged in user.") return redirect("/users") DatabaseConnect().delete_user(value) flash(f"Deleted user ID {value}.") return redirect("/users") else: abort(400) else: users = DatabaseConnect().display_users() admin = DatabaseConnect().is_admin(current_user.id) return templating.UserManagementPage(env, current_user, gather_room_info(), users, admin).render() if __name__ == "__main__": app.run(debug=True)