Files
proxmox-lldap/provision_from_env.py
2026-01-02 23:44:10 -08:00

352 lines
12 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import json
import requests
import sys
import time
import subprocess
import psycopg2
import base64
import shutil
# --- CONFIGURATION ---
SECRETS_FILE = "/mnt/secrets/lldap.env"
POSTGRES_FILE = "/mnt/secrets/postgres.env"
REDIS_SECRETS_FILE = "/mnt/secrets/redis.env"
LLDAP_CONFIG_FILE = "/etc/lldap/lldap_config.toml"
SERVER_KEY_FILE = "/var/lib/lldap/server_key"
HTTP_URL = "http://localhost:17170"
LDAP_DOMAIN = "ldap.poppyglen.cc"
REDIS_HOST = "192.168.0.120"
REDIS_PORT = "6379"
REDIS_DB = "2"
def load_env_file(filepath):
config = {}
if not os.path.exists(filepath):
# Redis file is optional (we might not be using it if testing locally)
if filepath == REDIS_SECRETS_FILE:
return {}
print(f"Error: Secrets file not found at {filepath}")
sys.exit(1)
with open(filepath, 'r') as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'): continue
if '=' in line:
key, value = line.split('=', 1)
if (value.startswith("'") and value.endswith("'")) or \
(value.startswith('"') and value.endswith('"')):
value = value[1:-1]
config[key.strip()] = value
return config
def fetch_real_certs():
print(f"--- 🔐 FETCHING CERTS FOR {LDAP_DOMAIN} ---")
dest_dir = "/etc/lldap/certs"
os.makedirs(dest_dir, exist_ok=True)
# Load Redis Auth
redis_env = load_env_file(REDIS_SECRETS_FILE)
redis_pass = redis_env.get("REDIS_PASSWORD")
if not shutil.which("redis-cli"):
print("❌ Error: 'redis-cli' is missing. Cannot fetch certs.")
sys.exit(1)
# Keys exactly as Caddy stores them
cert_redis_key = f"caddy/certificates/acme-v02.api.letsencrypt.org-directory/{LDAP_DOMAIN}/{LDAP_DOMAIN}.crt"
priv_redis_key = f"caddy/certificates/acme-v02.api.letsencrypt.org-directory/{LDAP_DOMAIN}/{LDAP_DOMAIN}.key"
cmd_base = ["redis-cli", "-h", REDIS_HOST, "-p", REDIS_PORT, "-n", REDIS_DB, "--raw"]
if redis_pass:
cmd_base.extend(["-a", redis_pass])
success = True
for label, r_key, filename in [("Certificate", cert_redis_key, "cert.pem"), ("Private Key", priv_redis_key, "key.pem")]:
try:
# 1. Fetch raw JSON string from Redis
res = subprocess.run(cmd_base + ["GET", r_key], capture_output=True, text=True)
if res.returncode != 0:
print(f"❌ Redis Error fetching {label}: {res.stderr}")
success = False
continue
raw_output = res.stdout.strip()
if not raw_output:
print(f"❌ Error: Key '{r_key}' not found in Redis. Has Caddy generated it yet?")
success = False
continue
# 2. Parse JSON
# Caddy Redis storage wraps the data in a JSON object: {"value": "base64..."}
data = json.loads(raw_output)
b64_val = data['value']
# 3. Decode Base64
file_bytes = base64.b64decode(b64_val)
# 4. Write to file
out_path = os.path.join(dest_dir, filename)
with open(out_path, "wb") as f:
f.write(file_bytes)
# Permissions: Key must be restrictive
if filename == "key.pem":
os.chmod(out_path, 0o640)
else:
os.chmod(out_path, 0o644)
print(f"{label} saved to {out_path}")
except json.JSONDecodeError:
print(f"❌ Error: Redis returned invalid JSON for {label}. Is the key correct?")
success = False
except Exception as e:
print(f"❌ Error processing {label}: {e}")
success = False
return success
def wipe_state(pg_env):
print("--- 🧹 WIPING STATE 🧹 ---")
if os.path.exists(SERVER_KEY_FILE):
try:
os.remove(SERVER_KEY_FILE)
print("Server key file removed.")
except OSError:
pass
print("Wiping Database Tables...")
try:
conn = psycopg2.connect(
dbname=pg_env.get("DB_NAME"),
user=pg_env.get("DB_USER"),
password=pg_env.get("DB_PASS"),
host=pg_env.get("DB_HOST"),
port=pg_env.get("DB_PORT", "5432")
)
conn.autocommit = True
cur = conn.cursor()
tables = ["users", "groups", "user_groups", "token", "schema_migrations"]
for table in tables:
cur.execute(f"DROP TABLE IF EXISTS {table} CASCADE;")
print("✅ Database tables dropped.")
cur.close()
conn.close()
except Exception as e:
print(f"❌ Database Error: {e}")
sys.exit(1)
def setup_systemd_overrides(env, admin_pass, force_reset=False):
override_dir = "/etc/systemd/system/lldap.service.d"
os.makedirs(override_dir, exist_ok=True)
mode_str = "RESET MODE" if force_reset else "RUN MODE"
print(f"Configuring Systemd ({mode_str})...")
with open(f"{override_dir}/override.conf", 'w') as f:
f.write("[Service]\n")
f.write(f"Environment=\"LLDAP_JWT_SECRET={env.get('LLDAP_JWT_SECRET')}\"\n")
f.write(f"Environment=\"LLDAP_LDAP_USER_PASS={admin_pass}\"\n")
if force_reset:
f.write("Environment=\"LLDAP_FORCE_LDAP_USER_PASS_RESET=true\"\n")
else:
f.write("Environment=\"LLDAP_FORCE_LDAP_USER_PASS_RESET=false\"\n")
f.write("Environment=\"LLDAP_FORCE_UPDATE_PRIVATE_KEY=false\"\n")
subprocess.run(["systemctl", "daemon-reload"], check=True)
def write_lldap_config(env, pg_env):
print("Writing LLDAP Config...")
base_dn = env.get("LLDAP_LDAP_BASE_DN", env.get("LLDAP_BASE_DN", "dc=example,dc=com"))
db_uri = f"postgres://{pg_env.get('DB_USER')}:{pg_env.get('DB_PASS')}@{pg_env.get('DB_HOST')}:{pg_env.get('DB_PORT')}/{pg_env.get('DB_NAME')}"
config_content = f"""
[general]
base_dn = "{base_dn}"
key_file = "server_key"
[database]
type = "postgres"
uri = "{db_uri}"
# --- LDAPS CONFIGURATION ---
[ldaps]
enabled = true
port = 6360
cert_file = "/etc/lldap/certs/cert.pem"
key_file = "/etc/lldap/certs/key.pem"
[mail]
host = "{env.get('SMTP_HOST', '')}"
port = {env.get('SMTP_PORT', 587)}
user = "{env.get('SMTP_USER', '')}"
password = "{env.get('SMTP_PASS', '')}"
from = "{env.get('SMTP_FROM', 'LLDAP <notify@example.com>')}"
reply_to = "{env.get('SMTP_REPLY_TO', '')}"
[ldap_user]
id = "{env.get('ADMIN_USER', 'admin')}"
email = "{env.get('ADMIN_EMAIL', 'admin@example.com')}"
"""
with open(LLDAP_CONFIG_FILE, "w") as f:
f.write(config_content)
def wait_for_lldap():
print("Waiting for LLDAP to start...")
for _ in range(30):
try:
requests.get(HTTP_URL, timeout=1)
print("✅ LLDAP is Online.")
return True
except:
time.sleep(1)
sys.stdout.write(".")
sys.stdout.flush()
print("\n❌ LLDAP Timed Out.")
return False
def get_token(username, password):
try:
resp = requests.post(f"{HTTP_URL}/auth/simple/login", json={"username": username, "password": password})
if resp.status_code == 200:
return resp.cookies
except Exception as e:
print(f"Connection error: {e}")
return None
def change_password_rest(cookies, user_id, new_pass):
url = f"{HTTP_URL}/api/v1/users/{user_id}/password"
try:
resp = requests.post(url, cookies=cookies, json={"password": new_pass})
if resp.status_code == 200:
print(f" ✅ Password updated via REST API for '{user_id}'")
return True
else:
print(f" ❌ Failed to update password for '{user_id}': {resp.text}")
return False
except Exception as e:
print(f" ❌ API Error: {e}")
return False
def get_group_map(cookies):
query = """query { groups { id displayName } }"""
resp = requests.post(f"{HTTP_URL}/api/graphql", cookies=cookies, json={"query": query})
data = resp.json()
group_map = {}
if 'data' in data and 'groups' in data['data']:
for g in data['data']['groups']:
group_map[g['displayName']] = g['id']
return group_map
def create_group_if_missing(cookies, group_name):
query = """mutation CreateGroup($name: String!) { createGroup(name: $name) { id } }"""
resp = requests.post(f"{HTTP_URL}/api/graphql", cookies=cookies, json={"query": query, "variables": {"name": group_name}})
data = resp.json()
if 'data' in data and 'createGroup' in data['data']:
return data['data']['createGroup']['id']
return None
def add_user_to_group(cookies, user_id, group_id):
query = """mutation AddUserToGroup($userId: String!, $groupId: Int!) { addUserToGroup(userId: $userId, groupId: $groupId) { ok } }"""
requests.post(f"{HTTP_URL}/api/graphql", cookies=cookies, json={"query": query, "variables": {"userId": user_id, "groupId": group_id}})
def create_user(cookies, user_data, group_map):
user_id = user_data['id']
print(f"Processing {user_id}...")
query = """mutation CreateUser($user: CreateUserInput!) { createUser(user: $user) { id } }"""
variables = {
"user": {
"id": user_id,
"email": user_data['email'],
"displayName": f"{user_data.get('first_name','')} {user_data.get('last_name','')}".strip(),
"firstName": user_data.get('first_name',''),
"lastName": user_data.get('last_name','')
}
}
# FIX: Send request exactly ONCE
resp = requests.post(f"{HTTP_URL}/api/graphql", cookies=cookies, json={"query": query, "variables": variables})
if "errors" in resp.json():
print(f" User already exists (Skipping creation).")
else:
print(f" ✅ User created.")
if 'password' in user_data:
change_password_rest(cookies, user_id, user_data['password'])
for g_name in user_data.get('groups', []):
if g_name not in group_map:
new_id = create_group_if_missing(cookies, g_name)
if new_id: group_map[g_name] = new_id
if g_name in group_map:
add_user_to_group(cookies, user_id, group_map[g_name])
# --- MAIN EXECUTION ---
if __name__ == "__main__":
print("--- STARTING FRESH LLDAP PROVISIONING (TWO-STAGE START + REDIS CERTS) ---")
env = load_env_file(SECRETS_FILE)
pg_env = load_env_file(POSTGRES_FILE)
admin_user = env.get("ADMIN_USER", "admin")
admin_pass = env.get("ADMIN_PASS")
if not admin_pass:
print("❌ CRITICAL: ADMIN_PASS not found in lldap.env")
sys.exit(1)
print("Stopping LLDAP service...")
subprocess.run(["systemctl", "stop", "lldap"], check=False)
# 1. Fetch Real Certs first (so config can point to them)
if not fetch_real_certs():
print("⚠️ Warning: Failed to fetch real certs. LDAPS might fail if config points to missing files.")
wipe_state(pg_env)
write_lldap_config(env, pg_env)
# --- STAGE 1: FORCE RESET (Starts, Resets Password, and Dies) ---
print("\n[Stage 1] Forcing Password Reset...")
setup_systemd_overrides(env, admin_pass, force_reset=True)
subprocess.run(["systemctl", "start", "lldap"], check=False)
time.sleep(5)
print("[Stage 1] Reset should be complete.")
# --- STAGE 2: NORMAL RUN (Starts and Stays Alive) ---
print("\n[Stage 2] Starting for Production...")
setup_systemd_overrides(env, admin_pass, force_reset=False)
subprocess.run(["systemctl", "start", "lldap"], check=True)
if not wait_for_lldap():
sys.exit(1)
print(f"\nLogging in as '{admin_user}'...")
cookies = get_token(admin_user, admin_pass)
if not cookies:
print("❌ Authentication Failed! Please check journalctl -u lldap")
sys.exit(1)
print("✅ Authenticated successfully.")
print("Fetching groups...")
group_map = get_group_map(cookies)
user_json_str = env.get("USER_JSON", "[]")
try:
users = json.loads(user_json_str)
except:
users = []
for user in users:
create_user(cookies, user, group_map)
print("--- 🏁 Provisioning Complete 🏁 ---")