""" Onlylegs - API endpoints """ from uuid import uuid4 import os import pathlib import logging import platformdirs from flask import Blueprint, send_from_directory, abort, flash, request, current_app from werkzeug.utils import secure_filename from flask_login import login_required, current_user from colorthief import ColorThief from sqlalchemy.orm import sessionmaker from gallery import db from gallery.utils import metadata as mt from gallery.utils.generate_image import generate_thumbnail blueprint = Blueprint("api", __name__, url_prefix="/api") db_session = sessionmaker(bind=db.engine) db_session = db_session() @blueprint.route("/file/", methods=["GET"]) def file(file_name): """ Returns a file from the uploads folder r for resolution, 400x400 or thumb for thumbnail """ res = request.args.get("r", default=None, type=str) # Type of file (thumb, etc) ext = request.args.get("e", default=None, type=str) # File extension file_name = secure_filename(file_name) # Sanitize file name # if no args are passed, return the raw file if not res and not ext: if not os.path.exists( os.path.join(current_app.config["UPLOAD_FOLDER"], file_name) ): abort(404) return send_from_directory(current_app.config["UPLOAD_FOLDER"], file_name) thumb = generate_thumbnail(file_name, res, ext) if not thumb: abort(404) return send_from_directory(os.path.dirname(thumb), os.path.basename(thumb)) # @blueprint.route('/page', methods=['get']) # def page(): # """ # Returns a json object with all the images # info required to generate a page # """ # page_num = request.args.get('page', default=1, type=int) # limit = current_app.config['UPLOAD_CONF']['max-load'] # if not page_num: # abort(400, 'No page number specified') # if not isinstance(page_num, int): # abort(400, 'Page number is not a number') # if int(page_num) < 1: # abort(400, 'Page number is less than 1') # # get the total number of images in the database # # calculate the total number of pages, and make sure the page number is valid # total_images = db_session.query(db.Posts.id).count() # pages = ceil(max(total_images, limit) / limit) # if page_num > pages: # abort(404, 'You have gone above and beyond, but this isnt a fairy tale.') # # get the images for the current page # images = (db_session.query(db.Posts.filename, db.Posts.alt, db.Posts.colours, # db.Posts.created_at, db.Posts.id) # .order_by(db.Posts.id.desc()) # .offset((page_num - 1) * limit) # .limit(limit) # .all()) # image_list = [] # for image in images: # image_list.append({ # 'id': image.id, # 'filename': image.filename, # 'alt': image.alt, # 'colours': image.colours, # 'created_at': image.created_at, # }) # return jsonify(image_list) @blueprint.route("/upload", methods=["POST"]) @login_required def upload(): """ Uploads an image to the server and saves it to the database """ form_file = request.files["file"] form = request.form # If no image is uploaded, return 404 error if not form_file: return abort(404) # Get file extension, generate random name and set file path img_ext = pathlib.Path(form_file.filename).suffix.replace(".", "").lower() img_name = "GWAGWA_" + str(uuid4()) img_path = os.path.join( current_app.config["UPLOAD_FOLDER"], img_name + "." + img_ext ) # Check if file extension is allowed if img_ext not in current_app.config["ALLOWED_EXTENSIONS"].keys(): logging.info("File extension not allowed: %s", img_ext) abort(403) # Save file try: form_file.save(img_path) except OSError as err: logging.info("Error saving file %s because of %s", img_path, err) abort(500) img_exif = mt.Metadata(img_path).yoink() # Get EXIF data img_colors = ColorThief(img_path).get_palette(color_count=3) # Get color palette # Save to database query = db.Posts( author_id=current_user.id, filename=img_name + "." + img_ext, mimetype=img_ext, exif=img_exif, colours=img_colors, description=form["description"], alt=form["alt"], ) db_session.add(query) db_session.commit() return "Gwa Gwa" # Return something so the browser doesn't show an error @blueprint.route("/delete/", methods=["POST"]) @login_required def delete_image(image_id): """ Deletes an image from the server and database """ img = db_session.query(db.Posts).filter_by(id=image_id).first() # Check if image exists and if user is allowed to delete it (author) if img is None: abort(404) if img.author_id != current_user.id: abort(403) # Delete file try: os.remove(os.path.join(current_app.config["UPLOAD_FOLDER"], img.filename)) except FileNotFoundError: logging.warning( "File not found: %s, already deleted or never existed", img.filename ) # Delete cached files cache_path = os.path.join(platformdirs.user_config_dir("onlylegs"), "cache") cache_name = img.filename.rsplit(".")[0] for cache_file in pathlib.Path(cache_path).glob(cache_name + "*"): os.remove(cache_file) # Delete from database db_session.query(db.Posts).filter_by(id=image_id).delete() # Remove all entries in junction table groups = db_session.query(db.GroupJunction).filter_by(post_id=image_id).all() for group in groups: db_session.delete(group) # Commit all changes db_session.commit() logging.info("Removed image (%s) %s", image_id, img.filename) flash(["Image was all in Le Head!", "1"]) return "Gwa Gwa" @blueprint.route("/group/create", methods=["POST"]) @login_required def create_group(): """ Creates a group """ new_group = db.Groups( name=request.form["name"], description=request.form["description"], author_id=current_user.id, ) db_session.add(new_group) db_session.commit() return ":3" @blueprint.route("/group/modify", methods=["POST"]) @login_required def modify_group(): """ Changes the images in a group """ group_id = request.form["group"] image_id = request.form["image"] action = request.form["action"] group = db_session.query(db.Groups).filter_by(id=group_id).first() if group is None: abort(404) elif group.author_id != current_user.id: abort(403) if action == "add": if not ( db_session.query(db.GroupJunction) .filter_by(group_id=group_id, post_id=image_id) .first() ): db_session.add(db.GroupJunction(group_id=group_id, post_id=image_id)) elif request.form["action"] == "remove": ( db_session.query(db.GroupJunction) .filter_by(group_id=group_id, post_id=image_id) .delete() ) db_session.commit() return ":3" @blueprint.route("/group/delete", methods=["POST"]) def delete_group(): """ Deletes a group """ group_id = request.form["group"] group = db_session.query(db.Groups).filter_by(id=group_id).first() if group is None: abort(404) elif group.author_id != current_user.id: abort(403) db_session.query(db.Groups).filter_by(id=group_id).delete() db_session.query(db.GroupJunction).filter_by(group_id=group_id).delete() db_session.commit() flash(["Group yeeted!", "1"]) return ":3"