# external libraries from PIL import Image import exif # core libraries import os.path from os import mkdir import json from datetime import datetime from time import time, time_ns import subprocess # internal code from .datatypes import User collections = { "last_queried" : {}, "collections" : {} } users = {} THUMBNAIL_SIZE_MAP = (60, 45) THUMBNAIL_SIZE_GRD = (120, 120) # run upon server startup def startup(): if not os.path.exists("users.json"): initialize_users() load_users() # # Collections-related functions # # save collections DB to disk def save_collections(specific_collection_id=None): if specific_collection_id: with open(f"content/{specific_collection_id}/collection.json", "w") as f: json.dump(collections["collections"][specific_collection_id], f) return for collection_id, collection in collections["collections"].items(): with open(f"content/{collection_id}/collection.json", "w") as f: json.dump(collection, f) # load collection from disk # collection_id | str | collection to load # RETURNS True if collection was loaded, False otherwise def load_collection(collection_id): global collections if os.path.exists(f"content/{collection_id}/collection.json"): with open(f"content/{collection_id}/collection.json", "r") as f: collections["collections"][collection_id] = json.load(f) collections["last_queried"][collection_id] = time() return True else: return False def unload_idle_collections(): to_remove = [] for collection_id, age in collections["last_queried"].items(): if (time() - age) > 300: # 5 minutes of no contact to_remove.append(collection_id) for collection_id in to_remove: del collections["last_queried"][collection_id] del collections["collections"][collection_id] # create an empty collection with placeholder information # owner_id | str def create_collection(owner_id): global collections collection_id = str(time_ns()) if collection_id in collections["collections"]: collection_id += "_" collections["collections"][collection_id] = \ { "id" : collection_id, "title" : "Title", "subtitle" : "Subtitle", "info" : "Additional information", "default_view" : [ 0, 0, 15], # lat, long, zoom "last_edited" : time(), "owner" : owner_id, "editors" : [], "viewers" : [], "public" : False, "media" : [], "notes" : [], "routes" : [] } mkdir("content/"+collection_id) mkdir("content/"+collection_id+"/thumbs") mkdir("content/"+collection_id+"/thumbs/map") mkdir("content/"+collection_id+"/thumbs/ui") mkdir("content/"+collection_id+"/media") unload_idle_collections() save_collections(collection_id) # find collection by collection id # collection_id | int | the collection id which identifies the desired collection # RETURNS: None if no such collection, otherwise returns the collection def get_collection(collection_id): if collection_id not in collections["collections"]: if not load_collection(collection_id): return None collections["last_queried"][collection_id] = time() unload_idle_collections() return collections["collections"][collection_id] # get collections as a list rather than a dictionary # RETURNS: collections as a list def get_collection_list(): return [ coll for coll_id, coll in collections['collections'].items() ] # set collection to either public or private # collection_id | str # public | bool # RETURNS False if no such collection, otherwise True def set_collection_public(collection_id, public): collection = get_collection(collection_id) if not collection: return False collection["public"] = public collection["last_edited"] = time() return True # set collection info fields (title, subtitle, info) # collection_id | str # edits_dict | dict # RETURNS False if collection not found, otherwise True def set_collection_info(collection_id, edits_dict): collection = get_collection(collection_id) if not collection: return False for key, value in edits_dict.items(): if key in ["title", "subtitle", "info"]: collection[key] = value collection["last_edited"] = time() return True # whether the given user can edit the given collection # collection_id | str # user_email | str # RETURNS: True/False def validate_user(collection_id, user_email): collection = get_collection(collection_id) if not collection or user_email not in users: return False if (user_email not in collection["editors"]) and (user_email != collection["owner"]): return False return True # set the user permissions for a given collection # collection_id | str # user_email | str # perm | str | invalid permission removes permissions from user # RETURNS False if invalid command, otherwise True in case of successful change def set_user_permissions(collection_id, user_email, perm): collection = get_collection(collection_id) if not collection: return False collection["last_edited"] = time() # reset user perms while user_email in collection["viewers"]: collection["viewers"].remove(user_email) while user_email in collection["editors"]: collection["editors"].remove(user_email) # set user perms to specified value if perm == "viewer": collection["viewers"].append(user_email) elif perm == "editor": collection["editors"].append(user_email) return True # # User-related functions # # if no user DB exists, create an empty one def initialize_users(): global users users = {} save_users() # save users DB to disk # *filename | str | filename to save to def save_users(filename="users.json"): users_j = {} # convert User objects to JSON for user_email, user in users.items(): users_j[user_email] = {"email" : user.email, "password" : user.password, "name" : user.name} # save JSON-ified user DB with open(filename, "w") as f: json.dump(users_j, f) # load collections DB from disk # *filename | str | filename to load from def load_users(filename="users.json"): global users users = {} # clear user DB # load from file with open(filename, "r") as f: users_j = json.load(f) # convert from pure JSON to User objects for user_email, user_info in users_j.items(): users[user_email] = User(email=user_info["email"], password=user_info["password"], name=user_info["name"]) # returns User class by id (email) # user_id | str | user email by which the user is identified # RETURNS: User object if user exists, otherwise None def get_user(user_id): if user_id not in users: return None return users[user_id] # add a new user # email | str | user's email # password | str | user's password (hashed) # name | str | user's real/username # RETURNS: False if user already exists, True if user has been newly added def add_user(email, password, name): global users if email in users: return False users[email] = User(email=email, password=password, name=name) save_users() return True # returns User class by id (email) # user_id | str | user email by which the user is identified # RETURNS: True if user was deleted, False if not found def delete_user(user_id): global users if user_id not in users: return False del users[user_id] save_users() return True # # Media-related functions # # adds media to specified collection. info is taken from exif unless file_info is specified # collection_id | str | collection photo should be added to # file | file | file uploaded via flask # *file_info | json | dictionary describing information about the file, optional def add_media(collection_id, file, file_info=None): filename = file.filename # find valid filename filename_components = file.filename.split(".") extension = filename_components[-1].lower() i = 1 while os.path.exists("content/"+collection_id+"/media/"+filename): filename = ".".join(filename_components[:-1]) + " (" + str(i) + ")." + extension i += 1 # save file file.save(os.path.join("content/"+collection_id+"/media/", filename)) print(filename) filename_thumb = ".".join(filename.split(".")[:-1])+".png" # generate thumbnails if extension in ["mov", "mp4", "webm"]: # generate thumbnail image is_video = True video_path = os.path.join("content/"+collection_id+"/media/", filename) temp_img_path = os.path.join("content/"+collection_id+"/media/", filename+".jpg") subprocess.call(['ffmpeg', '-i', video_path, '-ss', '00:00:00.000', '-vframes', '1', temp_img_path]) photo_temp = Image.open(os.path.join("content/"+collection_id+"/media/", filename+".jpg")) photo = photo_temp.copy() photo_temp.close() os.remove(temp_img_path) else: is_video = False photo = Image.open(os.path.join("content/"+collection_id+"/media/", filename_thumb)) photo.thumbnail(THUMBNAIL_SIZE_GRD) if is_video: photo_resize = Image.new("RGBA", THUMBNAIL_SIZE_GRD, (255, 255, 255, 0)) old_w, old_h = photo.size if old_w == THUMBNAIL_SIZE_GRD[0]: photo_resize.paste(photo, (0, int((THUMBNAIL_SIZE_GRD[1] - old_h)/2), old_w, int((THUMBNAIL_SIZE_GRD[1] + old_h)/2))) else: photo_resize.paste(photo, (int((THUMBNAIL_SIZE_GRD[0] - old_w)/2), 0, int((THUMBNAIL_SIZE_GRD[0] + old_w)/2), old_h)) video_icon = Image.open("static/img/video.png") video_icon = video_icon.convert("RGBA") video_icon = video_icon.resize(THUMBNAIL_SIZE_GRD) photo_resize.paste(video_icon, (0, 0, THUMBNAIL_SIZE_GRD[0], THUMBNAIL_SIZE_GRD[1]), video_icon ) photo = photo_resize photo.save(os.path.join("content/"+collection_id+"/thumbs/ui/", filename_thumb), quality=90) photo.thumbnail(THUMBNAIL_SIZE_MAP) old_w, old_h = photo.size photo_resize = Image.new("RGBA", THUMBNAIL_SIZE_MAP, (255, 255, 255, 0)) if old_w == THUMBNAIL_SIZE_MAP[0]: photo_resize.paste(photo, (0, int((THUMBNAIL_SIZE_MAP[1] - old_h)/2), old_w, int((THUMBNAIL_SIZE_MAP[1] + old_h)/2))) else: photo_resize.paste(photo, (int((THUMBNAIL_SIZE_MAP[0] - old_w)/2), 0, int((THUMBNAIL_SIZE_MAP[0] + old_w)/2), old_h)) photo_resize.save(os.path.join("content/"+collection_id+"/thumbs/map/", filename_thumb), quality=90) photo.close() photo_resize.close() # create data structure media = { "id" : filename, # does not change "mediapath" : os.path.join("content/"+collection_id+"/media/", filename), "thumbbig" : os.path.join("content/"+collection_id+"/thumbs/ui/", filename_thumb), "thumbsmall" : os.path.join("content/"+collection_id+"/thumbs/map/", filename_thumb), "name" : filename, "info" : "", "timestamp" : "", "latitude" : None, "longitude" : None, "attached_notes" : [], "is_video" : is_video } # attempt to add info via EXIF with open(os.path.join("content/"+collection_id+"/media/", filename), "rb") as f: try: img_exif = exif.Image(f) if img_exif.has_exif: attribs = img_exif.list_all() if 'datetime_original' in attribs: timestamp = img_exif.datetime_original.split(" ") timestamp[0] = timestamp[0].replace(":", "-") timestamp = "T".join(timestamp) media["timestamp"] = timestamp if 'gps_latitude' in attribs: lat = sum([ val / (60**i) for i, val in enumerate(img_exif.gps_latitude) ]) long = sum([ val / (60**i) for i, val in enumerate(img_exif.gps_longitude) ]) media["latitude"] = lat media["longitude"] = long except Exception as e: print(e) # manually add file info, if provided if file_info != None: for key, value in file_info.items(): media[key] = value # add data struct to collection collection = get_collection(collection_id) collections["last_queried"][collection_id] = time() collection["media"].append(media) collection["last_edited"] = time() save_collections() # performs add_media for every file in files # collection_id | str # files | [file] def add_media_bulk(collection_id, files): for file in files: add_media(collection_id, file) # edit media JSON by collection id + media_id # collection_id | str # media_id | str # attr_dict | dict # RETURNS: status def edit_media(collection_id, media_id, attr_dict): collection = get_collection(collection_id) # verify media exists if not collection: return {"status" : f"No such collection '{collection_id}'."} for index, media in enumerate(collection["media"]): if media["id"] == media_id: # update media for key, value in attr_dict.items(): if key in collection["media"][index]: collection["media"][index][key] = value collection["last_edited"] = time() collections["last_queried"][collection_id] = time() save_collections() return {"status" : "OK"} return {"status" : f"No such media '{media_id}'"} # delte media by collection id + media_id # collection_id | str # media_id | str # RETURNS: status def delete_media(collection_id, media_id): collection = get_collection(collection_id) # verify media exists if not collection: return {"status" : f"No such collection '{collection_id}'."} attached_notes = [] for index, media in enumerate(collection["media"]): if media["id"] == media_id: # clean up files files = ["mediapath", "thumbbig", "thumbsmall"] for file in files: if os.path.exists(media[file]): os.remove(media[file]) # remove media + thumbnails attached_notes = media["attached_notes"] collection["media"].pop(index) break else: return {"status" : f"No such media '{media_id}'"} for attached_note in attached_notes: # remove references to media from any notes for index, note in enumerate(collection['notes']): if note["id"] == attached_note: if media_id in note["attached_media"]: note["attached_media"].pop(note["attached_media"].index(media_id)) collection["last_edited"] = time() save_collections() return {"status" : "OK"} def add_note(collection_id, note_info=None): collection = get_collection(collection_id) # verify media exists if not collection: return {"status" : f"No such collection '{collection_id}'."} note = { "id" : str(time_ns()), "title" : "", "subtitle" : "", "datetime_start" : "", "datetime_end" : "", "content" : "", "attached_media" : [] } collection["notes"].append(note) collection["last_edited"] = time() save_collections() return {"status" : "OK"} def edit_note(collection_id, note_id, attr_dict): collection = get_collection(collection_id) # verify media exists if not collection: return {"status" : f"No such collection '{collection_id}'."} for index, note in enumerate(collection["notes"]): if note["id"] == note_id: # update media for key, value in attr_dict.items(): if key in collection["notes"][index]: collection["notes"][index][key] = value collection["last_edited"] = time() collections["last_queried"][collection_id] = time() save_collections() return {"status" : "OK"} return {"status" : f"No such media '{note_id}'"} def delete_note(collection_id, note_id): collection = get_collection(collection_id) if not collection: return False for index, note in enumerate(collection["notes"]): if note["id"] == note_id: collection["notes"].pop(index) collection["last_edited"] = time() collections["last_queried"][collection_id] = time() return True return False