#!/usr/bin/env python3 import time import json import os from pyroute2 import IPRoute, IPDB, NetlinkError, IPRoute from threading import Thread # Configuración IFACE = "wlp3s0" # Reemplaza con el nombre de tu interfaz de red CACHE_FILE = "/tmp/network_speed_cache.json" UPDATE_INTERVAL = 1 # Intervalo de actualización en segundos def format_speed(speed_bytes): if speed_bytes >= 1024 * 1024: return f"{speed_bytes / (1024 * 1024):.2f} MB/s" elif speed_bytes >= 1024: return f"{speed_bytes / 1024:.2f} KB/s" else: return f"{speed_bytes:.2f} B/s" def monitor_network(): ip = IPRoute() try: idx = ip.link_lookup(ifname=IFACE)[0] except IndexError: print(f"Error: Interfaz '{IFACE}' no encontrada.") return prev_rx, prev_tx = 0, 0 prev_time = time.time() while True: try: stats = ip.get_links(idx)[0].get('stats', {}) curr_rx = stats.get('rx_bytes', 0) curr_tx = stats.get('tx_bytes', 0) current_time = time.time() delta_time = current_time - prev_time if delta_time == 0: delta_time = UPDATE_INTERVAL delta_rx = (curr_rx - prev_rx) / delta_time delta_tx = (curr_tx - prev_tx) / delta_time rx_speed = format_speed(delta_rx) tx_speed = format_speed(delta_tx) # Actualizar el archivo de caché data_output = { "text": f"↓ {rx_speed} ↑ {tx_speed}", "tooltip": f"Velocidad de descarga: {rx_speed}\nVelocidad de subida: {tx_speed}", "class": "network-speed" } with open(CACHE_FILE, "w") as f: json.dump(data_output, f) # Actualizar variables para el siguiente cálculo prev_rx, prev_tx = curr_rx, curr_tx prev_time = current_time time.sleep(UPDATE_INTERVAL) except NetlinkError as e: print(f"Error de Netlink: {e}") time.sleep(UPDATE_INTERVAL) except Exception as e: print(f"Error inesperado: {e}") time.sleep(UPDATE_INTERVAL) def main(): monitor_thread = Thread(target=monitor_network, daemon=True) monitor_thread.start() try: while True: time.sleep(1) except KeyboardInterrupt: print("Daemon de monitoreo de red detenido.") if __name__ == "__main__": main()