63 lines
1.8 KiB
Python
63 lines
1.8 KiB
Python
import sqlite3
|
|
from typing import IO
|
|
import click
|
|
from flask import Flask, current_app, g
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import Session
|
|
|
|
|
|
def get_database():
|
|
if "db" not in g:
|
|
g.db = sqlite3.connect(
|
|
current_app.config["DATABASE"],
|
|
detect_types=sqlite3.PARSE_DECLTYPES
|
|
)
|
|
g.db.row_factory = sqlite3.Row
|
|
|
|
return g.db
|
|
|
|
def get_jiten_db():
|
|
if "jiten_db" not in g:
|
|
engine = create_engine(f"sqlite:///{current_app.config['JITEN_DB']}")
|
|
g.jiten_db = Session(engine)
|
|
print("Session is", g.jiten_db)
|
|
|
|
return g.jiten_db
|
|
|
|
def initialize_database():
|
|
db = get_database()
|
|
|
|
with current_app.open_resource("schema.sql") as f:
|
|
f: IO[bytes]
|
|
db.executescript(f.read().decode())
|
|
|
|
def close_database(e=None):
|
|
db = g.pop("db", None)
|
|
|
|
if db is not None:
|
|
db.close()
|
|
|
|
@click.command("init-db")
|
|
def init_db_command():
|
|
"""Wipe the existing database and create new tables."""
|
|
if input("Are you sure you wish to overwrite any existing database? (y/n) ") == "y":
|
|
initialize_database()
|
|
click.echo("Initialized the database.")
|
|
else:
|
|
click.echo("Aborted.")
|
|
|
|
@click.command("prune-pfps")
|
|
def prune_pfps_command():
|
|
"""Removes excess profile pictures that aren't used in the database."""
|
|
import os
|
|
db = get_database()
|
|
used_pfps = {row[0] for row in db.execute("SELECT pfp_filename FROM user_settings").fetchall()}
|
|
stored_pfps = set(os.listdir(current_app.config["PFP_STORE"]))
|
|
unused_pfps = stored_pfps - used_pfps
|
|
for pfp in unused_pfps:
|
|
os.remove(os.path.join(current_app.config["PFP_STORE"], pfp))
|
|
|
|
def initialize_app(app: Flask):
|
|
app.teardown_appcontext(close_database)
|
|
app.cli.add_command(init_db_command)
|
|
app.cli.add_command(prune_pfps_command) |