224 lines
8.6 KiB
Python
Executable File
224 lines
8.6 KiB
Python
Executable File
import os
|
|
import requests
|
|
import urllib.parse
|
|
import asyncio
|
|
from dbus_next.aio import MessageBus
|
|
from dbus_next import Variant
|
|
from pathlib import Path
|
|
import logging
|
|
|
|
logging.basicConfig(level=logging.INFO) # Set logging level to INFO
|
|
logger = logging.getLogger(__name__)
|
|
# Get the Last.fm API key path from an environment variable
|
|
API_KEY_FILE = os.environ.get(
|
|
'LASTFM_API_KEY_PATH', os.path.expandvars('$HOME/apikeys/lastfm'))
|
|
|
|
|
|
def get_lastfm_api_key():
|
|
"""Read the Last.fm API key from a file."""
|
|
try:
|
|
with open(API_KEY_FILE, 'r') as file:
|
|
api_key = file.readline().strip()
|
|
logger.info(f"Successfully read Last.fm API key from {
|
|
API_KEY_FILE}")
|
|
return api_key
|
|
except FileNotFoundError:
|
|
logger.info(f"Error: API key file '{
|
|
API_KEY_FILE}' not found. Please check the path.")
|
|
return None
|
|
except Exception as e:
|
|
logger.info(f"Error reading API key: {e}")
|
|
return None
|
|
|
|
|
|
# Get the Last.fm API key
|
|
LASTFM_API_KEY = get_lastfm_api_key()
|
|
if not LASTFM_API_KEY:
|
|
raise RuntimeError(
|
|
"Last.fm API key could not be loaded. Please check the API key file path.")
|
|
else:
|
|
print("API key loaded successfully. Continuing with the script.")
|
|
|
|
# Get the base path of the music library from an environment variable
|
|
MUSIC_BASE_PATH = os.environ.get(
|
|
'MUSIC_LIBRARY_PATH', os.path.expandvars('$HOME/media/all'))
|
|
|
|
|
|
def cover_exists(album_dir):
|
|
"""Check if 'cover.jpg' or 'Cover.jpg' exists in the album directory."""
|
|
cover_path_lower = album_dir / "cover.jpg"
|
|
cover_path_upper = album_dir / "Cover.jpg"
|
|
return cover_path_lower.exists() or cover_path_upper.exists()
|
|
|
|
|
|
def download_image(cover_url, output_path):
|
|
"""Download the cover image from a URL and save it to the specified path."""
|
|
try:
|
|
response = requests.get(cover_url, stream=True)
|
|
if response.status_code == 200:
|
|
with open(output_path, 'wb') as f:
|
|
for chunk in response.iter_content(1024):
|
|
f.write(chunk)
|
|
logger.info(f"Image downloaded to {output_path}")
|
|
return output_path
|
|
else:
|
|
logger.info(f"Failed to download image. HTTP Status: {
|
|
response.status_code}")
|
|
except Exception as e:
|
|
logger.info(f"Error downloading image: {e}")
|
|
return None
|
|
|
|
|
|
def get_largest_cover_image(data):
|
|
"""Select the largest available cover image from the Last.fm response."""
|
|
if 'album' in data and 'image' in data['album']:
|
|
# Define the priority order of sizes
|
|
size_priority = ["mega", "extralarge", "large", "medium", "small"]
|
|
for size in size_priority:
|
|
for img in data['album']['image']:
|
|
if img["size"] == size and img["#text"]:
|
|
logger.info(f"Using {size} cover: {img['#text']}")
|
|
return img["#text"]
|
|
return None
|
|
|
|
|
|
def download_cover_from_lastfm(artist, album, album_dir):
|
|
"""Download the album cover from Last.fm and save it in the album directory."""
|
|
try:
|
|
logger.info(f"Fetching cover for '{album}' by {
|
|
artist} from Last.fm...")
|
|
# Last.fm API URL
|
|
url = f"http://ws.audioscrobbler.com/2.0/?method=album.getinfo&api_key={LASTFM_API_KEY}&artist={
|
|
urllib.parse.quote(artist)}&album={urllib.parse.quote(album)}&format=json"
|
|
response = requests.get(url)
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
cover_url = get_largest_cover_image(data)
|
|
if cover_url:
|
|
# Ensure the directory exists
|
|
album_dir.mkdir(parents=True, exist_ok=True)
|
|
local_cover_path = album_dir / "cover.jpg" # Save as 'cover.jpg'
|
|
download_image(cover_url, local_cover_path)
|
|
logger.info(f"Downloaded cover to {local_cover_path}")
|
|
return local_cover_path
|
|
logger.info(f"No cover found for '{album}' by {artist} on Last.fm.")
|
|
except Exception as e:
|
|
logger.info(f"Error fetching cover from Last.fm: {e}")
|
|
return None
|
|
|
|
|
|
def get_local_album_dir(xesam_url):
|
|
"""Extract the album directory from the `xesam:url` path."""
|
|
if not xesam_url:
|
|
logger.info(
|
|
f"Warning: xesam:url is empty or missing. Skipping cover detection.")
|
|
return None
|
|
|
|
try:
|
|
logger.info(f"Raw xesam:url value: {xesam_url}")
|
|
|
|
# Convert the xesam URL to a local file path
|
|
local_path = urllib.parse.unquote(xesam_url).replace("file://", "")
|
|
logger.info(f"Converted local path: {local_path}")
|
|
|
|
# Remove '/trackXXXX' if it's a CUE file path
|
|
if "/track" in local_path and ".cue" in local_path:
|
|
local_path = local_path.split("/track")[0]
|
|
logger.info(f"Stripped CUE path: {local_path}")
|
|
|
|
# If the path is not absolute, add the music base directory
|
|
track_path = Path(local_path)
|
|
if not track_path.is_absolute():
|
|
track_path = Path(MUSIC_BASE_PATH) / track_path
|
|
logger.info(f"Updated to absolute path: {track_path}")
|
|
|
|
# Check if the resulting path exists
|
|
if not track_path.exists():
|
|
logger.info(f"Warning: The path '{track_path}' does not exist.")
|
|
return None
|
|
|
|
# Handle CUE files: Move to the actual album directory
|
|
if track_path.suffix == ".cue":
|
|
return track_path.parent
|
|
|
|
# Standard path handling
|
|
if track_path.is_file():
|
|
return track_path.parent
|
|
else:
|
|
logger.info(f"Path is not a valid file: {track_path}")
|
|
except Exception as e:
|
|
logger.info(f"Error extracting local album directory: {e}")
|
|
return None
|
|
|
|
|
|
class MprisCoverHandler:
|
|
def __init__(self):
|
|
self.bus = None
|
|
|
|
async def on_properties_changed(self, interface, changed, invalidated):
|
|
"""Handler for MPRIS property changes."""
|
|
if 'Metadata' in changed:
|
|
metadata = changed['Metadata'].value
|
|
|
|
# Extract metadata properties
|
|
track_title = metadata.get(
|
|
'xesam:title', Variant('s', 'Unknown')).value
|
|
artist = metadata.get('xesam:artist', Variant('as', [])).value
|
|
album = metadata.get('xesam:album', Variant('s', 'Unknown')).value
|
|
xesam_url = metadata.get('xesam:url', Variant('s', '')).value
|
|
|
|
if not isinstance(artist, list):
|
|
artist = [str(artist)]
|
|
artist = [str(a) for a in artist]
|
|
|
|
if not artist or album == 'Unknown' or not xesam_url:
|
|
logger.info(f"Skipping track '{
|
|
track_title}' due to missing metadata.")
|
|
return
|
|
|
|
logger.info(f"Now playing: {track_title} by {
|
|
', '.join(artist)} from the album '{album}'")
|
|
|
|
# Determine the local album directory based on xesam:url
|
|
album_dir = get_local_album_dir(xesam_url)
|
|
if not album_dir:
|
|
logger.info(f"Could not determine album directory for '{
|
|
track_title}'. xesam:url: {xesam_url}")
|
|
return
|
|
|
|
# Check if a cover already exists
|
|
if cover_exists(album_dir):
|
|
logger.info(f"Cover already exists in '{
|
|
album_dir}', doing nothing.")
|
|
else:
|
|
logger.info(f"No cover found in '{
|
|
album_dir}', attempting to download.")
|
|
download_cover_from_lastfm(artist[0], album, album_dir)
|
|
|
|
# Delay and execute the command to refresh Waybar
|
|
await asyncio.sleep(0.5)
|
|
os.system("/usr/bin/pkill -RTMIN+23 waybar")
|
|
|
|
async def main(self):
|
|
"""Main function to connect to D-Bus and listen for changes."""
|
|
print("Starting D-Bus listener...")
|
|
self.bus = await MessageBus().connect()
|
|
introspectable = await self.bus.introspect("org.mpris.MediaPlayer2.mpd", "/org/mpris/MediaPlayer2")
|
|
obj = self.bus.get_proxy_object(
|
|
"org.mpris.MediaPlayer2.mpd", "/org/mpris/MediaPlayer2", introspectable)
|
|
properties = obj.get_interface("org.freedesktop.DBus.Properties")
|
|
|
|
# Run the command once when the listener starts
|
|
# Small delay to ensure Waybar is ready
|
|
await asyncio.sleep(0.5)
|
|
os.system("/usr/bin/pkill -RTMIN+23 waybar")
|
|
|
|
properties.on_properties_changed(self.on_properties_changed)
|
|
print("D-Bus listener started successfully.")
|
|
await asyncio.Future() # Run indefinitely
|
|
|
|
|
|
# Run the main loop to monitor MPRIS changes
|
|
fetcher = MprisCoverHandler()
|
|
asyncio.run(fetcher.main())
|