510 lines
19 KiB
Python
Executable file
510 lines
19 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
import sys
|
|
import json
|
|
import time
|
|
import subprocess
|
|
import threading
|
|
import urllib.request
|
|
import urllib.error
|
|
import hmac
|
|
import base64
|
|
import struct
|
|
import hashlib
|
|
import os
|
|
import shutil
|
|
import configparser
|
|
import argparse
|
|
try:
|
|
import keyring
|
|
HAS_KEYRING = True
|
|
except ImportError:
|
|
HAS_KEYRING = False
|
|
import getpass
|
|
import shlex
|
|
|
|
import urllib.parse
|
|
|
|
def prompt_gui_secret():
|
|
"""Prompts for the OTP secret using a DE-appropriate GUI dialog (kdialog or zenity)."""
|
|
desktop = os.environ.get('XDG_CURRENT_DESKTOP', '').lower()
|
|
|
|
dialogs = []
|
|
if 'kde' in desktop and shutil.which('kdialog'):
|
|
dialogs.append('kdialog')
|
|
if shutil.which('zenity'): dialogs.append('zenity')
|
|
else:
|
|
if shutil.which('zenity'): dialogs.append('zenity')
|
|
if shutil.which('kdialog'): dialogs.append('kdialog')
|
|
|
|
for dialog in dialogs:
|
|
if dialog == 'kdialog':
|
|
try:
|
|
result = subprocess.run(
|
|
['kdialog', '--password', 'Please enter your XIVLauncher TOTP Secret (base32) or otpauth:// URL:'],
|
|
capture_output=True, text=True
|
|
)
|
|
if result.returncode == 0:
|
|
return result.stdout.strip()
|
|
except Exception as e:
|
|
print(f"Warning: kdialog failed: {e}")
|
|
elif dialog == 'zenity':
|
|
try:
|
|
result = subprocess.run(
|
|
['zenity', '--password', '--title=XIVLauncher Wrapper', '--text=Please enter your TOTP Secret (base32) or otpauth:// URL:'],
|
|
capture_output=True, text=True
|
|
)
|
|
if result.returncode == 0:
|
|
return result.stdout.strip()
|
|
except Exception as e:
|
|
print(f"Warning: zenity failed: {e}")
|
|
|
|
return None
|
|
|
|
def parse_secret(secret_input):
|
|
"""Clean and extract secret from input (handles raw secret or otpauth:// URL)."""
|
|
if not secret_input:
|
|
return ""
|
|
|
|
secret_clean = secret_input.strip()
|
|
|
|
# Handle otpauth URL
|
|
if secret_clean.lower().startswith('otpauth://'):
|
|
try:
|
|
parsed = urllib.parse.urlparse(secret_clean)
|
|
params = urllib.parse.parse_qs(parsed.query)
|
|
if 'secret' in params:
|
|
# parse_qs returns a list
|
|
return params['secret'][0].replace(" ", "").upper()
|
|
else:
|
|
print("Warning: otpauth URL provided but no 'secret' parameter found.")
|
|
return ""
|
|
except Exception as e:
|
|
print(f"Warning: Failed to parse otpauth URL: {e}")
|
|
return ""
|
|
|
|
return secret_clean.replace(" ", "").upper()
|
|
|
|
def get_totp_token(secret):
|
|
"""Generates a TOTP code from the secret."""
|
|
try:
|
|
# Clean/extract secret first
|
|
secret_clean = parse_secret(secret)
|
|
if not secret_clean:
|
|
return None
|
|
|
|
# Pad the secret if needed for base32 decoding
|
|
padding = '=' * ((8 - len(secret_clean) % 8) % 8)
|
|
key = base64.b32decode(secret_clean + padding)
|
|
|
|
# Time step is 30 seconds
|
|
msg = struct.pack(">Q", int(time.time() / 30))
|
|
h = hmac.new(key, msg, hashlib.sha1).digest()
|
|
o = h[19] & 15
|
|
h = (struct.unpack(">I", h[o:o+4])[0] & 0x7fffffff) % 1000000
|
|
return f"{h:06d}"
|
|
except Exception as e:
|
|
print(f"Error generating TOTP: {e}")
|
|
return None
|
|
|
|
def send_otp(secret):
|
|
"""Attempts to send the OTP to the local XIVLauncher server."""
|
|
url_base = "http://127.0.0.1:4646/ffxivlauncher/"
|
|
print("Waiting for XIVLauncher to accept OTP...")
|
|
|
|
# Try for up to 5 minutes (300 seconds) - launcher might take time to load
|
|
start_time = time.time()
|
|
while time.time() - start_time < 300:
|
|
otp = get_totp_token(secret)
|
|
if not otp:
|
|
print("Failed to generate OTP token.")
|
|
return
|
|
|
|
try:
|
|
# Short timeout so we don't block
|
|
with urllib.request.urlopen(f"{url_base}{otp}", timeout=1) as response:
|
|
if response.status == 200:
|
|
print(f"SUCCESS: OTP {otp} sent to XIVLauncher!")
|
|
# We can stop trying once successful
|
|
return
|
|
except (urllib.error.URLError, ConnectionRefusedError, ConnectionResetError):
|
|
# Server not up or not accepting connections yet
|
|
pass
|
|
except Exception as e:
|
|
# Some other error (e.g. malformed URL if secret is bad?)
|
|
print(f"Debug: Connection error: {e}")
|
|
|
|
time.sleep(1)
|
|
|
|
print("Timed out waiting for XIVLauncher OTP prompt.")
|
|
|
|
def check_and_update_launcher_ini():
|
|
"""Checks ~/.xlcore/launcher.ini for IsOtpServer setting.
|
|
|
|
If file is missing: prompts user to enable it manually in launcher.
|
|
If file exists but setting is off: prompts user to auto-enable it.
|
|
"""
|
|
ini_path = os.path.expanduser("~/.xlcore/launcher.ini")
|
|
|
|
if not os.path.exists(ini_path):
|
|
print(f"\nConfiguration Check: '{ini_path}' not found.")
|
|
print("Please open XIVLauncher, go to Settings, and enable 'Start internal OTP server'.")
|
|
print("This is required for the wrapper to function.")
|
|
if sys.stdin.isatty():
|
|
try:
|
|
input("Press Enter to continue once you have done this (or to try anyway)...")
|
|
except EOFError:
|
|
pass
|
|
return
|
|
|
|
config = configparser.ConfigParser()
|
|
config.optionxform = str # Preserve case sensitivity
|
|
|
|
try:
|
|
config.read(ini_path)
|
|
except configparser.Error as e:
|
|
print(f"Warning: Could not read {ini_path}: {e}")
|
|
return
|
|
|
|
# Check if section exists, if not we assume malformed or empty, so we proceed to check/fix
|
|
if not config.has_section('Main'):
|
|
config.add_section('Main')
|
|
|
|
current_value = config.get('Main', 'IsOtpServer', fallback='false').lower()
|
|
|
|
if current_value != 'true':
|
|
print(f"\nConfiguration Check: '{ini_path}'")
|
|
print("XIVLauncher requires 'IsOtpServer' to be enabled to accept OTP codes from this wrapper.")
|
|
|
|
choice = 'n'
|
|
if sys.stdin.isatty():
|
|
try:
|
|
choice = input("Enable 'IsOtpServer' now? [Y/n]: ").strip().lower()
|
|
except EOFError:
|
|
choice = 'y'
|
|
else:
|
|
print("Non-interactive mode detected. Auto-enabling 'IsOtpServer'.")
|
|
choice = 'y'
|
|
|
|
if choice in ('', 'y', 'yes'):
|
|
config.set('Main', 'IsOtpServer', 'true')
|
|
try:
|
|
with open(ini_path, 'w') as f:
|
|
config.write(f)
|
|
print("Updated launcher.ini successfully.")
|
|
except OSError as e:
|
|
print(f"Error writing to {ini_path}: {e}")
|
|
else:
|
|
print("Warning: OTP injection will likely fail without this setting.")
|
|
|
|
def detect_launch_command():
|
|
"""Detects available XIVLauncher installations."""
|
|
commands = []
|
|
|
|
# Check for Flatpak
|
|
if shutil.which('flatpak'):
|
|
try:
|
|
# Check if installed
|
|
result = subprocess.run(['flatpak', 'list', '--app', '--columns=application'],
|
|
capture_output=True, text=True)
|
|
if 'dev.goats.xivlauncher' in result.stdout:
|
|
commands.append(('Flatpak', 'flatpak run dev.goats.xivlauncher'))
|
|
|
|
# Check for XL Core flatpak (sometimes named differently? no, usually dev.goats.xivlauncher)
|
|
except Exception:
|
|
pass
|
|
|
|
# Check for Native
|
|
# Common names for native packages
|
|
for cmd in ['xivlauncher', 'xivlauncher-core', 'xivlauncher-rb']:
|
|
path = shutil.which(cmd)
|
|
if path:
|
|
commands.append(('Native', path))
|
|
|
|
return commands
|
|
|
|
return commands
|
|
|
|
def get_secret(config, config_path):
|
|
"""
|
|
Retrieves the OTP secret in the following priority:
|
|
1. System Keyring (SERVICE_NAME, USERNAME) - if available
|
|
2. config.json - if Keyring unavailable or secret found there (and Keyring unavailable for migration)
|
|
3. User Input - prompts user and saves to Keyring (if available) or config.json.
|
|
"""
|
|
SERVICE_NAME = "XIVLauncherWrapper"
|
|
USERNAME = "OTP_SECRET"
|
|
|
|
# 1. Try Keyring
|
|
if HAS_KEYRING:
|
|
try:
|
|
secret = keyring.get_password(SERVICE_NAME, USERNAME)
|
|
if secret:
|
|
return secret
|
|
except Exception as e:
|
|
print(f"Warning: Failed to access keyring: {e}")
|
|
|
|
# 2. Try Config
|
|
if config and "secret" in config:
|
|
secret = config["secret"]
|
|
if secret and secret != "YOUR_BASE32_SECRET_HERE":
|
|
if HAS_KEYRING:
|
|
print("Migrating secret from config.json to system keyring...")
|
|
try:
|
|
keyring.set_password(SERVICE_NAME, USERNAME, secret)
|
|
|
|
# Remove from config
|
|
del config["secret"]
|
|
with open(config_path, 'w') as f:
|
|
json.dump(config, f, indent=4)
|
|
print("Secret migrated successfully and removed from config.json.")
|
|
return secret
|
|
except Exception as e:
|
|
print(f"Error migrating to keyring: {e}")
|
|
print("Continuing with secret from config...")
|
|
return secret
|
|
else:
|
|
# No keyring, just use config
|
|
return secret
|
|
|
|
# 3. Prompt User
|
|
if HAS_KEYRING:
|
|
print("\nOTP Secret not found in keyring.")
|
|
else:
|
|
print("\nOTP Secret not found in config.json and keyring module is missing.")
|
|
|
|
print("Attempting GUI prompt...")
|
|
secret_input = prompt_gui_secret()
|
|
|
|
if not secret_input:
|
|
print("GUI prompt unavailable or cancelled. Falling back to CLI.")
|
|
if not sys.stdin.isatty():
|
|
print("Error: No interactive terminal available and GUI prompt failed. Cannot prompt for secret.")
|
|
return None
|
|
|
|
print("Please enter your TOTP Secret (base32) or otpauth:// URL.")
|
|
while True:
|
|
try:
|
|
secret_input = getpass.getpass("Secret: ").strip()
|
|
if not secret_input:
|
|
print("Secret cannot be empty.")
|
|
continue
|
|
break
|
|
except KeyboardInterrupt:
|
|
print("\nOperation cancelled.")
|
|
sys.exit(1)
|
|
except EOFError:
|
|
print("\nError: EOF reading secret.")
|
|
return None
|
|
|
|
# Basic validation/parsing check
|
|
parsed = parse_secret(secret_input)
|
|
while not parsed:
|
|
print("Invalid secret format.")
|
|
secret_input = getpass.getpass("Secret: ").strip()
|
|
parsed = parse_secret(secret_input)
|
|
|
|
if HAS_KEYRING:
|
|
try:
|
|
keyring.set_password(SERVICE_NAME, USERNAME, parsed)
|
|
print("Secret saved to system keyring.")
|
|
return parsed
|
|
except Exception as e:
|
|
print(f"Error saving to keyring: {e}")
|
|
print("Proceeding with in-memory secret for this session.")
|
|
return parsed
|
|
else:
|
|
# Save to config.json
|
|
try:
|
|
config['secret'] = parsed
|
|
with open(config_path, 'w') as f:
|
|
json.dump(config, f, indent=4)
|
|
print(f"Secret saved to {config_path}")
|
|
return parsed
|
|
except Exception as e:
|
|
print(f"Error saving to config.json: {e}")
|
|
return parsed
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="XIVLauncher Wrapper")
|
|
parser.add_argument('--prelaunch', action='store_true', help="Run as a prelaunch script (spawns injector and exits)")
|
|
parser.add_argument('--inject-only', action='store_true', help=argparse.SUPPRESS)
|
|
args, unknown_args = parser.parse_known_args()
|
|
|
|
# Auto-detect if running from a prelaunch.d directory
|
|
script_path = os.path.abspath(sys.argv[0])
|
|
if 'prelaunch.d' in script_path.split(os.sep):
|
|
args.prelaunch = True
|
|
|
|
# Determine config path (XDG standard)
|
|
xdg_config_home = os.environ.get('XDG_CONFIG_HOME', os.path.join(os.path.expanduser('~'), '.config'))
|
|
config_dir = os.path.join(xdg_config_home, 'xivlauncher-wrapper')
|
|
config_path = os.path.join(config_dir, 'config.json')
|
|
|
|
# Fallback to script directory (legacy/development)
|
|
if not os.path.exists(config_path):
|
|
script_dir = os.path.dirname(os.path.realpath(__file__))
|
|
local_config = os.path.join(script_dir, "config.json")
|
|
if os.path.exists(local_config):
|
|
config_path = local_config
|
|
|
|
if not os.path.exists(config_path):
|
|
print(f"Config file not found. Creating default empty configuration at {config_path}...")
|
|
os.makedirs(config_dir, exist_ok=True)
|
|
config = {"launcher_cmd": ""}
|
|
try:
|
|
with open(config_path, 'w') as f:
|
|
json.dump(config, f, indent=4)
|
|
except OSError as e:
|
|
print(f"Error creating config file: {e}")
|
|
sys.exit(1)
|
|
else:
|
|
try:
|
|
with open(config_path, 'r') as f:
|
|
config = json.load(f)
|
|
except json.JSONDecodeError:
|
|
print(f"Error parsing {config_path}. Invalid JSON.")
|
|
sys.exit(1)
|
|
|
|
# 1. Check and Update launcher.ini
|
|
check_and_update_launcher_ini()
|
|
|
|
cmd = config.get("launcher_cmd")
|
|
config_dirty = False
|
|
cmd_args = None
|
|
|
|
# Determine command to launch (if not prelaunch/inject-only)
|
|
if not args.prelaunch and not args.inject_only:
|
|
if unknown_args:
|
|
# Command passed via args (Steam %command% mode)
|
|
cmd_args = unknown_args
|
|
cmd = " ".join(shlex.quote(a) for a in cmd_args)
|
|
else:
|
|
# 2. Auto-detect command if missing
|
|
if not cmd:
|
|
print("\n'launcher_cmd' not set in config. detecting installed XIVLauncher versions...")
|
|
available = detect_launch_command()
|
|
|
|
if not available:
|
|
print("Error: No XIVLauncher installation found (Flatpak or Native).")
|
|
print("Please install XIVLauncher or manually set 'launcher_cmd' in config.json.")
|
|
sys.exit(1)
|
|
|
|
elif len(available) >= 1:
|
|
name, command = available[0]
|
|
if len(available) > 1:
|
|
print(f"Multiple XIVLauncher installations found. Automatically selecting first available: {name}")
|
|
else:
|
|
print(f"Detected {name}: {command}")
|
|
cmd = command
|
|
config['launcher_cmd'] = cmd
|
|
config_dirty = True
|
|
|
|
# 2.5 Auto-detect gamemode/game-performance
|
|
if "use_gamemode" not in config:
|
|
gamemode_cmd = None
|
|
if shutil.which("game-performance"):
|
|
gamemode_cmd = "game-performance"
|
|
elif shutil.which("gamemoderun"):
|
|
gamemode_cmd = "gamemoderun"
|
|
|
|
if gamemode_cmd:
|
|
print(f"\n'{gamemode_cmd}' was detected on your system.")
|
|
print("Would you like to enable it for XIVLauncher? This can improve game performance.")
|
|
try:
|
|
if sys.stdin.isatty():
|
|
choice = input(f"Enable {gamemode_cmd}? [Y/n]: ").strip().lower()
|
|
else:
|
|
choice = 'y' # auto-enable if not interactive
|
|
except (EOFError, KeyboardInterrupt):
|
|
choice = 'n'
|
|
print("\nPrompt interrupted, defaulting to No.")
|
|
|
|
if choice in ('', 'y', 'yes'):
|
|
config["use_gamemode"] = True
|
|
config["gamemode_cmd"] = gamemode_cmd
|
|
else:
|
|
config["use_gamemode"] = False
|
|
config_dirty = True
|
|
|
|
# Save config if updated (e.g. launcher_cmd detected)
|
|
if config_dirty:
|
|
try:
|
|
with open(config_path, 'w') as f:
|
|
json.dump(config, f, indent=4)
|
|
print(f"Updated configuration file: {config_path}")
|
|
except OSError as e:
|
|
print(f"Warning: Could not save updated config: {e}")
|
|
|
|
# Retrieve Secret (Keyring refactor)
|
|
secret = get_secret(config, config_path)
|
|
|
|
if not secret:
|
|
print("Error: No secret available.")
|
|
sys.exit(1)
|
|
|
|
if args.inject_only:
|
|
# Only run the OTP injection loop and exit
|
|
send_otp(secret)
|
|
sys.exit(0)
|
|
|
|
if args.prelaunch:
|
|
print("Running in prelaunch mode. Spawning OTP injector in background...")
|
|
# Spawn ourselves in background with --inject-only
|
|
subprocess.Popen([sys.executable, os.path.realpath(__file__), '--inject-only'],
|
|
start_new_session=True,
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL)
|
|
sys.exit(0)
|
|
|
|
print(f"Launch Command: {cmd}")
|
|
|
|
# 3. Launch Logic
|
|
if not cmd_args:
|
|
cmd_args = shlex.split(cmd)
|
|
|
|
# Prepend gamemode wrapper if enabled
|
|
if config.get("use_gamemode"):
|
|
g_cmd = config.get("gamemode_cmd")
|
|
if not g_cmd:
|
|
if shutil.which("game-performance"):
|
|
g_cmd = "game-performance"
|
|
elif shutil.which("gamemoderun"):
|
|
g_cmd = "gamemoderun"
|
|
if g_cmd and shutil.which(g_cmd):
|
|
cmd_args.insert(0, g_cmd)
|
|
print(f"Using game performance wrapper: {g_cmd}")
|
|
|
|
try:
|
|
# Inject XL_SECRET_PROVIDER=file environment variable
|
|
env = os.environ.copy()
|
|
env['XL_SECRET_PROVIDER'] = 'file'
|
|
|
|
proc = subprocess.Popen(cmd_args, env=env)
|
|
except FileNotFoundError:
|
|
print(f"Error: Command executable not found: {cmd_args[0] if cmd_args else cmd}")
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
print(f"Error launching command '{cmd}': {e}")
|
|
sys.exit(1)
|
|
|
|
# Start the OTP injector in a background thread
|
|
injector_thread = threading.Thread(target=send_otp, args=(secret,))
|
|
injector_thread.daemon = True
|
|
injector_thread.start()
|
|
|
|
try:
|
|
# Wait for the launcher to exit
|
|
proc.wait()
|
|
except KeyboardInterrupt:
|
|
print("\nStopping wrapper...")
|
|
proc.terminate()
|
|
try:
|
|
proc.wait(timeout=5)
|
|
except subprocess.TimeoutExpired:
|
|
proc.kill()
|
|
|
|
sys.exit(proc.returncode)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|