summaryrefslogtreecommitdiff
path: root/db.py
diff options
context:
space:
mode:
authorAnson Bridges <bridges.anson@gmail.com>2025-08-11 22:24:05 -0700
committerAnson Bridges <bridges.anson@gmail.com>2025-08-11 22:24:05 -0700
commit02284958a1189ffcb10b34a4c3a02417f8136a4d (patch)
tree837aac77184a3435ee686dd33878b9f2715c94b1 /db.py
Initialize git repo from local project filesHEADmaster
Diffstat (limited to 'db.py')
-rw-r--r--db.py506
1 files changed, 506 insertions, 0 deletions
diff --git a/db.py b/db.py
new file mode 100644
index 0000000..18b2d2f
--- /dev/null
+++ b/db.py
@@ -0,0 +1,506 @@
+# external libraries
+from PIL import Image
+import exif
+
+# core libraries
+import os.path
+from os import mkdir
+import json
+from datetime import datetime
+from time import time, time_ns
+import subprocess
+
+# internal code
+from .datatypes import User
+
+collections = { "last_queried" : {}, "collections" : {} }
+users = {}
+
+THUMBNAIL_SIZE_MAP = (60, 45)
+THUMBNAIL_SIZE_GRD = (120, 120)
+
+# run upon server startup
+def startup():
+ if not os.path.exists("users.json"):
+ initialize_users()
+ load_users()
+
+
+#
+# Collections-related functions
+#
+
+
+# save collections DB to disk
+def save_collections(specific_collection_id=None):
+ if specific_collection_id:
+ with open(f"content/{specific_collection_id}/collection.json", "w") as f:
+ json.dump(collections["collections"][specific_collection_id], f)
+ return
+
+ for collection_id, collection in collections["collections"].items():
+ with open(f"content/{collection_id}/collection.json", "w") as f:
+ json.dump(collection, f)
+
+# load collection from disk
+# collection_id | str | collection to load
+# RETURNS True if collection was loaded, False otherwise
+def load_collection(collection_id):
+ global collections
+ if os.path.exists(f"content/{collection_id}/collection.json"):
+ with open(f"content/{collection_id}/collection.json", "r") as f:
+ collections["collections"][collection_id] = json.load(f)
+ collections["last_queried"][collection_id] = time()
+ return True
+ else:
+ return False
+
+
+def unload_idle_collections():
+ to_remove = []
+ for collection_id, age in collections["last_queried"].items():
+ if (time() - age) > 300: # 5 minutes of no contact
+ to_remove.append(collection_id)
+ for collection_id in to_remove:
+ del collections["last_queried"][collection_id]
+ del collections["collections"][collection_id]
+
+# create an empty collection with placeholder information
+# owner_id | str
+def create_collection(owner_id):
+ global collections
+ collection_id = str(time_ns())
+ if collection_id in collections["collections"]:
+ collection_id += "_"
+ collections["collections"][collection_id] = \
+ { "id" : collection_id,
+ "title" : "Title",
+ "subtitle" : "Subtitle",
+ "info" : "Additional information",
+ "default_view" : [ 0, 0, 15], # lat, long, zoom
+ "last_edited" : time(),
+ "owner" : owner_id,
+ "editors" : [],
+ "viewers" : [],
+ "public" : False,
+ "media" : [],
+ "notes" : [],
+ "routes" : []
+ }
+ mkdir("content/"+collection_id)
+ mkdir("content/"+collection_id+"/thumbs")
+ mkdir("content/"+collection_id+"/thumbs/map")
+ mkdir("content/"+collection_id+"/thumbs/ui")
+ mkdir("content/"+collection_id+"/media")
+ unload_idle_collections()
+ save_collections(collection_id)
+
+# find collection by collection id
+# collection_id | int | the collection id which identifies the desired collection
+# RETURNS: None if no such collection, otherwise returns the collection
+def get_collection(collection_id):
+ if collection_id not in collections["collections"]:
+ if not load_collection(collection_id):
+ return None
+
+ collections["last_queried"][collection_id] = time()
+ unload_idle_collections()
+
+ return collections["collections"][collection_id]
+
+# get collections as a list rather than a dictionary
+# RETURNS: collections as a list
+def get_collection_list():
+ return [ coll for coll_id, coll in collections['collections'].items() ]
+
+
+# set collection to either public or private
+# collection_id | str
+# public | bool
+# RETURNS False if no such collection, otherwise True
+def set_collection_public(collection_id, public):
+ collection = get_collection(collection_id)
+ if not collection:
+ return False
+ collection["public"] = public
+ collection["last_edited"] = time()
+ return True
+
+# set collection info fields (title, subtitle, info)
+# collection_id | str
+# edits_dict | dict
+# RETURNS False if collection not found, otherwise True
+def set_collection_info(collection_id, edits_dict):
+ collection = get_collection(collection_id)
+ if not collection:
+ return False
+
+ for key, value in edits_dict.items():
+ if key in ["title", "subtitle", "info"]:
+ collection[key] = value
+ collection["last_edited"] = time()
+
+ return True
+
+# whether the given user can edit the given collection
+# collection_id | str
+# user_email | str
+# RETURNS: True/False
+def validate_user(collection_id, user_email):
+ collection = get_collection(collection_id)
+ if not collection or user_email not in users:
+ return False
+ if (user_email not in collection["editors"]) and (user_email != collection["owner"]):
+ return False
+ return True
+
+# set the user permissions for a given collection
+# collection_id | str
+# user_email | str
+# perm | str | invalid permission removes permissions from user
+# RETURNS False if invalid command, otherwise True in case of successful change
+def set_user_permissions(collection_id, user_email, perm):
+ collection = get_collection(collection_id)
+ if not collection:
+ return False
+
+ collection["last_edited"] = time()
+
+ # reset user perms
+ while user_email in collection["viewers"]:
+ collection["viewers"].remove(user_email)
+ while user_email in collection["editors"]:
+ collection["editors"].remove(user_email)
+
+ # set user perms to specified value
+ if perm == "viewer":
+ collection["viewers"].append(user_email)
+ elif perm == "editor":
+ collection["editors"].append(user_email)
+
+ return True
+
+
+#
+# User-related functions
+#
+
+# if no user DB exists, create an empty one
+def initialize_users():
+ global users
+ users = {}
+ save_users()
+
+# save users DB to disk
+# *filename | str | filename to save to
+def save_users(filename="users.json"):
+ users_j = {}
+ # convert User objects to JSON
+ for user_email, user in users.items():
+ users_j[user_email] = {"email" : user.email,
+ "password" : user.password,
+ "name" : user.name}
+
+ # save JSON-ified user DB
+ with open(filename, "w") as f:
+ json.dump(users_j, f)
+
+# load collections DB from disk
+# *filename | str | filename to load from
+def load_users(filename="users.json"):
+ global users
+ users = {} # clear user DB
+
+ # load from file
+ with open(filename, "r") as f:
+ users_j = json.load(f)
+
+ # convert from pure JSON to User objects
+ for user_email, user_info in users_j.items():
+ users[user_email] = User(email=user_info["email"],
+ password=user_info["password"],
+ name=user_info["name"])
+
+# returns User class by id (email)
+# user_id | str | user email by which the user is identified
+# RETURNS: User object if user exists, otherwise None
+def get_user(user_id):
+ if user_id not in users:
+ return None
+ return users[user_id]
+
+# add a new user
+# email | str | user's email
+# password | str | user's password (hashed)
+# name | str | user's real/username
+# RETURNS: False if user already exists, True if user has been newly added
+def add_user(email, password, name):
+ global users
+ if email in users:
+ return False
+ users[email] = User(email=email, password=password, name=name)
+ save_users()
+ return True
+
+# returns User class by id (email)
+# user_id | str | user email by which the user is identified
+# RETURNS: True if user was deleted, False if not found
+def delete_user(user_id):
+ global users
+ if user_id not in users:
+ return False
+ del users[user_id]
+ save_users()
+ return True
+
+
+#
+# Media-related functions
+#
+
+# adds media to specified collection. info is taken from exif unless file_info is specified
+# collection_id | str | collection photo should be added to
+# file | file | file uploaded via flask
+# *file_info | json | dictionary describing information about the file, optional
+def add_media(collection_id, file, file_info=None):
+ filename = file.filename
+
+ # find valid filename
+ filename_components = file.filename.split(".")
+ extension = filename_components[-1].lower()
+ i = 1
+ while os.path.exists("content/"+collection_id+"/media/"+filename):
+ filename = ".".join(filename_components[:-1]) + " (" + str(i) + ")." + extension
+ i += 1
+
+ # save file
+ file.save(os.path.join("content/"+collection_id+"/media/", filename))
+ print(filename)
+ filename_thumb = ".".join(filename.split(".")[:-1])+".png"
+
+ # generate thumbnails
+
+ if extension in ["mov", "mp4", "webm"]: # generate thumbnail image
+ is_video = True
+ video_path = os.path.join("content/"+collection_id+"/media/", filename)
+ temp_img_path = os.path.join("content/"+collection_id+"/media/", filename+".jpg")
+ subprocess.call(['ffmpeg', '-i', video_path, '-ss', '00:00:00.000', '-vframes', '1', temp_img_path])
+ photo_temp = Image.open(os.path.join("content/"+collection_id+"/media/", filename+".jpg"))
+ photo = photo_temp.copy()
+ photo_temp.close()
+ os.remove(temp_img_path)
+ else:
+ is_video = False
+ photo = Image.open(os.path.join("content/"+collection_id+"/media/", filename_thumb))
+
+
+ photo.thumbnail(THUMBNAIL_SIZE_GRD)
+
+ if is_video:
+ photo_resize = Image.new("RGBA", THUMBNAIL_SIZE_GRD, (255, 255, 255, 0))
+ old_w, old_h = photo.size
+ if old_w == THUMBNAIL_SIZE_GRD[0]:
+ photo_resize.paste(photo, (0, int((THUMBNAIL_SIZE_GRD[1] - old_h)/2), old_w, int((THUMBNAIL_SIZE_GRD[1] + old_h)/2)))
+ else:
+ photo_resize.paste(photo, (int((THUMBNAIL_SIZE_GRD[0] - old_w)/2), 0, int((THUMBNAIL_SIZE_GRD[0] + old_w)/2), old_h))
+ video_icon = Image.open("static/img/video.png")
+ video_icon = video_icon.convert("RGBA")
+ video_icon = video_icon.resize(THUMBNAIL_SIZE_GRD)
+ photo_resize.paste(video_icon, (0, 0, THUMBNAIL_SIZE_GRD[0], THUMBNAIL_SIZE_GRD[1]), video_icon )
+ photo = photo_resize
+
+ photo.save(os.path.join("content/"+collection_id+"/thumbs/ui/", filename_thumb), quality=90)
+ photo.thumbnail(THUMBNAIL_SIZE_MAP)
+
+ old_w, old_h = photo.size
+ photo_resize = Image.new("RGBA", THUMBNAIL_SIZE_MAP, (255, 255, 255, 0))
+ if old_w == THUMBNAIL_SIZE_MAP[0]:
+ photo_resize.paste(photo, (0, int((THUMBNAIL_SIZE_MAP[1] - old_h)/2), old_w, int((THUMBNAIL_SIZE_MAP[1] + old_h)/2)))
+ else:
+ photo_resize.paste(photo, (int((THUMBNAIL_SIZE_MAP[0] - old_w)/2), 0, int((THUMBNAIL_SIZE_MAP[0] + old_w)/2), old_h))
+
+
+ photo_resize.save(os.path.join("content/"+collection_id+"/thumbs/map/", filename_thumb), quality=90)
+
+ photo.close()
+ photo_resize.close()
+
+
+
+ # create data structure
+ media = {
+ "id" : filename, # does not change
+ "mediapath" : os.path.join("content/"+collection_id+"/media/", filename),
+ "thumbbig" : os.path.join("content/"+collection_id+"/thumbs/ui/", filename_thumb),
+ "thumbsmall" : os.path.join("content/"+collection_id+"/thumbs/map/", filename_thumb),
+ "name" : filename,
+ "info" : "",
+ "timestamp" : "",
+ "latitude" : None,
+ "longitude" : None,
+ "attached_notes" : [],
+ "is_video" : is_video
+ }
+
+ # attempt to add info via EXIF
+ with open(os.path.join("content/"+collection_id+"/media/", filename), "rb") as f:
+ try:
+ img_exif = exif.Image(f)
+ if img_exif.has_exif:
+ attribs = img_exif.list_all()
+ if 'datetime_original' in attribs:
+ timestamp = img_exif.datetime_original.split(" ")
+ timestamp[0] = timestamp[0].replace(":", "-")
+ timestamp = "T".join(timestamp)
+ media["timestamp"] = timestamp
+ if 'gps_latitude' in attribs:
+ lat = sum([ val / (60**i) for i, val in enumerate(img_exif.gps_latitude) ])
+ long = sum([ val / (60**i) for i, val in enumerate(img_exif.gps_longitude) ])
+ media["latitude"] = lat
+ media["longitude"] = long
+ except Exception as e:
+ print(e)
+
+ # manually add file info, if provided
+ if file_info != None:
+ for key, value in file_info.items():
+ media[key] = value
+
+ # add data struct to collection
+ collection = get_collection(collection_id)
+ collections["last_queried"][collection_id] = time()
+ collection["media"].append(media)
+ collection["last_edited"] = time()
+
+ save_collections()
+
+# performs add_media for every file in files
+# collection_id | str
+# files | [file]
+def add_media_bulk(collection_id, files):
+ for file in files:
+ add_media(collection_id, file)
+
+# edit media JSON by collection id + media_id
+# collection_id | str
+# media_id | str
+# attr_dict | dict
+# RETURNS: status
+def edit_media(collection_id, media_id, attr_dict):
+ collection = get_collection(collection_id)
+
+ # verify media exists
+ if not collection:
+ return {"status" : f"No such collection '{collection_id}'."}
+
+ for index, media in enumerate(collection["media"]):
+ if media["id"] == media_id:
+ # update media
+ for key, value in attr_dict.items():
+ if key in collection["media"][index]:
+ collection["media"][index][key] = value
+ collection["last_edited"] = time()
+ collections["last_queried"][collection_id] = time()
+ save_collections()
+ return {"status" : "OK"}
+
+ return {"status" : f"No such media '{media_id}'"}
+
+
+# delte media by collection id + media_id
+# collection_id | str
+# media_id | str
+# RETURNS: status
+def delete_media(collection_id, media_id):
+ collection = get_collection(collection_id)
+
+ # verify media exists
+ if not collection:
+ return {"status" : f"No such collection '{collection_id}'."}
+
+ attached_notes = []
+ for index, media in enumerate(collection["media"]):
+ if media["id"] == media_id:
+ # clean up files
+ files = ["mediapath", "thumbbig", "thumbsmall"]
+ for file in files:
+ if os.path.exists(media[file]):
+ os.remove(media[file]) # remove media + thumbnails
+ attached_notes = media["attached_notes"]
+ collection["media"].pop(index)
+ break
+ else:
+ return {"status" : f"No such media '{media_id}'"}
+
+
+
+ for attached_note in attached_notes: # remove references to media from any notes
+ for index, note in enumerate(collection['notes']):
+ if note["id"] == attached_note:
+ if media_id in note["attached_media"]:
+ note["attached_media"].pop(note["attached_media"].index(media_id))
+
+
+
+ collection["last_edited"] = time()
+ save_collections()
+
+ return {"status" : "OK"}
+
+
+def add_note(collection_id, note_info=None):
+ collection = get_collection(collection_id)
+
+ # verify media exists
+ if not collection:
+ return {"status" : f"No such collection '{collection_id}'."}
+
+ note = { "id" : str(time_ns()),
+ "title" : "",
+ "subtitle" : "",
+ "datetime_start" : "",
+ "datetime_end" : "",
+ "content" : "",
+ "attached_media" : []
+ }
+
+ collection["notes"].append(note)
+ collection["last_edited"] = time()
+ save_collections()
+
+ return {"status" : "OK"}
+
+def edit_note(collection_id, note_id, attr_dict):
+ collection = get_collection(collection_id)
+
+ # verify media exists
+ if not collection:
+ return {"status" : f"No such collection '{collection_id}'."}
+
+ for index, note in enumerate(collection["notes"]):
+ if note["id"] == note_id:
+ # update media
+ for key, value in attr_dict.items():
+ if key in collection["notes"][index]:
+ collection["notes"][index][key] = value
+ collection["last_edited"] = time()
+ collections["last_queried"][collection_id] = time()
+ save_collections()
+ return {"status" : "OK"}
+
+ return {"status" : f"No such media '{note_id}'"}
+
+def delete_note(collection_id, note_id):
+ collection = get_collection(collection_id)
+
+ if not collection:
+ return False
+
+ for index, note in enumerate(collection["notes"]):
+ if note["id"] == note_id:
+ collection["notes"].pop(index)
+ collection["last_edited"] = time()
+ collections["last_queried"][collection_id] = time()
+ return True
+
+ return False \ No newline at end of file