508 lines
17 KiB
Python
508 lines
17 KiB
Python
import os
|
||
import socket
|
||
import subprocess
|
||
import unicodedata
|
||
import time
|
||
import threading
|
||
import requests
|
||
import re
|
||
import html
|
||
import json
|
||
|
||
SAVE_PATH = "/var/lib/radio/"
|
||
COOKIES_PATH = "/app/cookies.txt"
|
||
LIQUIDSOAP_SOCKET = "/var/run/liquidsoap/socket"
|
||
LASTFM_API_KEY = os.getenv("LASTFM_API_KEY") # API Key de Last.fm
|
||
GENIUS_API_KEY = os.getenv("GENIUS_API_KEY")
|
||
# Extensiones de audio permitidas
|
||
AUDIO_EXTENSIONS = {".mp3", ".m4a", ".opus", ".flac", ".webm"}
|
||
HUGGINGFACE_API_KEY = os.getenv("HUGGINGFACE_API_KEY") # Tu API Key de Hugging Face
|
||
HF_API_URL = "https://api-inference.huggingface.co/models/google/gemma-2-2b-it"
|
||
OLLAMA_API_URL = "http://ollama:11434/api/generate"
|
||
|
||
HEADERS = {
|
||
"Authorization": f"Bearer {HUGGINGFACE_API_KEY}",
|
||
"Content-Type": "application/json"
|
||
}
|
||
|
||
def get_current_song_file():
|
||
"""Devuelve el nombre de archivo de la canción actual si está disponible"""
|
||
artist, title = get_current_song()
|
||
if not artist or not title:
|
||
return None
|
||
|
||
for file in os.listdir(SAVE_PATH):
|
||
if clean_song_title(title).lower() in clean_song_title(file).lower():
|
||
return f"🎵 Reproduciendo: {clean_filename(file)}"
|
||
|
||
return " No se pudo encontrar el archivo de la canción actual."
|
||
|
||
def remove_code_fence(text):
|
||
"""
|
||
Busca un bloque de código tipo:
|
||
```json
|
||
{ ... }
|
||
```
|
||
y devuelve solo el contenido interno. Si no lo encuentra,
|
||
retorna el texto tal cual.
|
||
"""
|
||
pattern = r"```json\s*(.*?)\s*```"
|
||
match = re.search(pattern, text, re.DOTALL)
|
||
if match:
|
||
return match.group(1).strip()
|
||
return text.strip()
|
||
|
||
def generate_playlist(prompt):
|
||
"""Genera una lista de canciones usando tu instancia local de Ollama."""
|
||
|
||
query = (
|
||
f"Genera una lista de 10 canciones recomendadas para: {prompt}. "
|
||
"Devuelve SOLO un objeto JSON con la estructura:\n"
|
||
"{\n"
|
||
' \"songs\": [\n'
|
||
' \"Artista - Canción\",\n'
|
||
' \"Artista - Canción\"\n'
|
||
" ]\n"
|
||
"}\n"
|
||
"Nada de texto adicional ni bloques de código."
|
||
)
|
||
|
||
payload = {
|
||
"model": "gemma2:2b", # Ajusta al nombre exacto de tu modelo si es distinto
|
||
"prompt": query,
|
||
"stream": False
|
||
}
|
||
|
||
try:
|
||
response = requests.post(OLLAMA_API_URL, json=payload, timeout=60)
|
||
response.raise_for_status()
|
||
response_json = response.json()
|
||
raw_text = response_json.get("response", "").strip()
|
||
|
||
print(f"[DEBUG] Respuesta cruda de Ollama:\n{raw_text}")
|
||
|
||
clean_text = remove_code_fence(raw_text)
|
||
|
||
# 1) Intentar parsear como JSON
|
||
try:
|
||
parsed_data = json.loads(clean_text)
|
||
return parsed_data
|
||
except json.JSONDecodeError:
|
||
print("[ERROR] JSON inválido. Intentando regex...")
|
||
|
||
json_match = re.search(r"\{.*?\}", clean_text, re.DOTALL)
|
||
if json_match:
|
||
try:
|
||
parsed_data = json.loads(json_match.group(0))
|
||
return parsed_data
|
||
except json.JSONDecodeError:
|
||
pass
|
||
|
||
songs = re.findall(r"([^-]+) - ([^\n]+)", clean_text)
|
||
if songs:
|
||
return {"songs": [f"{a.strip()} - {t.strip()}" for a, t in songs]}
|
||
|
||
return {"songs": []}
|
||
|
||
except requests.exceptions.RequestException as e:
|
||
print(f"[ERROR] No se pudo conectar a Ollama: {e}")
|
||
return {"songs": []}
|
||
|
||
|
||
def suggest_playlist(criterion):
|
||
"""Genera y descarga una playlist basada en el criterio dado por el usuario."""
|
||
songs_data = generate_playlist(criterion) # YA DEVUELVE UN DICCIONARIO
|
||
|
||
if not isinstance(songs_data, dict) or "songs" not in songs_data:
|
||
return "️ No se pudo generar una playlist válida."
|
||
|
||
songs = songs_data["songs"]
|
||
if not songs:
|
||
return "️ No se encontraron canciones en la lista generada."
|
||
|
||
responses = []
|
||
for song in songs[:8]: # Limitar a 5 canciones para evitar spam en el IRC
|
||
print(f" Añadiendo: {song}") # Debug para verificar qué canciones se están añadiendo
|
||
response = add_song(song) # Descarga cada canción individualmente
|
||
print(f" Respuesta de add_song: {response}") # Debug para ver la respuesta de yt-dlp
|
||
responses.append(response)
|
||
|
||
send_liquidsoap_command("radio.reload") # Recargar la playlist después de añadir
|
||
return " Playlist generada con éxito:\n" + "\n".join(responses)
|
||
|
||
|
||
|
||
def monitor_radio():
|
||
"""Monitorea la reproducción y activa la eliminación de canciones."""
|
||
last_song = None
|
||
|
||
while True:
|
||
artist, title = get_current_song()
|
||
|
||
if artist and title and title != last_song:
|
||
print(f" Nueva canción detectada: {title} - {artist}")
|
||
last_song = title
|
||
|
||
# Ejecutar la eliminación de la canción en un hilo separado
|
||
threading.Thread(target=delete_song_after_playing, daemon=True).start()
|
||
|
||
time.sleep(10) # Verificar cada 10 segundos
|
||
|
||
def monitor_liquidsoap():
|
||
"""Escucha eventos de Liquidsoap y elimina canciones terminadas."""
|
||
if not os.path.exists(LIQUIDSOAP_SOCKET):
|
||
print(" Error: El socket de Liquidsoap no existe.")
|
||
return
|
||
|
||
try:
|
||
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
|
||
sock.connect(LIQUIDSOAP_SOCKET)
|
||
print(" Conectado al socket de Liquidsoap para monitoreo.")
|
||
|
||
while True:
|
||
data = sock.recv(4096).decode("utf-8").strip()
|
||
if not data:
|
||
continue
|
||
|
||
print(f" Liquidsoap: {data}") # Debug: Ver lo que responde Liquidsoap
|
||
|
||
if "request.on_air" in data or "end of track" in data:
|
||
print(" Detectada canción terminada. Verificando para eliminar...")
|
||
delete_song_after_playing()
|
||
|
||
except Exception as e:
|
||
print(f" Error en monitor_liquidsoap: {e}")
|
||
|
||
|
||
def normalize_artist_name(artist):
|
||
"""Normaliza el nombre del artista eliminando caracteres extraños."""
|
||
replacements = {
|
||
"⧸": "/", # Reemplaza el carácter Unicode extraño por "/"
|
||
"’": "'", # Reemplaza apóstrofes extraños
|
||
"‘": "'",
|
||
"“": '"',
|
||
"”": '"'
|
||
}
|
||
for bad_char, good_char in replacements.items():
|
||
artist = artist.replace(bad_char, good_char)
|
||
|
||
return artist.strip()
|
||
|
||
|
||
def help():
|
||
"""Devuelve la ayuda del comando .radio"""
|
||
return "Uso: .radio <comando> [argumentos] - Control de radio por Liquidsoap."
|
||
|
||
def clear_playlist():
|
||
"""Elimina todas las canciones de la playlist, ignorando archivos .nfs* y directorios."""
|
||
if not os.path.exists(SAVE_PATH):
|
||
return f"Error: El directorio {SAVE_PATH} no existe."
|
||
|
||
files_removed = 0
|
||
for file in os.listdir(SAVE_PATH):
|
||
file_path = os.path.join(SAVE_PATH, file)
|
||
|
||
# Ignorar archivos .nfs* y directorios
|
||
if file.startswith(".nfs") or os.path.isdir(file_path):
|
||
continue
|
||
|
||
try:
|
||
os.remove(file_path)
|
||
files_removed += 1
|
||
except Exception as e:
|
||
print(f" No se pudo borrar {file}: {e}")
|
||
|
||
send_liquidsoap_command("radio.reload") # Recargar la playlist en Liquidsoap
|
||
return f" {files_removed} archivos eliminados de la playlist."
|
||
|
||
|
||
|
||
def clean_song_title(song_title):
|
||
"""Elimina información irrelevante entre paréntesis o corchetes"""
|
||
cleaned_title = re.sub(r"\s*[\(\[].*?[\)\]]", "", song_title) # Elimina (Official Video), [Live], etc.
|
||
return cleaned_title.strip()
|
||
|
||
def clean_filename(filename):
|
||
"""Elimina caracteres raros de los nombres de archivos"""
|
||
return unicodedata.normalize('NFKD', filename).encode('ASCII', 'ignore').decode('utf-8')
|
||
|
||
def extract_artist_and_title(song_title):
|
||
"""Intenta separar el artista y el título de la canción"""
|
||
match = re.match(r"(.+?)\s*[-–]\s*(.+)", song_title) # Soporta "Artista - Canción"
|
||
if match:
|
||
artist, title = match.groups()
|
||
return artist.strip(), title.strip()
|
||
return None, song_title
|
||
|
||
def send_liquidsoap_command(command):
|
||
"""Envía un comando al socket de Liquidsoap y devuelve la respuesta"""
|
||
if not os.path.exists(LIQUIDSOAP_SOCKET):
|
||
return "Error: El socket de Liquidsoap no existe. ¿Está corriendo el servidor?"
|
||
|
||
try:
|
||
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
|
||
sock.connect(LIQUIDSOAP_SOCKET)
|
||
sock.sendall((command + "\n").encode("utf-8"))
|
||
response = sock.recv(4096).decode("utf-8").strip()
|
||
return response
|
||
except Exception as e:
|
||
return f"Error al enviar comando a Liquidsoap: {e}"
|
||
|
||
def get_current_song():
|
||
"""Obtiene el título y artista de la canción en reproducción con sanitización mejorada"""
|
||
index = send_liquidsoap_command("request.on_air")
|
||
if not index.isdigit():
|
||
return None, None
|
||
|
||
metadata_response = send_liquidsoap_command(f"request.metadata {index}")
|
||
if not metadata_response or "Error" in metadata_response:
|
||
return None, None
|
||
|
||
filename = None
|
||
for line in metadata_response.split("\n"):
|
||
if line.startswith("filename="):
|
||
filename = line.split("=", 1)[1].strip().replace('"', '')
|
||
|
||
if not filename:
|
||
return None, None
|
||
|
||
# Normalizar el nombre del archivo y quitar contenido irrelevante
|
||
song_title = os.path.basename(filename).rsplit(".", 1)[0]
|
||
cleaned_title = clean_song_title(song_title) # Elimina (Official Video) y demás
|
||
artist, title = extract_artist_and_title(cleaned_title)
|
||
|
||
# Si no se detecta artista, usar "Desconocido"
|
||
artist = artist if artist else "Desconocido"
|
||
artist = normalize_artist_name(artist) # Normaliza AC⧸DC → AC/DC
|
||
|
||
return artist.strip(), title.strip()
|
||
|
||
|
||
def delete_song_after_playing():
|
||
"""Espera a que la canción termine y la elimina de la carpeta de radio."""
|
||
artist, title = get_current_song()
|
||
|
||
if not artist or not title:
|
||
return "️ No se está reproduciendo ninguna canción."
|
||
|
||
# Buscar el archivo correspondiente en el sistema
|
||
song_filename = None
|
||
for file in os.listdir(SAVE_PATH):
|
||
if clean_song_title(title).lower() in clean_song_title(file).lower():
|
||
song_filename = os.path.join(SAVE_PATH, file)
|
||
break
|
||
|
||
if not song_filename or not os.path.exists(song_filename):
|
||
return "️ No se encontró la canción en el sistema de archivos."
|
||
|
||
print(f" Monitoreando {song_filename} para eliminación...")
|
||
|
||
# Esperar hasta que la canción ya no esté en reproducción
|
||
while True:
|
||
current_artist, current_title = get_current_song()
|
||
if not current_title or current_title != title:
|
||
break
|
||
time.sleep(5) # Esperar 5 segundos antes de volver a verificar
|
||
|
||
# Borrar archivo de la carpeta de radio
|
||
try:
|
||
os.remove(song_filename)
|
||
print(f"️ Eliminado: {song_filename}")
|
||
send_liquidsoap_command("radio.reload") # Recargar playlist en Liquidsoap
|
||
return " Canción eliminada después de su reproducción."
|
||
except Exception as e:
|
||
return f" Error al eliminar la canción: {e}"
|
||
|
||
def get_playlist():
|
||
"""Lista directamente los archivos en el directorio de canciones"""
|
||
if not os.path.exists(SAVE_PATH):
|
||
return f"Error: El directorio {SAVE_PATH} no existe."
|
||
|
||
files = os.listdir(SAVE_PATH)
|
||
music_files = [f for f in files if os.path.splitext(f)[1].lower() in AUDIO_EXTENSIONS]
|
||
|
||
if not music_files:
|
||
return "La playlist está vacía."
|
||
|
||
return "Playlist actual:\n" + "\n".join(f"{i}. {clean_filename(os.path.splitext(f)[0])}" for i, f in enumerate(sorted(music_files)))
|
||
|
||
def add_song(query):
|
||
"""Añade una canción a la playlist usando yt-dlp"""
|
||
if not query:
|
||
return help()
|
||
|
||
yt_command = [
|
||
"yt-dlp",
|
||
"--cookies", COOKIES_PATH,
|
||
f"ytsearch1:{query}",
|
||
"-f", "bestaudio",
|
||
"-x",
|
||
"--audio-format", "opus",
|
||
"--ffmpeg-location", "/usr/bin/ffmpeg",
|
||
"-o", f"{SAVE_PATH}%(title)s.%(ext)s",
|
||
"--embed-metadata", # Forzar metadatos en el archivo final
|
||
"--ppa", "ffmpeg:-metadata comment='Descargado via yt-dlp'"
|
||
]
|
||
try:
|
||
subprocess.run(yt_command, capture_output=True, text=True, check=True)
|
||
send_liquidsoap_command("radio.reload")
|
||
return f"'{query}' añadida al stream y playlist recargada."
|
||
except subprocess.CalledProcessError as e:
|
||
return f"Error al añadir la canción: {e.stderr}"
|
||
|
||
def get_track_info():
|
||
"""Obtiene información del track desde Last.fm"""
|
||
artist, track_title = get_current_song()
|
||
if not artist or not track_title:
|
||
return "No se está reproduciendo ninguna canción."
|
||
|
||
url = "http://ws.audioscrobbler.com/2.0/"
|
||
params = {
|
||
"method": "track.getInfo",
|
||
"api_key": LASTFM_API_KEY,
|
||
"format": "json",
|
||
"track": track_title
|
||
}
|
||
|
||
if artist:
|
||
params["artist"] = artist
|
||
|
||
try:
|
||
response = requests.get(url, params=params)
|
||
data = response.json()
|
||
|
||
if "track" not in data:
|
||
return f"No se encontró información para '{track_title}'."
|
||
|
||
track = data["track"]
|
||
|
||
# **Obtener artista y normalizar caracteres**
|
||
artist_name = html.unescape(track.get("artist", {}).get("name", "Desconocido"))
|
||
|
||
# **Obtener álbum si está disponible**
|
||
album = track.get("album", {}).get("title", "No disponible")
|
||
|
||
# **Obtener número de oyentes con formato**
|
||
listeners = track.get("listeners", "N/A")
|
||
if listeners.isdigit():
|
||
listeners = f"{int(listeners):,}".replace(",", ".")
|
||
|
||
# **Obtener lista de géneros si hay más de uno**
|
||
genres = track.get("toptags", {}).get("tag", [])
|
||
genre_list = ", ".join([g["name"] for g in genres[:3]]) if genres else "Sin género"
|
||
|
||
return (
|
||
f"Ahora suena: {track_title}\n"
|
||
f"Artista: {artist_name}\n"
|
||
f"Álbum: {album}\n"
|
||
f"Oyentes en Last.fm: {listeners}\n"
|
||
f"Género(s): {genre_list}"
|
||
)
|
||
|
||
except Exception as e:
|
||
return f"Error al obtener información del track: {e}"
|
||
|
||
|
||
def get_lyrics():
|
||
"""Busca la letra de la canción en Genius."""
|
||
if not GENIUS_API_KEY:
|
||
return "Error: La API Key de Genius no está configurada."
|
||
|
||
artist, title = get_current_song()
|
||
if not artist or not title:
|
||
return "No se está reproduciendo ninguna canción."
|
||
|
||
# Normalizar el artista antes de buscar
|
||
artist = normalize_artist_name(artist)
|
||
|
||
query = f"{artist} {title}"
|
||
print(f"[DEBUG] Buscando en Genius: {query}") # ️ Debug
|
||
|
||
url = "https://api.genius.com/search"
|
||
headers = {"Authorization": f"Bearer {GENIUS_API_KEY}"}
|
||
params = {"q": query}
|
||
|
||
try:
|
||
response = requests.get(url, headers=headers, params=params, timeout=10)
|
||
response.raise_for_status()
|
||
data = response.json()
|
||
|
||
hits = data.get("response", {}).get("hits", [])
|
||
if not hits:
|
||
return f"No se encontró la letra de '{title}' de {artist}."
|
||
|
||
for hit in hits:
|
||
result = hit["result"]
|
||
genius_title = result.get("title", "").lower()
|
||
genius_artist = normalize_artist_name(result.get("primary_artist", {}).get("name", "").lower())
|
||
|
||
if title.lower() in genius_title and artist.lower() in genius_artist:
|
||
return f"Letras de '{title}' - {artist}: {result['url']}"
|
||
|
||
return f"No se encontró la letra de '{title}' de {artist}."
|
||
|
||
except requests.exceptions.RequestException as e:
|
||
return f"Error al obtener la letra: {e}"
|
||
|
||
|
||
def run(sender, *args):
|
||
"""Manejador principal de comandos de radio"""
|
||
if not args: # Si no hay argumentos, mostrar ayuda
|
||
return help()
|
||
|
||
command = args[0].lower()
|
||
command_args = args[1:] if len(args) > 1 else []
|
||
|
||
# Mapeo de comandos
|
||
|
||
if command == "suggest":
|
||
if not command_args:
|
||
return " Uso: `.radio suggest <criterio>` (Ejemplo: `.radio suggest música relajante`)"
|
||
return suggest_playlist(" ".join(command_args))
|
||
|
||
elif command == "lyrics":
|
||
return get_lyrics()
|
||
|
||
elif command == "add":
|
||
if not command_args:
|
||
return " ¡Falta el nombre de la canción! Ejemplo: .radio add Nirvana - Smells Like Teen Spirit"
|
||
return add_song(" ".join(command_args))
|
||
|
||
elif command == "playing":
|
||
return get_current_song_file() or " No hay música reproduciéndose"
|
||
|
||
elif command == "clear":
|
||
return clear_playlist()
|
||
|
||
elif command == "trackinfo":
|
||
return get_track_info()
|
||
|
||
elif command == "list":
|
||
return get_playlist()
|
||
|
||
elif command in ["skip", "next"]:
|
||
return send_liquidsoap_command("radio.skip")
|
||
|
||
elif command == "reload":
|
||
return send_liquidsoap_command("radio.reload")
|
||
|
||
else:
|
||
return help() # Comando no reconocido
|
||
|
||
def help():
|
||
return (
|
||
" Comandos de Radio: .radio <comando>\n"
|
||
"──────────────────────────\n"
|
||
"add <canción> - Añade a la cola\n"
|
||
"playing - Canción actual\n"
|
||
"trackinfo - Detalles técnicos\n"
|
||
"list - Lista de reproducción\n"
|
||
"lyrics - Letra de la canción\n"
|
||
"skip/next - Saltar tema\n"
|
||
"reload - Recargar playlist\n"
|
||
"clear - Limpiar playlist\n"
|
||
"suggest <criterio> - Generar playlist\n"
|
||
"Stream: https://radio.priet.us/stream.opus\n"
|
||
"M3U: https://radio.priet.us/stream.m3u"
|
||
)
|