#!/usr/bin/env python3 import threading from pathlib import Path import asyncio from lastfm_cover_downloader import download_cover_from_lastfm, cover_exists, get_local_album_dir from dbus_next.aio import MessageBus from dbus_next import Variant from gi.repository import Gtk, GdkPixbuf, GLib import os import gi gi.require_version("Gtk", "4.0") os.environ["GDK_BACKEND"] = "x11" # O "wayland" si usas Wayland class CoverPanel: def __init__(self, music_base_path): self.music_base_path = Path(music_base_path) self.window = Gtk.Window(title="Album Cover") self.window.set_default_size(300, 300) self.window.set_resizable(False) self.window.set_decorated(False) self.window.set_opacity(0.8) # Crear el widget Gtk.Picture para mostrar y escalar la imagen self.picture = Gtk.Picture() self.window.set_child(self.picture) self.window.present() print("CoverPanel window initialized successfully.") def update_cover(self, cover_path): print(f"Attempting to update cover with path: {cover_path}") if cover_path and cover_path.exists(): print(f"Updating cover with path: {cover_path.resolve()}") try: # Configurar Gtk.Picture para mostrar la imagen y escalarla automáticamente pixbuf = GdkPixbuf.Pixbuf.new_from_file(str(cover_path)) GLib.idle_add(self.picture.set_paintable, Gtk.Picture.new_for_pixbuf(pixbuf).get_paintable()) print(f"Cover panel updated with: {cover_path.resolve()}") except Exception as e: print(f"Error loading image: {e}") else: print(f"Cover path does not exist or is invalid: {cover_path}") def load_initial_cover(self): """Busca y carga la última carátula disponible en la biblioteca.""" cover_path = self.find_latest_cover() if cover_path: self.update_cover(cover_path) def find_latest_cover(self): """Busca la carátula más reciente en la biblioteca de música.""" for album_dir in sorted(self.music_base_path.glob("*/"), reverse=True): if cover_exists(album_dir): return album_dir / "cover.jpg" return None class MprisCoverHandlerWithGTK: def __init__(self, music_base_path): print("Inicializando MprisCoverHandlerWithGTK.") self.music_base_path = music_base_path self.panel = CoverPanel(music_base_path) self.bus = None async def load_current_song_cover(self): """Intenta cargar la carátula de la canción actual usando MPRIS.""" try: self.bus = await MessageBus().connect() introspectable = await self.bus.introspect("org.mpris.MediaPlayer2.mpd", "/org/mpris/MediaPlayer2") obj = self.bus.get_proxy_object( "org.mpris.MediaPlayer2.mpd", "/org/mpris/MediaPlayer2", introspectable) properties = obj.get_interface("org.freedesktop.DBus.Properties") metadata_variant = await properties.call_get("org.mpris.MediaPlayer2.Player", "Metadata") metadata = metadata_variant.value # Obtener el valor real del Variant track_title = metadata.get('xesam:title', Variant('s', '')).value artist = metadata.get('xesam:artist', Variant('as', [])).value album = metadata.get('xesam:album', Variant('s', '')).value xesam_url = metadata.get('xesam:url', Variant('s', '')).value print(f"Metadata recibida: Track Title: {track_title}, Artist: { artist}, Album: {album}, URL: {xesam_url}") if not isinstance(artist, list): artist = [str(artist)] artist = [str(a) for a in artist] if artist and album != 'Unknown' and xesam_url: album_dir = get_local_album_dir( xesam_url, self.music_base_path) print(f"Determined album directory: {album_dir}") if album_dir and cover_exists(album_dir): cover_path = album_dir / "cover.jpg" self.panel.update_cover(cover_path) return except Exception as e: print(f"Error loading current song cover: {e}") # Si no se encuentra la carátula de la canción actual, cargar la última carátula encontrada print("No current song cover found. Loading last known cover.") self.panel.load_initial_cover() async def on_properties_changed(self, interface, changed, invalidated): print("on_properties_changed: Detectando cambios de propiedades MPRIS.") if 'Metadata' in changed: metadata = changed['Metadata'].value # Extraer los valores reales de cada Variant track_title = metadata.get('xesam:title', Variant('s', '')).value artist = metadata.get('xesam:artist', Variant('as', [])).value album = metadata.get('xesam:album', Variant('s', '')).value xesam_url = metadata.get('xesam:url', Variant('s', '')).value print(f"Metadata recibida: Track Title: {track_title}, Artist: { artist}, Album: {album}, URL: {xesam_url}") if not isinstance(artist, list): artist = [str(artist)] artist = [str(a) for a in artist] if not artist or album == 'Unknown' or not xesam_url: print(f"Skipping track '{ track_title}' due to missing metadata.") return print(f"Now playing: {track_title} by { ', '.join(artist)} from the album '{album}'") album_dir = get_local_album_dir(xesam_url, self.music_base_path) print(f"Determined album directory: {album_dir}") if not album_dir: print(f"Could not determine album directory for '{ track_title}'. xesam:url: {xesam_url}") return if cover_exists(album_dir): print(f"Cover already exists in '{ album_dir}', loading from local.") cover_path = album_dir / "cover.jpg" self.panel.update_cover(cover_path) else: print(f"No cover found in '{ album_dir}', attempting to download.") try: cover_path = download_cover_from_lastfm( artist[0], album, album_dir) if cover_path: self.panel.update_cover(cover_path) except Exception as e: print(f"Error downloading cover: {e}") await asyncio.sleep(0.5) os.system("/usr/bin/pkill -RTMIN+23 waybar") async def main(self): print("Starting D-Bus listener with GTK panel...") # Cargar la carátula de la canción actual al inicio await self.load_current_song_cover() introspectable = await self.bus.introspect("org.mpris.MediaPlayer2.mpd", "/org/mpris/MediaPlayer2") obj = self.bus.get_proxy_object( "org.mpris.MediaPlayer2.mpd", "/org/mpris/MediaPlayer2", introspectable) properties = obj.get_interface("org.freedesktop.DBus.Properties") await asyncio.sleep(0.5) os.system("/usr/bin/pkill -RTMIN+23 waybar") properties.on_properties_changed(self.on_properties_changed) print("D-Bus listener with GTK panel started successfully.") await asyncio.Future() if __name__ == "__main__": print("Iniciando script show_cover.py") MUSIC_BASE_PATH = os.environ.get( 'MUSIC_LIBRARY_PATH', os.path.expandvars('$HOME/media/all')) print(f"MUSIC_BASE_PATH: {MUSIC_BASE_PATH}") fetcher = MprisCoverHandlerWithGTK(MUSIC_BASE_PATH) # Ejecutar el ciclo de asyncio en un hilo separado def run_asyncio(): asyncio.run(fetcher.main()) asyncio_thread = threading.Thread(target=run_asyncio) asyncio_thread.start() # Ejecutar el ciclo de GTK en el hilo principal loop = GLib.MainLoop() loop.run()