Nut-upsd-fake

Iš Žinynas.
Jump to navigation Jump to search

Jeigu taip atsitiko ir network ups'as sugrybavo, arba dėl kažkokių priežaščių reikia raportuoti, kad upsas gyvas ir online, galima pasinaudoti šiuo fake nut-upsd daemonu...

#!/usr/bin/env python3
import socket
import threading
import datetime

HOST = "0.0.0.0"
PORT = 3493

PRIMARY_UPSNAME = "qnapups"
UPS_ALIASES = {"qnapups", "ups"}

USERNAME = "monuser"
PASSWORD = "secret"

VARS = {
    "ups.status": "OL",
    "battery.charge": "100",
    "battery.charge.low": "10",
    "battery.charge.warning": "20",
    "battery.runtime": "3600",
    "battery.runtime.low": "300",
    "battery.voltage": "26.9",
    "battery.voltage.nominal": "24",
    "input.voltage": "230.0",
    "input.voltage.nominal": "230",
    "output.voltage": "230.0",
    "ups.load": "25",
    "ups.mfr": "FakeNUT",
    "ups.model": "FakeQNAPUPS",
    "device.type": "ups",
    "device.model": "FakeQNAPUPS",
    "driver.name": "dummy-ups",
}

def ts():
    return datetime.datetime.now().strftime("%H:%M:%S")

def log(msg):
    print(f"[{ts()}] {msg}", flush=True)

def send_line(conn, addr, line):
    log(f"{addr} << {line}")
    conn.sendall((line + "\n").encode())

def normalize_ups_name(name: str) -> str:
    if name in UPS_ALIASES:
        return name
    return PRIMARY_UPSNAME

def handle_client(conn, addr_tuple):
    addr = f"{addr_tuple[0]}:{addr_tuple[1]}"
    log(f"[+] CONNECT {addr}")

    authed_user = None
    authed_pass = None
    logged_in = False
    tls_started = False

    f = conn.makefile("r", encoding="utf-8", newline="\n")

    try:
        for raw in f:
            line = raw.strip()
            if not line:
                continue

            log(f"{addr} >> {line}")
            parts = line.split()
            cmd = parts[0].upper()

            if cmd == "VER":
                send_line(conn, addr, "Network UPS Tools upsd 2.8.0")

            elif cmd == "NETVER":
                send_line(conn, addr, "2.8.0")

            elif cmd == "PROTVER":
                send_line(conn, addr, "1.3")

            elif cmd == "STARTTLS":
                tls_started = True
                send_line(conn, addr, "ERR FEATURE-NOT-CONFIGURED")

            elif cmd == "LIST" and len(parts) >= 2 and parts[1].upper() == "UPS":
                send_line(conn, addr, "BEGIN LIST UPS")
                send_line(conn, addr, f'UPS qnapups "Fake QNAP UPS"')
                send_line(conn, addr, f'UPS ups "Fake Synology Default UPS"')
                send_line(conn, addr, "END LIST UPS")

            elif cmd == "USERNAME" and len(parts) >= 2:
                authed_user = parts[1]
                send_line(conn, addr, "OK")

            elif cmd == "PASSWORD" and len(parts) >= 2:
                authed_pass = parts[1]
                send_line(conn, addr, "OK")

            elif cmd == "LOGIN" and len(parts) >= 2:
                upsname = parts[1]
                if upsname in UPS_ALIASES and authed_user == USERNAME and authed_pass == PASSWORD:
                    logged_in = True
                    send_line(conn, addr, "OK")
                else:
                    send_line(conn, addr, "ERR ACCESS-DENIED")

            elif cmd in ("MASTER", "PRIMARY") and len(parts) >= 2:
                upsname = parts[1]
                if logged_in and upsname in UPS_ALIASES:
                    send_line(conn, addr, "OK")
                else:
                    send_line(conn, addr, "ERR ACCESS-DENIED")

            elif cmd == "GET" and len(parts) >= 4 and parts[1].upper() == "VAR":
                ups = parts[2]
                var = " ".join(parts[3:])

                if ups not in UPS_ALIASES:
                    send_line(conn, addr, "ERR UNKNOWN-UPS")
                elif var in VARS:
                    resp_ups = normalize_ups_name(ups)
                    send_line(conn, addr, f'VAR {resp_ups} {var} "{VARS[var]}"')
                else:
                    send_line(conn, addr, "ERR VAR-NOT-SUPPORTED")

            elif cmd == "LIST" and len(parts) >= 3 and parts[1].upper() == "VAR":
                ups = parts[2]

                if ups not in UPS_ALIASES:
                    send_line(conn, addr, "ERR UNKNOWN-UPS")
                else:
                    resp_ups = normalize_ups_name(ups)
                    send_line(conn, addr, f"BEGIN LIST VAR {resp_ups}")
                    for k, v in VARS.items():
                        send_line(conn, addr, f'VAR {resp_ups} {k} "{v}"')
                    send_line(conn, addr, f"END LIST VAR {resp_ups}")

            elif cmd == "HELP":
                send_line(conn, addr, "OK Commands: VER NETVER PROTVER STARTTLS LIST USERNAME PASSWORD LOGIN GET LOGOUT")

            elif cmd == "LOGOUT":
                send_line(conn, addr, "OK Goodbye")
                break

            else:
                send_line(conn, addr, "ERR UNKNOWN-COMMAND")

    except Exception as e:
        log(f"[!] ERROR {addr}: {e}")
    finally:
        log(f"[-] DISCONNECT {addr} tls={tls_started} logged_in={logged_in}")
        try:
            conn.close()
        except Exception:
            pass

def main():
    log(f"Starting Fake NUT DEBUG server on {HOST}:{PORT}")
    log(f"Accepted UPS names: {', '.join(sorted(UPS_ALIASES))}")
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    s.bind((HOST, PORT))
    s.listen(20)

    while True:
        conn, addr = s.accept()
        threading.Thread(target=handle_client, args=(conn, addr), daemon=True).start()

if __name__ == "__main__":
    main()