156 lines
5.5 KiB
Python
156 lines
5.5 KiB
Python
import os
|
|
import json
|
|
import subprocess
|
|
import re
|
|
import time
|
|
import getpass
|
|
|
|
# --- CONFIGURATION ---
|
|
ENV_FILE = "/root/secrets/lldap.env"
|
|
IMPORT_SCRIPT_PATH = "/root/proxmox-vaultwarden/import_secrets.py"
|
|
|
|
# !!! UPDATE THESE IDs !!!
|
|
VAULTWARDEN_LXC_ID = "104" # Your Vaultwarden Container ID
|
|
LLDAP_LXC_ID = "126" # <--- REPLACE THIS with your LLDAP Container ID
|
|
|
|
def run_pct_command(lxc_id, args):
|
|
"""Helper to run pct commands cleanly."""
|
|
cmd = ["pct", "exec", lxc_id, "--"] + args
|
|
return subprocess.run(cmd, capture_output=True, text=True)
|
|
|
|
def generate_password(length=25):
|
|
"""Generates a password by calling 'bw generate' inside the LXC."""
|
|
# We use Vaultwarden container to generate the password
|
|
res = run_pct_command(VAULTWARDEN_LXC_ID, ["/usr/local/bin/bw", "generate", "-ulns", "--length", str(length)])
|
|
if res.returncode != 0:
|
|
print(f"Error generating password: {res.stderr}")
|
|
return None
|
|
return res.stdout.strip()
|
|
|
|
def sync_to_vaultwarden():
|
|
"""Mounts secrets, unlocks vault, and triggers import."""
|
|
print("\n--- Syncing to Vaultwarden ---")
|
|
|
|
password = getpass.getpass("Enter Vaultwarden Master Password to unlock (or leave empty to skip): ")
|
|
if not password:
|
|
print("Skipping Vaultwarden sync.")
|
|
return
|
|
|
|
print(" 1. Unlocking Vault...")
|
|
unlock_res = run_pct_command(VAULTWARDEN_LXC_ID, ["/usr/local/bin/bw", "unlock", password, "--raw"])
|
|
|
|
if unlock_res.returncode != 0:
|
|
print(f" Failed to unlock: {unlock_res.stderr}")
|
|
return
|
|
|
|
session_key = unlock_res.stdout.strip()
|
|
|
|
print(" 2. Mounting secrets...")
|
|
subprocess.run(["pct", "set", VAULTWARDEN_LXC_ID, "-mp0", "/root/secrets,mp=/mnt/secrets,ro=1"], capture_output=True)
|
|
time.sleep(1)
|
|
|
|
try:
|
|
print(" 3. Running import script...")
|
|
bash_cmd = f"export BW_SESSION='{session_key}'; python3 {IMPORT_SCRIPT_PATH}"
|
|
sync_res = run_pct_command(VAULTWARDEN_LXC_ID, ["bash", "-c", bash_cmd])
|
|
|
|
if sync_res.returncode == 0:
|
|
print(" SUCCESS: Secrets synced to Vaultwarden.")
|
|
else:
|
|
print(f" ERROR during import: {sync_res.stderr}")
|
|
|
|
finally:
|
|
print(" 4. Unmounting secrets...")
|
|
subprocess.run(["pct", "set", VAULTWARDEN_LXC_ID, "-mp0", "--delete"], capture_output=True)
|
|
|
|
def restart_and_provision_lldap():
|
|
"""Restarts LLDAP to load new env, then runs provisioning script."""
|
|
print("\n--- Applying Changes to LLDAP ---")
|
|
|
|
# 1. Restart Container
|
|
#print(f" 1. Restarting LLDAP Container ({LLDAP_LXC_ID}) to load new .env...")
|
|
#subprocess.run(["pct", "restart", LLDAP_LXC_ID], check=True)
|
|
|
|
# 2. Wait for boot
|
|
#print(" 2. Waiting 10 seconds for LLDAP services to initialize...")
|
|
#time.sleep(10)
|
|
|
|
# 3. Run Provisioning Script
|
|
print(" 3. Running 'provision_from_env.py' inside container...")
|
|
# Assumes the script is in the default working dir. If not, provide full path (e.g. /app/provision_from_env.py)
|
|
prov_res = run_pct_command(LLDAP_LXC_ID, ["python3", "/root/proxmox-lldap/provision_from_env.py"])
|
|
|
|
if prov_res.returncode == 0:
|
|
print(" SUCCESS: Provisioning script completed.")
|
|
print(prov_res.stdout)
|
|
else:
|
|
print(" FAILURE: Provisioning script crashed.")
|
|
print(f" Error: {prov_res.stderr}")
|
|
print(f" Output: {prov_res.stdout}")
|
|
|
|
def load_env_file(filepath):
|
|
with open(filepath, 'r') as f:
|
|
return f.read()
|
|
|
|
def main():
|
|
if not os.path.exists(ENV_FILE):
|
|
print(f"Error: {ENV_FILE} not found.")
|
|
return
|
|
|
|
print(f"Reading {ENV_FILE}...")
|
|
content = load_env_file(ENV_FILE)
|
|
|
|
# --- STEP A: Rotate Admin Passwords ---
|
|
new_pass_match = re.search(r'LLDAP_NEW_PASS=["\'](.*?)["\']', content)
|
|
|
|
if new_pass_match:
|
|
old_val = new_pass_match.group(1)
|
|
fresh_admin_pass = generate_password(32)
|
|
print(f" [Admin] Rotating password...")
|
|
|
|
content = re.sub(r'LLDAP_OLD_PASS=["\'].*?["\']', f'LLDAP_OLD_PASS="{old_val}"', content)
|
|
content = re.sub(r'LLDAP_NEW_PASS=["\'].*?["\']', f'LLDAP_NEW_PASS="{fresh_admin_pass}"', content)
|
|
else:
|
|
print("Warning: Could not find LLDAP_NEW_PASS variable.")
|
|
|
|
# --- STEP B: Rotate User JSON Passwords ---
|
|
json_match = re.search(r"USER_JSON='(.*?)'", content, re.DOTALL)
|
|
if not json_match:
|
|
json_match = re.search(r'USER_JSON="(.*?)"', content, re.DOTALL)
|
|
|
|
if json_match:
|
|
raw_json = json_match.group(1)
|
|
try:
|
|
users = json.loads(raw_json)
|
|
print(f" [Users] Found {len(users)} users. Rotating passwords...")
|
|
|
|
for user in users:
|
|
user['password'] = generate_password(24)
|
|
|
|
new_json_str = json.dumps(users)
|
|
old_block = json_match.group(0)
|
|
quote_char = "'" if old_block.startswith("USER_JSON='") else '"'
|
|
new_block = f"USER_JSON={quote_char}{new_json_str}{quote_char}"
|
|
|
|
content = content.replace(old_block, new_block)
|
|
|
|
except json.JSONDecodeError as e:
|
|
print(f"Error parsing USER_JSON: {e}")
|
|
|
|
# --- STEP C: Write Back ---
|
|
with open(ENV_FILE, 'w') as f:
|
|
f.write(content)
|
|
print("Success! .env file updated.")
|
|
|
|
# --- STEP E: Restart & Provision LLDAP ---
|
|
restart_and_provision_lldap()
|
|
|
|
# --- STEP D: Sync to Vaultwarden ---
|
|
sync_to_vaultwarden()
|
|
|
|
print("---------------------------------------------------")
|
|
print("Rotation Complete.")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|