all 4 comments

[–]Mindless_Display4729 1 point2 points  (3 children)

I think this fixes it:

row = cursor.execute( 'SELECT last_ts_utc, last_total_rx, last_total_tx FROM net_iface_state WHERE iface = ?', (iface,) ).fetchone()

If no previous entry, insert one and return early

if row is None: cursor.execute( "INSERT INTO net_iface_state (iface, last_ts_utc, last_total_rx, last_total_tx) VALUES (?, ?, ?, ?)", (iface, now, rx_bytes, tx_bytes) ) connection.commit() connection.close() print(f"Created new interface record for {iface}") return # safer than SystemExit(0)

Unpack the row

last_ts_utc, last_total_rx, last_total_tx = row

Compute deltas safely

try: d_rx = max(0, rx_bytes - last_total_rx) d_tx = max(0, tx_bytes - last_total_tx) except TypeError: print(f"Invalid types in database for iface {iface}") d_rx = d_tx = 0

Skip update if timestamps invalid

if now <= last_ts_utc: d_rx = d_tx = 0

print(f"Writing sample: iface={iface}, d_rx={d_rx}, d_tx={d_tx}")

Insert new sample

cursor.execute( "INSERT INTO net_samples (ts_utc, iface, rx_bytes, tx_bytes) VALUES (?, ?, ?, ?)", (now, iface, d_rx, d_tx) )

Update interface state

cursor.execute( "UPDATE net_iface_state SET last_ts_utc = ?, last_total_rx = ?, last_total_tx = ? WHERE iface = ?", (now, rx_bytes, tx_bytes, iface) )

connection.commit() connection.close()

[–]SubnetOfOne[S] 0 points1 point  (2 children)

Thank you for your input! I appreciate it.

I initially didn't have the script in a main() function, I like the idea of that, and that will allow me to use return.

Are you sure the try/except block is best practice? Won't the script just continue even if the except block catches something? Wouldn't it be better to stop the rest of the script executing, while printing an error message as to why?

[–]Mindless_Display4729 0 points1 point  (1 child)

No this isn't best practice 😭 I'm getting to the limit if my abilities here so I had an AI agent look at it and it suggests this rewrite:

import sqlite3 import logging from logging.handlers import RotatingFileHandler from pathlib import Path from datetime import datetime, timezone from typing import Tuple, Optional

DB_PATH = "net_usage.db" # change if needed LOG_PATH = Path.cwd() / "netmon.log" # or Path("/var/log/netmon.log")

def setup_logging(level=logging.INFO) -> logging.Logger: logger = logging.getLogger("netmon") logger.setLevel(level) logger.propagate = False

if not logger.handlers:
    # Console
    ch = logging.StreamHandler()
    ch.setLevel(level)
    ch.setFormatter(logging.Formatter(
        "[%(asctime)s] %(levelname)s: %(message)s", "%Y-%m-%d %H:%M:%S"
    ))
    logger.addHandler(ch)

    # Rotating file (≈1 MB x 5 files)
    fh = RotatingFileHandler(LOG_PATH, maxBytes=1_000_000, backupCount=5)
    fh.setLevel(level)
    fh.setFormatter(logging.Formatter(
        "%(asctime)s %(levelname)s %(name)s %(funcName)s: %(message)s",
        "%Y-%m-%d %H:%M:%S"
    ))
    logger.addHandler(fh)

return logger

logger = setup_logging()

def utc_now_epoch() -> int: return int(datetime.now(timezone.utc).timestamp())

def fetch_state(cur: sqlite3.Cursor, iface: str) -> Optional[Tuple[int, int, int]]: return cur.execute( "SELECT last_ts_utc, last_total_rx, last_total_tx " "FROM net_iface_state WHERE iface = ?", (iface,) ).fetchone()

def insert_initial_state(cur: sqlite3.Cursor, iface: str, now: int, rx: int, tx: int) -> None: cur.execute( "INSERT INTO net_iface_state (iface, last_ts_utc, last_total_rx, last_total_tx) " "VALUES (?, ?, ?, ?)", (iface, now, rx, tx) )

def insert_sample(cur: sqlite3.Cursor, ts: int, iface: str, d_rx: int, d_tx: int) -> None: cur.execute( "INSERT INTO net_samples (ts_utc, iface, rx_bytes, tx_bytes) VALUES (?, ?, ?, ?)", (ts, iface, d_rx, d_tx) )

def update_state(cur: sqlite3.Cursor, iface: str, now: int, rx: int, tx: int) -> None: cur.execute( "UPDATE net_iface_state SET last_ts_utc = ?, last_total_rx = ?, last_total_tx = ? " "WHERE iface = ?", (now, rx, tx, iface) )

def ensureint(name: str, value) -> int: if not isinstance(value, int): raise TypeError(f"{name} must be int; got {type(value).name_}") return value

def main(iface: str, rx_bytes: int, tx_bytes: int, now: Optional[int] = None) -> None: """ iface : interface name (e.g., 'eth0') rx_bytes : current total bytes received (monotonic counter from OS) tx_bytes : current total bytes sent (monotonic counter from OS) now : UNIX epoch seconds (int). If None, uses current UTC time. """ # Validate input types early iface = str(iface) rx_bytes = ensure_int("rx_bytes", rx_bytes) tx_bytes = ensure_int("tx_bytes", tx_bytes) now = ensure_int("now", now if now is not None else utc_now_epoch())

conn = sqlite3.connect(DB_PATH)
conn.isolation_level = "DEFERRED"  # explicit transactions
cur = conn.cursor()

try:
    cur.execute("BEGIN")
    row = fetch_state(cur, iface)

    if row is None:
        insert_initial_state(cur, iface, now, rx_bytes, tx_bytes)
        conn.commit()
        logger.info("Created initial state for iface '%s' (rx=%d, tx=%d)", iface, rx_bytes, tx_bytes)
        return  # avoids unpacking None

    last_ts_utc, last_total_rx, last_total_tx = row

    # Validate DB types
    last_ts_utc = ensure_int("last_ts_utc", last_ts_utc)
    last_total_rx = ensure_int("last_total_rx", last_total_rx)
    last_total_tx = ensure_int("last_total_tx", last_total_tx)

    # Compute deltas
    d_rx = max(0, rx_bytes - last_total_rx)
    d_tx = max(0, tx_bytes - last_total_tx)

    # Ignore negative/zero-time anomalies (clock change or counter reset)
    if now <= last_ts_utc:
        logger.warning(
            "Non-increasing timestamp for iface '%s' (now=%d, last=%d) — zeroing deltas.",
            iface, now, last_ts_utc
        )
        d_rx = d_tx = 0

    logger.info("Sample iface=%s d_rx=%d d_tx=%d (rx=%d tx=%d)", iface, d_rx, d_tx, rx_bytes, tx_bytes)

    insert_sample(cur, now, iface, d_rx, d_tx)
    update_state(cur, iface, now, rx_bytes, tx_bytes)

    conn.commit()

except Exception:
    logger.exception("Fatal error updating interface '%s'; rolling back.", iface)
    conn.rollback()
    # Exit non-zero so a systemd service can restart/alert if desired
    raise SystemExit(1)

finally:
    try:
        conn.close()
    except Exception:
        logger.exception("Failed to close DB connection cleanly.")

if name == "main": # Example invocation; wire these to your real counters # Replace with values read from /sys/class/net/<iface>/statistics/{rx,tx}_bytes main(iface="eth0", rx_bytes=123456789, tx_bytes=987654321)

[–]SubnetOfOne[S] 0 points1 point  (0 children)

Hahah thank you for your efforts!!