diff options
| author | Anson Bridges <bridges.anson@gmail.com> | 2025-08-11 22:24:05 -0700 |
|---|---|---|
| committer | Anson Bridges <bridges.anson@gmail.com> | 2025-08-11 22:24:05 -0700 |
| commit | 02284958a1189ffcb10b34a4c3a02417f8136a4d (patch) | |
| tree | 837aac77184a3435ee686dd33878b9f2715c94b1 /db.py | |
Diffstat (limited to 'db.py')
| -rw-r--r-- | db.py | 506 |
1 files changed, 506 insertions, 0 deletions
@@ -0,0 +1,506 @@ +# external libraries +from PIL import Image +import exif + +# core libraries +import os.path +from os import mkdir +import json +from datetime import datetime +from time import time, time_ns +import subprocess + +# internal code +from .datatypes import User + +collections = { "last_queried" : {}, "collections" : {} } +users = {} + +THUMBNAIL_SIZE_MAP = (60, 45) +THUMBNAIL_SIZE_GRD = (120, 120) + +# run upon server startup +def startup(): + if not os.path.exists("users.json"): + initialize_users() + load_users() + + +# +# Collections-related functions +# + + +# save collections DB to disk +def save_collections(specific_collection_id=None): + if specific_collection_id: + with open(f"content/{specific_collection_id}/collection.json", "w") as f: + json.dump(collections["collections"][specific_collection_id], f) + return + + for collection_id, collection in collections["collections"].items(): + with open(f"content/{collection_id}/collection.json", "w") as f: + json.dump(collection, f) + +# load collection from disk +# collection_id | str | collection to load +# RETURNS True if collection was loaded, False otherwise +def load_collection(collection_id): + global collections + if os.path.exists(f"content/{collection_id}/collection.json"): + with open(f"content/{collection_id}/collection.json", "r") as f: + collections["collections"][collection_id] = json.load(f) + collections["last_queried"][collection_id] = time() + return True + else: + return False + + +def unload_idle_collections(): + to_remove = [] + for collection_id, age in collections["last_queried"].items(): + if (time() - age) > 300: # 5 minutes of no contact + to_remove.append(collection_id) + for collection_id in to_remove: + del collections["last_queried"][collection_id] + del collections["collections"][collection_id] + +# create an empty collection with placeholder information +# owner_id | str +def create_collection(owner_id): + global collections + collection_id = str(time_ns()) + if collection_id in collections["collections"]: + collection_id += "_" + collections["collections"][collection_id] = \ + { "id" : collection_id, + "title" : "Title", + "subtitle" : "Subtitle", + "info" : "Additional information", + "default_view" : [ 0, 0, 15], # lat, long, zoom + "last_edited" : time(), + "owner" : owner_id, + "editors" : [], + "viewers" : [], + "public" : False, + "media" : [], + "notes" : [], + "routes" : [] + } + mkdir("content/"+collection_id) + mkdir("content/"+collection_id+"/thumbs") + mkdir("content/"+collection_id+"/thumbs/map") + mkdir("content/"+collection_id+"/thumbs/ui") + mkdir("content/"+collection_id+"/media") + unload_idle_collections() + save_collections(collection_id) + +# find collection by collection id +# collection_id | int | the collection id which identifies the desired collection +# RETURNS: None if no such collection, otherwise returns the collection +def get_collection(collection_id): + if collection_id not in collections["collections"]: + if not load_collection(collection_id): + return None + + collections["last_queried"][collection_id] = time() + unload_idle_collections() + + return collections["collections"][collection_id] + +# get collections as a list rather than a dictionary +# RETURNS: collections as a list +def get_collection_list(): + return [ coll for coll_id, coll in collections['collections'].items() ] + + +# set collection to either public or private +# collection_id | str +# public | bool +# RETURNS False if no such collection, otherwise True +def set_collection_public(collection_id, public): + collection = get_collection(collection_id) + if not collection: + return False + collection["public"] = public + collection["last_edited"] = time() + return True + +# set collection info fields (title, subtitle, info) +# collection_id | str +# edits_dict | dict +# RETURNS False if collection not found, otherwise True +def set_collection_info(collection_id, edits_dict): + collection = get_collection(collection_id) + if not collection: + return False + + for key, value in edits_dict.items(): + if key in ["title", "subtitle", "info"]: + collection[key] = value + collection["last_edited"] = time() + + return True + +# whether the given user can edit the given collection +# collection_id | str +# user_email | str +# RETURNS: True/False +def validate_user(collection_id, user_email): + collection = get_collection(collection_id) + if not collection or user_email not in users: + return False + if (user_email not in collection["editors"]) and (user_email != collection["owner"]): + return False + return True + +# set the user permissions for a given collection +# collection_id | str +# user_email | str +# perm | str | invalid permission removes permissions from user +# RETURNS False if invalid command, otherwise True in case of successful change +def set_user_permissions(collection_id, user_email, perm): + collection = get_collection(collection_id) + if not collection: + return False + + collection["last_edited"] = time() + + # reset user perms + while user_email in collection["viewers"]: + collection["viewers"].remove(user_email) + while user_email in collection["editors"]: + collection["editors"].remove(user_email) + + # set user perms to specified value + if perm == "viewer": + collection["viewers"].append(user_email) + elif perm == "editor": + collection["editors"].append(user_email) + + return True + + +# +# User-related functions +# + +# if no user DB exists, create an empty one +def initialize_users(): + global users + users = {} + save_users() + +# save users DB to disk +# *filename | str | filename to save to +def save_users(filename="users.json"): + users_j = {} + # convert User objects to JSON + for user_email, user in users.items(): + users_j[user_email] = {"email" : user.email, + "password" : user.password, + "name" : user.name} + + # save JSON-ified user DB + with open(filename, "w") as f: + json.dump(users_j, f) + +# load collections DB from disk +# *filename | str | filename to load from +def load_users(filename="users.json"): + global users + users = {} # clear user DB + + # load from file + with open(filename, "r") as f: + users_j = json.load(f) + + # convert from pure JSON to User objects + for user_email, user_info in users_j.items(): + users[user_email] = User(email=user_info["email"], + password=user_info["password"], + name=user_info["name"]) + +# returns User class by id (email) +# user_id | str | user email by which the user is identified +# RETURNS: User object if user exists, otherwise None +def get_user(user_id): + if user_id not in users: + return None + return users[user_id] + +# add a new user +# email | str | user's email +# password | str | user's password (hashed) +# name | str | user's real/username +# RETURNS: False if user already exists, True if user has been newly added +def add_user(email, password, name): + global users + if email in users: + return False + users[email] = User(email=email, password=password, name=name) + save_users() + return True + +# returns User class by id (email) +# user_id | str | user email by which the user is identified +# RETURNS: True if user was deleted, False if not found +def delete_user(user_id): + global users + if user_id not in users: + return False + del users[user_id] + save_users() + return True + + +# +# Media-related functions +# + +# adds media to specified collection. info is taken from exif unless file_info is specified +# collection_id | str | collection photo should be added to +# file | file | file uploaded via flask +# *file_info | json | dictionary describing information about the file, optional +def add_media(collection_id, file, file_info=None): + filename = file.filename + + # find valid filename + filename_components = file.filename.split(".") + extension = filename_components[-1].lower() + i = 1 + while os.path.exists("content/"+collection_id+"/media/"+filename): + filename = ".".join(filename_components[:-1]) + " (" + str(i) + ")." + extension + i += 1 + + # save file + file.save(os.path.join("content/"+collection_id+"/media/", filename)) + print(filename) + filename_thumb = ".".join(filename.split(".")[:-1])+".png" + + # generate thumbnails + + if extension in ["mov", "mp4", "webm"]: # generate thumbnail image + is_video = True + video_path = os.path.join("content/"+collection_id+"/media/", filename) + temp_img_path = os.path.join("content/"+collection_id+"/media/", filename+".jpg") + subprocess.call(['ffmpeg', '-i', video_path, '-ss', '00:00:00.000', '-vframes', '1', temp_img_path]) + photo_temp = Image.open(os.path.join("content/"+collection_id+"/media/", filename+".jpg")) + photo = photo_temp.copy() + photo_temp.close() + os.remove(temp_img_path) + else: + is_video = False + photo = Image.open(os.path.join("content/"+collection_id+"/media/", filename_thumb)) + + + photo.thumbnail(THUMBNAIL_SIZE_GRD) + + if is_video: + photo_resize = Image.new("RGBA", THUMBNAIL_SIZE_GRD, (255, 255, 255, 0)) + old_w, old_h = photo.size + if old_w == THUMBNAIL_SIZE_GRD[0]: + photo_resize.paste(photo, (0, int((THUMBNAIL_SIZE_GRD[1] - old_h)/2), old_w, int((THUMBNAIL_SIZE_GRD[1] + old_h)/2))) + else: + photo_resize.paste(photo, (int((THUMBNAIL_SIZE_GRD[0] - old_w)/2), 0, int((THUMBNAIL_SIZE_GRD[0] + old_w)/2), old_h)) + video_icon = Image.open("static/img/video.png") + video_icon = video_icon.convert("RGBA") + video_icon = video_icon.resize(THUMBNAIL_SIZE_GRD) + photo_resize.paste(video_icon, (0, 0, THUMBNAIL_SIZE_GRD[0], THUMBNAIL_SIZE_GRD[1]), video_icon ) + photo = photo_resize + + photo.save(os.path.join("content/"+collection_id+"/thumbs/ui/", filename_thumb), quality=90) + photo.thumbnail(THUMBNAIL_SIZE_MAP) + + old_w, old_h = photo.size + photo_resize = Image.new("RGBA", THUMBNAIL_SIZE_MAP, (255, 255, 255, 0)) + if old_w == THUMBNAIL_SIZE_MAP[0]: + photo_resize.paste(photo, (0, int((THUMBNAIL_SIZE_MAP[1] - old_h)/2), old_w, int((THUMBNAIL_SIZE_MAP[1] + old_h)/2))) + else: + photo_resize.paste(photo, (int((THUMBNAIL_SIZE_MAP[0] - old_w)/2), 0, int((THUMBNAIL_SIZE_MAP[0] + old_w)/2), old_h)) + + + photo_resize.save(os.path.join("content/"+collection_id+"/thumbs/map/", filename_thumb), quality=90) + + photo.close() + photo_resize.close() + + + + # create data structure + media = { + "id" : filename, # does not change + "mediapath" : os.path.join("content/"+collection_id+"/media/", filename), + "thumbbig" : os.path.join("content/"+collection_id+"/thumbs/ui/", filename_thumb), + "thumbsmall" : os.path.join("content/"+collection_id+"/thumbs/map/", filename_thumb), + "name" : filename, + "info" : "", + "timestamp" : "", + "latitude" : None, + "longitude" : None, + "attached_notes" : [], + "is_video" : is_video + } + + # attempt to add info via EXIF + with open(os.path.join("content/"+collection_id+"/media/", filename), "rb") as f: + try: + img_exif = exif.Image(f) + if img_exif.has_exif: + attribs = img_exif.list_all() + if 'datetime_original' in attribs: + timestamp = img_exif.datetime_original.split(" ") + timestamp[0] = timestamp[0].replace(":", "-") + timestamp = "T".join(timestamp) + media["timestamp"] = timestamp + if 'gps_latitude' in attribs: + lat = sum([ val / (60**i) for i, val in enumerate(img_exif.gps_latitude) ]) + long = sum([ val / (60**i) for i, val in enumerate(img_exif.gps_longitude) ]) + media["latitude"] = lat + media["longitude"] = long + except Exception as e: + print(e) + + # manually add file info, if provided + if file_info != None: + for key, value in file_info.items(): + media[key] = value + + # add data struct to collection + collection = get_collection(collection_id) + collections["last_queried"][collection_id] = time() + collection["media"].append(media) + collection["last_edited"] = time() + + save_collections() + +# performs add_media for every file in files +# collection_id | str +# files | [file] +def add_media_bulk(collection_id, files): + for file in files: + add_media(collection_id, file) + +# edit media JSON by collection id + media_id +# collection_id | str +# media_id | str +# attr_dict | dict +# RETURNS: status +def edit_media(collection_id, media_id, attr_dict): + collection = get_collection(collection_id) + + # verify media exists + if not collection: + return {"status" : f"No such collection '{collection_id}'."} + + for index, media in enumerate(collection["media"]): + if media["id"] == media_id: + # update media + for key, value in attr_dict.items(): + if key in collection["media"][index]: + collection["media"][index][key] = value + collection["last_edited"] = time() + collections["last_queried"][collection_id] = time() + save_collections() + return {"status" : "OK"} + + return {"status" : f"No such media '{media_id}'"} + + +# delte media by collection id + media_id +# collection_id | str +# media_id | str +# RETURNS: status +def delete_media(collection_id, media_id): + collection = get_collection(collection_id) + + # verify media exists + if not collection: + return {"status" : f"No such collection '{collection_id}'."} + + attached_notes = [] + for index, media in enumerate(collection["media"]): + if media["id"] == media_id: + # clean up files + files = ["mediapath", "thumbbig", "thumbsmall"] + for file in files: + if os.path.exists(media[file]): + os.remove(media[file]) # remove media + thumbnails + attached_notes = media["attached_notes"] + collection["media"].pop(index) + break + else: + return {"status" : f"No such media '{media_id}'"} + + + + for attached_note in attached_notes: # remove references to media from any notes + for index, note in enumerate(collection['notes']): + if note["id"] == attached_note: + if media_id in note["attached_media"]: + note["attached_media"].pop(note["attached_media"].index(media_id)) + + + + collection["last_edited"] = time() + save_collections() + + return {"status" : "OK"} + + +def add_note(collection_id, note_info=None): + collection = get_collection(collection_id) + + # verify media exists + if not collection: + return {"status" : f"No such collection '{collection_id}'."} + + note = { "id" : str(time_ns()), + "title" : "", + "subtitle" : "", + "datetime_start" : "", + "datetime_end" : "", + "content" : "", + "attached_media" : [] + } + + collection["notes"].append(note) + collection["last_edited"] = time() + save_collections() + + return {"status" : "OK"} + +def edit_note(collection_id, note_id, attr_dict): + collection = get_collection(collection_id) + + # verify media exists + if not collection: + return {"status" : f"No such collection '{collection_id}'."} + + for index, note in enumerate(collection["notes"]): + if note["id"] == note_id: + # update media + for key, value in attr_dict.items(): + if key in collection["notes"][index]: + collection["notes"][index][key] = value + collection["last_edited"] = time() + collections["last_queried"][collection_id] = time() + save_collections() + return {"status" : "OK"} + + return {"status" : f"No such media '{note_id}'"} + +def delete_note(collection_id, note_id): + collection = get_collection(collection_id) + + if not collection: + return False + + for index, note in enumerate(collection["notes"]): + if note["id"] == note_id: + collection["notes"].pop(index) + collection["last_edited"] = time() + collections["last_queried"][collection_id] = time() + return True + + return False
\ No newline at end of file |
