diff --git a/src/models.py_backup b/src/models.py_backup deleted file mode 100644 index ded23a6..0000000 --- a/src/models.py_backup +++ /dev/null @@ -1,156 +0,0 @@ -from flask_sqlalchemy import SQLAlchemy -from datetime import datetime -from werkzeug.security import generate_password_hash, check_password_hash -import os -from config import UPLOAD_FOLDER -from flask_login import UserMixin - -db = SQLAlchemy() - -class Favorite(db.Model): - id = db.Column(db.Integer, primary_key=True) - user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) - paste_id = db.Column(db.Integer, db.ForeignKey('paste.id'), nullable=False) - created_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow) - - user = db.relationship('User', backref=db.backref('favorites', lazy=True)) - paste = db.relationship('Paste', backref=db.backref('favorited_by', lazy=True)) - - paste_favorites = db.Table( - 'paste_favorites', - db.Column('user_id', db.Integer, db.ForeignKey('user.id'), primary_key=True), - db.Column('paste_id', db.Integer, db.ForeignKey('paste.id'), primary_key=True) - ) -class User(db.Model, UserMixin): - id = db.Column(db.Integer, primary_key=True) - username = db.Column(db.String(80), unique=True, nullable=False) - password_hash = db.Column(db.String(128), nullable=False) - role = db.Column(db.String(10), nullable=False, default='user') - storage_used = db.Column(db.BigInteger, nullable=False, default=0) # En bytes - storage_limit = db.Column(db.BigInteger, nullable=False, default=1 * 1024**3) - theme_preference = db.Column(db.String(10), default="light") # Guardar "light" o "dark" - def set_password(self, password): - self.password_hash = generate_password_hash(password) - - def check_password(self, password): - return check_password_hash(self.password_hash, password) - - def has_unlimited_storage(self): - return self.storage_limit == -1 - - def get_storage_limit(self): - """Devuelve el límite de almacenamiento del usuario.""" - return self.storage_limit - -shared_pastes = db.Table( - 'shared_pastes', - db.Column('paste_id', db.Integer, db.ForeignKey('paste.id'), primary_key=True), - db.Column('user_id', db.Integer, db.ForeignKey('user.id'), primary_key=True), - db.Column('can_edit', db.Boolean, default=False) # Nuevo campo para indicar permiso de edición -) - -# Tabla intermedia para usuarios con permisos de edición -paste_editors = db.Table( - 'paste_editors', - db.Column('paste_id', db.Integer, db.ForeignKey('paste.id'), primary_key=True), - db.Column('user_id', db.Integer, db.ForeignKey('user.id'), primary_key=True) -) - -class Paste(db.Model): - id = db.Column(db.Integer, primary_key=True) - title = db.Column(db.String(255), nullable=True) - editable = db.Column(db.Boolean, default=True) - content_type = db.Column(db.String(50), nullable=False) - filename = db.Column(db.String(255), nullable=True) - owner_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) - created_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow) - expires_at = db.Column(db.DateTime, nullable=True) # Nuevo campo de expiración - language = db.Column(db.String(50), nullable=True) - last_edited_at = db.Column(db.DateTime, nullable=True) - user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) - size = db.Column(db.Integer, nullable=True) - - # Relaciones - editors = db.relationship('User', secondary=paste_editors, backref=db.backref('editable_pastes', lazy='dynamic')) - owner = db.relationship('User', backref=db.backref('owned_pastes', lazy=True), foreign_keys=[owner_id]) - favorites = db.relationship('User', secondary='paste_favorites', backref=db.backref('favorite_pastes', lazy='dynamic')) - shared_with = db.relationship('User', secondary=shared_pastes, backref=db.backref('shared_pastes', lazy='dynamic')) - - owner = db.relationship( - 'User', - backref=db.backref('owned_pastes', lazy=True), - foreign_keys=[owner_id] - ) - favorites = db.relationship( - 'User', - secondary='paste_favorites', - backref=db.backref('favorite_pastes', lazy='dynamic') - ) - def is_expired(self): - """ Verifica si el paste ha expirado. """ - return self.expires_at and datetime.utcnow() > self.expires_at - def to_dict(self): - return { - "id": self.id, - "content_type": self.content_type, - "filename": self.filename, - "owner_id": self.owner_id, - "created_at": self.created_at.isoformat(), - "language": self.language, - "user_id": self.user_id, - "size": self.file_size - } - - - def get_size(self): - file_path = os.path.join(UPLOAD_FOLDER, self.filename) - try: - return os.path.getsize(file_path) # Devuelve el tamaño del archivo - except FileNotFoundError: - return 0 - - def get_type(self): - # Determinar el tipo de archivo basado en content_type - if self.content_type.startswith("video"): - return "Video" - elif self.content_type.startswith("audio"): - return "Audio" - elif self.content_type.startswith("image"): - return "Image" - elif self.content_type in ["application/zip", "application/x-tar", "application/gzip", "application/x-bzip2", "application/x-7z-compressed", "application/x-rar-compressed"]: - return "Compressed" - elif self.content_type.startswith("text"): - if self.language: - return f"Text ({self.language})" - return "Text" - else: - return "Other" - - def has_edit_permission(self, user): - """ - Verifica si un usuario tiene permiso de edición en este paste. - """ - # Si el usuario no está autenticado, no puede tener permisos - if not user.is_authenticated: - print(f"[DEBUG] Unauthenticated user tried to edit paste {self.id}. Permission denied.") - return False - - # El dueño del paste siempre tiene permisos - if user.id == self.owner_id: - print(f"[DEBUG] User {user.id} is the owner of paste {self.id}. Permission granted.") - return True - - # Verificar si el usuario tiene permisos en `shared_pastes` - shared_entry = db.session.query(shared_pastes).filter_by( - paste_id=self.id, - user_id=user.id, - can_edit=True - ).first() - - if shared_entry: - print(f"[DEBUG] User {user.id} has edit permissions for paste {self.id}.") - else: - print(f"[DEBUG] User {user.id} does NOT have edit permissions for paste {self.id}.") - - return shared_entry is not None - diff --git a/src/routes.py_backup b/src/routes.py_backup deleted file mode 100644 index aa1a9fc..0000000 --- a/src/routes.py_backup +++ /dev/null @@ -1,2586 +0,0 @@ -import smtplib -from email.mime.text import MIMEText -import os -import mimetypes -from flask import request, jsonify, send_from_directory, abort, render_template, redirect, url_for, flash, session -from src.models import db, Paste, User, shared_pastes -from src.auth import generate_token -from config import UPLOAD_FOLDER, SMTP_SERVER, SMTP_PORT, SMTP_USERNAME, SMTP_PASSWORD, SMTP_USE_TLS, SMTP_USE_SSL, ROLE_STORAGE_LIMITS -from pygments import highlight -from pygments.lexers import guess_lexer, get_lexer_by_name, guess_lexer_for_filename -from pygments.formatters import HtmlFormatter -from pygments.util import ClassNotFound -from sqlalchemy import func -import magic -from werkzeug.utils import secure_filename -import logging -from io import BytesIO -from flask import send_file, Response -import uuid -from pymediainfo import MediaInfo -import secrets -from sqlalchemy.exc import SQLAlchemyError -from pygments.styles import get_all_styles -from datetime import datetime -from flask_login import login_required -from flask_login import login_user -from flask_login import logout_user -from werkzeug.security import check_password_hash -from src.auth import jwt_required -from flask_login import current_user -from markdown import markdown -from collections import defaultdict -import jwt -from flask import current_app -from elasticsearch import Elasticsearch -from datetime import datetime -from src.models import Favorite -from config import UPLOAD_FOLDER -from rapidfuzz import process, fuzz -from sqlalchemy import or_ -import json -from datetime import datetime, timedelta - -es = Elasticsearch(hosts=["http://elasticsearch:9200"]) - -def register_error_handlers(app): - @app.errorhandler(404) - def not_found_error(error): - return render_template("errors/404.html"), 404 - - @app.errorhandler(500) - def internal_error(error): - return render_template("errors/500.html"), 500 - - @app.errorhandler(403) - def forbidden_error(error): - return render_template("errors/403.html"), 403 - -def delete_expired_pastes(): - """Elimina los pastes expirados de la base de datos y Elasticsearch.""" - now = datetime.utcnow() - - # Obtener pastes expirados - expired_pastes = Paste.query.filter(Paste.expires_at < now).all() - - if not expired_pastes: - print("[INFO] No expired pastes found.") - return - - for paste in expired_pastes: - try: - # Eliminar de Elasticsearch - es.delete(index="pastes", id=paste.id, ignore=[404]) - print(f"[INFO] Deleted paste {paste.id} from Elasticsearch.") - except Exception as e: - print(f"[ERROR] Failed to delete paste {paste.id} from Elasticsearch: {e}") - - # Eliminar de la base de datos - delete_paste_from_index(paste) - db.session.delete(paste) - - db.session.commit() - print(f"[INFO] Deleted {len(expired_pastes)} expired pastes from the database.") - - -def calculate_storage_used(user_id): - """ - Calcula el almacenamiento utilizado por el usuario. - - :param user_id: ID del usuario. - :return: Almacenamiento utilizado en MB. - """ - total = db.session.query(db.func.sum(Paste.size)).filter_by(user_id=user_id).scalar() - total = total if total else 0 - mb = total / (1024 ** 2) # Convertir bytes a MB - return mb - - - -def get_shared_pastes(user, paste_filters=None, page=1, per_page=10): - """ - Obtiene los pastes compartidos con el usuario especificado, aplicando filtros si es necesario. - - :param user: Objeto de usuario actual. - :param paste_filters: Lista de filtros adicionales para aplicar a la consulta. - :param page: Número de página para la paginación. - :param per_page: Número de elementos por página. - :return: Objeto de paginación con los pastes compartidos. - """ - query = Paste.query.join(shared_pastes).filter(shared_pastes.c.user_id == user.id) - - if paste_filters: - query = query.filter(*paste_filters) - - query = query.order_by(Paste.created_at.desc()) - - pagination = query.paginate(page=page, per_page=per_page, error_out=False) - - return pagination - - -def delete_paste_from_index(paste): - try: - es.delete(index='pastes', id=paste.id) - print(f"Deleted paste {paste.id} from index.") - except Exception as e: - print(f"Error deleting paste {paste.id} from index: {e}") - -def calculate_stats(user_id=None, start_date=None, end_date=None): - query = Paste.query - - if user_id is not None: - query = query.filter_by(user_id=user_id) - - if start_date: - if isinstance(start_date, str): - start_date = datetime.strptime(start_date, '%Y-%m-%d') - query = query.filter(Paste.created_at >= start_date) - - if end_date: - if isinstance(end_date, str): - end_date = datetime.strptime(end_date, '%Y-%m-%d').replace(hour=23, minute=59, second=59) - query = query.filter(Paste.created_at <= end_date) - - pastes = query.all() - - stats = { - "total_pastes": len(pastes), - "total_size": sum(paste.size for paste in pastes), - "total_text_pastes": query.filter(Paste.content_type.like('text/%')).count(), - "total_file_pastes": query.filter(Paste.content_type.notlike('text/%')).count(), - "total_media_pastes": query.filter( - or_( - Paste.content_type.like('image/%'), - Paste.content_type.like('video/%'), - Paste.content_type.like('audio/%') - ) - ).count(), - "total_compressed_pastes": query.filter( - Paste.content_type.in_([ - "application/zip", "application/x-tar", "application/gzip", - "application/x-bzip2", "application/x-7z-compressed", "application/x-rar-compressed" - ]) - ).count(), - "languages": list(set(paste.language for paste in pastes if paste.language)), - "counts_text": [], - "counts_file": [], - "counts_media": [], - "counts_compressed": [], - "pastes": pastes # Lista de pastes - } - - # Agrupar estadísticas por lenguaje y tipo - language_groups = query.with_entities(Paste.language, Paste.content_type).all() - language_counts = {} - for lang, ctype in language_groups: - if lang not in language_counts: - language_counts[lang] = {"text": 0, "file": 0, "media": 0} - if ctype.startswith("text/"): - language_counts[lang]["text"] += 1 - elif ctype.startswith(("image/", "video/", "audio/")): - language_counts[lang]["media"] += 1 - else: - language_counts[lang]["file"] += 1 - - stats["counts_text"] = [language_counts[lang]["text"] for lang in stats["languages"]] - stats["counts_file"] = [language_counts[lang]["file"] for lang in stats["languages"]] - stats["counts_media"] = [language_counts[lang]["media"] for lang in stats["languages"]] - - return stats - - -def index_paste(paste): - try: - file_path = os.path.join("/app/uploads", paste.filename) - with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: - content = f.read() - - doc = { - "id": paste.id, - "title": paste.title or paste.filename, - "content": content, - "owner_id": paste.owner_id, - "created_at": paste.created_at.isoformat(), - } - es.index(index="pastes", id=paste.id, document=doc) - except Exception as e: - current_app.logger.error(f"Error indexing paste {paste.id}: {e}") - -def get_current_user(): - user_id = session.get('user_id') - if not user_id: - return None - # Supongamos que tienes un modelo User para buscar el usuario - return User.query.get(user_id) - -logging.basicConfig( - level=logging.DEBUG, # Cambiar a INFO si no quieres mensajes de depuración - format="%(asctime)s - %(levelname)s - %(message)s", - handlers=[ - logging.StreamHandler() # Envía los logs a la consola - ] -) - -# Define la función highlight_code -def highlight_code(content, language=None, filename=None): - if language: - try: - lexer = get_lexer_by_name(language) - except: - lexer = guess_lexer(content) - else: - if filename: - lexer = guess_lexer_for_filename(filename, content) - else: - lexer = guess_lexer(content) - - formatter = HtmlFormatter(linenos=True, cssclass="highlight") - html_code = highlight(content, lexer, formatter) - - return html_code - -MEDIA_MIME_TYPES = ( - 'image/', - 'video/', - 'audio/', - 'application/pdf', -) - -# Tipos MIME basados en texto que no comienzan con 'text/' -TEXT_BASED_APPLICATION_MIME_TYPES = ( - 'application/json', - 'application/javascript', - 'application/xml', - 'application/xhtml+xml', - 'application/sql', - 'text/xml', - # Agrega otros tipos basados en texto según tus necesidades -) - -LANGUAGE_TO_EXTENSION = { - "python": "py", - "javascript": "js", - "java": "java", - "csharp": "cs", - "cpp": "cpp", - "ruby": "rb", - "go": "go", - "html": "html", - "css": "css", - "php": "php", - "swift": "swift", - "kotlin": "kt", - "rust": "rs", - "typescript": "ts", - "bash": "sh", - "plaintext": "txt", - "sql": "sql", - "json": "json", - "yaml": "yaml", - "xml": "xml" -} - -# Mapeo personalizado de extensiones a lenguajes -EXTENSION_TO_LANGUAGE = { - ".sh": "bash", - ".bash": "bash", - ".py": "python", - ".json": "json", - ".js": "javascript", - ".ts": "typescript", - ".sql": "sql", - ".html": "html", - ".css": "css", - ".java": "java", - ".c": "c", - ".cpp": "cpp", - ".rb": "ruby", - ".go": "go", - ".png": "image", - ".jpg": "image", - ".jpeg": "image", - ".gif": "image", - ".mp4": "video", - ".avi": "video", - ".mp3": "audio", - ".webm": "video", - ".mov": "video", - ".mkv": "video", - ".pdf": "pdf", - ",log": "plaintext", - # Agrega más mapeos según tus necesidades -} - -FILENAME_TO_LANGUAGE = { - "PKGBUILD": "bash", - # Puedes añadir más archivos específicos aquí - # "INSTALL": "bash", - # "configure": "bash", -} - - -mime = magic.Magic(mime=True, uncompress=True) - -import os -import logging -from pygments.lexers import guess_lexer -from pygments.util import ClassNotFound - -# Definir MIME types y extensiones de archivos comprimidos -COMPRESSED_MIME_TYPES = { - "application/zip": "compressed", - "application/gzip": "compressed", - "application/x-tar": "compressed", - "application/x-bzip2": "compressed", - "application/x-7z-compressed": "compressed" -} - -COMPRESSED_EXTENSIONS = { - ".zip": "compressed", - ".gz": "compressed", - ".tgz": "compressed", - ".tar": "compressed", - ".bz2": "compressed", - ".7z": "compressed" -} - -import os -import logging -from pygments.lexers import guess_lexer -from pygments.util import ClassNotFound - -# Definir MIME types y extensiones de archivos comprimidos -COMPRESSED_MIME_TYPES = { - "application/zip": "compressed", - "application/gzip": "compressed", - "application/x-tar": "compressed", - "application/x-bzip2": "compressed", - "application/x-7z-compressed": "compressed" -} - -COMPRESSED_EXTENSIONS = { - ".zip": "compressed", - ".gz": "compressed", - ".tgz": "compressed", - ".tar": "compressed", - ".bz2": "compressed", - ".7z": "compressed" -} - -def detect_language(content, unique_filename=None, original_filename=None, detected_mime_type=None): - """ - Detecta el lenguaje del contenido utilizando el nombre del archivo original, extensión o MIME type. - - :param content: Contenido del paste (str) o None. - :param unique_filename: Nombre de archivo único (UUID) si aplica. - :param original_filename: Nombre de archivo original (e.g., PKGBUILD) si aplica. - :param detected_mime_type: Tipo MIME detectado por magic. - :return: Lenguaje detectado (str) o 'compressed' si es un archivo comprimido. - """ - - # Verificar si el archivo es comprimido basándose en MIME type - if detected_mime_type and detected_mime_type in COMPRESSED_MIME_TYPES: - logging.info(f"MIME type '{detected_mime_type}' detectado como archivo comprimido.") - return COMPRESSED_MIME_TYPES[detected_mime_type] - - # Verificar si el nombre del archivo original está en FILENAME_TO_LANGUAGE - if original_filename: - base_name = os.path.basename(original_filename) - if base_name.upper() in FILENAME_TO_LANGUAGE: - language = FILENAME_TO_LANGUAGE[base_name.upper()] - logging.info(f"Filename '{base_name}' mapeado a lenguaje '{language}'") - return language - - # Detectar por extensión usando unique_filename - if unique_filename: - ext = os.path.splitext(unique_filename)[1].lower() - if ext in COMPRESSED_EXTENSIONS: - logging.info(f"Extension '{ext}' detectada como archivo comprimido.") - return COMPRESSED_EXTENSIONS[ext] - - if ext in EXTENSION_TO_LANGUAGE: - logging.info(f"Extension '{ext}' mapeada a lenguaje '{EXTENSION_TO_LANGUAGE[ext]}'") - return EXTENSION_TO_LANGUAGE[ext] - - # Si no hay mapeo por extensión, usar magic para detectar el lenguaje - if detected_mime_type: - MIME_TO_LANGUAGE = { - "application/json": "json", - "application/x-shellscript": "bash", - "application/javascript": "javascript", - "text/plain": "plaintext", - "text/xml": "xml", - "text/html": "html", - "text/css": "css", - "text/x-python": "python", - "text/x-c": "c", - "text/x-c++": "cpp", - "text/x-java": "java", - "text/x-php": "php", - "text/x-shellscript": "bash", - "application/sql": "sql", - "application/x-yaml": "yaml", - "application/x-toml": "toml", - "text/x-markdown": "markdown", - "text/markdown": "markdown", - "text/x-lua": "lua", - "text/x-rust": "rust", - "application/x-perl": "perl", - "text/x-ruby": "ruby", - } - if detected_mime_type in MIME_TO_LANGUAGE: - logging.info(f"MIME type '{detected_mime_type}' mapeado a lenguaje '{MIME_TO_LANGUAGE[detected_mime_type]}'") - return MIME_TO_LANGUAGE[detected_mime_type] - - # Fallback a Pygments solo si content no es None - if content: - try: - lexer = guess_lexer(content) - language = lexer.aliases[0] - logging.info(f"Pygments detectó el lenguaje como '{language}'") - return language - except ClassNotFound: - logging.warning("Pygments no pudo detectar el lenguaje. Asignando 'plaintext'") - return "plaintext" - - # Si content es None y no hay mapeo, asignar 'unknown' - logging.warning("Content es None y no hay mapeo de lenguaje. Asignando 'unknown'") - return "unknown" - -def init_routes(app): - @app.route('/file/', methods=['GET']) - def get_file(filename): - file_path = os.path.join(UPLOAD_FOLDER, filename) - - if not os.path.exists(file_path): - abort(404, description="file not found") - - mime_type = mimetypes.guess_type(file_path)[0] or 'application/octet-stream' - return send_from_directory(UPLOAD_FOLDER, filename, mimetype=mime_type) - - - @app.route('/update-theme', methods=['POST']) - def update_theme(): - data = request.get_json() - theme = data.get('theme') - if theme not in ['light', 'dark']: - return jsonify({'error': 'Invalid theme'}), 400 - session['theme'] = theme - return jsonify({'message': 'Theme updated successfully'}), 200 - - - @app.route('/', methods=['GET']) - def index(): - base_url = request.host_url.rstrip('/') - user_name = None - - # Supongamos que el usuario está guardado en la sesión con su ID - if 'user_id' in session: - user = User.query.get(session['user_id']) # Recuperar el usuario de la base de datos - user_name = user.username if user else None # Obtener el nombre del usuario - - return render_template('index.html', base_url=base_url, user_name=user_name) - - - @app.route('/login', methods=['GET', 'POST']) - def login(): - if request.method == 'POST': - username = request.form.get('username') - password = request.form.get('password') - user = User.query.filter_by(username=username).first() - - if user and user.check_password(password): - login_user(user) # Autentica al usuario - flash("Login successful!", "success") - return redirect(url_for('user_dashboard')) - - flash("Invalid username or password", "danger") - return redirect(url_for('login')) - - return render_template('login.html') - - @app.route('/paste//json', methods=['GET']) - @jwt_required - def get_paste_json(id): - paste = Paste.query.get(id) - if not paste: - return jsonify({"error": "Paste not found"}), 404 - - response_data = { - "id": paste.id, - "filename": paste.filename, - "language": paste.language, - "content_type": paste.content_type, - "size": paste.size, - "created_at": paste.created_at.strftime('%Y-%m-%d %H:%M:%S') if paste.created_at else None, - "content": None - } - - # Solo incluye el contenido si existe como atributo - if hasattr(paste, 'content') and paste.content: - response_data["content"] = paste.content - elif paste.filename: - file_path = os.path.join(UPLOAD_FOLDER, paste.filename) - if os.path.exists(file_path): - with open(file_path, 'r', encoding='utf-8', errors='replace') as f: - response_data["content"] = f.read() - - return jsonify(response_data), 200 - - - @app.route('/paste//raw', methods=['GET']) - def get_paste_raw(id): - paste = Paste.query.get(id) - if not paste: - return jsonify({"error": "Paste not found"}), 404 - - file_path = os.path.join(UPLOAD_FOLDER, paste.filename) - if not os.path.exists(file_path): - return jsonify({"error": "File not found"}), 404 - - # Leer el contenido del archivo para devolverlo como texto sin formato - with open(file_path, 'r', encoding='utf-8', errors='replace') as f: - content = f.read() - - return content, 200, {'Content-Type': 'text/plain; charset=utf-8'} - - - - @app.route('/paste', methods=['POST']) - @jwt_required - def create_paste(): - """Crea un nuevo paste con detección automática de lenguaje, opciones de expiración y soporte para compartir.""" - file_obj = request.files.get('c') - user_language = request.form.get('lang', '').strip().lower() - expire = request.form.get('expire', 'yes').strip().lower() # Normaliza el input - share_with = request.form.getlist('share_with') # Lista de usuarios con los que se comparte - can_edit = request.form.get('can_edit', 'false').strip().lower() == 'true' # Convertir a booleano - - logging.info(f"User provided language: {user_language}") - - if not file_obj: - logging.warning("No file provided for the paste.") - return jsonify({"error": "No content provided"}), 400 - - filename = None - language = None - content_type = None - - # Obtener usuario autenticado - user = request.user - - # Leer el archivo antes de calcular el tamaño - file_content = file_obj.read() - file_size = len(file_content) - file_obj.seek(0) # Resetear el puntero del archivo - - # Validar límites de usuario - if user.role == 'user': - if file_size > 100 * 1024**2: # 100 MB por paste - logging.warning(f"User {user.username} tried to upload a file exceeding 100 MB.") - return jsonify({"error": "File exceeds the 100 MB limit for your role"}), 403 - if user.storage_used + file_size > user.storage_limit: # Límite de almacenamiento total - logging.warning(f"User {user.username} exceeded their total storage limit.") - return jsonify({"error": "You have exceeded your total storage limit"}), 403 - - try: - # Obtener la extensión del archivo - original_filename = file_obj.filename - file_extension = os.path.splitext(original_filename)[1].lower() - - # Generar un nombre único para el archivo - unique_filename = f"{uuid.uuid4().hex}{file_extension}" - file_path = os.path.join(UPLOAD_FOLDER, unique_filename) - - # Leer una muestra del archivo para analizar MIME - file_sample = file_content[:8192] - detected_mime_type = magic.Magic(mime=True).from_buffer(file_sample) - - logging.info(f"Detected MIME type: {detected_mime_type}") - logging.info(f"Original filename: {original_filename}, Extension: {file_extension}, Size: {file_size} bytes") - content_type = detected_mime_type - - # Guardar el archivo en el sistema de archivos - with open(file_path, 'wb') as f: - f.write(file_content) - logging.info(f"Saved paste content to: {file_path}") - - # **Corrección del problema `lang=yes/no`** - if user_language in ["yes", "no", ""]: - logging.warning(f"Invalid or missing language '{user_language}', using detection.") - language = detect_language( - content=file_content.decode(errors="ignore"), - unique_filename=unique_filename, - original_filename=original_filename, - detected_mime_type=detected_mime_type - ) - else: - language = user_language - logging.info(f"User specified language: {language}") - - filename = unique_filename - - # Actualizar el almacenamiento usado por el usuario - user.storage_used += file_size - db.session.commit() - - except Exception as e: - logging.error(f"Exception occurred: {str(e)}") - return jsonify({"error": f"Error processing the content: {str(e)}"}), 500 - - # **Manejo de Expiración** - expires_at = datetime.utcnow() + timedelta(days=1) if expire == 'yes' else None - - # **Crear registro del paste en la base de datos** - paste = Paste( - content_type=content_type, - filename=filename, - owner_id=user.id, - user_id=user.id, - language=language, - size=file_size, - expires_at=expires_at - ) - - try: - db.session.add(paste) - db.session.commit() - logging.info(f"Saved paste with ID: {paste.id}") - - # **Indexar contenido en Elasticsearch** - index_paste(paste) - - # **Compartir paste con otros usuarios** - shared_users = [] - if share_with: - for username in share_with: - shared_user = User.query.filter_by(username=username).first() - if shared_user and shared_user.id != user.id: - db.session.execute(shared_pastes.insert().values( - paste_id=paste.id, - user_id=shared_user.id, - can_edit=can_edit - )) - shared_users.append(username) - - db.session.commit() - logging.info(f"Paste {paste.id} shared with: {shared_users} (Editable: {can_edit})") - - except Exception as e: - db.session.rollback() - logging.error(f"Error saving paste: {str(e)}") - return jsonify({"error": f"Error saving paste: {str(e)}"}), 500 - - base_url = request.host_url.rstrip('/') - paste_url = f"{base_url}/paste/{paste.id}" - - logging.info(f"Paste created successfully. ID: {paste.id}, URL: {paste_url}") - return jsonify({ - "url": paste_url, - "language": language, - "expires_at": expires_at, - "shared_with": shared_users, - "can_edit": can_edit - }), 201 - - - @app.route('/paste/', methods=['GET']) - def get_paste(id): - paste = Paste.query.get(id) - if not paste: - logging.warning(f"Paste ID {id} not found.") - return render_template("errors/404.html", message="This paste has been deleted or has expired."), 404 - if current_user.is_authenticated: - can_edit = paste.has_edit_permission(current_user) - else: - can_edit = False - file_path = os.path.join(UPLOAD_FOLDER, paste.filename) if paste.filename else None - - # Recuperar usuarios con los que se compartió el paste (solo para el propietario) - shared_with = [] - if current_user.is_authenticated and paste.owner_id == current_user.id: - shared_with = db.session.query(User.username, shared_pastes.c.can_edit).join( - shared_pastes, User.id == shared_pastes.c.user_id - ).filter(shared_pastes.c.paste_id == paste.id).all() - # Definir los tipos MIME que consideras como binarios - BINARIO_MIME_TYPES = ( - 'font/', - 'model/', - # Agrega más tipos MIME según tus necesidades - ) - - # Tipos MIME basados en texto que no comienzan con 'text/' - TEXT_BASED_APPLICATION_MIME_TYPES = ( - 'application/json', - 'application/javascript', - 'application/xml', - 'application/xhtml+xml', - 'application/sql', - # Agrega otros tipos basados en texto según tus necesidades - ) - - # Tipos MIME multimedia - MEDIA_MIME_TYPES = ( - 'image/', - 'video/', - 'audio/', - 'application/pdf', - ) - - # Función para verificar si el MIME type es binario - def is_binary(mime_type): - return any(mime_type.startswith(prefix) for prefix in BINARIO_MIME_TYPES) - - # Función para verificar si el MIME type es multimedia - def is_media(mime_type): - return any(mime_type.startswith(prefix) for prefix in MEDIA_MIME_TYPES) - - # Determinar el tipo de contenido - if paste.content_type.startswith('text/') or paste.content_type in TEXT_BASED_APPLICATION_MIME_TYPES: - es_binario = False - es_media = False - elif is_media(paste.content_type): - es_binario = False - es_media = True - elif is_binary(paste.content_type): - es_binario = True - es_media = False - else: - # Por defecto, tratar como binario si no coincide con las anteriores - es_binario = True - es_media = False - - logging.debug(f"Content Type: {paste.content_type}") - logging.debug(f"Is binary: {es_binario}") - logging.debug(f"Is media: {es_media}") - - if es_binario: - if not paste.filename or not os.path.exists(file_path): - logging.error(f"File for paste ID {id} not found: {file_path}") - return jsonify({"error": "File not found"}), 404 - - try: - # Obtener información adicional - owner = User.query.get(paste.owner_id) - file_size = paste.size # Asegúrate de que el modelo Paste tenga el campo 'size' - - return render_template( - 'binary_view.html', - paste=paste, - filename=paste.filename, - mime_type=paste.content_type, - owner=owner, - size=file_size - ) - except Exception as e: - logging.error(f"Error while rendering binary view for paste ID {id}: {e}") - return jsonify({"error": "Error while processing the file"}), 500 - - elif es_media: - if not paste.filename or not os.path.exists(file_path): - logging.error(f"File for paste ID {id} not found: {file_path}") - return jsonify({"error": "File not found"}), 404 - - try: - # Extraer metadatos si es necesario - metadata = [] - if paste.content_type.startswith("image/"): - logging.info(f"Extracting metadata for image: {file_path}") - media_info = MediaInfo.parse(file_path) - for track in media_info.tracks: - track_data = {key: value for key, value in track.to_data().items() if value} - if track_data: - metadata.append(track_data) - - if not metadata: - metadata.append({"Info": "No metadata found in image file"}) - - elif paste.content_type.startswith(("video/", "audio/")): - logging.info(f"Extracting metadata for media: {file_path}") - media_info = MediaInfo.parse(file_path) - for track in media_info.tracks: - track_data = {key: value for key, value in track.to_data().items() if value} - if track_data: - metadata.append(track_data) - - if not metadata: - metadata.append({"Info": "No metadata found in media file"}) - - elif paste.content_type == "application/pdf": - logging.info(f"Extracting metadata for PDF: {file_path}") - # Implementa la extracción de metadatos para PDFs si es necesario - - return render_template( - 'media_view.html', - filename=paste.filename, - mime_type=paste.content_type, - metadata=metadata, - paste_id=paste.id - ), 200 - - except Exception as e: - logging.error(f"Error while rendering media view for paste ID {id}: {e}") - return jsonify({"error": "Error while processing the media file"}), 500 - - elif not es_binario and not es_media: - # Manejo de tipos de contenido soportados (texto) - try: - with open(file_path, 'r', encoding='utf-8', errors='replace') as f: - content = f.read() - - # Determinar si el contenido es Markdown - is_markdown = False - - # 1. Basado en la extensión del archivo - if paste.filename and paste.filename.lower().endswith('.md'): - is_markdown = True - logging.info("Detected Markdown based on file extension (.md).") - - # 2. Basado en el lenguaje especificado - elif paste.language and paste.language.lower() in ['md', 'markdown']: - is_markdown = True - logging.info("Detected Markdown based on specified language.") - - # 3. Basado en Tipo MIME - elif paste.content_type == 'text/markdown': - is_markdown = True - logging.info("Detected Markdown based on MIME type (text/markdown).") - - if is_markdown: - # Convertir Markdown a HTML usando python-markdown con extensiones - md_html = markdown( - content, - extensions=[ - 'tables', # Soporte para tablas - 'fenced_code', # Soporte para bloques de código con ``` - 'codehilite' # Soporte para resaltado de sintaxis con Pygments - ] - ) - logging.info("Converted Markdown to HTML.") - - return render_template( - 'text_paste.html', - paste=paste, - md_html_code=md_html, # Pasamos el HTML generado - is_markdown=True, - can_edit=can_edit, - shared_with=shared_with - ), 200 - else: - # Contenido no es Markdown -> resaltar con Pygments normal - html_code = highlight_code( - content, language=paste.language, filename=paste.filename - ) - logging.info("Converted text content with Pygments.") - - return render_template( - 'text_paste.html', - paste=paste, - html_code=html_code, - is_markdown=False, - can_edit=can_edit, - shared_with=shared_with - ), 200 - - except Exception as e: - logging.error(f"Error reading text file for paste ID {id}: {e}") - return jsonify({"error": "Error processing text file"}), 500 - - else: - # Si el tipo MIME no coincide con ninguno de los anteriores, manejarlo aquí - logging.warning(f"Unhandled content type: {paste.content_type}") - return jsonify({"error": "Unhandled content type"}), 400 - - @app.route('/pastes', methods=['GET']) - @jwt_required - def list_pastes(): - try: - # Obtener todos los pastes del usuario autenticado - user = request.user - pastes = Paste.query.filter_by(owner_id=user.id).all() - - if not pastes: - return jsonify({"message": "No pastes found"}), 200 - - base_url = request.host_url.rstrip('/') - response_list = [] - for paste in pastes: - response_list.append({ - "id": paste.id, - "url": f"{base_url}/paste/{paste.id}", - "title": paste.title or paste.filename, - "type": paste.get_type(), # Método definido en tu modelo Paste - "size": paste.size or 0, # Devuelve 0 si el tamaño no está definido - "created_at": paste.created_at.strftime('%Y-%m-%d %H:%M:%S') if paste.created_at else None, - "is_favorite": paste in user.favorite_pastes # Verifica si el paste es favorito - }) - - return jsonify(response_list), 200 - except Exception as e: - logging.error(f"Error retrieving pastes: {e}") - return jsonify({"error": "Error retrieving pastes"}), 500 - - @app.route('/register', methods=['POST']) - @jwt_required - def register_user(): - authenticated_user = request.user - - if authenticated_user.username != 'admin': - return jsonify({"error": "you have no access rights to register users"}), 403 - - data = request.get_json() - if not data: - return jsonify({"error": "No data provided"}), 400 - - username = data.get('username') - password = data.get('password') - - if not username or not password: - return jsonify({"error": "'username' and 'password' required"}), 400 - - existing_user = User.query.filter_by(username=username).first() - if existing_user: - return jsonify({"error": "El nombre de usuario ya existe"}), 400 - - try: - new_user = User(username=username) - new_user.set_password(password) - db.session.add(new_user) - db.session.commit() - return jsonify({"message": f"Usuario '{username}' registry successful"}), 201 - except Exception as e: - db.session.rollback() - return jsonify({"error": f"Error registering user: {str(e)}"}), 500 - - @app.route('/admin/login', methods=['GET', 'POST']) - def admin_login(): - if request.method == 'POST': - username = request.form.get('username') - password = request.form.get('password') - user = User.query.filter_by(username=username).first() - - if user and user.check_password(password) and username == 'admin': - session['admin'] = username - flash('Login successful', 'success') - return redirect(url_for('admin_dashboard')) - - flash('Invalid credentials', 'danger') - return render_template('admin_login.html') - - @app.route('/admin/logout') - def admin_logout(): - session.pop('admin', None) - flash('Logged out', 'info') - return redirect(url_for('admin_login')) - - @app.route('/admin') - def admin_dashboard(): - if 'admin' not in session: - flash('Please log in to access the admin panel', 'warning') - return redirect(url_for('admin_login')) - - users_count = User.query.count() - pastes_count = Paste.query.count() - return render_template('dashboard.html', users_count=users_count, pastes_count=pastes_count) - - @app.route('/admin/users') - def admin_users(): - if 'admin' not in session: - flash('Please log in to access the admin panel', 'warning') - return redirect(url_for('admin_login')) - - users = User.query.all() - return render_template('users.html', users=users) - - @app.route('/admin/users/add', methods=['GET', 'POST']) - def admin_add_user(): - if 'admin' not in session: - flash('Please log in to access the admin panel', 'warning') - return redirect(url_for('admin_login')) - - if request.method == 'POST': - username = request.form.get('username') - password = request.form.get('password') - role = request.form.get('role') # Obtén el rol del formulario - - # Validar datos - if not username or not password or not role: - flash('All fields are required', 'danger') - return redirect(url_for('admin_add_user')) - - # Validar si el rol es válido - valid_roles = ['admin', 'advanced', 'user'] - if role not in valid_roles: - flash('Invalid role selected.', 'danger') - return redirect(url_for('admin_add_user')) - - # Crear usuario - try: - user = User(username=username, role=role, storage_limit=ROLE_STORAGE_LIMITS.get(role)) - user.set_password(password) - db.session.add(user) - db.session.commit() - flash('User added successfully', 'success') - return redirect(url_for('admin_users')) - except Exception as e: - db.session.rollback() - flash(f'Error: {str(e)}', 'danger') - return redirect(url_for('admin_add_user')) - - return render_template('add_user.html') - - @app.route('/admin/users/delete/', methods=['POST']) - def admin_delete_user(user_id): - if 'admin' not in session: - # Enviar JSON de error en lugar de redirigir - return jsonify({"error": "Unauthorized"}), 401 - - user = User.query.get(user_id) - if not user: - return jsonify({"error": "User not found"}), 404 - - db.session.delete(user) - db.session.commit() - return jsonify({"message": "User deleted successfully"}), 200 - - - user = User.query.get(user_id) - if not user: - flash('User not found', 'danger') - else: - db.session.delete(user) - db.session.commit() - flash('User deleted successfully', 'success') - return redirect(url_for('admin_users')) - - @app.route('/admin/pastes') - def admin_pastes(): - if 'admin' not in session: - flash('Please log in to access the admin panel', 'warning') - return redirect(url_for('admin_login')) - - pastes = Paste.query.all() - return render_template('pastes.html', pastes=pastes) - - @app.route('/admin/pastes/delete/', methods=['POST']) - def admin_delete_paste(paste_id): - if 'admin' not in session: - flash('Please log in to access the admin panel', 'warning') - return redirect(url_for('admin_login')) - - paste = Paste.query.get(paste_id) - if not paste: - flash('Paste not found', 'danger') - else: - delete_paste_from_index(paste) - paste.shared_with.clear() - db.session.delete(paste) - db.session.commit() - flash('Paste deleted successfully', 'success') - return redirect(url_for('admin_pastes')) - - @app.route('/stats', methods=['GET']) - def anonymous_stats(): - start_date = request.args.get('start_date') - end_date = request.args.get('end_date') - - stats = calculate_stats(start_date=start_date, end_date=end_date) - - return render_template( - 'anonymous_stats.html', - total_pastes=stats["total_pastes"], - total_text_pastes=stats["total_text_pastes"], - total_file_pastes=stats["total_file_pastes"], - total_media_pastes=stats["total_media_pastes"], - languages=stats["languages"], - counts_text=stats["counts_text"], - counts_file=stats["counts_file"], - counts_media=stats["counts_media"], - ) - - - @app.route('/api/docs') - def redoc(): - return ''' - - - - Pastebin API Docs - - - - - - - - ''' - @app.route('/paste//download', methods=['GET']) - def download_paste(id): - paste = Paste.query.get(id) - if not paste: - return jsonify({"error": "Paste not found"}), 404 - - file_path = os.path.join(UPLOAD_FOLDER, paste.filename) - if not os.path.exists(file_path): - return jsonify({"error": "File not found"}), 404 - - try: - return send_from_directory( - directory=UPLOAD_FOLDER, - path=paste.filename, - as_attachment=True, - download_name=paste.filename, - mimetype=paste.content_type - ) - except TypeError: - # Para Flask < 2.0 - return send_from_directory( - directory=UPLOAD_FOLDER, - path=paste.filename, - as_attachment=True, - attachment_filename=paste.filename, # <--- Distinto nombre de argumento - mimetype=paste.content_type - ) - - - @app.route('/request-account', methods=['GET', 'POST']) - def request_account(): - if request.method == 'POST': - email = request.form.get('email') - username = request.form.get('username') - - # Validate basic input - if not email or not username: - logging.warning("Validation failed: Missing email or username") - flash("Email and username are required.", "danger") - return redirect(url_for('request_account')) - - logging.info(f"Account request initiated. Username: {username}, Email: {email}") - - # Log the SMTP environment variables for debugging - logging.debug(f"SMTP_SERVER={SMTP_SERVER}") - logging.debug(f"SMTP_PORT={SMTP_PORT}") - logging.debug(f"SMTP_USERNAME={SMTP_USERNAME}") - logging.debug(f"SMTP_USE_SSL={SMTP_USE_SSL}, SMTP_USE_TLS={SMTP_USE_TLS}") - - # Send email - try: - msg = MIMEText(f"New account request:\n\nUsername: {username}\nEmail: {email}") - msg['Subject'] = 'Account Request' - msg['From'] = SMTP_USERNAME - msg['To'] = SMTP_USERNAME # Ajusta a tu dirección de correo receptora - - # Connect to the SMTP server - if SMTP_USE_SSL: - with smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT) as server: - logging.debug("Using SMTP over SSL") - server.login(SMTP_USERNAME, SMTP_PASSWORD) - logging.info("SMTP login successful") - server.send_message(msg) - else: - with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server: - if SMTP_USE_TLS: - logging.debug("Starting TLS session") - server.starttls() - server.login(SMTP_USERNAME, SMTP_PASSWORD) - logging.info("SMTP login successful") - server.send_message(msg) - - logging.info("Account request email sent successfully") - flash("Your account request has been submitted successfully!", "success") - return redirect(url_for('index')) - - except smtplib.SMTPException as smtp_error: - logging.error(f"SMTP error: {smtp_error}") - flash(f"SMTP error: {smtp_error}", "danger") - except Exception as general_error: - logging.error(f"An unexpected error occurred: {general_error}") - flash(f"An error occurred: {general_error}", "danger") - - return redirect(url_for('request_account')) - - return render_template('request_account.html') - - @app.route('/user/details', methods=['GET']) - @jwt_required - def get_user_details(): - user = request.user # Usuario autenticado - if not user: - return jsonify({"error": "User not found"}), 404 - - # Calcular contadores - pastes_count = Paste.query.filter_by(owner_id=user.id).count() - favorites_count = user.favorite_pastes.count() - shared_with_me_count = db.session.query(Paste).join(shared_pastes).filter( - shared_pastes.c.user_id == user.id - ).count() - shared_with_others_count = db.session.query(Paste).join(shared_pastes).filter( - Paste.owner_id == user.id, - shared_pastes.c.user_id != user.id - ).count() - - # Construir la respuesta - response = { - "id": user.id, - "username": user.username, - "role": user.role, - "storage_used": user.storage_used, - "storage_limit": user.storage_limit, - "storage_remaining": max(user.storage_limit - user.storage_used, 0) if user.storage_limit != -1 else None, - "theme_preference": user.theme_preference, - # Suponiendo que tienes un campo user.created_at - "created_at": user.created_at.isoformat() if hasattr(user, 'created_at') and user.created_at else None, - "pastes_count": pastes_count, - "favorites_count": favorites_count, - "shared_with_me_count": shared_with_me_count, - "shared_with_others_count": shared_with_others_count - } - - return jsonify(response), 200 - - - @app.route('/admin/users//change_role', methods=['POST']) - def change_user_role(user_id): - if 'admin' not in session: - flash('Please log in to access the admin panel', 'warning') - return redirect(url_for('admin_login')) - - try: - user = User.query.get(user_id) - if not user: - return jsonify({"error": "User not found"}), 404 - - new_role = request.form.get('role') - valid_roles = ['admin', 'advanced', 'user'] - if new_role not in valid_roles: - return jsonify({"error": "Invalid role"}), 400 - - # Actualizar rol y límite de almacenamiento - user.role = new_role - user.storage_limit = ROLE_STORAGE_LIMITS[new_role] - db.session.commit() - - return jsonify({"message": f"User role updated to {new_role}"}), 200 - - except Exception as e: - db.session.rollback() - return jsonify({"error": f"Failed to update role: {str(e)}"}), 500 - - @app.route('/logout') - @login_required - def logout(): - logout_user() - flash("You have been logged out.", "info") - return redirect(url_for('login')) - - @app.route('/user-dashboard', methods=['GET']) - @login_required - def user_dashboard(): - user = current_user # Usuario autenticado automáticamente - - # Obtener el número de página desde los parámetros de consulta, por defecto es 1 - page = request.args.get('page', 1, type=int) - per_page = 10 # Número de pastes por página - favorite_page = request.args.get('favorite_page', 1, type=int) - - # Consultar los pastes del usuario con paginación - pastes_query = Paste.query.filter_by(user_id=user.id).order_by(Paste.created_at.desc()) - pagination = pastes_query.paginate(page=page, per_page=per_page, error_out=False) - pastes = pagination.items - - # Consultar los pastes favoritos del usuario con paginación - favorite_query = Paste.query.filter(Paste.favorites.any(id=user.id)) - favorite_pagination = favorite_query.paginate(page=favorite_page, per_page=10, error_out=False) - favorite_pastes = favorite_pagination.items - - # Consultar los pastes compartidos con el usuario - shared_pastes_list = Paste.query.join(shared_pastes, (shared_pastes.c.paste_id == Paste.id))\ - .filter(shared_pastes.c.user_id == user.id)\ - .all() - - # Calcular estadísticas generales - stats = calculate_stats(user.id) # Asegúrate de que esta función devuelve un diccionario con las claves usadas a continuación - - # **Nuevo Código: Cálculo de Almacenamiento** - - # Obtener el rol del usuario - user_role = user.role.lower() if user.role else 'user' # Asegúrate de que el campo 'role' existe en el modelo User - - # Obtener el límite de almacenamiento basado en el rol - storage_limits = current_app.config.get('ROLE_STORAGE_LIMITS', { - 'admin': -1, # Ilimitado - 'advanced': 2 * 1024**3, # 2GB en bytes - 'user': 1 * 1024**3, # 1GB en bytes - }) - - # Obtener límite en bytes y convertir a MB - storage_limit = storage_limits.get(user_role, 1 * 1024**3) # 1GB en bytes - storage_used = db.session.query(db.func.sum(Paste.size)).filter_by(user_id=user.id).scalar() or 0 - - # Calcular almacenamiento disponible - storage_available = storage_limit - storage_used if storage_limit != -1 else -1 - storage_available = max(storage_available, 0) # Evita valores negativos - - # CONVERSIÓN A MB - storage_used_mb = round(storage_used / (1024**2), 2) - storage_limit_mb = -1 if storage_limit == -1 else round(storage_limit / (1024**2), 2) - storage_available_mb = -1 if storage_available == -1 else round(storage_available / (1024**2), 2) - - # DEBUG LOG PARA COMPROBAR QUE SE CONVIERTE BIEN - app.logger.debug(f"Storage (MB) - Used: {storage_used_mb}, Limit: {storage_limit_mb}, Available: {storage_available_mb}") - - # Información del perfil del usuario (sin email ni bio) - user_profile = { - 'username': user.username, - 'role': user.role.capitalize() if user.role else 'User', - } - - return render_template( - 'user_dashboard.html', - user=user, - total_pastes=stats['total_pastes'], - total_size=stats['total_size'], - pastes=pastes, - pagination=pagination, # Pasar el objeto de paginación al template - total_text_pastes=stats.get('total_text_pastes', 0), - total_file_pastes=stats.get('total_file_pastes', 0), - total_media_pastes=stats.get('total_media_pastes', 0), - languages=stats.get('languages', []), - counts_text=stats.get('counts_text', []), - counts_file=stats.get('counts_file', []), - counts_media=stats.get('counts_media', []), - favorite_pastes=favorite_pastes, - favorite_pagination=favorite_pagination, - shared_pastes=shared_pastes_list, - storage_used=storage_used_mb, - storage_limit=storage_limit_mb, - storage_available=storage_available_mb, - user_profile=user_profile, - ) - - - @app.route('/paste/', methods=['DELETE']) - @jwt_required - def delete_paste(id): - paste = Paste.query.get(id) - if not paste: - return jsonify({"error": "Paste not found"}), 404 - - # Permitir eliminación solo si es el propietario o un administrador - if paste.owner_id != request.user.id and request.user.username != 'admin': - return jsonify({"error": "You do not have permission to delete this paste"}), 403 - - try: - # Eliminar favoritos asociados - paste.favorites.clear() - - # Eliminar el archivo si existe - if paste.filename: - file_path = os.path.join(UPLOAD_FOLDER, paste.filename) - if os.path.exists(file_path): - os.remove(file_path) - - delete_paste_from_index(paste) - paste.shared_with.clear() - db.session.delete(paste) - db.session.commit() - return jsonify({"message": "Paste deleted successfully"}), 200 - except Exception as e: - db.session.rollback() - return jsonify({"error": f"Error deleting paste: {str(e)}"}), 500 - - - @app.route('/change-password-form', methods=['GET']) - @login_required - def change_password_form(): - return render_template('change_password.html') - - - @app.route('/change-password', methods=['POST']) - @login_required - def change_password(): - current_password = request.form.get('current_password') - new_password = request.form.get('new_password') - confirm_password = request.form.get('confirm_password') - - # Validar que los campos no estén vacíos - if not current_password or not new_password or not confirm_password: - flash("All fields are required.", "danger") - return redirect(url_for('change_password_form')) - - # Verificar que las contraseñas coincidan - if new_password != confirm_password: - flash("New passwords do not match.", "danger") - return redirect(url_for('change_password_form')) - - # Verificar contraseña actual - if not current_user.check_password(current_password): # Método check_password en el modelo User - flash("Current password is incorrect.", "danger") - return redirect(url_for('change_password_form')) - - # Actualizar la contraseña - try: - current_user.set_password(new_password) # Método set_password en el modelo User - db.session.commit() - flash("Password updated successfully.", "success") - except Exception as e: - db.session.rollback() - flash(f"An error occurred: {str(e)}", "danger") - - return redirect(url_for('user_dashboard')) - - @app.context_processor - def inject_pygments_styles(): - styles = list(get_all_styles()) - current_theme = session.get('theme', 'light') # Obtener el tema actual de la sesión - default_pygments_style = 'monokai' if current_theme == 'dark' else 'default' - - # Obtener el nombre del usuario si está logueado - user_name = None - if 'user_id' in session: - user = User.query.get(session['user_id']) - user_name = user.username if user else None - - return dict( - pygments_styles=styles, - default_pygments_style=default_pygments_style, - user_name=user_name - ) - - @app.route('/paste//download_page', methods=['GET']) - def download_page(id): - paste = Paste.query.get(id) - if not paste: - flash("Paste no encontrado", "danger") - return redirect(url_for('index')) - - return render_template('download_page.html', paste=paste) - - @app.route('/paste//download_file', methods=['GET']) - def download_file(id): - logging.info(f"Confirmed download request for paste ID: {id}") - paste = Paste.query.get(id) - if not paste: - logging.warning(f"Paste ID {id} not found.") - return jsonify({"error": "Paste not found"}), 404 - - if paste.filename: # El paste está asociado a un archivo físico - logging.info(f"Handling confirmed file download for paste ID {id} with filename: {paste.filename}") - file_path = os.path.join(UPLOAD_FOLDER, paste.filename) - - if not os.path.exists(file_path): - logging.error(f"File {paste.filename} for paste ID {id} not found.") - return jsonify({"error": "File not found"}), 404 - - return send_from_directory( - directory=UPLOAD_FOLDER, - path=paste.filename, - as_attachment=True, - download_name=paste.filename, - mimetype=paste.content_type - ) - - elif paste.content: # El paste es de texto y está almacenado en la base de datos - logging.info(f"Handling confirmed text paste ID {id}") - - language_suffix = f".{paste.language}" if paste.language and paste.language != 'unknown' else ".txt" - filename = f"paste_{paste.id}{language_suffix}" - - file_stream = BytesIO() - file_stream.write(paste.content.encode('utf-8')) - file_stream.seek(0) - return send_file( - file_stream, - as_attachment=True, - download_name=filename, - mimetype='text/plain' - ) - else: - logging.error(f"Paste ID {id} has no associated content.") - return jsonify({"error": "Paste has no associated content"}), 400 - - - @app.route('/paste//stream_download', methods=['GET']) - def stream_download(id): - paste = Paste.query.get(id) - if not paste or not paste.filename: - return jsonify({"error": "Paste or file not found"}), 404 - - def generate(): - with open(os.path.join(UPLOAD_FOLDER, paste.filename), 'rb') as f: - while True: - chunk = f.read(4096) - if not chunk: - break - yield chunk - - response = Response(generate(), mimetype=paste.content_type) - response.headers.set('Content-Disposition', 'attachment', filename=paste.filename) - return response - - @app.route('/user/paste//delete', methods=['POST']) - @login_required - def user_delete_paste(id): - paste = Paste.query.get_or_404(id) - - # Verificar si el usuario es el propietario - if paste.owner_id != current_user.id: - return jsonify({"error": "You do not have permission to delete this paste"}), 403 - - try: - # Eliminar favoritos asociados - paste.favorites.clear() - - # Eliminar el archivo asociado, si existe - if paste.filename: - file_path = os.path.join(UPLOAD_FOLDER, paste.filename) - if os.path.exists(file_path): - os.remove(file_path) - - # Eliminar el paste de la base de datos - delete_paste_from_index(paste) - db.session.delete(paste) - db.session.commit() - return jsonify({"message": "Paste deleted successfully"}), 200 - except Exception as e: - db.session.rollback() - return jsonify({"error": f"Error deleting paste: {str(e)}"}), 500 - - @app.route('/user//stats', methods=['GET']) - @login_required - def user_stats(username): - if current_user.username != username: - abort(404) - - start_date_str = request.args.get('start_date') - end_date_str = request.args.get('end_date') - - app.logger.debug(f"Start Date: {start_date_str}, End Date: {end_date_str}") - - paste_filters = [] - start_date = None - end_date = None - - try: - if start_date_str: - start_date = datetime.strptime(start_date_str, '%Y-%m-%d') - paste_filters.append(Paste.created_at >= start_date) - if end_date_str: - end_date = datetime.strptime(end_date_str, '%Y-%m-%d').replace(hour=23, minute=59, second=59) - paste_filters.append(Paste.created_at <= end_date) - except ValueError as ve: - app.logger.error(f"Error al parsear las fechas: {ve}") - start_date = None - end_date = None - - # Obtener los pastes y favoritos con filtros aplicados - page = request.args.get('page', 1, type=int) - per_page = 10 - pastes_query = Paste.query.filter_by(user_id=current_user.id).order_by(Paste.created_at.desc()) - if paste_filters: - pastes_query = pastes_query.filter(*paste_filters) - pagination = pastes_query.paginate(page=page, per_page=per_page, error_out=False) - - favorite_query = Paste.query.filter(Paste.favorites.any(id=current_user.id)) - if paste_filters: - favorite_query = favorite_query.filter(*paste_filters) - favorite_pagination = favorite_query.paginate(page=1, per_page=10, error_out=False) - - # Calcular estadísticas - stats = calculate_stats(current_user.id, start_date, end_date) - - # Calcular almacenamiento con valores seguros - storage_used = 0 - storage_limit = -1 - storage_available = -1 - - try: - storage_used = calculate_storage_used(current_user.id) - storage_limit = current_user.get_storage_limit() or -1 - - app.logger.debug(f"current_user: {current_user}, type: {type(current_user)}") - - storage_available = max(storage_limit - storage_used, 0) if storage_limit != -1 else -1 - - # CONVERSIÓN A MB - storage_used_mb = round(storage_used / (1024**2), 2) - storage_limit_mb = -1 if storage_limit == -1 else round(storage_limit / (1024**2), 2) - storage_available_mb = -1 if storage_available == -1 else round(storage_available / (1024**2), 2) - - app.logger.debug(f"Storage (MB) - Used: {storage_used_mb}, Limit: {storage_limit_mb}, Available: {storage_available_mb}") - except Exception as e: - app.logger.error(f"Error al calcular el almacenamiento: {e}") - storage_used_mb = 0 - storage_limit_mb = -1 - storage_available_mb = -1 - - # Obtener pastes compartidos con filtros y paginación - shared_page = request.args.get('shared_page', 1, type=int) - shared_pagination = get_shared_pastes(current_user, paste_filters=paste_filters, page=shared_page, per_page=per_page) - - return render_template( - 'user_dashboard.html', - user=current_user, - user_profile=current_user.role, - total_pastes=stats['total_pastes'], - total_size=stats['total_size'], - total_text_pastes=stats['total_text_pastes'], - total_file_pastes=stats['total_file_pastes'], - total_media_pastes=stats['total_media_pastes'], - languages=stats['languages'], - counts_text=stats['counts_text'], - counts_file=stats['counts_file'], - counts_media=stats['counts_media'], - pastes=pagination.items, - pagination=pagination, - favorite_pastes=favorite_pagination.items, - favorite_pagination=favorite_pagination, - shared_pastes=shared_pagination.items, - shared_pagination=shared_pagination, - start_date=start_date_str, - end_date=end_date_str, - storage_used=storage_used_mb, - storage_limit=storage_limit_mb, - storage_available=storage_available_mb, - ) - - @app.route('/api/token', methods=['POST']) - def get_token(): - """ - Endpoint dedicado para generar un token JWT. - """ - data = request.json - if not data or 'username' not in data or 'password' not in data: - return jsonify({"error": "Username and password are required"}), 400 - - username = data['username'] - password = data['password'] - - user = User.query.filter_by(username=username).first() - if user and check_password_hash(user.password_hash, password): - token = generate_token(username) - return jsonify({"token": token}), 200 - - return jsonify({"error": "Invalid username or password"}), 401 - - @app.route('/paste//favorite', methods=['POST']) - @login_required - def add_to_favorites(id): - user = current_user # Usuario autenticado - paste = Paste.query.get_or_404(id) - - # Verificar si ya es favorito - if user in paste.favorites: - # Sigues devolviendo 200, pero ahora con "success": True - return jsonify({ - "success": True, - "message": "Paste already in favorites" - }), 200 - - try: - paste.favorites.append(user) - db.session.commit() - # ¡Aquí pones success: True! - return jsonify({ - "success": True, - "message": "Paste added to favorites" - }), 201 - except Exception as e: - db.session.rollback() - return jsonify({"error": str(e), "success": False}), 500 - - @app.route('/paste//unfavorite', methods=['POST']) - @login_required - def remove_from_favorites(id): - user = current_user # Usuario autenticado - paste = Paste.query.get_or_404(id) - - if user not in paste.favorites: - # Devuelve success: False - return jsonify({"error": "Paste not in favorites", "success": False}), 404 - - try: - paste.favorites.remove(user) - db.session.commit() - return jsonify({ - "success": True, - "message": "Paste removed from favorites" - }), 200 - except Exception as e: - db.session.rollback() - return jsonify({"error": str(e), "success": False}), 500 - - - @app.route('/favorites', methods=['GET']) - @login_required - def list_favorites(): - user = current_user - page = request.args.get('page', 1, type=int) - per_page = 10 - favorite_query = Favorite.query.filter_by(user_id=user.id) - pagination = favorite_query.paginate(page=page, per_page=per_page, error_out=False) - - favorites = [{ - "id": fav.paste.id, - "title": fav.paste.filename, - "type": fav.paste.get_type(), # Tipo de archivo - "created_at": fav.paste.created_at.strftime('%Y-%m-%d %H:%M:%S') # Fecha de creación - } for fav in pagination.items] - - return jsonify({ - "favorites": favorites, - "pagination": { - "page": pagination.page, - "per_page": pagination.per_page, - "total_pages": pagination.pages, - "total_items": pagination.total, - "has_next": pagination.has_next, - "has_prev": pagination.has_prev, - "next_page": pagination.next_num if pagination.has_next else None, - "prev_page": pagination.prev_num if pagination.has_prev else None - } - }) - - - @app.route('/media/', methods=['GET']) - def serve_media(id): - paste = Paste.query.get_or_404(id) - - # Ruta completa del archivo - file_path = os.path.join(UPLOAD_FOLDER, paste.filename) - - if not os.path.exists(file_path): - return "File not found", 404 - - # Sirve el archivo directamente como contenido de video - return send_file(file_path, mimetype=paste.content_type) - - @app.route('/create_paste_web', methods=['GET', 'POST']) - @login_required - def create_paste_web(): - if request.method == 'POST': - title = None - language = request.form.get('language') or '' # Puede venir vacío - paste_type = request.form.get('type') - content = request.form.get('content') - file = request.files.get('file') - expire = request.form.get('expire', 'yes').strip().lower() # Nuevo parámetro de expiración - - # Validar campos obligatorios - if (paste_type == 'Text' and not content) or (paste_type != 'Text' and not file): - flash("All fields are required.", "danger") - return redirect(url_for('create_paste_web')) - - try: - filename = None - content_type = None - size = 0 - - if file: - # Caso: El usuario ha subido un archivo - original_filename = secure_filename(file.filename) - filename = f"{uuid.uuid4().hex}_{original_filename}" - file_path = os.path.join(UPLOAD_FOLDER, filename) - file.save(file_path) - content_type = mime.from_file(file_path) - size = os.path.getsize(file_path) - - elif paste_type == 'Text': - # Caso: El usuario eligió “Text” - - # Buscar la extensión adecuada al "language" del dropdown. - ext = LANGUAGE_TO_EXTENSION.get(language.lower(), "txt") - # Generar el nombre con esa extensión - filename = f"{uuid.uuid4().hex}.{ext}" - - file_path = os.path.join(UPLOAD_FOLDER, filename) - with open(file_path, 'w', encoding='utf-8') as f: - f.write(content) - - content_type = 'text/plain' - size = len(content) - - # Definir la expiración (1 día si expire="yes", permanente si expire="no") - expires_at = datetime.utcnow() + timedelta(days=1) if expire == 'yes' else None - - # Crear registro de Paste en la BD - paste = Paste( - title=title, - content_type=content_type, - filename=filename, - owner_id=current_user.id, - user_id=current_user.id, - language=language, - size=size, - expires_at=expires_at # Guardamos la fecha de expiración - ) - - db.session.add(paste) - db.session.commit() - index_paste(paste) - - flash("Paste created successfully!", "success") - return redirect(url_for('get_paste', id=paste.id)) - - except Exception as e: - db.session.rollback() - flash(f"An error occurred: {e}", "danger") - return redirect(url_for('create_paste_web')) - - # Si es GET, mostrar formulario - return render_template('create_paste_web.html') - - -# Función para descargar archivos - @app.route('/paste//download', methods=['GET']) - @login_required - def download_paste_file(id): - paste = Paste.query.get_or_404(id) - - if paste.owner_id != current_user.id: - flash('No tienes permiso para descargar este archivo.', 'danger') - return redirect(url_for('get_paste', id=id)) - - if not paste.filename: - flash('Este paste no tiene un archivo asociado.', 'warning') - return redirect(url_for('get_paste', id=id)) - - file_path = os.path.join(app.config['UPLOAD_FOLDER'], paste.filename) - - if not os.path.exists(file_path): - flash('El archivo no existe.', 'danger') - return redirect(url_for('get_paste', id=id)) - - return send_from_directory(app.config['UPLOAD_FOLDER'], paste.filename, as_attachment=True) - - @app.route('/paste/') - @login_required - def view_paste(paste_id): - paste = Paste.query.get_or_404(paste_id) - # Verificar que el usuario es el propietario o tiene permisos para ver el paste - if paste.owner_id != current_user.id: - flash('No tienes permiso para ver este paste.', 'danger') - return redirect(url_for('user_dashboard')) - return render_template('view_paste.html', paste=paste) - - @app.route('/pastes/search', methods=['GET']) - @jwt_required - def search_pastes(): - try: - # Obtener el query string - query = request.args.get('q', '') - if not query: - return jsonify({"error": "Search query is required"}), 400 - - # Obtener el usuario autenticado - user = request.user - if not user: - return jsonify({"error": "User not authenticated"}), 403 - - # Realizar la búsqueda en Elasticsearch - body = { - "query": { - "bool": { - "must": [ - {"match": {"content": query}} - ], - "filter": [ - {"term": {"owner_id": user.id}} - ] - } - } - } - results = es.search(index="pastes", body=body) - - # Construir la respuesta con la URL - base_url = request.host_url.rstrip('/') - hits = results["hits"]["hits"] - response = [{"id": hit["_id"], "url": f"{base_url}/paste/{hit['_id']}", "content": hit["_source"].get("content", "")} for hit in hits] - return jsonify(response), 200 - - except Exception as e: - current_app.logger.error(f"Error searching pastes: {e}") - return jsonify({"error": "Error searching pastes"}), 500 - - @app.route('/pastes/search_web', methods=['GET']) - @login_required - def search_pastes_web(): - try: - query = request.args.get('q', '').strip() - content_type = request.args.get('content_type', '').strip() - language = request.args.get('language', '').strip() - - # Obtener al usuario autenticado - user = current_user - if not user.is_authenticated: - return redirect(url_for('login')) - - # Construir la consulta para Elasticsearch - must_clauses = [] - if query: - must_clauses.append({"match": {"content": query}}) - if content_type: - must_clauses.append({"term": {"content_type": content_type}}) - if language: - must_clauses.append({"term": {"language": language}}) - - body = { - "query": { - "bool": { - "must": must_clauses, - "filter": [{"term": {"owner_id": user.id}}] - } - } - } - - # Consultar Elasticsearch - results = es.search(index="pastes", body=body) - hits = results["hits"]["hits"] - - # Construir los resultados - pastes = [ - { - "id": hit["_id"], - "url": f"{request.host_url.rstrip('/')}/paste/{hit['_id']}", - "content_type": hit["_source"].get("content_type", ""), - "language": hit["_source"].get("language", ""), - } - for hit in hits - ] - - # Obtener favoritos del usuario - favorite_query = Paste.query.filter(Paste.favorites.any(id=user.id)) - favorite_pagination = favorite_query.paginate(page=1, per_page=10, error_out=False) - favorite_pastes = favorite_pagination.items - - # Renderizar los resultados en la plantilla - return render_template( - 'search_results.html', - pastes=pastes, # Resultados de búsqueda - query=query, - content_type=content_type, - language=language, - favorite_pastes=favorite_pastes, # Lista de favoritos - favorite_pagination=favorite_pagination # Paginación de favoritos - ) - - except Exception as e: - current_app.logger.error(f"Error in web search: {e}") - return render_template('error.html', message="Search failed"), 500 - - @app.route('/api/paste//favorite', methods=['POST']) - @jwt_required - def api_add_to_favorites(id): - try: - # Usuario autenticado - user = request.user - paste = Paste.query.get_or_404(id) - - # Verificar si ya es favorito - if paste in user.favorite_pastes: - return jsonify({"message": "Paste already in favorites"}), 200 - - # Añadir a favoritos - user.favorite_pastes.append(paste) - db.session.commit() - return jsonify({"message": "Paste added to favorites"}), 201 - - except Exception as e: - logging.error(f"Error adding paste to favorites: {e}") - db.session.rollback() - return jsonify({"error": str(e)}), 500 - @app.route('/api/paste//unfavorite', methods=['POST']) - @jwt_required - def api_remove_from_favorites(id): - try: - # Usuario autenticado - user = request.user - paste = Paste.query.get_or_404(id) - - # Verificar si no está en favoritos - if paste not in user.favorite_pastes: - return jsonify({"message": "Paste is not in favorites"}), 200 - - # Quitar de favoritos - user.favorite_pastes.remove(paste) - db.session.commit() - return jsonify({"message": "Paste removed from favorites"}), 200 - - except Exception as e: - logging.error(f"Error removing paste from favorites: {e}") - db.session.rollback() - return jsonify({"error": str(e)}), 500 - - @app.route('/api/paste//download', methods=['GET']) - @jwt_required - def api_download_paste_file(id): - user = request.user # Usuario autenticado mediante JWT - paste = Paste.query.get_or_404(id) - - # Verificar permisos - if paste.owner_id != user.id: - return jsonify({"error": "You do not have permission to download this file"}), 403 - - # Verificar que el paste tiene un archivo asociado - if not paste.filename: - return jsonify({"error": "This paste does not have an associated file"}), 400 - - # Construir la ruta completa del archivo - file_path = os.path.join(current_app.config['UPLOAD_FOLDER'], paste.filename) - - # Verificar que el archivo exista - if not os.path.exists(file_path): - return jsonify({"error": "The file does not exist"}), 404 - - # Determinar el tipo MIME del archivo - mime_type = paste.content_type or mimetypes.guess_type(file_path)[0] or 'application/octet-stream' - - # Enviar el archivo con el nombre original y tipo MIME - return send_file( - file_path, - as_attachment=True, - download_name=paste.filename, # Flask >= 2.0 - mimetype=mime_type - ) - - @app.route('/api/favorites', methods=['GET']) - @jwt_required - def api_list_favorites(): - try: - # Obtener el usuario autenticado - user = request.user - - # Obtener todos los favoritos del usuario - favorite_pastes = user.favorite_pastes # Relación definida en el modelo User - - if not favorite_pastes: - return jsonify({"message": "No favorites found"}), 200 - - base_url = request.host_url.rstrip('/') - response_list = [] - for paste in favorite_pastes: - response_list.append({ - "id": paste.id, - "url": f"{base_url}/paste/{paste.id}", - "title": paste.title or paste.filename, - "type": paste.get_type(), # Método definido en tu modelo Paste - "size": paste.size or 0, # Devuelve 0 si el tamaño no está definido - "created_at": paste.created_at.strftime('%Y-%m-%d %H:%M:%S') if paste.created_at else None - }) - - return jsonify(response_list), 200 - except Exception as e: - logging.error(f"Error retrieving favorites: {e}") - return jsonify({"error": "Error retrieving favorites"}), 500 - - @app.route('/paste//toggle_editable', methods=['POST']) - @login_required - def toggle_editable(id): - paste = Paste.query.get_or_404(id) - - # Verificar que el usuario es el propietario - if paste.owner_id != current_user.id: - return jsonify({"error": "No tienes permiso para modificar este paste"}), 403 - - try: - # Toggle el estado de editable - paste.editable = not paste.editable - db.session.commit() - - status = "enabled" if paste.editable else "disabled" - message = f"Editable {status} successfully." - return jsonify({"success": True, "editable": paste.editable, "message": message}), 200 - - except Exception as e: - db.session.rollback() - logging.error(f"Error toggling editable: {e}") - return jsonify({"success": False, "error": "Error al actualizar el estado editable"}), 500 - - @app.route('/paste//edit', methods=['GET', 'POST']) - @login_required - def edit_paste_web(id): - paste = Paste.query.get_or_404(id) - - # Verificar permisos de edición - if not paste.has_edit_permission(current_user): - flash("No tienes permiso para editar este paste.", "danger") - return redirect(url_for('get_paste', id=id)) - - file_path = os.path.join(UPLOAD_FOLDER, paste.filename) if paste.filename else None - - # Manejar GET y POST - if request.method == 'POST': - new_content = request.form.get('content') - if not new_content: - flash("El contenido no puede estar vacío.", "danger") - return redirect(url_for('edit_paste_web', id=id)) - - try: - # 1. Eliminar el paste del índice anterior en Elasticsearch - delete_paste_from_index(paste) - - # 2. Guardar el contenido actualizado en el archivo - with open(file_path, 'w', encoding='utf-8') as f: - f.write(new_content) - - # Actualizar la información del paste en la base de datos - paste.last_edited_at = datetime.utcnow() - db.session.commit() - - # 3. Indexar nuevamente el paste actualizado en Elasticsearch - index_paste(paste) - - flash("Paste actualizado correctamente.", "success") - return redirect(url_for('get_paste', id=id)) - except Exception as e: - db.session.rollback() - flash(f"Ocurrió un error al guardar el paste: {str(e)}", "danger") - return redirect(url_for('edit_paste_web', id=id)) - - # Cargar contenido actual para la edición - current_content = "" - if file_path and os.path.exists(file_path): - try: - with open(file_path, 'r', encoding='utf-8') as f: - current_content = f.read() - except Exception as e: - flash(f"Error al leer el contenido del paste: {str(e)}", "danger") - - return render_template('edit_paste.html', paste=paste, current_content=current_content) - - - @app.route('/paste//share', methods=['POST']) - @login_required - def share_paste(id): - data = request.json - username = data.get('username') - can_edit = data.get('can_edit', False) - - # Buscar al usuario con el que se compartirá el paste - user = User.query.filter_by(username=username).first() - if not user: - return jsonify({"error": "User not found"}), 404 - - # Buscar el paste - paste = Paste.query.get_or_404(id) - if paste.owner_id != current_user.id: - return jsonify({"error": "You do not own this paste"}), 403 - - try: - # Verificar si ya existe el registro - existing_entry = db.session.query(shared_pastes).filter_by( - paste_id=paste.id, - user_id=user.id - ).first() - - if existing_entry: - # Si el registro existe, actualizar el valor de `can_edit` - db.session.execute( - shared_pastes.update() - .where( - (shared_pastes.c.paste_id == paste.id) & - (shared_pastes.c.user_id == user.id) - ) - .values(can_edit=can_edit) - ) - else: - # Si el registro no existe, insertar uno nuevo - stmt = shared_pastes.insert().values( - paste_id=paste.id, - user_id=user.id, - can_edit=can_edit - ) - db.session.execute(stmt) - - db.session.commit() - return jsonify({"message": f"Paste shared with {username}, can_edit: {can_edit}."}), 200 - - except Exception as e: - db.session.rollback() - return jsonify({"error": f"Error sharing paste: {str(e)}"}), 500 - - - @app.route('/paste//check-permissions', methods=['GET']) - @login_required - def check_permissions(id): - paste = Paste.query.get_or_404(id) - if paste.owner_id == current_user.id: - return jsonify({'can_edit': True}), 200 - - shared_paste = SharedPaste.query.filter_by(paste_id=paste.id, user_id=current_user.id).first() - if shared_paste and shared_paste.can_edit: - return jsonify({'can_edit': True}), 200 - - return jsonify({'can_edit': False}), 403 - - @app.route('/api/shared_with_others', methods=['GET']) - @jwt_required - def shared_with_others(): - """ - Devuelve los pastes compartidos por el usuario autenticado con otros usuarios. - """ - try: - user = request.user # Usuario autenticado - - # Obtener los pastes compartidos por el usuario actual con otros - shared_pastes_query = db.session.query(Paste).join(shared_pastes).filter( - shared_pastes.c.user_id != user.id, # Excluir al propietario como destinatario - Paste.owner_id == user.id # El usuario es el propietario del paste - ).all() - - # Construir respuesta con información de usuarios (usando joins para obtener el username) - pastes = [] - for paste in shared_pastes_query: - shared_users = db.session.query(User.username, shared_pastes.c.can_edit).join( - shared_pastes, shared_pastes.c.user_id == User.id - ).filter( - shared_pastes.c.paste_id == paste.id, - shared_pastes.c.user_id != user.id # Excluir al propietario - ).all() - - shared_with_list = [ - {"username": shared_user.username, "can_edit": shared_user.can_edit} - for shared_user in shared_users - ] - - pastes.append({ - "id": paste.id, - "title": paste.title or "Untitled", - "created_at": paste.created_at.strftime('%Y-%m-%d %H:%M:%S'), - "shared_with": shared_with_list - }) - - return jsonify({"shared_with_others": pastes}), 200 - except Exception as e: - logging.error(f"Error retrieving shared_with_others: {e}") - return jsonify({"error": "Error retrieving shared_with_others"}), 500 - - - @app.route('/api/shared_with_me', methods=['GET']) - @jwt_required - def shared_with_me(): - """ - Devuelve los pastes compartidos con el usuario autenticado. - """ - try: - user = request.user # Usuario autenticado - - # Obtener los pastes compartidos con el usuario actual - shared_pastes_query = db.session.query(Paste).join(shared_pastes).filter( - shared_pastes.c.user_id == user.id # Pastes compartidos con el usuario - ).all() - - # Construir respuesta - pastes = [ - { - "id": paste.id, - "title": paste.title or "Untitled", - "created_at": paste.created_at.strftime('%Y-%m-%d %H:%M:%S'), - "owner": paste.owner.username, - "can_edit": db.session.query(shared_pastes).filter_by( - paste_id=paste.id, user_id=user.id - ).first().can_edit - } - for paste in shared_pastes_query - ] - - return jsonify({"shared_with_me": pastes}), 200 - except Exception as e: - logging.error(f"Error retrieving shared_with_me: {e}") - return jsonify({"error": "Error retrieving shared_with_me"}), 500 - - - @app.route('/api/paste//share', methods=['POST']) - @jwt_required # Asegúrate de usar el decorador correcto - def add_share_paste(paste_id): - try: - # Obtener el usuario actual - current_user = request.user - if not current_user: - return jsonify({"error": "User not found in request"}), 401 - - # Verificar que el paste pertenece al usuario autenticado - paste = Paste.query.filter_by(id=paste_id, owner_id=current_user.id).first() - if not paste: - return jsonify({"error": "Paste not found or you do not own it"}), 404 - - # Obtener datos del cuerpo de la solicitud - data = request.get_json() - if not data: - return jsonify({"error": "Invalid JSON data"}), 400 - - username = data.get("username") - can_edit = data.get("can_edit", False) - - if not username: - return jsonify({"error": "Username is required"}), 400 - - # Prevenir que el usuario comparta el paste consigo mismo - if username.lower() == current_user.username.lower(): - return jsonify({"error": "You cannot share a paste with yourself"}), 400 - - # Verificar que el usuario destinatario existe (sin sensibilidad a mayúsculas) - recipient = User.query.filter(func.lower(User.username) == username.lower()).first() - if not recipient: - return jsonify({"error": f"User '{username}' not found"}), 404 - - # Verificar si ya se ha compartido el paste con este usuario - existing_share = db.session.query(shared_pastes).filter( - shared_pastes.c.paste_id == paste_id, - shared_pastes.c.user_id == recipient.id - ).first() - - if existing_share: - return jsonify({"error": f"Paste is already shared with '{username}'"}), 400 - - # Asegurarse de que can_edit es un booleano - if not isinstance(can_edit, bool): - return jsonify({"error": "can_edit must be a boolean value"}), 400 - - # Añadir el registro en shared_pastes - db.session.execute(shared_pastes.insert().values( - paste_id=paste_id, - user_id=recipient.id, - can_edit=can_edit - )) - db.session.commit() - - # Registrar la acción para auditoría - app.logger.info(f"User '{current_user.username}' shared paste ID {paste_id} with '{recipient.username}' with can_edit={can_edit}") - - return jsonify({"message": f"Paste shared successfully with '{username}'", "can_edit": can_edit}), 200 - - except SQLAlchemyError as e: - db.session.rollback() - app.logger.error(f"Database error while sharing paste: {e}") - return jsonify({"error": "Database error", "details": str(e)}), 500 - except Exception as e: - app.logger.error(f"Unexpected error while sharing paste: {e}") - return jsonify({"error": "An unexpected error occurred", "details": str(e)}), 500 - - - @app.route('/api/paste//unshare', methods=['POST']) - @jwt_required - def unshare_paste(paste_id): - try: - # Verificar que el usuario autenticado está configurado correctamente - current_user = request.user - if not current_user: - return jsonify({"error": "User not found in request"}), 401 - - # Verificar que el paste pertenece al usuario autenticado - paste = Paste.query.filter_by(id=paste_id, owner_id=current_user.id).first() - if not paste: - return jsonify({"error": "Paste not found or you do not own it"}), 404 - - # Obtener datos del cuerpo de la solicitud - data = request.get_json() - username = data.get("username") - - if not username: - return jsonify({"error": "Username is required"}), 400 - - # Verificar que el usuario destinatario existe - recipient = User.query.filter_by(username=username).first() - if not recipient: - return jsonify({"error": f"User {username} not found"}), 404 - - # Verificar si el paste está compartido con este usuario - existing_share = db.session.query(shared_pastes).filter( - shared_pastes.c.paste_id == paste_id, - shared_pastes.c.user_id == recipient.id - ).scalar() - - if not existing_share: - return jsonify({"error": f"Paste is not shared with {username}"}), 400 - - # Eliminar el registro en shared_pastes - db.session.execute( - shared_pastes.delete().where( - (shared_pastes.c.paste_id == paste_id) & - (shared_pastes.c.user_id == recipient.id) - ) - ) - db.session.commit() - - return jsonify({"message": f"Paste unshared successfully from {username}"}), 200 - except Exception as e: - return jsonify({"error": "An unexpected error occurred", "details": str(e)}), 500 - - @app.route('/paste//unshare', methods=['POST']) - @login_required - def unshare_paste_web(paste_id): - """ - Versión con sesión/logueo normal para 'descompartir' un paste. - """ - try: - # Tomar el username del formulario o del JSON (como prefieras) - # Si lo envías desde un fetch con JSON, usas request.get_json() - # Si lo envías desde un formulario
, usas request.form - data = request.get_json() or {} - username = data.get("username") - - if not username: - return jsonify({"error": "Username is required"}), 400 - - # Verificar que el paste pertenece al usuario logueado - paste = Paste.query.filter_by(id=paste_id, owner_id=current_user.id).first() - if not paste: - return jsonify({"error": "Paste not found or you do not own it"}), 404 - - # Verificar que el usuario con el que se compartió existe - recipient = User.query.filter_by(username=username).first() - if not recipient: - return jsonify({"error": f"User {username} not found"}), 404 - - # Verificar si está compartido - existing_share = db.session.query(shared_pastes).filter( - (shared_pastes.c.paste_id == paste_id), - (shared_pastes.c.user_id == recipient.id) - ).scalar() - - if not existing_share: - return jsonify({"error": f"Paste is not shared with {username}"}), 400 - - # Eliminar el registro de shared_pastes - db.session.execute( - shared_pastes.delete().where( - (shared_pastes.c.paste_id == paste_id) & - (shared_pastes.c.user_id == recipient.id) - ) - ) - db.session.commit() - - return jsonify({"message": f"Paste unshared from {username} successfully"}), 200 - - except Exception as e: - db.session.rollback() - return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500 - - @app.route('/contact', methods=['GET', 'POST']) - def contact(): - if request.method == 'POST': - email = request.form.get('email') - subject = request.form.get('subject') or "Contact Form" - message = request.form.get('message') - - # Validar campos básicos - if not email or not message: - flash("Email and message are required.", "danger") - return redirect(url_for('contact')) - - # Lógica de envío de correo (idéntica a la que ya tienes en request_account) - try: - # Construir el mensaje - msg_body = f"Message from Contact Form:\n\nEmail: {email}\nSubject: {subject}\nMessage: {message}" - msg = MIMEText(msg_body) - msg['Subject'] = subject - msg['From'] = SMTP_USERNAME # El remitente que tienes configurado - msg['To'] = SMTP_USERNAME # El correo que recibirá el mensaje - - # Conexión SMTP (mismo patrón que request_account) - if SMTP_USE_SSL: - with smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT) as server: - server.login(SMTP_USERNAME, SMTP_PASSWORD) - server.send_message(msg) - else: - with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server: - if SMTP_USE_TLS: - server.starttls() - server.login(SMTP_USERNAME, SMTP_PASSWORD) - server.send_message(msg) - - flash("Your message has been sent!", "success") - return redirect(url_for('contact')) - - except smtplib.SMTPException as smtp_error: - flash(f"SMTP error: {smtp_error}", "danger") - return redirect(url_for('contact')) - except Exception as e: - flash(f"An error occurred: {e}", "danger") - return redirect(url_for('contact')) - - # Si es GET, renderizar plantilla con el formulario - return render_template('contact.html') - - @app.route('/api/paste/', methods=['PUT']) - @jwt_required - def update_paste_file(id): - paste = Paste.query.get_or_404(id) - user = request.user # Usuario autenticado - - # Verificar permisos - if not paste.has_edit_permission(user): - return jsonify({"error": "No permission to edit this paste."}), 403 - - data = request.json or {} - new_content = data.get('content') - if not new_content: - return jsonify({"error": "Missing 'content' in JSON"}), 400 - - file_path = os.path.join(UPLOAD_FOLDER, paste.filename) - if not os.path.exists(file_path): - return jsonify({"error": "File not found"}), 404 - - try: - # 1. Eliminar de Elasticsearch (opcional) - delete_paste_from_index(paste) - - # 2. Guardar contenido en el archivo - with open(file_path, 'w', encoding='utf-8') as f: - f.write(new_content) - - # 3. Actualizar la fecha de edición - paste.last_edited_at = datetime.utcnow() - db.session.commit() - - # 4. Indexar de nuevo - index_paste(paste) - - return jsonify({"message": "Paste updated successfully"}), 200 - - except Exception as e: - db.session.rollback() - return jsonify({"error": str(e)}), 500 - - @app.route('/api/users', methods=['GET']) - @jwt_required - def list_users(): - try: - users = User.query.with_entities(User.username).all() - usernames = [user.username for user in users] - return jsonify({"users": usernames}), 200 - except Exception as e: - app.logger.error(f"Error retrieving users: {e}") - return jsonify({"error": "An unexpected error occurred."}), 500 - - - @app.route('/api/users/search', methods=['GET']) - @login_required # Asegura que solo usuarios autenticados puedan acceder - def search_users(): - query = request.args.get('q', '').strip() - logging.info(f"Search query received: {query}") - if not query: - return jsonify([]) # Retorna una lista vacía si la consulta está vacía - - # Obtener todos los nombres de usuario desde la base de datos - all_users = [user.username for user in User.query.all()] - logging.info(f"Total users fetched: {len(all_users)}") - - # Realizar una búsqueda difusa utilizando RapidFuzz - matches = process.extract(query, all_users, scorer=fuzz.WRatio, limit=10) - logging.info(f"Matches found: {matches}") - - # Definir un umbral para filtrar resultados poco relevantes - threshold = 60 # Puedes ajustar este valor según tus necesidades - - # Filtrar los resultados que superen el umbral - matched_users = [username for username, score, _ in matches if score >= threshold] - logging.info(f"Matched users after threshold: {matched_users}") - - # Crear una lista de diccionarios para retornar - user_list = [{'username': username} for username in matched_users] - - return jsonify(user_list) -