Submitted to PyLints needs :3

This commit is contained in:
Michał 2023-03-04 13:45:26 +00:00
parent 4cfcd178f1
commit 7ed3b455dd
12 changed files with 509 additions and 460 deletions

View file

@ -1,115 +1,131 @@
print("""
"""
___ _ _
/ _ \\ _ __ | |_ _| | ___ __ _ ___
| | | | '_ \\| | | | | | / _ \\/ _` / __|
| |_| | | | | | |_| | |__| __/ (_| \\__ \\
\\___/|_| |_|_|\\__, |_____\\___|\\__, |___/
/ _ \ _ __ | |_ _| | ___ __ _ ___
| | | | '_ \| | | | | | / _ \/ _` / __|
| |_| | | | | | |_| | |__| __/ (_| \__ \
\___/|_| |_|_|\__, |_____\___|\__, |___/
|___/ |___/
Created by Fluffy Bean - Version 23.03.03
""")
Created by Fluffy Bean - Version 23.03.04
"""
from flask import Flask, render_template
# Import system modules
import os
import sys
import logging
# Flask
from flask_compress import Compress
from flask import Flask, render_template
# Configuration
from dotenv import load_dotenv
import platformdirs
from gallery.logger import logger
logger.innit_logger()
import yaml
import os
from . import theme_manager
USER_DIR = platformdirs.user_config_dir('onlylegs')
INSTANCE_PATH = os.path.join(USER_DIR, 'instance')
# Check if any of the required files are missing
if not os.path.exists(platformdirs.user_config_dir('onlylegs')):
from .setup import setup
setup()
user_dir = platformdirs.user_config_dir('onlylegs')
instance_path = os.path.join(user_dir, 'instance')
from . import setup
setup.SetupApp()
# Get environment variables
if os.path.exists(os.path.join(user_dir, '.env')):
load_dotenv(os.path.join(user_dir, '.env'))
if os.path.exists(os.path.join(USER_DIR, '.env')):
load_dotenv(os.path.join(USER_DIR, '.env'))
print("Loaded environment variables")
else:
print("No environment variables found!")
exit(1)
sys.exit(1)
# Get config file
if os.path.exists(os.path.join(user_dir, 'conf.yml')):
with open(os.path.join(user_dir, 'conf.yml'), 'r') as f:
if os.path.exists(os.path.join(USER_DIR, 'conf.yml')):
with open(os.path.join(USER_DIR, 'conf.yml'), encoding='utf-8') as f:
conf = yaml.load(f, Loader=yaml.FullLoader)
print("Loaded gallery config")
else:
print("No config file found!")
exit(1)
sys.exit(1)
# Setup the logging config
LOGS_PATH = os.path.join(platformdirs.user_config_dir('onlylegs'), 'logs')
if not os.path.isdir(LOGS_PATH):
os.mkdir(LOGS_PATH)
logging.getLogger('werkzeug').disabled = True
logging.basicConfig(
filename=os.path.join(LOGS_PATH, 'only.log'),
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S',
format='%(asctime)s %(levelname)s %(name)s %(threadName)s : %(message)s',
encoding='utf-8')
def create_app(test_config=None):
# create and configure the app
app = Flask(__name__,instance_path=instance_path)
"""
Create and configure the main app
"""
app = Flask(__name__,instance_path=INSTANCE_PATH)
compress = Compress()
# App configuration
app.config.from_mapping(
SECRET_KEY=os.environ.get('FLASK_SECRET'),
DATABASE=os.path.join(app.instance_path, 'gallery.sqlite'),
UPLOAD_FOLDER=os.path.join(user_dir, 'uploads'),
UPLOAD_FOLDER=os.path.join(USER_DIR, 'uploads'),
ALLOWED_EXTENSIONS=conf['upload']['allowed-extensions'],
MAX_CONTENT_LENGTH=1024 * 1024 * conf['upload']['max-size'],
WEBSITE=conf['website'],
)
if test_config is None:
# load the instance config, if it exists, when not testing
app.config.from_pyfile('config.py', silent=True)
else:
# load the test config if passed in
app.config.from_mapping(test_config)
# ensure the instance folder exists
try:
os.makedirs(app.instance_path)
except OSError:
pass
# Load theme
from . import sassy
sassy.compile('default', app.root_path)
theme_manager.CompileTheme('default', app.root_path)
@app.errorhandler(405)
def method_not_allowed(e):
def method_not_allowed(err):
error = '405'
msg = e.description
return render_template('error.html', error=error, msg=e), 404
msg = err.description
return render_template('error.html', error=error, msg=msg), 404
@app.errorhandler(404)
def page_not_found(e):
def page_not_found(err):
error = '404'
msg = e.description
msg = err.description
return render_template('error.html', error=error, msg=msg), 404
@app.errorhandler(403)
def forbidden(e):
def forbidden(err):
error = '403'
msg = e.description
msg = err.description
return render_template('error.html', error=error, msg=msg), 403
@app.errorhandler(410)
def gone(e):
def gone(err):
error = '410'
msg = e.description
msg = err.description
return render_template('error.html', error=error, msg=msg), 410
@app.errorhandler(500)
def internal_server_error(e):
def internal_server_error(err):
error = '500'
msg = e.description
msg = err.description
return render_template('error.html', error=error, msg=msg), 500
# Load login, registration and logout manager
@ -130,4 +146,4 @@ def create_app(test_config=None):
app.register_blueprint(api.blueprint)
compress.init_app(app)
return app
return app

View file

@ -1,38 +1,48 @@
from flask import Blueprint, current_app, send_from_directory, send_file, request, g, abort, flash, jsonify
"""
Onlylegs - API endpoints
Used intermally by the frontend and possibly by other applications
"""
from uuid import uuid4
import os
import io
import logging
from flask import (
Blueprint, current_app, send_from_directory, send_file, request, g, abort, flash, jsonify)
from werkzeug.utils import secure_filename
from PIL import Image, ImageOps # ImageFilter
from sqlalchemy.orm import sessionmaker
from gallery.auth import login_required
from . import db
from sqlalchemy.orm import sessionmaker
db_session = sessionmaker(bind=db.engine)
db_session = db_session()
from PIL import Image, ImageOps, ImageFilter
from . import db # Import db to create a session
from . import metadata as mt
from .logger import logger
from uuid import uuid4
import io
import os
blueprint = Blueprint('api', __name__, url_prefix='/api')
db_session = sessionmaker(bind=db.engine)
db_session = db_session()
@blueprint.route('/uploads/<file>', methods=['GET'])
def uploads(file):
"""
Returns a file from the uploads folder
w and h are the width and height of the image for resizing
f is whether to apply filters to the image, such as blurring NSFW images
"""
# Get args
width = request.args.get('w', default=0, type=int) # Width of image
height = request.args.get('h', default=0, type=int) # Height of image
filtered = request.args.get('f', default=False, type=bool) # Whether to apply filters to image,
# such as blur for NSFW images
filtered = request.args.get('f', default=False, type=bool) # Whether to apply filters
# if no args are passed, return the raw file
if width == 0 and height == 0 and not filtered:
return send_from_directory(current_app.config['UPLOAD_FOLDER'],
secure_filename(file),
as_attachment=True)
if not os.path.exists(os.path.join(current_app.config['UPLOAD_FOLDER'],
secure_filename(file))):
abort(404)
return send_from_directory(current_app.config['UPLOAD_FOLDER'], file ,as_attachment=True)
# Of either width or height is 0, set it to the other value to keep aspect ratio
if width > 0 and height == 0:
@ -45,29 +55,28 @@ def uploads(file):
# Open image and set extension
try:
img = Image.open(
os.path.join(current_app.config['UPLOAD_FOLDER'],
secure_filename(file)))
except Exception as e:
logger.server(600, f"Error opening image: {e}")
img = Image.open(os.path.join(current_app.config['UPLOAD_FOLDER'],file))
except FileNotFoundError:
logging.error('File not found: %s, possibly broken upload', file)
abort(404)
except Exception as err:
logging.error('Error opening image: %s', err)
abort(500)
img_ext = os.path.splitext(secure_filename(file))[-1].lower().replace(
'.', '')
img_ext = os.path.splitext(file)[-1].lower().replace('.', '')
img_ext = set_ext[img_ext]
img_icc = img.info.get(
"icc_profile") # Get ICC profile as it alters colours
# Get ICC profile as it alters colours when saving
img_icc = img.info.get("icc_profile")
# Resize image and orientate correctly
img.thumbnail((width, height), Image.LANCZOS)
img = ImageOps.exif_transpose(img)
# TODO: Add filters
# If has NSFW tag, blur image, etc.
if filtered:
#pass
img = img.filter(ImageFilter.GaussianBlur(20))
#img = img.filter(ImageFilter.GaussianBlur(20))
pass
try:
img.save(buff, img_ext, icc_profile=img_icc)
except OSError:
@ -75,8 +84,8 @@ def uploads(file):
# Convert to RGB and try again
img = img.convert('RGB')
img.save(buff, img_ext, icc_profile=img_icc)
except:
logger.server(600, f"Error resizing image: {file}")
except Exception as err:
logging.error('Could not resize image %s, error: %s', file, err)
abort(500)
img.close()
@ -89,47 +98,51 @@ def uploads(file):
@blueprint.route('/upload', methods=['POST'])
@login_required
def upload():
"""
Uploads an image to the server and saves it to the database
"""
form_file = request.files['file']
form = request.form
if not form_file:
return abort(404)
img_ext = os.path.splitext(secure_filename(
form_file.filename))[-1].replace('.', '').lower()
img_name = f"GWAGWA_{uuid4().__str__()}.{img_ext}"
img_ext = os.path.splitext(form_file.filename)[-1].replace('.', '').lower()
img_name = f"GWAGWA_{str(uuid4())}.{img_ext}"
if not img_ext in current_app.config['ALLOWED_EXTENSIONS'].keys():
logger.add(303, f"File extension not allowed: {img_ext}")
logging.info('File extension not allowed: %s', img_ext)
abort(403)
if os.path.isdir(current_app.config['UPLOAD_FOLDER']) == False:
if os.path.isdir(current_app.config['UPLOAD_FOLDER']) is False:
os.mkdir(current_app.config['UPLOAD_FOLDER'])
# Save to database
try:
tr = db.posts(img_name, form['description'], form['alt'], g.user.id)
db_session.add(tr)
db_session.add(db.posts(img_name, form['description'], form['alt'], g.user.id))
db_session.commit()
except Exception as e:
logger.server(600, f"Error saving to database: {e}")
except Exception as err:
logging.error('Could not save to database: %s', err)
abort(500)
# Save file
try:
form_file.save(
os.path.join(current_app.config['UPLOAD_FOLDER'], img_name))
except Exception as e:
logger.server(600, f"Error saving file: {e}")
except Exception as err:
logging.error('Could not save file: %s', err)
abort(500)
return 'Gwa Gwa'
@blueprint.route('/remove/<int:id>', methods=['POST'])
@blueprint.route('/remove/<int:img_id>', methods=['POST'])
@login_required
def remove(id):
img = db_session.query(db.posts).filter_by(id=id).first()
def remove(img_id):
"""
Deletes an image from the server and database
"""
img = db_session.query(db.posts).filter_by(id=img_id).first()
if img is None:
abort(404)
@ -137,28 +150,32 @@ def remove(id):
abort(403)
try:
os.remove(
os.path.join(current_app.config['UPLOAD_FOLDER'],
img.file_name))
except Exception as e:
logger.server(600, f"Error removing file: {e}")
os.remove(os.path.join(current_app.config['UPLOAD_FOLDER'],img.file_name))
except FileNotFoundError:
# File was already deleted or doesn't exist
logging.warning('File not found: %s, already deleted or never existed', img.file_name)
except Exception as err:
logging.error('Could not remove file: %s', err)
abort(500)
try:
db_session.query(db.posts).filter_by(id=id).delete()
db_session.query(db.posts).filter_by(id=img_id).delete()
db_session.commit()
except Exception as e:
logger.server(600, f"Error removing from database: {e}")
except Exception as err:
logging.error('Could not remove from database: %s', err)
abort(500)
logger.server(301, f"Removed image {id}")
logging.info('Removed image (%s) %s', img_id, img.file_name)
flash(['Image was all in Le Head!', 1])
return 'Gwa Gwa'
@blueprint.route('/metadata/<int:id>', methods=['GET'])
def metadata(id):
img = db_session.query(db.posts).filter_by(id=id).first()
@blueprint.route('/metadata/<int:img_id>', methods=['GET'])
def metadata(img_id):
"""
Yoinks metadata from an image
"""
img = db_session.query(db.posts).filter_by(id=img_id).first()
if img is None:
abort(404)
@ -172,12 +189,15 @@ def metadata(id):
@blueprint.route('/logfile')
@login_required
def logfile():
filename = logger.filename()
"""
Gets the log file and returns it as a JSON object
"""
filename = logging.getLoggerClass().root.handlers[0].baseFilename
log_dict = {}
i = 0
with open(filename) as f:
for line in f:
with open(filename, encoding='utf-8') as file:
for line in file:
line = line.split(' : ')
event = line[0].strip().split(' ')
@ -194,11 +214,14 @@ def logfile():
'code': int(message[1:4]),
'message': message[5:].strip()
}
except:
except ValueError:
message_data = {'code': 0, 'message': message}
except Exception as err:
logging.error('Could not parse log file: %s', err)
abort(500)
log_dict[i] = {'event': event_data, 'message': message_data}
i += 1 # Line number, starts at 0
return jsonify(log_dict)
return jsonify(log_dict)

View file

@ -1,56 +1,84 @@
"""
OnlyLegs - Authentification
User registration, login and logout and locking access to pages behind a login
"""
import re
import uuid
import logging
import functools
from flask import Blueprint, flash, g, redirect, request, session, url_for, abort, jsonify, current_app
from flask import Blueprint, flash, g, redirect, request, session, url_for, abort, jsonify
from werkzeug.security import check_password_hash, generate_password_hash
from gallery import db
from sqlalchemy.orm import sessionmaker
from sqlalchemy import exc
from gallery import db
blueprint = Blueprint('auth', __name__, url_prefix='/auth')
db_session = sessionmaker(bind=db.engine)
db_session = db_session()
from .logger import logger
import re
import uuid
def login_required(view):
"""
Decorator to check if a user is logged in before accessing a page
"""
@functools.wraps(view)
def wrapped_view(**kwargs):
if g.user is None or session.get('uuid') is None:
logging.error('Authentification failed')
session.clear()
return redirect(url_for('gallery.index'))
blueprint = Blueprint('auth', __name__, url_prefix='/auth')
return view(**kwargs)
return wrapped_view
@blueprint.before_app_request
def load_logged_in_user():
"""
Runs before every request and checks if a user is logged in
"""
user_id = session.get('user_id')
user_uuid = session.get('uuid')
if user_id is None or user_uuid is None:
# This is not needed as the user is not logged in anyway, also spams the server logs with useless data
#add_log(103, 'Auth error before app request')
g.user = None
session.clear()
else:
is_alive = db_session.query(db.sessions).filter_by(session_uuid=user_uuid).first()
if is_alive is None:
logger.add(103, 'Session expired')
logging.info('Session expired')
flash(['Session expired!', '3'])
session.clear()
else:
g.user = db_session.query(db.users).filter_by(id=user_id).first()
@blueprint.route('/register', methods=['POST'])
def register():
"""
Register a new user
"""
username = request.form['username']
email = request.form['email']
password = request.form['password']
password_repeat = request.form['password-repeat']
error = []
if not username:
error.append('Username is empty!')
email_regex = re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b')
username_regex = re.compile(r'\b[A-Za-z0-9._%+-]+\b')
if not email:
error.append('Email is empty!')
elif not re.match(
r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', email):
if not username or not username_regex.match(username):
error.append('Username is invalid!')
if not email or not email_regex.match(email):
error.append('Email is invalid!')
if not password:
@ -59,73 +87,77 @@ def register():
error.append('Password is too short! Longer than 8 characters pls')
if not password_repeat:
error.append('Password repeat is empty!')
error.append('Enter password again!')
elif password_repeat != password:
error.append('Passwords do not match!')
if not error:
try:
tr = db.users(username, email, generate_password_hash(password))
db_session.add(tr)
db_session.commit()
except Exception as e:
error.append(f"User {username} is already registered!")
else:
logger.add(103, f"User {username} registered")
return 'gwa gwa'
if error:
return jsonify(error)
return jsonify(error)
try:
db_session.add(db.users(username, email, generate_password_hash(password)))
db_session.commit()
except exc.IntegrityError:
return f'User {username} is already registered!'
except Exception as err:
logging.error('User %s could not be registered: %s', username, err)
return 'Something went wrong!'
logging.info('User %s registered', username)
return 'gwa gwa'
@blueprint.route('/login', methods=['POST'])
def login():
"""
Log in a registered user by adding the user id to the session
"""
username = request.form['username']
password = request.form['password']
error = None
user = db_session.query(db.users).filter_by(username=username).first()
error = []
if user is None:
logger.add(101, f"User {username} does not exist from {request.remote_addr}")
abort(403)
logging.error('User %s does not exist. Login attempt from %s',
username, request.remote_addr)
error.append('Username or Password is incorrect!')
elif not check_password_hash(user.password, password):
logger.add(102, f"User {username} password error from {request.remote_addr}")
logging.error('User %s entered wrong password. Login attempt from %s',
username, request.remote_addr)
error.append('Username or Password is incorrect!')
if error:
abort(403)
try:
session.clear()
session['user_id'] = user.id
session['uuid'] = str(uuid.uuid4())
tr = db.sessions(user.id, session.get('uuid'), request.remote_addr, request.user_agent.string, 1)
db_session.add(tr)
db_session.add(db.sessions(user.id,
session.get('uuid'),
request.remote_addr,
request.user_agent.string,
1))
db_session.commit()
except error as err:
logger.add(105, f"User {username} auth error: {err}")
except Exception as err:
logging.error('User %s could not be logged in: %s', username, err)
abort(500)
if error is None:
logger.add(100, f"User {username} logged in from {request.remote_addr}")
flash(['Logged in successfully!', '4'])
return 'gwa gwa'
abort(500)
logging.info('User %s logged in from %s', username, request.remote_addr)
flash(['Logged in successfully!', '4'])
return 'gwa gwa'
@blueprint.route('/logout')
def logout():
logger.add(103, f"User {g.user.username} - id: {g.user.id} logged out")
"""
Clear the current session, including the stored user id
"""
logging.info('User (%s) %s logged out', session.get('user_id'), g.user.username)
session.clear()
return redirect(url_for('index'))
def login_required(view):
@functools.wraps(view)
def wrapped_view(**kwargs):
if g.user is None or session.get('uuid') is None:
logger.add(103, "Auth error")
session.clear()
return redirect(url_for('gallery.index'))
return view(**kwargs)
return wrapped_view

View file

@ -1,6 +1,10 @@
"""
OnlyLegs - Database
Database models and functions for SQLAlchemy
"""
import os
import platformdirs
from datetime import datetime
import platformdirs
from sqlalchemy import create_engine, Column, Integer, String, Boolean, DateTime, ForeignKey
from sqlalchemy.orm import declarative_base, relationship
@ -11,7 +15,11 @@ engine = create_engine(f'sqlite:///{path_to_db}', echo=False)
base = declarative_base()
class users (base):
class users (base): # pylint: disable=too-few-public-methods, C0103
"""
User table
Joins with post, groups, session and log
"""
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
@ -19,7 +27,7 @@ class users (base):
email = Column(String, unique=True, nullable=False)
password = Column(String, nullable=False)
created_at = Column(DateTime, nullable=False)
posts = relationship('posts')
groups = relationship('groups')
session = relationship('sessions')
@ -30,19 +38,24 @@ class users (base):
self.email = email
self.password = password
self.created_at = datetime.now()
class posts (base):
class posts (base): # pylint: disable=too-few-public-methods, C0103
"""
Post table
Joins with group_junction
"""
__tablename__ = 'posts'
id = Column(Integer, primary_key=True)
file_name = Column(String, unique=True, nullable=False)
description = Column(String, nullable=False)
alt = Column(String, nullable=False)
author_id = Column(Integer, ForeignKey('users.id'))
created_at = Column(DateTime, nullable=False)
junction = relationship('group_junction')
def __init__(self, file_name, description, alt, author_id):
self.file_name = file_name
self.description = description
@ -50,84 +63,108 @@ class posts (base):
self.author_id = author_id
self.created_at = datetime.now()
class groups (base):
class groups (base): # pylint: disable=too-few-public-methods, C0103
"""
Group table
Joins with group_junction
"""
__tablename__ = 'groups'
id = Column(Integer, primary_key=True)
name = Column(String, nullable=False)
description = Column(String, nullable=False)
author_id = Column(Integer, ForeignKey('users.id'))
created_at = Column(DateTime, nullable=False)
junction = relationship('group_junction')
def __init__(self, name, description, author_id):
self.name = name
self.description = description
self.author_id = author_id
self.created_at = datetime.now()
class group_junction (base):
class group_junction (base): # pylint: disable=too-few-public-methods, C0103
"""
Junction table for posts and groups
Joins with posts and groups
"""
__tablename__ = 'group_junction'
id = Column(Integer, primary_key=True)
group_id = Column(Integer, ForeignKey('groups.id'))
post_id = Column(Integer, ForeignKey('posts.id'))
def __init__(self, group_id, post_id):
self.group_id = group_id
self.post_id = post_id
class sessions (base):
class sessions (base): # pylint: disable=too-few-public-methods, C0103
"""
Session table
Joins with user
"""
__tablename__ = 'sessions'
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey('users.id'))
session_uuid = Column(String, nullable=False)
ip = Column(String, nullable=False)
ip_address = Column(String, nullable=False)
user_agent = Column(String, nullable=False)
active = Column(Boolean, nullable=False)
created_at = Column(DateTime, nullable=False)
def __init__(self, user_id, session_uuid, ip, user_agent, active):
def __init__(self, user_id, session_uuid, ip_address, user_agent, active): # pylint: disable=too-many-arguments, C0103
self.user_id = user_id
self.session_uuid = session_uuid
self.ip = ip
self.ip_address = ip_address
self.user_agent = user_agent
self.active = active
self.created_at = datetime.now()
class logs (base):
class logs (base): # pylint: disable=too-few-public-methods, C0103
"""
Log table
Joins with user
"""
__tablename__ = 'logs'
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey('users.id'))
ip = Column(String, nullable=False)
ip_address = Column(String, nullable=False)
code = Column(Integer, nullable=False)
msg = Column(String, nullable=False)
created_at = Column(DateTime, nullable=False)
def __init__(self, user_id, ip, code, msg):
def __init__(self, user_id, ip_address, code, msg):
self.user_id = user_id
self.ip = ip
self.ip_address = ip_address
self.code = code
self.msg = msg
self.created_at = datetime.now()
class bans (base):
class bans (base): # pylint: disable=too-few-public-methods, C0103
"""
Bans table
"""
__tablename__ = 'bans'
id = Column(Integer, primary_key=True)
ip = Column(String, nullable=False)
ip_address = Column(String, nullable=False)
code = Column(Integer, nullable=False)
msg = Column(String, nullable=False)
created_at = Column(DateTime, nullable=False)
def __init__(self, ip, code, msg):
self.ip = ip
def __init__(self, ip_address, code, msg):
self.ip_address = ip_address
self.code = code
self.msg = msg
self.created_at = datetime.now()
base.metadata.create_all(engine)
base.metadata.create_all(engine)

View file

@ -1,112 +0,0 @@
import logging
import os
from datetime import datetime
import platformdirs
# Prevent werkzeug from logging
logging.getLogger('werkzeug').disabled = True
class logger:
def innit_logger():
filepath = os.path.join(platformdirs.user_config_dir('onlylegs'), 'logs')
#filename = f'onlylogs_{datetime.now().strftime("%Y%m%d")}.log'
filename = 'only.log'
if not os.path.isdir(filepath):
os.mkdir(filepath)
logging.basicConfig(
filename=os.path.join(filepath, filename),
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S',
format=
'%(asctime)s %(levelname)s %(name)s %(threadName)s : %(message)s',
encoding='utf-8')
"""
Login and Auth error codes
--------------------------
100: Login
101: Login attempt
102: Login attempt (password error)
103: Logout
104: Registration
105: Auth error
Account error codes - User actions
----------------------------------
200: Account password reset
201: Account email change
202: Account delete
203: Account error
Image error codes
-----------------
300: Image upload
301: Image delete
302: Image edit
303: Image error
Group error codes
-----------------
400: Group create
401: Group delete
402: Group edit
403: Group error
User error codes - Admin actions
--------------------------------
500: User delete
501: User edit
502: User ban
503: User unban
504: User permission change
505: User error
Server and Website errors - Internal
------------------------------------
600: Server error
601: Server crash
602: Website error
603: Website crash
604: Maintenance
605: Startup
606: Other
621: :3
"""
def add(error, message):
# Allowed error codes, as listed above
log_levels = [
100, 101, 102, 103, 104, 105, 200, 201, 202, 203, 300, 301, 302,
303, 400, 401, 402, 403, 500, 501, 502, 503, 504, 505
]
if error in log_levels:
logging.log(logging.INFO, f'[{error}] {message}')
else:
logging.log(logging.WARN, f'[606] Improper use of error code {error}')
def server(error, message):
log_levels = {
600: logging.ERROR,
601: logging.CRITICAL,
602: logging.ERROR,
603: logging.CRITICAL,
604: logging.DEBUG,
605: logging.DEBUG,
606: logging.INFO,
621: logging.INFO,
}
if error in log_levels:
logging.log(log_levels[error], f'[{error}] {message}')
else:
logging.log(logging.WARN, f'[606] Invalid error code {error}')
def filename():
handler = logging.getLogger().handlers[0]
filename = handler.baseFilename
return filename

View file

@ -1,23 +1,27 @@
from flask import Blueprint, render_template, current_app
from werkzeug.exceptions import abort
from werkzeug.utils import secure_filename
from gallery.auth import login_required
from . import db
from sqlalchemy.orm import sessionmaker
db_session = sessionmaker(bind=db.engine)
db_session = db_session()
from . import metadata as mt
"""
Onlylegs Gallery - Routing
"""
import os
from flask import Blueprint, render_template, current_app
from werkzeug.exceptions import abort
from sqlalchemy.orm import sessionmaker
from . import db
from . import metadata as mt
blueprint = Blueprint('gallery', __name__)
db_session = sessionmaker(bind=db.engine)
db_session = db_session()
@blueprint.route('/')
def index():
"""
Home page of the website, shows the feed of latest images
"""
images = db_session.query(db.posts).order_by(db.posts.id.desc()).all()
return render_template('index.html',
@ -26,10 +30,12 @@ def index():
name=current_app.config['WEBSITE']['name'],
motto=current_app.config['WEBSITE']['motto'])
@blueprint.route('/image/<int:id>')
def image(id):
img = db_session.query(db.posts).filter_by(id=id).first()
@blueprint.route('/image/<int:image_id>')
def image(image_id):
"""
Image view, shows the image and its metadata
"""
img = db_session.query(db.posts).filter_by(id=image_id).first()
if img is None:
abort(404)
@ -39,28 +45,30 @@ def image(id):
return render_template('image.html', image=img, exif=exif)
@blueprint.route('/group')
def groups():
"""
Group overview, shows all image groups
"""
return render_template('group.html', group_id='gwa gwa')
@blueprint.route('/group/<int:id>')
def group(id):
return render_template('group.html', group_id=id)
@blueprint.route('/upload')
@login_required
def upload():
return render_template('upload.html')
@blueprint.route('/group/<int:group_id>')
def group(group_id):
"""
Group view, shows all images in a group
"""
return render_template('group.html', group_id=group_id)
@blueprint.route('/profile')
def profile():
"""
Profile overview, shows all profiles on the onlylegs gallery
"""
return render_template('profile.html', user_id='gwa gwa')
@blueprint.route('/profile/<int:id>')
def profile_id(id):
return render_template('profile.html', user_id=id)
@blueprint.route('/profile/<int:user_id>')
def profile_id(user_id):
"""
Shows user ofa given id, displays their uploads and other info
"""
return render_template('profile.html', user_id=user_id)

View file

@ -1,67 +0,0 @@
import datetime
now = datetime.datetime.now()
import sys
import shutil
import os
import sass
class compile():
def __init__(self, theme, dir):
print(f"Loading '{theme}' theme...")
theme_path = os.path.join(dir, 'themes', theme)
font_path = os.path.join(dir, 'themes', theme, 'fonts')
dest = os.path.join(dir, 'static', 'theme')
# print(f"Theme path: {theme_path}")
if os.path.exists(theme_path):
if os.path.exists(os.path.join(theme_path, 'style.scss')):
theme_path = os.path.join(theme_path, 'style.scss')
elif os.path.exists(os.path.join(theme_path, 'style.sass')):
theme_path = os.path.join(theme_path, 'style.sass')
else:
print("Theme does not contain a style file!")
sys.exit(1)
self.sass = sass
self.loadTheme(theme_path, dest)
self.loadFonts(font_path, dest)
else:
print("No theme found!")
sys.exit(1)
print(f"{now.hour}:{now.minute}:{now.second} - Done!\n")
def loadTheme(self, theme, dest):
with open(os.path.join(dest, 'style.css'), 'w') as f:
try:
f.write(
self.sass.compile(filename=theme,
output_style='compressed'))
print("Compiled successfully!")
except self.sass.CompileError as e:
print("Failed to compile!\n", e)
sys.exit(1)
def loadFonts(self, source, dest):
dest = os.path.join(dest, 'fonts')
if os.path.exists(dest):
print("Updating fonts...")
try:
shutil.rmtree(dest)
except Exception as e:
print("Failed to remove old fonts!\n", e)
sys.exit(1)
try:
shutil.copytree(source, dest)
# print("Copied fonts to:", dest)
print("Copied new fonts!")
except Exception as e:
print("Failed to copy fonts!\n", e)
sys.exit(1)

View file

@ -1,31 +1,42 @@
from flask import Blueprint, render_template, url_for
from werkzeug.exceptions import abort
"""
OnlyLegs - Settings page
"""
from flask import Blueprint, render_template
from gallery.auth import login_required
from datetime import datetime
now = datetime.now()
blueprint = Blueprint('settings', __name__, url_prefix='/settings')
@blueprint.route('/')
@login_required
def general():
"""
General settings page
"""
return render_template('settings/general.html')
@blueprint.route('/server')
@login_required
def server():
"""
Server settings page
"""
return render_template('settings/server.html')
@blueprint.route('/account')
@login_required
def account():
"""
Account settings page
"""
return render_template('settings/account.html')
@blueprint.route('/logs')
@login_required
def logs():
return render_template('settings/logs.html')
"""
Logs settings page
"""
return render_template('settings/logs.html')

View file

@ -1,49 +1,66 @@
# Import dependencies
import platformdirs
"""
OnlyLegs - Setup
Runs when the app detects that there is no user directory
"""
import os
import sys
import platformdirs
import yaml
class setup:
def __init__(self):
self.user_dir = platformdirs.user_config_dir('onlylegs')
USER_DIR = platformdirs.user_config_dir('onlylegs')
class SetupApp:
"""
Setup the application on first run
"""
def __init__(self):
"""
Main setup function
"""
print("Running setup...")
if not os.path.exists(self.user_dir):
if not os.path.exists(USER_DIR):
self.make_dir()
if not os.path.exists(os.path.join(self.user_dir, '.env')):
if not os.path.exists(os.path.join(USER_DIR, '.env')):
self.make_env()
if not os.path.exists(os.path.join(self.user_dir, 'conf.yml')):
if not os.path.exists(os.path.join(USER_DIR, 'conf.yml')):
self.make_yaml()
def make_dir(self):
"""
Create the user directory
"""
try:
os.makedirs(self.user_dir)
os.makedirs(os.path.join(self.user_dir, 'instance'))
print("Created user directory at:", self.user_dir)
except Exception as e:
print("Error creating user directory:", e)
exit(1) # exit with error code
os.makedirs(USER_DIR)
os.makedirs(os.path.join(USER_DIR, 'instance'))
print("Created user directory at:", USER_DIR)
except Exception as err:
print("Error creating user directory:", err)
sys.exit(1) # exit with error code
def make_env(self):
# Create .env file with default values
"""
Create the .env file with default values
"""
env_conf = {
'FLASK_SECRETE': 'dev',
}
try:
with open(os.path.join(self.user_dir, '.env'), 'w') as f:
with open(os.path.join(USER_DIR, '.env'), encoding='utf-8') as file:
for key, value in env_conf.items():
f.write(f"{key}={value}\n")
file.write(f"{key}={value}\n")
print("Created environment variables")
except Exception as e:
print("Error creating environment variables:", e)
exit(1)
except Exception as err:
print("Error creating environment variables:", err)
sys.exit(1)
print("Generated default .env file. EDIT IT BEFORE RUNNING THE APP AGAIN!")
def make_yaml(self):
# Create yaml config file with default values
"""
Create the YAML config file with default values
"""
yaml_conf = {
'admin': {
'name': 'Real Person',
@ -71,11 +88,11 @@ class setup:
},
}
try:
with open(os.path.join(self.user_dir, 'conf.yml'), 'w') as f:
yaml.dump(yaml_conf, f, default_flow_style=False)
with open(os.path.join(USER_DIR, 'conf.yml'), encoding='utf-8') as file:
yaml.dump(yaml_conf, file, default_flow_style=False)
print("Created default gallery config")
except Exception as e:
print("Error creating default gallery config:", e)
exit(1)
print("Generated default YAML config. EDIT IT BEFORE RUNNING THE APP AGAIN!")
except Exception as err:
print("Error creating default gallery config:", err)
sys.exit(1)
print("Generated default YAML config. EDIT IT BEFORE RUNNING THE APP AGAIN!")

78
gallery/theme_manager.py Normal file
View file

@ -0,0 +1,78 @@
"""
OnlyLegs - Theme Manager
"""
import os
import sys
import shutil
from datetime import datetime
import sass
class CompileTheme():
"""
Compiles the theme into the static folder
"""
def __init__(self, theme_name, app_path):
"""
Initialize the theme manager
Compiles the theme into the static folder and loads the fonts
"""
print(f"Loading '{theme_name}' theme...")
theme_path = os.path.join(app_path, 'themes', theme_name)
theme_dest = os.path.join(app_path, 'static', 'theme')
if not os.path.exists(theme_path):
print("Theme does not exist!")
sys.exit(1)
self.load_sass(theme_path, theme_dest)
self.load_fonts(theme_path, theme_dest)
now = datetime.now()
print(f"{now.hour}:{now.minute}:{now.second} - Done!\n")
def load_sass(self, source_path, css_dest):
"""
Compile the sass (or scss) file into css and save it to the static folder
"""
if os.path.join(source_path, 'style.sass'):
sass_path = os.path.join(source_path, 'style.sass')
elif os.path.join(source_path, 'style.scss'):
sass_path = os.path.join(source_path, 'style.scss')
else:
print("No sass file found!")
sys.exit(1)
with open(os.path.join(css_dest, 'style.css'), encoding='utf-8') as file:
try:
file.write(sass.compile(filename=sass_path,output_style='compressed'))
except sass.CompileError as err:
print("Failed to compile!\n", err)
sys.exit(1)
print("Compiled successfully!")
def load_fonts(self, source_path, font_dest):
"""
Copy the fonts folder to the static folder
"""
# Append fonts to the destination path
source_path = os.path.join(source_path, 'fonts')
font_dest = os.path.join(font_dest, 'fonts')
if os.path.exists(font_dest):
print("Updating fonts...")
try:
shutil.rmtree(font_dest)
except Exception as err:
print("Failed to remove old fonts!\n", err)
sys.exit(1)
try:
shutil.copytree(source_path, font_dest)
print("Copied new fonts!")
except Exception as err:
print("Failed to copy fonts!\n", err)
sys.exit(1)

6
poetry.lock generated
View file

@ -465,14 +465,14 @@ tests = ["check-manifest", "coverage", "defusedxml", "markdown2", "olefile", "pa
[[package]]
name = "platformdirs"
version = "3.0.0"
version = "3.1.0"
description = "A small Python package for determining appropriate platform-specific dirs, e.g. a \"user data dir\"."
category = "main"
optional = false
python-versions = ">=3.7"
files = [
{file = "platformdirs-3.0.0-py3-none-any.whl", hash = "sha256:b1d5eb14f221506f50d6604a561f4c5786d9e80355219694a1b244bcd96f4567"},
{file = "platformdirs-3.0.0.tar.gz", hash = "sha256:8a1228abb1ef82d788f74139988b137e78692984ec7b08eaa6c65f1723af28f9"},
{file = "platformdirs-3.1.0-py3-none-any.whl", hash = "sha256:13b08a53ed71021350c9e300d4ea8668438fb0046ab3937ac9a29913a1a1350a"},
{file = "platformdirs-3.1.0.tar.gz", hash = "sha256:accc3665857288317f32c7bebb5a8e482ba717b474f3fc1d18ca7f9214be0cef"},
]
[package.extras]

View file

@ -24,3 +24,9 @@ SQLAlchemy = "^2.0.3"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
[tool.pylint.messages_control]
# C0415: Flask uses it to register blueprints
# W1401: Anomalous backslash in string used in __init__
# W0718: Exception are logged so we don't need to raise them
disable = "C0415, W1401, W0718"