195 lines
8.0 KiB
Python
Executable File
195 lines
8.0 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import threading
|
|
from pathlib import Path
|
|
import asyncio
|
|
from lastfm_cover_downloader import download_cover_from_lastfm, cover_exists, get_local_album_dir
|
|
from dbus_next.aio import MessageBus
|
|
from dbus_next import Variant
|
|
from gi.repository import Gtk, GdkPixbuf, GLib
|
|
import os
|
|
import gi
|
|
gi.require_version("Gtk", "4.0")
|
|
|
|
os.environ["GDK_BACKEND"] = "x11" # O "wayland" si usas Wayland
|
|
|
|
|
|
class CoverPanel:
|
|
def __init__(self, music_base_path):
|
|
self.music_base_path = Path(music_base_path)
|
|
self.window = Gtk.Window(title="Album Cover")
|
|
self.window.set_default_size(300, 300)
|
|
self.window.set_resizable(False)
|
|
self.window.set_decorated(False)
|
|
self.window.set_opacity(0.8)
|
|
|
|
# Crear el widget Gtk.Picture para mostrar y escalar la imagen
|
|
self.picture = Gtk.Picture()
|
|
self.window.set_child(self.picture)
|
|
|
|
self.window.present()
|
|
print("CoverPanel window initialized successfully.")
|
|
|
|
def update_cover(self, cover_path):
|
|
print(f"Attempting to update cover with path: {cover_path}")
|
|
if cover_path and cover_path.exists():
|
|
print(f"Updating cover with path: {cover_path.resolve()}")
|
|
try:
|
|
# Configurar Gtk.Picture para mostrar la imagen y escalarla automáticamente
|
|
pixbuf = GdkPixbuf.Pixbuf.new_from_file(str(cover_path))
|
|
GLib.idle_add(self.picture.set_paintable,
|
|
Gtk.Picture.new_for_pixbuf(pixbuf).get_paintable())
|
|
print(f"Cover panel updated with: {cover_path.resolve()}")
|
|
except Exception as e:
|
|
print(f"Error loading image: {e}")
|
|
else:
|
|
print(f"Cover path does not exist or is invalid: {cover_path}")
|
|
|
|
def load_initial_cover(self):
|
|
"""Busca y carga la última carátula disponible en la biblioteca."""
|
|
cover_path = self.find_latest_cover()
|
|
if cover_path:
|
|
self.update_cover(cover_path)
|
|
|
|
def find_latest_cover(self):
|
|
"""Busca la carátula más reciente en la biblioteca de música."""
|
|
for album_dir in sorted(self.music_base_path.glob("*/"), reverse=True):
|
|
if cover_exists(album_dir):
|
|
return album_dir / "cover.jpg"
|
|
return None
|
|
|
|
|
|
class MprisCoverHandlerWithGTK:
|
|
def __init__(self, music_base_path):
|
|
print("Inicializando MprisCoverHandlerWithGTK.")
|
|
self.music_base_path = music_base_path
|
|
self.panel = CoverPanel(music_base_path)
|
|
self.bus = None
|
|
|
|
async def load_current_song_cover(self):
|
|
"""Intenta cargar la carátula de la canción actual usando MPRIS."""
|
|
try:
|
|
self.bus = await MessageBus().connect()
|
|
introspectable = await self.bus.introspect("org.mpris.MediaPlayer2.mpd", "/org/mpris/MediaPlayer2")
|
|
obj = self.bus.get_proxy_object(
|
|
"org.mpris.MediaPlayer2.mpd", "/org/mpris/MediaPlayer2", introspectable)
|
|
properties = obj.get_interface("org.freedesktop.DBus.Properties")
|
|
metadata_variant = await properties.call_get("org.mpris.MediaPlayer2.Player", "Metadata")
|
|
metadata = metadata_variant.value # Obtener el valor real del Variant
|
|
|
|
track_title = metadata.get('xesam:title', Variant('s', '')).value
|
|
artist = metadata.get('xesam:artist', Variant('as', [])).value
|
|
album = metadata.get('xesam:album', Variant('s', '')).value
|
|
xesam_url = metadata.get('xesam:url', Variant('s', '')).value
|
|
|
|
print(f"Metadata recibida: Track Title: {track_title}, Artist: {
|
|
artist}, Album: {album}, URL: {xesam_url}")
|
|
|
|
if not isinstance(artist, list):
|
|
artist = [str(artist)]
|
|
artist = [str(a) for a in artist]
|
|
|
|
if artist and album != 'Unknown' and xesam_url:
|
|
album_dir = get_local_album_dir(
|
|
xesam_url, self.music_base_path)
|
|
print(f"Determined album directory: {album_dir}")
|
|
|
|
if album_dir and cover_exists(album_dir):
|
|
cover_path = album_dir / "cover.jpg"
|
|
self.panel.update_cover(cover_path)
|
|
return
|
|
except Exception as e:
|
|
print(f"Error loading current song cover: {e}")
|
|
|
|
# Si no se encuentra la carátula de la canción actual, cargar la última carátula encontrada
|
|
print("No current song cover found. Loading last known cover.")
|
|
self.panel.load_initial_cover()
|
|
|
|
async def on_properties_changed(self, interface, changed, invalidated):
|
|
print("on_properties_changed: Detectando cambios de propiedades MPRIS.")
|
|
if 'Metadata' in changed:
|
|
metadata = changed['Metadata'].value
|
|
|
|
# Extraer los valores reales de cada Variant
|
|
track_title = metadata.get('xesam:title', Variant('s', '')).value
|
|
artist = metadata.get('xesam:artist', Variant('as', [])).value
|
|
album = metadata.get('xesam:album', Variant('s', '')).value
|
|
xesam_url = metadata.get('xesam:url', Variant('s', '')).value
|
|
|
|
print(f"Metadata recibida: Track Title: {track_title}, Artist: {
|
|
artist}, Album: {album}, URL: {xesam_url}")
|
|
|
|
if not isinstance(artist, list):
|
|
artist = [str(artist)]
|
|
artist = [str(a) for a in artist]
|
|
|
|
if not artist or album == 'Unknown' or not xesam_url:
|
|
print(f"Skipping track '{
|
|
track_title}' due to missing metadata.")
|
|
return
|
|
|
|
print(f"Now playing: {track_title} by {
|
|
', '.join(artist)} from the album '{album}'")
|
|
album_dir = get_local_album_dir(xesam_url, self.music_base_path)
|
|
print(f"Determined album directory: {album_dir}")
|
|
|
|
if not album_dir:
|
|
print(f"Could not determine album directory for '{
|
|
track_title}'. xesam:url: {xesam_url}")
|
|
return
|
|
|
|
if cover_exists(album_dir):
|
|
print(f"Cover already exists in '{
|
|
album_dir}', loading from local.")
|
|
cover_path = album_dir / "cover.jpg"
|
|
self.panel.update_cover(cover_path)
|
|
else:
|
|
print(f"No cover found in '{
|
|
album_dir}', attempting to download.")
|
|
try:
|
|
cover_path = download_cover_from_lastfm(
|
|
artist[0], album, album_dir)
|
|
if cover_path:
|
|
self.panel.update_cover(cover_path)
|
|
except Exception as e:
|
|
print(f"Error downloading cover: {e}")
|
|
|
|
await asyncio.sleep(0.5)
|
|
os.system("/usr/bin/pkill -RTMIN+23 waybar")
|
|
|
|
async def main(self):
|
|
print("Starting D-Bus listener with GTK panel...")
|
|
# Cargar la carátula de la canción actual al inicio
|
|
await self.load_current_song_cover()
|
|
|
|
introspectable = await self.bus.introspect("org.mpris.MediaPlayer2.mpd", "/org/mpris/MediaPlayer2")
|
|
obj = self.bus.get_proxy_object(
|
|
"org.mpris.MediaPlayer2.mpd", "/org/mpris/MediaPlayer2", introspectable)
|
|
properties = obj.get_interface("org.freedesktop.DBus.Properties")
|
|
await asyncio.sleep(0.5)
|
|
os.system("/usr/bin/pkill -RTMIN+23 waybar")
|
|
|
|
properties.on_properties_changed(self.on_properties_changed)
|
|
print("D-Bus listener with GTK panel started successfully.")
|
|
await asyncio.Future()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
print("Iniciando script show_cover.py")
|
|
MUSIC_BASE_PATH = os.environ.get(
|
|
'MUSIC_LIBRARY_PATH', os.path.expandvars('$HOME/media/all'))
|
|
print(f"MUSIC_BASE_PATH: {MUSIC_BASE_PATH}")
|
|
|
|
fetcher = MprisCoverHandlerWithGTK(MUSIC_BASE_PATH)
|
|
|
|
# Ejecutar el ciclo de asyncio en un hilo separado
|
|
def run_asyncio():
|
|
asyncio.run(fetcher.main())
|
|
|
|
asyncio_thread = threading.Thread(target=run_asyncio)
|
|
asyncio_thread.start()
|
|
|
|
# Ejecutar el ciclo de GTK en el hilo principal
|
|
loop = GLib.MainLoop()
|
|
loop.run()
|