Nut-upsd-fake
Jump to navigation
Jump to search
Jeigu taip atsitiko ir network ups'as sugrybavo, arba dėl kažkokių priežaščių reikia raportuoti, kad upsas gyvas ir online, galima pasinaudoti šiuo fake nut-upsd daemonu...
#!/usr/bin/env python3
import socket
import threading
import datetime
HOST = "0.0.0.0"
PORT = 3493
PRIMARY_UPSNAME = "qnapups"
UPS_ALIASES = {"qnapups", "ups"}
USERNAME = "monuser"
PASSWORD = "secret"
VARS = {
"ups.status": "OL",
"battery.charge": "100",
"battery.charge.low": "10",
"battery.charge.warning": "20",
"battery.runtime": "3600",
"battery.runtime.low": "300",
"battery.voltage": "26.9",
"battery.voltage.nominal": "24",
"input.voltage": "230.0",
"input.voltage.nominal": "230",
"output.voltage": "230.0",
"ups.load": "25",
"ups.mfr": "FakeNUT",
"ups.model": "FakeQNAPUPS",
"device.type": "ups",
"device.model": "FakeQNAPUPS",
"driver.name": "dummy-ups",
}
def ts():
return datetime.datetime.now().strftime("%H:%M:%S")
def log(msg):
print(f"[{ts()}] {msg}", flush=True)
def send_line(conn, addr, line):
log(f"{addr} << {line}")
conn.sendall((line + "\n").encode())
def normalize_ups_name(name: str) -> str:
if name in UPS_ALIASES:
return name
return PRIMARY_UPSNAME
def handle_client(conn, addr_tuple):
addr = f"{addr_tuple[0]}:{addr_tuple[1]}"
log(f"[+] CONNECT {addr}")
authed_user = None
authed_pass = None
logged_in = False
tls_started = False
f = conn.makefile("r", encoding="utf-8", newline="\n")
try:
for raw in f:
line = raw.strip()
if not line:
continue
log(f"{addr} >> {line}")
parts = line.split()
cmd = parts[0].upper()
if cmd == "VER":
send_line(conn, addr, "Network UPS Tools upsd 2.8.0")
elif cmd == "NETVER":
send_line(conn, addr, "2.8.0")
elif cmd == "PROTVER":
send_line(conn, addr, "1.3")
elif cmd == "STARTTLS":
tls_started = True
send_line(conn, addr, "ERR FEATURE-NOT-CONFIGURED")
elif cmd == "LIST" and len(parts) >= 2 and parts[1].upper() == "UPS":
send_line(conn, addr, "BEGIN LIST UPS")
send_line(conn, addr, f'UPS qnapups "Fake QNAP UPS"')
send_line(conn, addr, f'UPS ups "Fake Synology Default UPS"')
send_line(conn, addr, "END LIST UPS")
elif cmd == "USERNAME" and len(parts) >= 2:
authed_user = parts[1]
send_line(conn, addr, "OK")
elif cmd == "PASSWORD" and len(parts) >= 2:
authed_pass = parts[1]
send_line(conn, addr, "OK")
elif cmd == "LOGIN" and len(parts) >= 2:
upsname = parts[1]
if upsname in UPS_ALIASES and authed_user == USERNAME and authed_pass == PASSWORD:
logged_in = True
send_line(conn, addr, "OK")
else:
send_line(conn, addr, "ERR ACCESS-DENIED")
elif cmd in ("MASTER", "PRIMARY") and len(parts) >= 2:
upsname = parts[1]
if logged_in and upsname in UPS_ALIASES:
send_line(conn, addr, "OK")
else:
send_line(conn, addr, "ERR ACCESS-DENIED")
elif cmd == "GET" and len(parts) >= 4 and parts[1].upper() == "VAR":
ups = parts[2]
var = " ".join(parts[3:])
if ups not in UPS_ALIASES:
send_line(conn, addr, "ERR UNKNOWN-UPS")
elif var in VARS:
resp_ups = normalize_ups_name(ups)
send_line(conn, addr, f'VAR {resp_ups} {var} "{VARS[var]}"')
else:
send_line(conn, addr, "ERR VAR-NOT-SUPPORTED")
elif cmd == "LIST" and len(parts) >= 3 and parts[1].upper() == "VAR":
ups = parts[2]
if ups not in UPS_ALIASES:
send_line(conn, addr, "ERR UNKNOWN-UPS")
else:
resp_ups = normalize_ups_name(ups)
send_line(conn, addr, f"BEGIN LIST VAR {resp_ups}")
for k, v in VARS.items():
send_line(conn, addr, f'VAR {resp_ups} {k} "{v}"')
send_line(conn, addr, f"END LIST VAR {resp_ups}")
elif cmd == "HELP":
send_line(conn, addr, "OK Commands: VER NETVER PROTVER STARTTLS LIST USERNAME PASSWORD LOGIN GET LOGOUT")
elif cmd == "LOGOUT":
send_line(conn, addr, "OK Goodbye")
break
else:
send_line(conn, addr, "ERR UNKNOWN-COMMAND")
except Exception as e:
log(f"[!] ERROR {addr}: {e}")
finally:
log(f"[-] DISCONNECT {addr} tls={tls_started} logged_in={logged_in}")
try:
conn.close()
except Exception:
pass
def main():
log(f"Starting Fake NUT DEBUG server on {HOST}:{PORT}")
log(f"Accepted UPS names: {', '.join(sorted(UPS_ALIASES))}")
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((HOST, PORT))
s.listen(20)
while True:
conn, addr = s.accept()
threading.Thread(target=handle_client, args=(conn, addr), daemon=True).start()
if __name__ == "__main__":
main()