import os import requests import urllib.parse import asyncio from dbus_next.aio import MessageBus from dbus_next import Variant from pathlib import Path import logging logging.basicConfig(level=logging.INFO) # Set logging level to INFO logger = logging.getLogger(__name__) # Get the Last.fm API key path from an environment variable API_KEY_FILE = os.environ.get( 'LASTFM_API_KEY_PATH', os.path.expandvars('$HOME/apikeys/lastfm')) def get_lastfm_api_key(): """Read the Last.fm API key from a file.""" try: with open(API_KEY_FILE, 'r') as file: api_key = file.readline().strip() logger.info(f"Successfully read Last.fm API key from { API_KEY_FILE}") return api_key except FileNotFoundError: logger.info(f"Error: API key file '{ API_KEY_FILE}' not found. Please check the path.") return None except Exception as e: logger.info(f"Error reading API key: {e}") return None # Get the Last.fm API key LASTFM_API_KEY = get_lastfm_api_key() if not LASTFM_API_KEY: raise RuntimeError( "Last.fm API key could not be loaded. Please check the API key file path.") else: print("API key loaded successfully. Continuing with the script.") # Get the base path of the music library from an environment variable MUSIC_BASE_PATH = os.environ.get( 'MUSIC_LIBRARY_PATH', os.path.expandvars('$HOME/media/all')) def cover_exists(album_dir): """Check if 'cover.jpg' or 'Cover.jpg' exists in the album directory.""" cover_path_lower = album_dir / "cover.jpg" cover_path_upper = album_dir / "Cover.jpg" return cover_path_lower.exists() or cover_path_upper.exists() def download_image(cover_url, output_path): """Download the cover image from a URL and save it to the specified path.""" try: response = requests.get(cover_url, stream=True) if response.status_code == 200: with open(output_path, 'wb') as f: for chunk in response.iter_content(1024): f.write(chunk) logger.info(f"Image downloaded to {output_path}") return output_path else: logger.info(f"Failed to download image. HTTP Status: { response.status_code}") except Exception as e: logger.info(f"Error downloading image: {e}") return None def get_largest_cover_image(data): """Select the largest available cover image from the Last.fm response.""" if 'album' in data and 'image' in data['album']: # Define the priority order of sizes size_priority = ["mega", "extralarge", "large", "medium", "small"] for size in size_priority: for img in data['album']['image']: if img["size"] == size and img["#text"]: logger.info(f"Using {size} cover: {img['#text']}") return img["#text"] return None def download_cover_from_lastfm(artist, album, album_dir): """Download the album cover from Last.fm and save it in the album directory.""" try: logger.info(f"Fetching cover for '{album}' by { artist} from Last.fm...") # Last.fm API URL url = f"http://ws.audioscrobbler.com/2.0/?method=album.getinfo&api_key={LASTFM_API_KEY}&artist={ urllib.parse.quote(artist)}&album={urllib.parse.quote(album)}&format=json" response = requests.get(url) if response.status_code == 200: data = response.json() cover_url = get_largest_cover_image(data) if cover_url: # Ensure the directory exists album_dir.mkdir(parents=True, exist_ok=True) local_cover_path = album_dir / "cover.jpg" # Save as 'cover.jpg' download_image(cover_url, local_cover_path) logger.info(f"Downloaded cover to {local_cover_path}") return local_cover_path logger.info(f"No cover found for '{album}' by {artist} on Last.fm.") except Exception as e: logger.info(f"Error fetching cover from Last.fm: {e}") return None def get_local_album_dir(xesam_url): """Extract the album directory from the `xesam:url` path.""" if not xesam_url: logger.info( f"Warning: xesam:url is empty or missing. Skipping cover detection.") return None try: logger.info(f"Raw xesam:url value: {xesam_url}") # Convert the xesam URL to a local file path local_path = urllib.parse.unquote(xesam_url).replace("file://", "") logger.info(f"Converted local path: {local_path}") # Remove '/trackXXXX' if it's a CUE file path if "/track" in local_path and ".cue" in local_path: local_path = local_path.split("/track")[0] logger.info(f"Stripped CUE path: {local_path}") # If the path is not absolute, add the music base directory track_path = Path(local_path) if not track_path.is_absolute(): track_path = Path(MUSIC_BASE_PATH) / track_path logger.info(f"Updated to absolute path: {track_path}") # Check if the resulting path exists if not track_path.exists(): logger.info(f"Warning: The path '{track_path}' does not exist.") return None # Handle CUE files: Move to the actual album directory if track_path.suffix == ".cue": return track_path.parent # Standard path handling if track_path.is_file(): return track_path.parent else: logger.info(f"Path is not a valid file: {track_path}") except Exception as e: logger.info(f"Error extracting local album directory: {e}") return None class MprisCoverHandler: def __init__(self): self.bus = None async def on_properties_changed(self, interface, changed, invalidated): """Handler for MPRIS property changes.""" if 'Metadata' in changed: metadata = changed['Metadata'].value # Extract metadata properties track_title = metadata.get( 'xesam:title', Variant('s', 'Unknown')).value artist = metadata.get('xesam:artist', Variant('as', [])).value album = metadata.get('xesam:album', Variant('s', 'Unknown')).value xesam_url = metadata.get('xesam:url', Variant('s', '')).value if not isinstance(artist, list): artist = [str(artist)] artist = [str(a) for a in artist] if not artist or album == 'Unknown' or not xesam_url: logger.info(f"Skipping track '{ track_title}' due to missing metadata.") return logger.info(f"Now playing: {track_title} by { ', '.join(artist)} from the album '{album}'") # Determine the local album directory based on xesam:url album_dir = get_local_album_dir(xesam_url) if not album_dir: logger.info(f"Could not determine album directory for '{ track_title}'. xesam:url: {xesam_url}") return # Check if a cover already exists if cover_exists(album_dir): logger.info(f"Cover already exists in '{ album_dir}', doing nothing.") else: logger.info(f"No cover found in '{ album_dir}', attempting to download.") download_cover_from_lastfm(artist[0], album, album_dir) # Delay and execute the command to refresh Waybar await asyncio.sleep(0.5) os.system("/usr/bin/pkill -RTMIN+23 waybar") async def main(self): """Main function to connect to D-Bus and listen for changes.""" print("Starting D-Bus listener...") self.bus = await MessageBus().connect() introspectable = await self.bus.introspect("org.mpris.MediaPlayer2.mpd", "/org/mpris/MediaPlayer2") obj = self.bus.get_proxy_object( "org.mpris.MediaPlayer2.mpd", "/org/mpris/MediaPlayer2", introspectable) properties = obj.get_interface("org.freedesktop.DBus.Properties") # Run the command once when the listener starts # Small delay to ensure Waybar is ready await asyncio.sleep(0.5) os.system("/usr/bin/pkill -RTMIN+23 waybar") properties.on_properties_changed(self.on_properties_changed) print("D-Bus listener started successfully.") await asyncio.Future() # Run indefinitely # Run the main loop to monitor MPRIS changes fetcher = MprisCoverHandler() asyncio.run(fetcher.main())