Files
proxmox-templates/rotate_lldap_full.py
2026-01-02 13:00:39 -08:00

156 lines
5.5 KiB
Python

import os
import json
import subprocess
import re
import time
import getpass
# --- CONFIGURATION ---
ENV_FILE = "/root/secrets/lldap.env"
IMPORT_SCRIPT_PATH = "/root/proxmox-vaultwarden/import_secrets.py"
# !!! UPDATE THESE IDs !!!
VAULTWARDEN_LXC_ID = "104" # Your Vaultwarden Container ID
LLDAP_LXC_ID = "126" # <--- REPLACE THIS with your LLDAP Container ID
def run_pct_command(lxc_id, args):
"""Helper to run pct commands cleanly."""
cmd = ["pct", "exec", lxc_id, "--"] + args
return subprocess.run(cmd, capture_output=True, text=True)
def generate_password(length=25):
"""Generates a password by calling 'bw generate' inside the LXC."""
# We use Vaultwarden container to generate the password
res = run_pct_command(VAULTWARDEN_LXC_ID, ["/usr/local/bin/bw", "generate", "-ulns", "--length", str(length)])
if res.returncode != 0:
print(f"Error generating password: {res.stderr}")
return None
return res.stdout.strip()
def sync_to_vaultwarden():
"""Mounts secrets, unlocks vault, and triggers import."""
print("\n--- Syncing to Vaultwarden ---")
password = getpass.getpass("Enter Vaultwarden Master Password to unlock (or leave empty to skip): ")
if not password:
print("Skipping Vaultwarden sync.")
return
print(" 1. Unlocking Vault...")
unlock_res = run_pct_command(VAULTWARDEN_LXC_ID, ["/usr/local/bin/bw", "unlock", password, "--raw"])
if unlock_res.returncode != 0:
print(f" Failed to unlock: {unlock_res.stderr}")
return
session_key = unlock_res.stdout.strip()
print(" 2. Mounting secrets...")
subprocess.run(["pct", "set", VAULTWARDEN_LXC_ID, "-mp0", "/root/secrets,mp=/mnt/secrets,ro=1"], capture_output=True)
time.sleep(1)
try:
print(" 3. Running import script...")
bash_cmd = f"export BW_SESSION='{session_key}'; python3 {IMPORT_SCRIPT_PATH}"
sync_res = run_pct_command(VAULTWARDEN_LXC_ID, ["bash", "-c", bash_cmd])
if sync_res.returncode == 0:
print(" SUCCESS: Secrets synced to Vaultwarden.")
else:
print(f" ERROR during import: {sync_res.stderr}")
finally:
print(" 4. Unmounting secrets...")
subprocess.run(["pct", "set", VAULTWARDEN_LXC_ID, "-mp0", "--delete"], capture_output=True)
def restart_and_provision_lldap():
"""Restarts LLDAP to load new env, then runs provisioning script."""
print("\n--- Applying Changes to LLDAP ---")
# 1. Restart Container
#print(f" 1. Restarting LLDAP Container ({LLDAP_LXC_ID}) to load new .env...")
#subprocess.run(["pct", "restart", LLDAP_LXC_ID], check=True)
# 2. Wait for boot
#print(" 2. Waiting 10 seconds for LLDAP services to initialize...")
#time.sleep(10)
# 3. Run Provisioning Script
print(" 3. Running 'provision_from_env.py' inside container...")
# Assumes the script is in the default working dir. If not, provide full path (e.g. /app/provision_from_env.py)
prov_res = run_pct_command(LLDAP_LXC_ID, ["python3", "/root/proxmox-lldap/provision_from_env.py"])
if prov_res.returncode == 0:
print(" SUCCESS: Provisioning script completed.")
print(prov_res.stdout)
else:
print(" FAILURE: Provisioning script crashed.")
print(f" Error: {prov_res.stderr}")
print(f" Output: {prov_res.stdout}")
def load_env_file(filepath):
with open(filepath, 'r') as f:
return f.read()
def main():
if not os.path.exists(ENV_FILE):
print(f"Error: {ENV_FILE} not found.")
return
print(f"Reading {ENV_FILE}...")
content = load_env_file(ENV_FILE)
# --- STEP A: Rotate Admin Passwords ---
new_pass_match = re.search(r'LLDAP_NEW_PASS=["\'](.*?)["\']', content)
if new_pass_match:
old_val = new_pass_match.group(1)
fresh_admin_pass = generate_password(32)
print(f" [Admin] Rotating password...")
content = re.sub(r'LLDAP_OLD_PASS=["\'].*?["\']', f'LLDAP_OLD_PASS="{old_val}"', content)
content = re.sub(r'LLDAP_NEW_PASS=["\'].*?["\']', f'LLDAP_NEW_PASS="{fresh_admin_pass}"', content)
else:
print("Warning: Could not find LLDAP_NEW_PASS variable.")
# --- STEP B: Rotate User JSON Passwords ---
json_match = re.search(r"USER_JSON='(.*?)'", content, re.DOTALL)
if not json_match:
json_match = re.search(r'USER_JSON="(.*?)"', content, re.DOTALL)
if json_match:
raw_json = json_match.group(1)
try:
users = json.loads(raw_json)
print(f" [Users] Found {len(users)} users. Rotating passwords...")
for user in users:
user['password'] = generate_password(24)
new_json_str = json.dumps(users)
old_block = json_match.group(0)
quote_char = "'" if old_block.startswith("USER_JSON='") else '"'
new_block = f"USER_JSON={quote_char}{new_json_str}{quote_char}"
content = content.replace(old_block, new_block)
except json.JSONDecodeError as e:
print(f"Error parsing USER_JSON: {e}")
# --- STEP C: Write Back ---
with open(ENV_FILE, 'w') as f:
f.write(content)
print("Success! .env file updated.")
# --- STEP E: Restart & Provision LLDAP ---
restart_and_provision_lldap()
# --- STEP D: Sync to Vaultwarden ---
sync_to_vaultwarden()
print("---------------------------------------------------")
print("Rotation Complete.")
if __name__ == "__main__":
main()