XIVLauncherRemoteOTP/main.py

386 lines
14 KiB
Python

import os
import time
import sqlite3
import asyncio
import pyotp
import httpx
import base64
import secrets
import urllib.parse
from typing import Optional
from fastapi import FastAPI, Request, Depends, HTTPException
from fastapi.responses import Response, FileResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from starlette.middleware.sessions import SessionMiddleware
from passlib.context import CryptContext
from pydantic import BaseModel
from cryptography.fernet import Fernet
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
WEB_PASSWORD = os.getenv("WEB_PASSWORD", "admin")
ENCRYPTION_KEY = os.getenv("ENCRYPTION_KEY")
FIREWALL_HOST_IP = os.getenv("FIREWALL_HOST_IP")
DB_PATH = "data/instances.db"
if not ENCRYPTION_KEY or ENCRYPTION_KEY == "CHANGE_ME_TO_A_VALID_FERNET_KEY":
print("WARNING: ENCRYPTION_KEY not set or invalid.")
print("Generating a random key for this session, but secrets will be lost on restart!")
ENCRYPTION_KEY = Fernet.generate_key().decode()
try:
fernet = Fernet(ENCRYPTION_KEY)
except ValueError:
print("ERROR: ENCRYPTION_KEY must be a valid 32-byte base64 encoded string.")
exit(1)
app = FastAPI()
app.add_middleware(SessionMiddleware, secret_key=ENCRYPTION_KEY)
def get_password_hash(password):
# bcrypt has a 72-byte limit
return pwd_context.hash(password[:72])
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password[:72], hashed_password)
def init_db():
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
# Create users table
c.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
hashed_password TEXT NOT NULL,
is_admin INTEGER DEFAULT 0
)
''')
# Check if instances table has user_id, if not, add it (simple migration)
c.execute("PRAGMA table_info(instances)")
columns = [col[1] for col in c.fetchall()]
if "user_id" not in columns:
print("Migrating instances table to include user_id...")
# SQLite doesn't support adding a column with a foreign key constraint easily,
# so we'll just add the column for now.
c.execute('ALTER TABLE instances ADD COLUMN user_id INTEGER')
c.execute('''
CREATE TABLE IF NOT EXISTS instances (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
ip TEXT NOT NULL,
port INTEGER NOT NULL,
encrypted_secret TEXT NOT NULL,
last_status TEXT DEFAULT 'Unknown',
last_checked REAL DEFAULT 0,
user_id INTEGER,
FOREIGN KEY (user_id) REFERENCES users (id)
)
''')
# Create or update default admin if no users exist or only one user exists
c.execute('SELECT COUNT(*) FROM users')
count = c.fetchone()[0]
if count == 0:
print("Creating default admin user...")
admin_hash = get_password_hash(WEB_PASSWORD)
c.execute('INSERT INTO users (username, hashed_password, is_admin) VALUES (?, ?, ?)',
('admin', admin_hash, 1))
elif count == 1:
# If there's only one user and it's 'admin', ensure its password matches WEB_PASSWORD
c.execute('SELECT id, username FROM users LIMIT 1')
user = c.fetchone()
if user[1] == 'admin':
print("Ensuring admin password matches WEB_PASSWORD...")
admin_hash = get_password_hash(WEB_PASSWORD)
c.execute('UPDATE users SET hashed_password = ? WHERE id = ?', (admin_hash, user[0]))
conn.commit()
conn.close()
init_db()
def is_authenticated(request: Request):
if not request.session.get("user_id"):
raise HTTPException(status_code=401, detail="Not authenticated")
return True
def is_admin(request: Request):
if not request.session.get("is_admin"):
raise HTTPException(status_code=403, detail="Admin access required")
return True
@app.post("/api/login")
async def login(request: Request):
data = await request.json()
username = data.get("username")
password = data.get("password")
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
c = conn.cursor()
c.execute('SELECT * FROM users WHERE username = ?', (username,))
user = c.fetchone()
conn.close()
if user and verify_password(password, user['hashed_password']):
request.session["user_id"] = user['id']
request.session["username"] = user['username']
request.session["is_admin"] = bool(user['is_admin'])
return {"status": "ok", "user": {"username": user['username'], "is_admin": bool(user['is_admin'])}}
raise HTTPException(status_code=401, detail="Invalid username or password")
@app.get("/api/logout")
async def logout(request: Request):
request.session.clear()
return {"status": "ok"}
@app.get("/api/auth/status")
async def auth_status(request: Request):
if request.session.get("user_id"):
return {
"authenticated": True,
"user": {
"username": request.session.get("username"),
"is_admin": request.session.get("is_admin")
}
}
return {"authenticated": False}
# User Management (Admin Only)
@app.get("/api/users", dependencies=[Depends(is_authenticated), Depends(is_admin)])
def get_users():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
c = conn.cursor()
c.execute('SELECT id, username, is_admin FROM users')
users = [dict(row) for row in c.fetchall()]
conn.close()
return users
@app.post("/api/users", dependencies=[Depends(is_authenticated), Depends(is_admin)])
async def create_user(request: Request):
data = await request.json()
username = data.get("username")
password = data.get("password")
is_admin = 1 if data.get("is_admin") else 0
if not username or not password:
raise HTTPException(status_code=400, detail="Username and password required")
hashed = get_password_hash(password)
try:
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute('INSERT INTO users (username, hashed_password, is_admin) VALUES (?, ?, ?)',
(username, hashed, is_admin))
conn.commit()
conn.close()
except sqlite3.IntegrityError:
raise HTTPException(status_code=400, detail="Username already exists")
return {"status": "ok"}
@app.delete("/api/users/{id}", dependencies=[Depends(is_authenticated), Depends(is_admin)])
def delete_user(id: int, request: Request):
if id == request.session.get("user_id"):
raise HTTPException(status_code=400, detail="Cannot delete yourself")
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute('DELETE FROM users WHERE id=?', (id,))
# Also delete their instances? Let's say yes for cleanliness.
c.execute('DELETE FROM instances WHERE user_id=?', (id,))
conn.commit()
conn.close()
return {"status": "ok"}
@app.put("/api/users/{id}/password", dependencies=[Depends(is_authenticated), Depends(is_admin)])
async def admin_reset_password(id: int, request: Request):
data = await request.json()
new_password = data.get("password")
if not new_password:
raise HTTPException(status_code=400, detail="New password required")
hashed = get_password_hash(new_password)
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute('UPDATE users SET hashed_password = ? WHERE id = ?', (hashed, id))
conn.commit()
conn.close()
return {"status": "ok"}
# Own Password Change (All Users)
@app.put("/api/users/me/password", dependencies=[Depends(is_authenticated)])
async def change_own_password(request: Request):
data = await request.json()
new_password = data.get("password")
if not new_password:
raise HTTPException(status_code=400, detail="New password required")
user_id = request.session.get("user_id")
hashed = get_password_hash(new_password)
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute('UPDATE users SET hashed_password = ? WHERE id = ?', (hashed, user_id))
conn.commit()
conn.close()
return {"status": "ok"}
class InstanceCreate(BaseModel):
name: str
ip: str
port: int = 4646
secret: str
class InstanceUpdate(BaseModel):
name: str
ip: str
port: int = 4646
secret: Optional[str] = None
def clean_secret(secret_input):
secret_clean = secret_input.strip()
if secret_clean.lower().startswith('otpauth://'):
try:
parsed = urllib.parse.urlparse(secret_clean)
params = urllib.parse.parse_qs(parsed.query)
if 'secret' in params:
return params['secret'][0].replace(" ", "").upper()
except Exception:
pass
return secret_clean.replace(" ", "").upper()
async def poll_instances():
while True:
try:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
c = conn.cursor()
c.execute('SELECT * FROM instances')
instances = c.fetchall()
conn.close()
async with httpx.AsyncClient(timeout=1.0) as client:
for inst in instances:
# Skip if we checked recently and it was successful (60s cooldown)
if inst['last_status'] == 'Sent' and (time.time() - inst['last_checked'] < 60):
continue
ip = inst['ip']
port = inst['port']
try:
url = f"http://{ip}:{port}/ffxivlauncher/"
# Poll the server endpoint to see if it's accepting connections
await client.get(url, timeout=0.5)
secret = fernet.decrypt(inst['encrypted_secret'].encode()).decode()
totp = pyotp.TOTP(secret)
otp = totp.now()
send_url = f"http://{ip}:{port}/ffxivlauncher/{otp}"
resp = await client.get(send_url, timeout=1.0)
if resp.status_code == 200:
status_msg = "Sent"
else:
status_msg = f"Failed: {resp.status_code}"
except (httpx.ConnectError, httpx.TimeoutException):
status_msg = "Offline"
except Exception as e:
status_msg = f"Error: {str(e)}"
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute('''
UPDATE instances
SET last_status = ?, last_checked = ?
WHERE id = ?
''', (status_msg, time.time(), inst['id']))
conn.commit()
conn.close()
except Exception as e:
print(f"Poller error: {e}")
await asyncio.sleep(5)
@app.on_event("startup")
async def startup_event():
asyncio.create_task(poll_instances())
@app.get("/api/instances", dependencies=[Depends(is_authenticated)])
def get_instances(request: Request):
user_id = request.session.get("user_id")
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
c = conn.cursor()
c.execute('SELECT id, name, ip, port, last_status, last_checked FROM instances WHERE user_id = ?', (user_id,))
instances = [dict(row) for row in c.fetchall()]
conn.close()
return instances
@app.post("/api/instances", dependencies=[Depends(is_authenticated)])
def create_instance(inst: InstanceCreate, request: Request):
user_id = request.session.get("user_id")
cleaned = clean_secret(inst.secret)
encrypted = fernet.encrypt(cleaned.encode()).decode()
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute('''
INSERT INTO instances (name, ip, port, encrypted_secret, user_id)
VALUES (?, ?, ?, ?, ?)
''', (inst.name, inst.ip, inst.port, encrypted, user_id))
conn.commit()
conn.close()
return {"status": "ok"}
@app.put("/api/instances/{id}", dependencies=[Depends(is_authenticated)])
def update_instance(id: int, inst: InstanceUpdate, request: Request):
user_id = request.session.get("user_id")
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
# Verify ownership
c.execute('SELECT id FROM instances WHERE id=? AND user_id=?', (id, user_id))
if not c.fetchone():
conn.close()
raise HTTPException(status_code=404, detail="Instance not found")
if inst.secret:
cleaned = clean_secret(inst.secret)
encrypted = fernet.encrypt(cleaned.encode()).decode()
c.execute('''
UPDATE instances SET name=?, ip=?, port=?, encrypted_secret=?
WHERE id=? AND user_id=?
''', (inst.name, inst.ip, inst.port, encrypted, id, user_id))
else:
c.execute('''
UPDATE instances SET name=?, ip=?, port=?
WHERE id=? AND user_id=?
''', (inst.name, inst.ip, inst.port, id, user_id))
conn.commit()
conn.close()
return {"status": "ok"}
@app.get("/api/config", dependencies=[Depends(is_authenticated)])
def get_config():
return {"firewall_host_ip": FIREWALL_HOST_IP}
@app.delete("/api/instances/{id}", dependencies=[Depends(is_authenticated)])
def delete_instance(id: int, request: Request):
user_id = request.session.get("user_id")
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute('DELETE FROM instances WHERE id=? AND user_id=?', (id, user_id))
conn.commit()
conn.close()
return {"status": "ok"}
# Mount root and static
@app.get("/")
def read_root():
return FileResponse("static/index.html")
app.mount("/static", StaticFiles(directory="static"), name="static")