import os import time import sqlite3 import asyncio import pyotp import httpx import base64 import secrets import urllib.parse from typing import Optional from fastapi import FastAPI, Request, Depends, HTTPException from fastapi.responses import Response, FileResponse, JSONResponse from fastapi.staticfiles import StaticFiles from starlette.middleware.sessions import SessionMiddleware from passlib.context import CryptContext from pydantic import BaseModel from cryptography.fernet import Fernet pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") WEB_PASSWORD = os.getenv("WEB_PASSWORD", "admin") ENCRYPTION_KEY = os.getenv("ENCRYPTION_KEY") FIREWALL_HOST_IP = os.getenv("FIREWALL_HOST_IP") DB_PATH = "data/instances.db" if not ENCRYPTION_KEY or ENCRYPTION_KEY == "CHANGE_ME_TO_A_VALID_FERNET_KEY": print("WARNING: ENCRYPTION_KEY not set or invalid.") print("Generating a random key for this session, but secrets will be lost on restart!") ENCRYPTION_KEY = Fernet.generate_key().decode() try: fernet = Fernet(ENCRYPTION_KEY) except ValueError: print("ERROR: ENCRYPTION_KEY must be a valid 32-byte base64 encoded string.") exit(1) app = FastAPI() app.add_middleware(SessionMiddleware, secret_key=ENCRYPTION_KEY) def get_password_hash(password): # bcrypt has a 72-byte limit return pwd_context.hash(password[:72]) def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password[:72], hashed_password) def init_db(): conn = sqlite3.connect(DB_PATH) c = conn.cursor() # Create users table c.execute(''' CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, hashed_password TEXT NOT NULL, is_admin INTEGER DEFAULT 0 ) ''') # Check if instances table has user_id, if not, add it (simple migration) c.execute("PRAGMA table_info(instances)") columns = [col[1] for col in c.fetchall()] if "user_id" not in columns: print("Migrating instances table to include user_id...") # SQLite doesn't support adding a column with a foreign key constraint easily, # so we'll just add the column for now. c.execute('ALTER TABLE instances ADD COLUMN user_id INTEGER') c.execute(''' CREATE TABLE IF NOT EXISTS instances ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, ip TEXT NOT NULL, port INTEGER NOT NULL, encrypted_secret TEXT NOT NULL, last_status TEXT DEFAULT 'Unknown', last_checked REAL DEFAULT 0, user_id INTEGER, FOREIGN KEY (user_id) REFERENCES users (id) ) ''') # Create or update default admin if no users exist or only one user exists c.execute('SELECT COUNT(*) FROM users') count = c.fetchone()[0] if count == 0: print("Creating default admin user...") admin_hash = get_password_hash(WEB_PASSWORD) c.execute('INSERT INTO users (username, hashed_password, is_admin) VALUES (?, ?, ?)', ('admin', admin_hash, 1)) elif count == 1: # If there's only one user and it's 'admin', ensure its password matches WEB_PASSWORD c.execute('SELECT id, username FROM users LIMIT 1') user = c.fetchone() if user[1] == 'admin': print("Ensuring admin password matches WEB_PASSWORD...") admin_hash = get_password_hash(WEB_PASSWORD) c.execute('UPDATE users SET hashed_password = ? WHERE id = ?', (admin_hash, user[0])) conn.commit() conn.close() init_db() def is_authenticated(request: Request): if not request.session.get("user_id"): raise HTTPException(status_code=401, detail="Not authenticated") return True def is_admin(request: Request): if not request.session.get("is_admin"): raise HTTPException(status_code=403, detail="Admin access required") return True @app.post("/api/login") async def login(request: Request): data = await request.json() username = data.get("username") password = data.get("password") conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row c = conn.cursor() c.execute('SELECT * FROM users WHERE username = ?', (username,)) user = c.fetchone() conn.close() if user and verify_password(password, user['hashed_password']): request.session["user_id"] = user['id'] request.session["username"] = user['username'] request.session["is_admin"] = bool(user['is_admin']) return {"status": "ok", "user": {"username": user['username'], "is_admin": bool(user['is_admin'])}} raise HTTPException(status_code=401, detail="Invalid username or password") @app.get("/api/logout") async def logout(request: Request): request.session.clear() return {"status": "ok"} @app.get("/api/auth/status") async def auth_status(request: Request): if request.session.get("user_id"): return { "authenticated": True, "user": { "username": request.session.get("username"), "is_admin": request.session.get("is_admin") } } return {"authenticated": False} # User Management (Admin Only) @app.get("/api/users", dependencies=[Depends(is_authenticated), Depends(is_admin)]) def get_users(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row c = conn.cursor() c.execute('SELECT id, username, is_admin FROM users') users = [dict(row) for row in c.fetchall()] conn.close() return users @app.post("/api/users", dependencies=[Depends(is_authenticated), Depends(is_admin)]) async def create_user(request: Request): data = await request.json() username = data.get("username") password = data.get("password") is_admin = 1 if data.get("is_admin") else 0 if not username or not password: raise HTTPException(status_code=400, detail="Username and password required") hashed = get_password_hash(password) try: conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute('INSERT INTO users (username, hashed_password, is_admin) VALUES (?, ?, ?)', (username, hashed, is_admin)) conn.commit() conn.close() except sqlite3.IntegrityError: raise HTTPException(status_code=400, detail="Username already exists") return {"status": "ok"} @app.delete("/api/users/{id}", dependencies=[Depends(is_authenticated), Depends(is_admin)]) def delete_user(id: int, request: Request): if id == request.session.get("user_id"): raise HTTPException(status_code=400, detail="Cannot delete yourself") conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute('DELETE FROM users WHERE id=?', (id,)) # Also delete their instances? Let's say yes for cleanliness. c.execute('DELETE FROM instances WHERE user_id=?', (id,)) conn.commit() conn.close() return {"status": "ok"} @app.put("/api/users/{id}/password", dependencies=[Depends(is_authenticated), Depends(is_admin)]) async def admin_reset_password(id: int, request: Request): data = await request.json() new_password = data.get("password") if not new_password: raise HTTPException(status_code=400, detail="New password required") hashed = get_password_hash(new_password) conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute('UPDATE users SET hashed_password = ? WHERE id = ?', (hashed, id)) conn.commit() conn.close() return {"status": "ok"} # Own Password Change (All Users) @app.put("/api/users/me/password", dependencies=[Depends(is_authenticated)]) async def change_own_password(request: Request): data = await request.json() new_password = data.get("password") if not new_password: raise HTTPException(status_code=400, detail="New password required") user_id = request.session.get("user_id") hashed = get_password_hash(new_password) conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute('UPDATE users SET hashed_password = ? WHERE id = ?', (hashed, user_id)) conn.commit() conn.close() return {"status": "ok"} class InstanceCreate(BaseModel): name: str ip: str port: int = 4646 secret: str def clean_secret(secret_input): secret_clean = secret_input.strip() if secret_clean.lower().startswith('otpauth://'): try: parsed = urllib.parse.urlparse(secret_clean) params = urllib.parse.parse_qs(parsed.query) if 'secret' in params: return params['secret'][0].replace(" ", "").upper() except Exception: pass return secret_clean.replace(" ", "").upper() async def poll_instances(): while True: try: conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row c = conn.cursor() c.execute('SELECT * FROM instances') instances = c.fetchall() conn.close() async with httpx.AsyncClient(timeout=1.0) as client: for inst in instances: # Skip if we checked recently and it was successful (60s cooldown) if inst['last_status'] == 'Sent' and (time.time() - inst['last_checked'] < 60): continue ip = inst['ip'] port = inst['port'] try: url = f"http://{ip}:{port}/ffxivlauncher/" # Poll the server endpoint to see if it's accepting connections await client.get(url, timeout=0.5) secret = fernet.decrypt(inst['encrypted_secret'].encode()).decode() totp = pyotp.TOTP(secret) otp = totp.now() send_url = f"http://{ip}:{port}/ffxivlauncher/{otp}" resp = await client.get(send_url, timeout=1.0) if resp.status_code == 200: status_msg = "Sent" else: status_msg = f"Failed: {resp.status_code}" except (httpx.ConnectError, httpx.TimeoutException): status_msg = "Offline" except Exception as e: status_msg = f"Error: {str(e)}" conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute(''' UPDATE instances SET last_status = ?, last_checked = ? WHERE id = ? ''', (status_msg, time.time(), inst['id'])) conn.commit() conn.close() except Exception as e: print(f"Poller error: {e}") await asyncio.sleep(5) @app.on_event("startup") async def startup_event(): asyncio.create_task(poll_instances()) @app.get("/api/instances", dependencies=[Depends(is_authenticated)]) def get_instances(request: Request): user_id = request.session.get("user_id") conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row c = conn.cursor() c.execute('SELECT id, name, ip, port, last_status, last_checked FROM instances WHERE user_id = ?', (user_id,)) instances = [dict(row) for row in c.fetchall()] conn.close() return instances @app.post("/api/instances", dependencies=[Depends(is_authenticated)]) def create_instance(inst: InstanceCreate, request: Request): user_id = request.session.get("user_id") cleaned = clean_secret(inst.secret) encrypted = fernet.encrypt(cleaned.encode()).decode() conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute(''' INSERT INTO instances (name, ip, port, encrypted_secret, user_id) VALUES (?, ?, ?, ?, ?) ''', (inst.name, inst.ip, inst.port, encrypted, user_id)) conn.commit() conn.close() return {"status": "ok"} @app.put("/api/instances/{id}", dependencies=[Depends(is_authenticated)]) def update_instance(id: int, inst: InstanceUpdate, request: Request): user_id = request.session.get("user_id") conn = sqlite3.connect(DB_PATH) c = conn.cursor() # Verify ownership c.execute('SELECT id FROM instances WHERE id=? AND user_id=?', (id, user_id)) if not c.fetchone(): conn.close() raise HTTPException(status_code=404, detail="Instance not found") if inst.secret: cleaned = clean_secret(inst.secret) encrypted = fernet.encrypt(cleaned.encode()).decode() c.execute(''' UPDATE instances SET name=?, ip=?, port=?, encrypted_secret=? WHERE id=? AND user_id=? ''', (inst.name, inst.ip, inst.port, encrypted, id, user_id)) else: c.execute(''' UPDATE instances SET name=?, ip=?, port=? WHERE id=? AND user_id=? ''', (inst.name, inst.ip, inst.port, id, user_id)) conn.commit() conn.close() return {"status": "ok"} @app.get("/api/config", dependencies=[Depends(is_authenticated)]) def get_config(): return {"firewall_host_ip": FIREWALL_HOST_IP} @app.delete("/api/instances/{id}", dependencies=[Depends(is_authenticated)]) def delete_instance(id: int, request: Request): user_id = request.session.get("user_id") conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute('DELETE FROM instances WHERE id=? AND user_id=?', (id, user_id)) conn.commit() conn.close() return {"status": "ok"} # Mount root and static @app.get("/") def read_root(): return FileResponse("static/index.html") app.mount("/static", StaticFiles(directory="static"), name="static")