import os import socket import subprocess import unicodedata import time import threading import requests import re import html import json SAVE_PATH = "/var/lib/radio/" COOKIES_PATH = "/app/cookies.txt" LIQUIDSOAP_SOCKET = "/var/run/liquidsoap/socket" LASTFM_API_KEY = os.getenv("LASTFM_API_KEY") # API Key de Last.fm GENIUS_API_KEY = os.getenv("GENIUS_API_KEY") # Extensiones de audio permitidas AUDIO_EXTENSIONS = {".mp3", ".m4a", ".opus", ".flac", ".webm"} HUGGINGFACE_API_KEY = os.getenv("HUGGINGFACE_API_KEY") # Tu API Key de Hugging Face HF_API_URL = "https://api-inference.huggingface.co/models/google/gemma-2-2b-it" OLLAMA_API_URL = "http://ollama:11434/api/generate" HEADERS = { "Authorization": f"Bearer {HUGGINGFACE_API_KEY}", "Content-Type": "application/json" } def get_current_song_file(): """Devuelve el nombre de archivo de la canción actual si está disponible""" artist, title = get_current_song() if not artist or not title: return None for file in os.listdir(SAVE_PATH): if clean_song_title(title).lower() in clean_song_title(file).lower(): return f"🎵 Reproduciendo: {clean_filename(file)}" return " No se pudo encontrar el archivo de la canción actual." def remove_code_fence(text): """ Busca un bloque de código tipo: ```json { ... } ``` y devuelve solo el contenido interno. Si no lo encuentra, retorna el texto tal cual. """ pattern = r"```json\s*(.*?)\s*```" match = re.search(pattern, text, re.DOTALL) if match: return match.group(1).strip() return text.strip() def generate_playlist(prompt): """Genera una lista de canciones usando tu instancia local de Ollama.""" query = ( f"Genera una lista de 10 canciones recomendadas para: {prompt}. " "Devuelve SOLO un objeto JSON con la estructura:\n" "{\n" ' \"songs\": [\n' ' \"Artista - Canción\",\n' ' \"Artista - Canción\"\n' " ]\n" "}\n" "Nada de texto adicional ni bloques de código." ) payload = { "model": "gemma2:2b", # Ajusta al nombre exacto de tu modelo si es distinto "prompt": query, "stream": False } try: response = requests.post(OLLAMA_API_URL, json=payload, timeout=60) response.raise_for_status() response_json = response.json() raw_text = response_json.get("response", "").strip() print(f"[DEBUG] Respuesta cruda de Ollama:\n{raw_text}") clean_text = remove_code_fence(raw_text) # 1) Intentar parsear como JSON try: parsed_data = json.loads(clean_text) return parsed_data except json.JSONDecodeError: print("[ERROR] JSON inválido. Intentando regex...") json_match = re.search(r"\{.*?\}", clean_text, re.DOTALL) if json_match: try: parsed_data = json.loads(json_match.group(0)) return parsed_data except json.JSONDecodeError: pass songs = re.findall(r"([^-]+) - ([^\n]+)", clean_text) if songs: return {"songs": [f"{a.strip()} - {t.strip()}" for a, t in songs]} return {"songs": []} except requests.exceptions.RequestException as e: print(f"[ERROR] No se pudo conectar a Ollama: {e}") return {"songs": []} def suggest_playlist(criterion): """Genera y descarga una playlist basada en el criterio dado por el usuario.""" songs_data = generate_playlist(criterion) # YA DEVUELVE UN DICCIONARIO if not isinstance(songs_data, dict) or "songs" not in songs_data: return "️ No se pudo generar una playlist válida." songs = songs_data["songs"] if not songs: return "️ No se encontraron canciones en la lista generada." responses = [] for song in songs[:8]: # Limitar a 5 canciones para evitar spam en el IRC print(f" Añadiendo: {song}") # Debug para verificar qué canciones se están añadiendo response = add_song(song) # Descarga cada canción individualmente print(f" Respuesta de add_song: {response}") # Debug para ver la respuesta de yt-dlp responses.append(response) send_liquidsoap_command("radio.reload") # Recargar la playlist después de añadir return " Playlist generada con éxito:\n" + "\n".join(responses) def monitor_radio(): """Monitorea la reproducción y activa la eliminación de canciones.""" last_song = None while True: artist, title = get_current_song() if artist and title and title != last_song: print(f" Nueva canción detectada: {title} - {artist}") last_song = title # Ejecutar la eliminación de la canción en un hilo separado threading.Thread(target=delete_song_after_playing, daemon=True).start() time.sleep(10) # Verificar cada 10 segundos def monitor_liquidsoap(): """Escucha eventos de Liquidsoap y elimina canciones terminadas.""" if not os.path.exists(LIQUIDSOAP_SOCKET): print(" Error: El socket de Liquidsoap no existe.") return try: with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock: sock.connect(LIQUIDSOAP_SOCKET) print(" Conectado al socket de Liquidsoap para monitoreo.") while True: data = sock.recv(4096).decode("utf-8").strip() if not data: continue print(f" Liquidsoap: {data}") # Debug: Ver lo que responde Liquidsoap if "request.on_air" in data or "end of track" in data: print(" Detectada canción terminada. Verificando para eliminar...") delete_song_after_playing() except Exception as e: print(f" Error en monitor_liquidsoap: {e}") def normalize_artist_name(artist): """Normaliza el nombre del artista eliminando caracteres extraños.""" replacements = { "⧸": "/", # Reemplaza el carácter Unicode extraño por "/" "’": "'", # Reemplaza apóstrofes extraños "‘": "'", "“": '"', "”": '"' } for bad_char, good_char in replacements.items(): artist = artist.replace(bad_char, good_char) return artist.strip() def help(): """Devuelve la ayuda del comando .radio""" return "Uso: .radio [argumentos] - Control de radio por Liquidsoap." def clear_playlist(): """Elimina todas las canciones de la playlist, ignorando archivos .nfs* y directorios.""" if not os.path.exists(SAVE_PATH): return f"Error: El directorio {SAVE_PATH} no existe." files_removed = 0 for file in os.listdir(SAVE_PATH): file_path = os.path.join(SAVE_PATH, file) # Ignorar archivos .nfs* y directorios if file.startswith(".nfs") or os.path.isdir(file_path): continue try: os.remove(file_path) files_removed += 1 except Exception as e: print(f" No se pudo borrar {file}: {e}") send_liquidsoap_command("radio.reload") # Recargar la playlist en Liquidsoap return f" {files_removed} archivos eliminados de la playlist." def clean_song_title(song_title): """Elimina información irrelevante entre paréntesis o corchetes""" cleaned_title = re.sub(r"\s*[\(\[].*?[\)\]]", "", song_title) # Elimina (Official Video), [Live], etc. return cleaned_title.strip() def clean_filename(filename): """Elimina caracteres raros de los nombres de archivos""" return unicodedata.normalize('NFKD', filename).encode('ASCII', 'ignore').decode('utf-8') def extract_artist_and_title(song_title): """Intenta separar el artista y el título de la canción""" match = re.match(r"(.+?)\s*[-–]\s*(.+)", song_title) # Soporta "Artista - Canción" if match: artist, title = match.groups() return artist.strip(), title.strip() return None, song_title def send_liquidsoap_command(command): """Envía un comando al socket de Liquidsoap y devuelve la respuesta""" if not os.path.exists(LIQUIDSOAP_SOCKET): return "Error: El socket de Liquidsoap no existe. ¿Está corriendo el servidor?" try: with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock: sock.connect(LIQUIDSOAP_SOCKET) sock.sendall((command + "\n").encode("utf-8")) response = sock.recv(4096).decode("utf-8").strip() return response except Exception as e: return f"Error al enviar comando a Liquidsoap: {e}" def get_current_song(): """Obtiene el título y artista de la canción en reproducción con sanitización mejorada""" index = send_liquidsoap_command("request.on_air") if not index.isdigit(): return None, None metadata_response = send_liquidsoap_command(f"request.metadata {index}") if not metadata_response or "Error" in metadata_response: return None, None filename = None for line in metadata_response.split("\n"): if line.startswith("filename="): filename = line.split("=", 1)[1].strip().replace('"', '') if not filename: return None, None # Normalizar el nombre del archivo y quitar contenido irrelevante song_title = os.path.basename(filename).rsplit(".", 1)[0] cleaned_title = clean_song_title(song_title) # Elimina (Official Video) y demás artist, title = extract_artist_and_title(cleaned_title) # Si no se detecta artista, usar "Desconocido" artist = artist if artist else "Desconocido" artist = normalize_artist_name(artist) # Normaliza AC⧸DC → AC/DC return artist.strip(), title.strip() def delete_song_after_playing(): """Espera a que la canción termine y la elimina de la carpeta de radio.""" artist, title = get_current_song() if not artist or not title: return "️ No se está reproduciendo ninguna canción." # Buscar el archivo correspondiente en el sistema song_filename = None for file in os.listdir(SAVE_PATH): if clean_song_title(title).lower() in clean_song_title(file).lower(): song_filename = os.path.join(SAVE_PATH, file) break if not song_filename or not os.path.exists(song_filename): return "️ No se encontró la canción en el sistema de archivos." print(f" Monitoreando {song_filename} para eliminación...") # Esperar hasta que la canción ya no esté en reproducción while True: current_artist, current_title = get_current_song() if not current_title or current_title != title: break time.sleep(5) # Esperar 5 segundos antes de volver a verificar # Borrar archivo de la carpeta de radio try: os.remove(song_filename) print(f"️ Eliminado: {song_filename}") send_liquidsoap_command("radio.reload") # Recargar playlist en Liquidsoap return " Canción eliminada después de su reproducción." except Exception as e: return f" Error al eliminar la canción: {e}" def get_playlist(): """Lista directamente los archivos en el directorio de canciones""" if not os.path.exists(SAVE_PATH): return f"Error: El directorio {SAVE_PATH} no existe." files = os.listdir(SAVE_PATH) music_files = [f for f in files if os.path.splitext(f)[1].lower() in AUDIO_EXTENSIONS] if not music_files: return "La playlist está vacía." return "Playlist actual:\n" + "\n".join(f"{i}. {clean_filename(os.path.splitext(f)[0])}" for i, f in enumerate(sorted(music_files))) def add_song(query): """Añade una canción a la playlist usando yt-dlp""" if not query: return help() yt_command = [ "yt-dlp", "--cookies", COOKIES_PATH, f"ytsearch1:{query}", "-f", "bestaudio", "-x", "--audio-format", "opus", "--ffmpeg-location", "/usr/bin/ffmpeg", "-o", f"{SAVE_PATH}%(title)s.%(ext)s", "--embed-metadata", # Forzar metadatos en el archivo final "--ppa", "ffmpeg:-metadata comment='Descargado via yt-dlp'" ] try: subprocess.run(yt_command, capture_output=True, text=True, check=True) send_liquidsoap_command("radio.reload") return f"'{query}' añadida al stream y playlist recargada." except subprocess.CalledProcessError as e: return f"Error al añadir la canción: {e.stderr}" def get_track_info(): """Obtiene información del track desde Last.fm""" artist, track_title = get_current_song() if not artist or not track_title: return "No se está reproduciendo ninguna canción." url = "http://ws.audioscrobbler.com/2.0/" params = { "method": "track.getInfo", "api_key": LASTFM_API_KEY, "format": "json", "track": track_title } if artist: params["artist"] = artist try: response = requests.get(url, params=params) data = response.json() if "track" not in data: return f"No se encontró información para '{track_title}'." track = data["track"] # **Obtener artista y normalizar caracteres** artist_name = html.unescape(track.get("artist", {}).get("name", "Desconocido")) # **Obtener álbum si está disponible** album = track.get("album", {}).get("title", "No disponible") # **Obtener número de oyentes con formato** listeners = track.get("listeners", "N/A") if listeners.isdigit(): listeners = f"{int(listeners):,}".replace(",", ".") # **Obtener lista de géneros si hay más de uno** genres = track.get("toptags", {}).get("tag", []) genre_list = ", ".join([g["name"] for g in genres[:3]]) if genres else "Sin género" return ( f"Ahora suena: {track_title}\n" f"Artista: {artist_name}\n" f"Álbum: {album}\n" f"Oyentes en Last.fm: {listeners}\n" f"Género(s): {genre_list}" ) except Exception as e: return f"Error al obtener información del track: {e}" def get_lyrics(): """Busca la letra de la canción en Genius.""" if not GENIUS_API_KEY: return "Error: La API Key de Genius no está configurada." artist, title = get_current_song() if not artist or not title: return "No se está reproduciendo ninguna canción." # Normalizar el artista antes de buscar artist = normalize_artist_name(artist) query = f"{artist} {title}" print(f"[DEBUG] Buscando en Genius: {query}") # ️ Debug url = "https://api.genius.com/search" headers = {"Authorization": f"Bearer {GENIUS_API_KEY}"} params = {"q": query} try: response = requests.get(url, headers=headers, params=params, timeout=10) response.raise_for_status() data = response.json() hits = data.get("response", {}).get("hits", []) if not hits: return f"No se encontró la letra de '{title}' de {artist}." for hit in hits: result = hit["result"] genius_title = result.get("title", "").lower() genius_artist = normalize_artist_name(result.get("primary_artist", {}).get("name", "").lower()) if title.lower() in genius_title and artist.lower() in genius_artist: return f"Letras de '{title}' - {artist}: {result['url']}" return f"No se encontró la letra de '{title}' de {artist}." except requests.exceptions.RequestException as e: return f"Error al obtener la letra: {e}" def run(sender, *args): """Manejador principal de comandos de radio""" if not args: # Si no hay argumentos, mostrar ayuda return help() command = args[0].lower() command_args = args[1:] if len(args) > 1 else [] # Mapeo de comandos if command == "suggest": if not command_args: return " Uso: `.radio suggest ` (Ejemplo: `.radio suggest música relajante`)" return suggest_playlist(" ".join(command_args)) elif command == "lyrics": return get_lyrics() elif command == "add": if not command_args: return " ¡Falta el nombre de la canción! Ejemplo: .radio add Nirvana - Smells Like Teen Spirit" return add_song(" ".join(command_args)) elif command == "playing": return get_current_song_file() or " No hay música reproduciéndose" elif command == "clear": return clear_playlist() elif command == "trackinfo": return get_track_info() elif command == "list": return get_playlist() elif command in ["skip", "next"]: return send_liquidsoap_command("radio.skip") elif command == "reload": return send_liquidsoap_command("radio.reload") else: return help() # Comando no reconocido def help(): return ( " Comandos de Radio: .radio \n" "──────────────────────────\n" "add - Añade a la cola\n" "playing - Canción actual\n" "trackinfo - Detalles técnicos\n" "list - Lista de reproducción\n" "lyrics - Letra de la canción\n" "skip/next - Saltar tema\n" "reload - Recargar playlist\n" "clear - Limpiar playlist\n" "suggest - Generar playlist\n" "Stream: https://radio.priet.us/stream.opus\n" "M3U: https://radio.priet.us/stream.m3u" )