myircbot/plugins/radio.py
2025-05-29 22:58:53 +02:00

508 lines
17 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import socket
import subprocess
import unicodedata
import time
import threading
import requests
import re
import html
import json
SAVE_PATH = "/var/lib/radio/"
COOKIES_PATH = "/app/cookies.txt"
LIQUIDSOAP_SOCKET = "/var/run/liquidsoap/socket"
LASTFM_API_KEY = os.getenv("LASTFM_API_KEY") # API Key de Last.fm
GENIUS_API_KEY = os.getenv("GENIUS_API_KEY")
# Extensiones de audio permitidas
AUDIO_EXTENSIONS = {".mp3", ".m4a", ".opus", ".flac", ".webm"}
HUGGINGFACE_API_KEY = os.getenv("HUGGINGFACE_API_KEY") # Tu API Key de Hugging Face
HF_API_URL = "https://api-inference.huggingface.co/models/google/gemma-2-2b-it"
OLLAMA_API_URL = "http://ollama:11434/api/generate"
HEADERS = {
"Authorization": f"Bearer {HUGGINGFACE_API_KEY}",
"Content-Type": "application/json"
}
def get_current_song_file():
"""Devuelve el nombre de archivo de la canción actual si está disponible"""
artist, title = get_current_song()
if not artist or not title:
return None
for file in os.listdir(SAVE_PATH):
if clean_song_title(title).lower() in clean_song_title(file).lower():
return f"🎵 Reproduciendo: {clean_filename(file)}"
return " No se pudo encontrar el archivo de la canción actual."
def remove_code_fence(text):
"""
Busca un bloque de código tipo:
```json
{ ... }
```
y devuelve solo el contenido interno. Si no lo encuentra,
retorna el texto tal cual.
"""
pattern = r"```json\s*(.*?)\s*```"
match = re.search(pattern, text, re.DOTALL)
if match:
return match.group(1).strip()
return text.strip()
def generate_playlist(prompt):
"""Genera una lista de canciones usando tu instancia local de Ollama."""
query = (
f"Genera una lista de 10 canciones recomendadas para: {prompt}. "
"Devuelve SOLO un objeto JSON con la estructura:\n"
"{\n"
' \"songs\": [\n'
' \"Artista - Canción\",\n'
' \"Artista - Canción\"\n'
" ]\n"
"}\n"
"Nada de texto adicional ni bloques de código."
)
payload = {
"model": "gemma2:2b", # Ajusta al nombre exacto de tu modelo si es distinto
"prompt": query,
"stream": False
}
try:
response = requests.post(OLLAMA_API_URL, json=payload, timeout=60)
response.raise_for_status()
response_json = response.json()
raw_text = response_json.get("response", "").strip()
print(f"[DEBUG] Respuesta cruda de Ollama:\n{raw_text}")
clean_text = remove_code_fence(raw_text)
# 1) Intentar parsear como JSON
try:
parsed_data = json.loads(clean_text)
return parsed_data
except json.JSONDecodeError:
print("[ERROR] JSON inválido. Intentando regex...")
json_match = re.search(r"\{.*?\}", clean_text, re.DOTALL)
if json_match:
try:
parsed_data = json.loads(json_match.group(0))
return parsed_data
except json.JSONDecodeError:
pass
songs = re.findall(r"([^-]+) - ([^\n]+)", clean_text)
if songs:
return {"songs": [f"{a.strip()} - {t.strip()}" for a, t in songs]}
return {"songs": []}
except requests.exceptions.RequestException as e:
print(f"[ERROR] No se pudo conectar a Ollama: {e}")
return {"songs": []}
def suggest_playlist(criterion):
"""Genera y descarga una playlist basada en el criterio dado por el usuario."""
songs_data = generate_playlist(criterion) # YA DEVUELVE UN DICCIONARIO
if not isinstance(songs_data, dict) or "songs" not in songs_data:
return " No se pudo generar una playlist válida."
songs = songs_data["songs"]
if not songs:
return " No se encontraron canciones en la lista generada."
responses = []
for song in songs[:8]: # Limitar a 5 canciones para evitar spam en el IRC
print(f" Añadiendo: {song}") # Debug para verificar qué canciones se están añadiendo
response = add_song(song) # Descarga cada canción individualmente
print(f" Respuesta de add_song: {response}") # Debug para ver la respuesta de yt-dlp
responses.append(response)
send_liquidsoap_command("radio.reload") # Recargar la playlist después de añadir
return " Playlist generada con éxito:\n" + "\n".join(responses)
def monitor_radio():
"""Monitorea la reproducción y activa la eliminación de canciones."""
last_song = None
while True:
artist, title = get_current_song()
if artist and title and title != last_song:
print(f" Nueva canción detectada: {title} - {artist}")
last_song = title
# Ejecutar la eliminación de la canción en un hilo separado
threading.Thread(target=delete_song_after_playing, daemon=True).start()
time.sleep(10) # Verificar cada 10 segundos
def monitor_liquidsoap():
"""Escucha eventos de Liquidsoap y elimina canciones terminadas."""
if not os.path.exists(LIQUIDSOAP_SOCKET):
print(" Error: El socket de Liquidsoap no existe.")
return
try:
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
sock.connect(LIQUIDSOAP_SOCKET)
print(" Conectado al socket de Liquidsoap para monitoreo.")
while True:
data = sock.recv(4096).decode("utf-8").strip()
if not data:
continue
print(f" Liquidsoap: {data}") # Debug: Ver lo que responde Liquidsoap
if "request.on_air" in data or "end of track" in data:
print(" Detectada canción terminada. Verificando para eliminar...")
delete_song_after_playing()
except Exception as e:
print(f" Error en monitor_liquidsoap: {e}")
def normalize_artist_name(artist):
"""Normaliza el nombre del artista eliminando caracteres extraños."""
replacements = {
"": "/", # Reemplaza el carácter Unicode extraño por "/"
"": "'", # Reemplaza apóstrofes extraños
"": "'",
"": '"',
"": '"'
}
for bad_char, good_char in replacements.items():
artist = artist.replace(bad_char, good_char)
return artist.strip()
def help():
"""Devuelve la ayuda del comando .radio"""
return "Uso: .radio <comando> [argumentos] - Control de radio por Liquidsoap."
def clear_playlist():
"""Elimina todas las canciones de la playlist, ignorando archivos .nfs* y directorios."""
if not os.path.exists(SAVE_PATH):
return f"Error: El directorio {SAVE_PATH} no existe."
files_removed = 0
for file in os.listdir(SAVE_PATH):
file_path = os.path.join(SAVE_PATH, file)
# Ignorar archivos .nfs* y directorios
if file.startswith(".nfs") or os.path.isdir(file_path):
continue
try:
os.remove(file_path)
files_removed += 1
except Exception as e:
print(f" No se pudo borrar {file}: {e}")
send_liquidsoap_command("radio.reload") # Recargar la playlist en Liquidsoap
return f" {files_removed} archivos eliminados de la playlist."
def clean_song_title(song_title):
"""Elimina información irrelevante entre paréntesis o corchetes"""
cleaned_title = re.sub(r"\s*[\(\[].*?[\)\]]", "", song_title) # Elimina (Official Video), [Live], etc.
return cleaned_title.strip()
def clean_filename(filename):
"""Elimina caracteres raros de los nombres de archivos"""
return unicodedata.normalize('NFKD', filename).encode('ASCII', 'ignore').decode('utf-8')
def extract_artist_and_title(song_title):
"""Intenta separar el artista y el título de la canción"""
match = re.match(r"(.+?)\s*[-]\s*(.+)", song_title) # Soporta "Artista - Canción"
if match:
artist, title = match.groups()
return artist.strip(), title.strip()
return None, song_title
def send_liquidsoap_command(command):
"""Envía un comando al socket de Liquidsoap y devuelve la respuesta"""
if not os.path.exists(LIQUIDSOAP_SOCKET):
return "Error: El socket de Liquidsoap no existe. ¿Está corriendo el servidor?"
try:
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
sock.connect(LIQUIDSOAP_SOCKET)
sock.sendall((command + "\n").encode("utf-8"))
response = sock.recv(4096).decode("utf-8").strip()
return response
except Exception as e:
return f"Error al enviar comando a Liquidsoap: {e}"
def get_current_song():
"""Obtiene el título y artista de la canción en reproducción con sanitización mejorada"""
index = send_liquidsoap_command("request.on_air")
if not index.isdigit():
return None, None
metadata_response = send_liquidsoap_command(f"request.metadata {index}")
if not metadata_response or "Error" in metadata_response:
return None, None
filename = None
for line in metadata_response.split("\n"):
if line.startswith("filename="):
filename = line.split("=", 1)[1].strip().replace('"', '')
if not filename:
return None, None
# Normalizar el nombre del archivo y quitar contenido irrelevante
song_title = os.path.basename(filename).rsplit(".", 1)[0]
cleaned_title = clean_song_title(song_title) # Elimina (Official Video) y demás
artist, title = extract_artist_and_title(cleaned_title)
# Si no se detecta artista, usar "Desconocido"
artist = artist if artist else "Desconocido"
artist = normalize_artist_name(artist) # Normaliza ACDC → AC/DC
return artist.strip(), title.strip()
def delete_song_after_playing():
"""Espera a que la canción termine y la elimina de la carpeta de radio."""
artist, title = get_current_song()
if not artist or not title:
return " No se está reproduciendo ninguna canción."
# Buscar el archivo correspondiente en el sistema
song_filename = None
for file in os.listdir(SAVE_PATH):
if clean_song_title(title).lower() in clean_song_title(file).lower():
song_filename = os.path.join(SAVE_PATH, file)
break
if not song_filename or not os.path.exists(song_filename):
return " No se encontró la canción en el sistema de archivos."
print(f" Monitoreando {song_filename} para eliminación...")
# Esperar hasta que la canción ya no esté en reproducción
while True:
current_artist, current_title = get_current_song()
if not current_title or current_title != title:
break
time.sleep(5) # Esperar 5 segundos antes de volver a verificar
# Borrar archivo de la carpeta de radio
try:
os.remove(song_filename)
print(f" Eliminado: {song_filename}")
send_liquidsoap_command("radio.reload") # Recargar playlist en Liquidsoap
return " Canción eliminada después de su reproducción."
except Exception as e:
return f" Error al eliminar la canción: {e}"
def get_playlist():
"""Lista directamente los archivos en el directorio de canciones"""
if not os.path.exists(SAVE_PATH):
return f"Error: El directorio {SAVE_PATH} no existe."
files = os.listdir(SAVE_PATH)
music_files = [f for f in files if os.path.splitext(f)[1].lower() in AUDIO_EXTENSIONS]
if not music_files:
return "La playlist está vacía."
return "Playlist actual:\n" + "\n".join(f"{i}. {clean_filename(os.path.splitext(f)[0])}" for i, f in enumerate(sorted(music_files)))
def add_song(query):
"""Añade una canción a la playlist usando yt-dlp"""
if not query:
return help()
yt_command = [
"yt-dlp",
"--cookies", COOKIES_PATH,
f"ytsearch1:{query}",
"-f", "bestaudio",
"-x",
"--audio-format", "opus",
"--ffmpeg-location", "/usr/bin/ffmpeg",
"-o", f"{SAVE_PATH}%(title)s.%(ext)s",
"--embed-metadata", # Forzar metadatos en el archivo final
"--ppa", "ffmpeg:-metadata comment='Descargado via yt-dlp'"
]
try:
subprocess.run(yt_command, capture_output=True, text=True, check=True)
send_liquidsoap_command("radio.reload")
return f"'{query}' añadida al stream y playlist recargada."
except subprocess.CalledProcessError as e:
return f"Error al añadir la canción: {e.stderr}"
def get_track_info():
"""Obtiene información del track desde Last.fm"""
artist, track_title = get_current_song()
if not artist or not track_title:
return "No se está reproduciendo ninguna canción."
url = "http://ws.audioscrobbler.com/2.0/"
params = {
"method": "track.getInfo",
"api_key": LASTFM_API_KEY,
"format": "json",
"track": track_title
}
if artist:
params["artist"] = artist
try:
response = requests.get(url, params=params)
data = response.json()
if "track" not in data:
return f"No se encontró información para '{track_title}'."
track = data["track"]
# **Obtener artista y normalizar caracteres**
artist_name = html.unescape(track.get("artist", {}).get("name", "Desconocido"))
# **Obtener álbum si está disponible**
album = track.get("album", {}).get("title", "No disponible")
# **Obtener número de oyentes con formato**
listeners = track.get("listeners", "N/A")
if listeners.isdigit():
listeners = f"{int(listeners):,}".replace(",", ".")
# **Obtener lista de géneros si hay más de uno**
genres = track.get("toptags", {}).get("tag", [])
genre_list = ", ".join([g["name"] for g in genres[:3]]) if genres else "Sin género"
return (
f"Ahora suena: {track_title}\n"
f"Artista: {artist_name}\n"
f"Álbum: {album}\n"
f"Oyentes en Last.fm: {listeners}\n"
f"Género(s): {genre_list}"
)
except Exception as e:
return f"Error al obtener información del track: {e}"
def get_lyrics():
"""Busca la letra de la canción en Genius."""
if not GENIUS_API_KEY:
return "Error: La API Key de Genius no está configurada."
artist, title = get_current_song()
if not artist or not title:
return "No se está reproduciendo ninguna canción."
# Normalizar el artista antes de buscar
artist = normalize_artist_name(artist)
query = f"{artist} {title}"
print(f"[DEBUG] Buscando en Genius: {query}") # Debug
url = "https://api.genius.com/search"
headers = {"Authorization": f"Bearer {GENIUS_API_KEY}"}
params = {"q": query}
try:
response = requests.get(url, headers=headers, params=params, timeout=10)
response.raise_for_status()
data = response.json()
hits = data.get("response", {}).get("hits", [])
if not hits:
return f"No se encontró la letra de '{title}' de {artist}."
for hit in hits:
result = hit["result"]
genius_title = result.get("title", "").lower()
genius_artist = normalize_artist_name(result.get("primary_artist", {}).get("name", "").lower())
if title.lower() in genius_title and artist.lower() in genius_artist:
return f"Letras de '{title}' - {artist}: {result['url']}"
return f"No se encontró la letra de '{title}' de {artist}."
except requests.exceptions.RequestException as e:
return f"Error al obtener la letra: {e}"
def run(sender, *args):
"""Manejador principal de comandos de radio"""
if not args: # Si no hay argumentos, mostrar ayuda
return help()
command = args[0].lower()
command_args = args[1:] if len(args) > 1 else []
# Mapeo de comandos
if command == "suggest":
if not command_args:
return " Uso: `.radio suggest <criterio>` (Ejemplo: `.radio suggest música relajante`)"
return suggest_playlist(" ".join(command_args))
elif command == "lyrics":
return get_lyrics()
elif command == "add":
if not command_args:
return " ¡Falta el nombre de la canción! Ejemplo: .radio add Nirvana - Smells Like Teen Spirit"
return add_song(" ".join(command_args))
elif command == "playing":
return get_current_song_file() or " No hay música reproduciéndose"
elif command == "clear":
return clear_playlist()
elif command == "trackinfo":
return get_track_info()
elif command == "list":
return get_playlist()
elif command in ["skip", "next"]:
return send_liquidsoap_command("radio.skip")
elif command == "reload":
return send_liquidsoap_command("radio.reload")
else:
return help() # Comando no reconocido
def help():
return (
" Comandos de Radio: .radio <comando>\n"
"──────────────────────────\n"
"add <canción> - Añade a la cola\n"
"playing - Canción actual\n"
"trackinfo - Detalles técnicos\n"
"list - Lista de reproducción\n"
"lyrics - Letra de la canción\n"
"skip/next - Saltar tema\n"
"reload - Recargar playlist\n"
"clear - Limpiar playlist\n"
"suggest <criterio> - Generar playlist\n"
"Stream: https://radio.priet.us/stream.opus\n"
"M3U: https://radio.priet.us/stream.m3u"
)