dotfiles/.local/bin/pwrate.py
2025-05-28 18:33:04 +02:00

93 lines
3.0 KiB
Python

#!/usr/bin/env python3
import subprocess
import json
import sys
import time
def get_default_sink_name():
"""
Obtiene el nombre del sink predeterminado usando pactl.
Retorna el nombre del sink como una cadena, o None si no se encuentra.
"""
try:
result = subprocess.run(['pactl', 'get-default-sink'], stdout=subprocess.PIPE, text=True, check=True)
sink_name = result.stdout.strip()
if sink_name:
return sink_name
else:
return None
except subprocess.CalledProcessError as e:
print(f"Error al ejecutar pactl: {e}", file=sys.stderr)
return None
def get_sink_rate(sink_name):
"""
Ejecuta pw-dump y extrae la tasa de muestreo ('rate') del sink con el nombre dado.
Retorna la tasa como un entero, o None si no se encuentra.
"""
try:
# Ejecuta pw-dump y captura la salida
result = subprocess.run(['pw-dump'], stdout=subprocess.PIPE, text=True, check=True)
data = json.loads(result.stdout)
# Verifica si los datos son una lista o un diccionario
if isinstance(data, list):
nodes = data
elif isinstance(data, dict):
nodes = data.get('nodes', [])
else:
print(f"Tipo de datos no soportado: {type(data)}", file=sys.stderr)
return None
for node in nodes:
props = node.get('info', {}).get('props', {})
node_name = props.get('node.name', '')
media_class = props.get('media.class', '')
if media_class != 'Audio/Sink':
continue
if node_name != sink_name:
continue
formats = node.get('info', {}).get('params', {}).get('Format', [])
for fmt in formats:
rate = fmt.get('rate')
if rate:
return rate
return None
except subprocess.CalledProcessError as e:
print(f"Error al ejecutar pw-dump: {e}", file=sys.stderr)
return None
except json.JSONDecodeError as e:
print(f"Error al decodificar JSON: {e}", file=sys.stderr)
return None
def main():
default_sink = get_default_sink_name()
if not default_sink:
print("No se pudo obtener el sink predeterminado.", file=sys.stderr)
sys.exit(1)
previous_rate = get_sink_rate(default_sink)
if previous_rate:
print(f"{previous_rate} Hz")
else:
print("No se encontró una tasa de muestreo en el sink predeterminado.")
while True:
try:
time.sleep(1) # Intervalo de 1 segundo entre sondeos
current_rate = get_sink_rate(default_sink)
if current_rate != previous_rate and current_rate is not None:
print(f"{current_rate} Hz")
previous_rate = current_rate
except KeyboardInterrupt:
print("\nMonitoreo terminado por el usuario.")
sys.exit(0)
if __name__ == "__main__":
main()