KankenOnline/kanken_online/database.py

63 lines
1.8 KiB
Python

import sqlite3
from typing import IO
import click
from flask import Flask, current_app, g
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
def get_database():
if "db" not in g:
g.db = sqlite3.connect(
current_app.config["DATABASE"],
detect_types=sqlite3.PARSE_DECLTYPES
)
g.db.row_factory = sqlite3.Row
return g.db
def get_jiten_db():
if "jiten_db" not in g:
engine = create_engine(f"sqlite:///{current_app.config['JITEN_DB']}")
g.jiten_db = Session(engine)
print("Session is", g.jiten_db)
return g.jiten_db
def initialize_database():
db = get_database()
with current_app.open_resource("schema.sql") as f:
f: IO[bytes]
db.executescript(f.read().decode())
def close_database(e=None):
db = g.pop("db", None)
if db is not None:
db.close()
@click.command("init-db")
def init_db_command():
"""Wipe the existing database and create new tables."""
if input("Are you sure you wish to overwrite any existing database? (y/n) ") == "y":
initialize_database()
click.echo("Initialized the database.")
else:
click.echo("Aborted.")
@click.command("prune-pfps")
def prune_pfps_command():
"""Removes excess profile pictures that aren't used in the database."""
import os
db = get_database()
used_pfps = {row[0] for row in db.execute("SELECT pfp_filename FROM user_settings").fetchall()}
stored_pfps = set(os.listdir(current_app.config["PFP_STORE"]))
unused_pfps = stored_pfps - used_pfps
for pfp in unused_pfps:
os.remove(os.path.join(current_app.config["PFP_STORE"], pfp))
def initialize_app(app: Flask):
app.teardown_appcontext(close_database)
app.cli.add_command(init_db_command)
app.cli.add_command(prune_pfps_command)