decouple OTP secrets from instances with a dedicated database table and CRUD API endpoints

This commit is contained in:
CPTN Cosmo 2026-04-25 17:29:53 +02:00
parent c15f965a2b
commit b8202abba5
5 changed files with 317 additions and 52 deletions

171
main.py
View file

@ -58,29 +58,58 @@ def init_db():
)
''')
# Check if instances table has user_id, if not, add it (simple migration)
c.execute("PRAGMA table_info(instances)")
columns = [col[1] for col in c.fetchall()]
if "user_id" not in columns:
print("Migrating instances table to include user_id...")
# SQLite doesn't support adding a column with a foreign key constraint easily,
# so we'll just add the column for now.
c.execute('ALTER TABLE instances ADD COLUMN user_id INTEGER')
c.execute('''
CREATE TABLE IF NOT EXISTS otp_secrets (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
encrypted_secret TEXT NOT NULL,
user_id INTEGER NOT NULL,
FOREIGN KEY (user_id) REFERENCES users (id)
)
''')
# Create instances table first if it doesn't exist
c.execute('''
CREATE TABLE IF NOT EXISTS instances (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
ip TEXT NOT NULL,
port INTEGER NOT NULL,
encrypted_secret TEXT NOT NULL,
encrypted_secret TEXT,
otp_secret_id INTEGER,
last_status TEXT DEFAULT 'Unknown',
last_checked REAL DEFAULT 0,
user_id INTEGER,
FOREIGN KEY (user_id) REFERENCES users (id)
FOREIGN KEY (user_id) REFERENCES users (id),
FOREIGN KEY (otp_secret_id) REFERENCES otp_secrets (id)
)
''')
# Now check for column migrations
c.execute("PRAGMA table_info(instances)")
columns = [col[1] for col in c.fetchall()]
if "user_id" not in columns:
print("Migrating instances table to include user_id...")
c.execute('ALTER TABLE instances ADD COLUMN user_id INTEGER REFERENCES users(id)')
if "otp_secret_id" not in columns:
print("Migrating instances table to include otp_secret_id...")
c.execute('ALTER TABLE instances ADD COLUMN otp_secret_id INTEGER REFERENCES otp_secrets(id)')
# Migration: Move existing encrypted_secret to otp_secrets table
c.execute('SELECT id, name, encrypted_secret, user_id FROM instances WHERE encrypted_secret IS NOT NULL')
instances_to_migrate = c.fetchall()
for inst_id, inst_name, secret, user_id in instances_to_migrate:
if secret and user_id:
# Create a new OTP secret entry
secret_name = f"Migrated ({inst_name})"
c.execute('INSERT INTO otp_secrets (name, encrypted_secret, user_id) VALUES (?, ?, ?)',
(secret_name, secret, user_id))
otp_secret_id = c.lastrowid
# Update instance to point to the new OTP secret
c.execute('UPDATE instances SET otp_secret_id = ? WHERE id = ?', (otp_secret_id, inst_id))
# Create or update default admin if no users exist or only one user exists
c.execute('SELECT COUNT(*) FROM users')
count = c.fetchone()[0]
@ -243,17 +272,25 @@ async def change_own_username(request: Request):
raise HTTPException(status_code=400, detail="Username already exists")
return {"status": "ok"}
class OTPSecretCreate(BaseModel):
name: str
secret: str
class OTPSecretUpdate(BaseModel):
name: str
secret: Optional[str] = None
class InstanceCreate(BaseModel):
name: str
ip: str
port: int = 4646
secret: str
otp_secret_id: int
class InstanceUpdate(BaseModel):
name: str
ip: str
port: int = 4646
secret: Optional[str] = None
otp_secret_id: int
def clean_secret(secret_input):
secret_clean = secret_input.strip()
@ -267,13 +304,90 @@ def clean_secret(secret_input):
pass
return secret_clean.replace(" ", "").upper()
# OTP Secret Management
@app.get("/api/otp-secrets", dependencies=[Depends(is_authenticated)])
def get_otp_secrets(request: Request):
user_id = request.session.get("user_id")
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
c = conn.cursor()
c.execute('SELECT id, name FROM otp_secrets WHERE user_id = ?', (user_id,))
secrets = [dict(row) for row in c.fetchall()]
conn.close()
return secrets
@app.post("/api/otp-secrets", dependencies=[Depends(is_authenticated)])
def create_otp_secret(secret_data: OTPSecretCreate, request: Request):
user_id = request.session.get("user_id")
cleaned = clean_secret(secret_data.secret)
encrypted = fernet.encrypt(cleaned.encode()).decode()
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute('''
INSERT INTO otp_secrets (name, encrypted_secret, user_id)
VALUES (?, ?, ?)
''', (secret_data.name, encrypted, user_id))
conn.commit()
conn.close()
return {"status": "ok"}
@app.put("/api/otp-secrets/{id}", dependencies=[Depends(is_authenticated)])
def update_otp_secret(id: int, secret_data: OTPSecretUpdate, request: Request):
user_id = request.session.get("user_id")
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
# Verify ownership
c.execute('SELECT id FROM otp_secrets WHERE id=? AND user_id=?', (id, user_id))
if not c.fetchone():
conn.close()
raise HTTPException(status_code=404, detail="OTP Secret not found")
if secret_data.secret:
cleaned = clean_secret(secret_data.secret)
encrypted = fernet.encrypt(cleaned.encode()).decode()
c.execute('''
UPDATE otp_secrets SET name=?, encrypted_secret=?
WHERE id=? AND user_id=?
''', (secret_data.name, encrypted, id, user_id))
else:
c.execute('''
UPDATE otp_secrets SET name=?
WHERE id=? AND user_id=?
''', (secret_data.name, id, user_id))
conn.commit()
conn.close()
return {"status": "ok"}
@app.delete("/api/otp-secrets/{id}", dependencies=[Depends(is_authenticated)])
def delete_otp_secret(id: int, request: Request):
user_id = request.session.get("user_id")
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
# Check if used by any instances
c.execute('SELECT COUNT(*) FROM instances WHERE otp_secret_id=?', (id,))
if c.fetchone()[0] > 0:
conn.close()
raise HTTPException(status_code=400, detail="Cannot delete secret because it is in use by one or more instances")
c.execute('DELETE FROM otp_secrets WHERE id=? AND user_id=?', (id, user_id))
conn.commit()
conn.close()
return {"status": "ok"}
async def poll_instances():
while True:
try:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
c = conn.cursor()
c.execute('SELECT * FROM instances')
c.execute('''
SELECT i.*, s.encrypted_secret
FROM instances i
JOIN otp_secrets s ON i.otp_secret_id = s.id
''')
instances = c.fetchall()
conn.close()
@ -332,7 +446,12 @@ def get_instances(request: Request):
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
c = conn.cursor()
c.execute('SELECT id, name, ip, port, last_status, last_checked FROM instances WHERE user_id = ?', (user_id,))
c.execute('''
SELECT i.id, i.name, i.ip, i.port, i.last_status, i.last_checked, i.otp_secret_id, s.name as otp_secret_name
FROM instances i
LEFT JOIN otp_secrets s ON i.otp_secret_id = s.id
WHERE i.user_id = ?
''', (user_id,))
instances = [dict(row) for row in c.fetchall()]
conn.close()
return instances
@ -340,14 +459,12 @@ def get_instances(request: Request):
@app.post("/api/instances", dependencies=[Depends(is_authenticated)])
def create_instance(inst: InstanceCreate, request: Request):
user_id = request.session.get("user_id")
cleaned = clean_secret(inst.secret)
encrypted = fernet.encrypt(cleaned.encode()).decode()
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute('''
INSERT INTO instances (name, ip, port, encrypted_secret, user_id)
INSERT INTO instances (name, ip, port, otp_secret_id, user_id)
VALUES (?, ?, ?, ?, ?)
''', (inst.name, inst.ip, inst.port, encrypted, user_id))
''', (inst.name, inst.ip, inst.port, inst.otp_secret_id, user_id))
conn.commit()
conn.close()
return {"status": "ok"}
@ -364,18 +481,10 @@ def update_instance(id: int, inst: InstanceUpdate, request: Request):
conn.close()
raise HTTPException(status_code=404, detail="Instance not found")
if inst.secret:
cleaned = clean_secret(inst.secret)
encrypted = fernet.encrypt(cleaned.encode()).decode()
c.execute('''
UPDATE instances SET name=?, ip=?, port=?, encrypted_secret=?
WHERE id=? AND user_id=?
''', (inst.name, inst.ip, inst.port, encrypted, id, user_id))
else:
c.execute('''
UPDATE instances SET name=?, ip=?, port=?
WHERE id=? AND user_id=?
''', (inst.name, inst.ip, inst.port, id, user_id))
c.execute('''
UPDATE instances SET name=?, ip=?, port=?, otp_secret_id=?
WHERE id=? AND user_id=?
''', (inst.name, inst.ip, inst.port, inst.otp_secret_id, id, user_id))
conn.commit()
conn.close()