93 lines
3.0 KiB
Python
93 lines
3.0 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import subprocess
|
|
import json
|
|
import sys
|
|
import time
|
|
|
|
def get_default_sink_name():
|
|
"""
|
|
Obtiene el nombre del sink predeterminado usando pactl.
|
|
Retorna el nombre del sink como una cadena, o None si no se encuentra.
|
|
"""
|
|
try:
|
|
result = subprocess.run(['pactl', 'get-default-sink'], stdout=subprocess.PIPE, text=True, check=True)
|
|
sink_name = result.stdout.strip()
|
|
if sink_name:
|
|
return sink_name
|
|
else:
|
|
return None
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Error al ejecutar pactl: {e}", file=sys.stderr)
|
|
return None
|
|
|
|
def get_sink_rate(sink_name):
|
|
"""
|
|
Ejecuta pw-dump y extrae la tasa de muestreo ('rate') del sink con el nombre dado.
|
|
Retorna la tasa como un entero, o None si no se encuentra.
|
|
"""
|
|
try:
|
|
# Ejecuta pw-dump y captura la salida
|
|
result = subprocess.run(['pw-dump'], stdout=subprocess.PIPE, text=True, check=True)
|
|
data = json.loads(result.stdout)
|
|
|
|
# Verifica si los datos son una lista o un diccionario
|
|
if isinstance(data, list):
|
|
nodes = data
|
|
elif isinstance(data, dict):
|
|
nodes = data.get('nodes', [])
|
|
else:
|
|
print(f"Tipo de datos no soportado: {type(data)}", file=sys.stderr)
|
|
return None
|
|
|
|
for node in nodes:
|
|
props = node.get('info', {}).get('props', {})
|
|
node_name = props.get('node.name', '')
|
|
media_class = props.get('media.class', '')
|
|
|
|
if media_class != 'Audio/Sink':
|
|
continue
|
|
|
|
if node_name != sink_name:
|
|
continue
|
|
|
|
formats = node.get('info', {}).get('params', {}).get('Format', [])
|
|
for fmt in formats:
|
|
rate = fmt.get('rate')
|
|
if rate:
|
|
return rate
|
|
return None
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Error al ejecutar pw-dump: {e}", file=sys.stderr)
|
|
return None
|
|
except json.JSONDecodeError as e:
|
|
print(f"Error al decodificar JSON: {e}", file=sys.stderr)
|
|
return None
|
|
|
|
def main():
|
|
default_sink = get_default_sink_name()
|
|
if not default_sink:
|
|
print("No se pudo obtener el sink predeterminado.", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
previous_rate = get_sink_rate(default_sink)
|
|
if previous_rate:
|
|
print(f"{previous_rate} Hz")
|
|
else:
|
|
print("No se encontró una tasa de muestreo en el sink predeterminado.")
|
|
|
|
while True:
|
|
try:
|
|
time.sleep(1) # Intervalo de 1 segundo entre sondeos
|
|
current_rate = get_sink_rate(default_sink)
|
|
if current_rate != previous_rate and current_rate is not None:
|
|
print(f"{current_rate} Hz")
|
|
previous_rate = current_rate
|
|
except KeyboardInterrupt:
|
|
print("\nMonitoreo terminado por el usuario.")
|
|
sys.exit(0)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|