89 lines
2.4 KiB
Python
Executable File
89 lines
2.4 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import time
|
|
import json
|
|
import os
|
|
from pyroute2 import IPRoute, IPDB, NetlinkError, IPRoute
|
|
from threading import Thread
|
|
|
|
# Configuración
|
|
IFACE = "wlp3s0" # Reemplaza con el nombre de tu interfaz de red
|
|
CACHE_FILE = "/tmp/network_speed_cache.json"
|
|
UPDATE_INTERVAL = 1 # Intervalo de actualización en segundos
|
|
|
|
|
|
def format_speed(speed_bytes):
|
|
if speed_bytes >= 1024 * 1024:
|
|
return f"{speed_bytes / (1024 * 1024):.2f} MB/s"
|
|
elif speed_bytes >= 1024:
|
|
return f"{speed_bytes / 1024:.2f} KB/s"
|
|
else:
|
|
return f"{speed_bytes:.2f} B/s"
|
|
|
|
|
|
def monitor_network():
|
|
ip = IPRoute()
|
|
try:
|
|
idx = ip.link_lookup(ifname=IFACE)[0]
|
|
except IndexError:
|
|
print(f"Error: Interfaz '{IFACE}' no encontrada.")
|
|
return
|
|
|
|
prev_rx, prev_tx = 0, 0
|
|
prev_time = time.time()
|
|
|
|
while True:
|
|
try:
|
|
stats = ip.get_links(idx)[0].get('stats', {})
|
|
curr_rx = stats.get('rx_bytes', 0)
|
|
curr_tx = stats.get('tx_bytes', 0)
|
|
current_time = time.time()
|
|
|
|
delta_time = current_time - prev_time
|
|
if delta_time == 0:
|
|
delta_time = UPDATE_INTERVAL
|
|
|
|
delta_rx = (curr_rx - prev_rx) / delta_time
|
|
delta_tx = (curr_tx - prev_tx) / delta_time
|
|
|
|
rx_speed = format_speed(delta_rx)
|
|
tx_speed = format_speed(delta_tx)
|
|
|
|
# Actualizar el archivo de caché
|
|
data_output = {
|
|
"text": f"↓ {rx_speed} ↑ {tx_speed}",
|
|
"tooltip": f"Velocidad de descarga: {rx_speed}\nVelocidad de subida: {tx_speed}",
|
|
"class": "network-speed"
|
|
}
|
|
|
|
with open(CACHE_FILE, "w") as f:
|
|
json.dump(data_output, f)
|
|
|
|
# Actualizar variables para el siguiente cálculo
|
|
prev_rx, prev_tx = curr_rx, curr_tx
|
|
prev_time = current_time
|
|
|
|
time.sleep(UPDATE_INTERVAL)
|
|
|
|
except NetlinkError as e:
|
|
print(f"Error de Netlink: {e}")
|
|
time.sleep(UPDATE_INTERVAL)
|
|
except Exception as e:
|
|
print(f"Error inesperado: {e}")
|
|
time.sleep(UPDATE_INTERVAL)
|
|
|
|
|
|
def main():
|
|
monitor_thread = Thread(target=monitor_network, daemon=True)
|
|
monitor_thread.start()
|
|
|
|
try:
|
|
while True:
|
|
time.sleep(1)
|
|
except KeyboardInterrupt:
|
|
print("Daemon de monitoreo de red detenido.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|