rotating passwords

This commit is contained in:
root
2026-01-02 12:38:34 -08:00
parent a153d93231
commit 308d06fcb3
4 changed files with 213 additions and 149 deletions

View File

@@ -1,45 +0,0 @@
import requests
# CONFIG
URL = "http://localhost:17170" # HTTP port, not LDAP port
ADMIN_USER = "admin"
ADMIN_PASS = "YOUR_ADMIN_PASSWORD"
# LIST OF USERS TO ADD
users = [
{"id": "jdoe", "email": "jdoe@home.arpa", "displayName": "John Doe", "password": "password123"},
{"id": "jsmith", "email": "jsmith@home.arpa", "displayName": "Jane Smith", "password": "password123"},
]
# 1. Login to get Token
session = requests.Session()
auth_payload = {"username": ADMIN_USER, "password": ADMIN_PASS}
login_resp = session.post(f"{URL}/auth/simple/login", json=auth_payload)
if login_resp.status_code != 200:
print("Login Failed")
exit(1)
# 2. Add Users
query = """
mutation CreateUser($user: CreateUserInput!) {
createUser(user: $user) {
id
}
}
"""
for user in users:
variables = {
"user": {
"id": user["id"],
"email": user["email"],
"displayName": user["displayName"],
"password": user["password"]
}
}
resp = session.post(f"{URL}/api/graphql", json={"query": query, "variables": variables})
if "errors" in resp.json():
print(f"Error adding {user['id']}: {resp.json()['errors'][0]['message']}")
else:
print(f"Successfully added {user['id']}")

View File

@@ -3,12 +3,22 @@ import json
import requests
import sys
# Try importing ldap3
try:
from ldap3 import Server, Connection, ALL, MODIFY_REPLACE
LDAP_AVAILABLE = True
except ImportError:
print("CRITICAL: 'ldap3' library is missing.")
print("Please run: pip install ldap3")
sys.exit(1)
# --- CONFIGURATION ---
SECRETS_FILE = "/mnt/secrets/lldap.env"
URL = "http://localhost:17170"
HTTP_URL = "http://localhost:17170"
LDAP_HOST = "localhost"
LDAP_PORT = 3890
def load_env_file(filepath):
"""Manually parses the env file to handle the JSON string safely."""
config = {}
if not os.path.exists(filepath):
print(f"Error: Secrets file not found at {filepath}")
@@ -20,7 +30,6 @@ def load_env_file(filepath):
if not line or line.startswith('#'): continue
if '=' in line:
key, value = line.split('=', 1)
# Remove surrounding quotes if present
if (value.startswith("'") and value.endswith("'")) or \
(value.startswith('"') and value.endswith('"')):
value = value[1:-1]
@@ -28,78 +37,196 @@ def load_env_file(filepath):
return config
def get_token(username, password):
"""Attempts to login and returns session token."""
try:
resp = requests.post(f"{URL}/auth/simple/login", json={"username": username, "password": password})
resp = requests.post(f"{HTTP_URL}/auth/simple/login", json={"username": username, "password": password})
if resp.status_code == 200:
# lldap sets the token in a cookie named 'token'
return resp.cookies
except Exception as e:
print(f"Connection error: {e}")
return None
def change_admin_password(cookies, old_pass, new_pass):
print(">>> Attempting to rotate Admin Password...")
# --- NEW: ROTATE VIA LDAP INSTEAD OF GRAPHQL ---
def change_admin_password_ldap(old_pass, new_pass, base_dn):
print(">>> Attempting to rotate Admin Password via LDAP...")
# We bind as admin to change admin's own password
admin_dn = f"uid=admin,ou=people,{base_dn}"
try:
server = Server(LDAP_HOST, port=LDAP_PORT, get_info=ALL)
# Bind with OLD password first
conn = Connection(server, user=admin_dn, password=old_pass, auto_bind=True)
# Perform standard LDAP Password Modify Extended Operation
# We pass 'old_password' just to be safe, though admins usually don't need to.
success = conn.extend.standard.modify_password(
user=admin_dn,
new_password=new_pass,
old_password=old_pass
)
conn.unbind()
if success:
print("Success: Admin password updated via LDAP.")
return True
else:
print(f"LDAP Rotation Failed: {conn.result}")
return False
except Exception as e:
print(f"LDAP Connection Error during rotation: {e}")
return False
def get_group_map(cookies):
query = """query { groups { id displayName } }"""
resp = requests.post(f"{HTTP_URL}/api/graphql", cookies=cookies, json={"query": query})
data = resp.json()
group_map = {}
if 'data' in data and 'groups' in data['data']:
for g in data['data']['groups']:
group_map[g['displayName']] = g['id']
return group_map
def create_group_if_missing(cookies, group_name):
query = """
mutation ChangePassword($password: String!) {
changePassword(password: $password) {
ok
}
mutation CreateGroup($name: String!) {
createGroup(name: $name) { id }
}
"""
resp = requests.post(
f"{URL}/api/graphql",
f"{HTTP_URL}/api/graphql",
cookies=cookies,
json={"query": query, "variables": {"password": new_pass}}
json={"query": query, "variables": {"name": group_name}}
)
if resp.status_code == 200 and "errors" not in resp.json():
print("Success: Admin password updated.")
return True
else:
print(f"Failed to update password: {resp.text}")
return False
data = resp.json()
if 'data' in data and 'createGroup' in data['data']:
return data['data']['createGroup']['id']
return None
def create_user(cookies, user_data):
def add_user_to_group(cookies, user_id, group_id):
query = """
mutation AddUserToGroup($userId: String!, $groupId: Int!) {
addUserToGroup(userId: $userId, groupId: $groupId) { ok }
}
"""
requests.post(
f"{HTTP_URL}/api/graphql",
cookies=cookies,
json={"query": query, "variables": {"userId": user_id, "groupId": group_id}}
)
def delete_user(cookies, user_id):
print(f" -> Deleting '{user_id}' to ensure clean state...")
query = """
mutation DeleteUser($userId: String!) {
deleteUser(userId: $userId) { ok }
}
"""
requests.post(
f"{HTTP_URL}/api/graphql",
cookies=cookies,
json={"query": query, "variables": {"userId": user_id}}
)
def set_user_password_ldap(user_id, password, base_dn, admin_user, admin_pass):
print(f" -> Setting password for {user_id} via LDAP...")
user_dn = f"uid={user_id},ou=people,{base_dn}"
admin_dn = f"uid={admin_user},ou=people,{base_dn}"
try:
server = Server(LDAP_HOST, port=LDAP_PORT, get_info=ALL)
conn = Connection(server, user=admin_dn, password=admin_pass, auto_bind=True)
success = conn.extend.standard.modify_password(user=user_dn, new_password=password)
if success:
print(" Success: Password set.")
else:
print(f" LDAP Error: {conn.result}")
conn.unbind()
except Exception as e:
print(f" LDAP Connection Failed: {e}")
def create_user(cookies, user_data, group_map, base_dn, admin_user, admin_pass):
first = user_data.get('first_name', '')
last = user_data.get('last_name', '')
display_name = f"{first} {last}".strip()
user_id = user_data['id']
print(f"Processing {user_id} ({display_name})...")
# 1. Create User (GraphQL)
query = """
mutation CreateUser($user: CreateUserInput!) {
createUser(user: $user) {
id
}
createUser(user: $user) { id }
}
"""
variables = {
"user": {
"id": user_data['id'],
"id": user_id,
"email": user_data['email'],
"displayName": user_data['name'],
"password": user_data['password']
"displayName": display_name,
"firstName": first,
"lastName": last
}
}
resp = requests.post(
f"{URL}/api/graphql",
f"{HTTP_URL}/api/graphql",
cookies=cookies,
json={"query": query, "variables": variables}
)
response_json = resp.json()
# 2. Handle Conflicts
if "errors" in response_json:
err_msg = response_json['errors'][0]['message']
if "User already exists" in err_msg:
print(f"Skip: User '{user_data['id']}' already exists.")
else:
print(f"Error adding '{user_data['id']}': {err_msg}")
else:
print(f"Success: Added user '{user_data['id']}'")
if "User already exists" in err_msg or "UNIQUE constraint" in err_msg:
delete_user(cookies, user_id)
# Retry
resp = requests.post(
f"{HTTP_URL}/api/graphql",
cookies=cookies,
json={"query": query, "variables": variables}
)
if "errors" in resp.json():
print(f"Error creating '{user_id}': {resp.json()['errors'][0]['message']}")
return
print(f" -> User Created (GraphQL).")
# 3. Set Password (LDAP)
if 'password' in user_data:
set_user_password_ldap(user_id, user_data['password'], base_dn, admin_user, admin_pass)
# 4. Assign Groups
wanted_groups = user_data.get('groups', [])
for g_name in wanted_groups:
if g_name not in group_map:
new_id = create_group_if_missing(cookies, g_name)
if new_id: group_map[g_name] = new_id
if g_name in group_map:
add_user_to_group(cookies, user_id, group_map[g_name])
print(f" -> Added to group: {g_name}")
# --- MAIN EXECUTION ---
if __name__ == "__main__":
print("--- Starting LLDAP Provisioning ---")
# 1. Load Secrets
env = load_env_file(SECRETS_FILE)
admin_user = env.get("LLDAP_ADMIN_USER", "admin")
old_pass = env.get("LLDAP_OLD_PASS", "password")
new_pass = env.get("LLDAP_NEW_PASS")
# Priority: 1. LDAP_LDAP_BASE_DN (Correct), 2. LDAP_BASE_DN (Old), 3. Hardcoded (Fallback)
base_dn = env.get("LLDAP_LDAP_BASE_DN", env.get("LLDAP_BASE_DN", "dc=poppyglen,dc=cc"))
print(f"Using Base DN: {base_dn}")
user_json_str = env.get("USER_JSON", "[]")
try:
@@ -108,29 +235,35 @@ if __name__ == "__main__":
print(f"Error parsing USER_JSON: {e}")
sys.exit(1)
# 2. Authenticate
# Try logging in with the NEW password first (idempotency)
print(f"Checking access for {admin_user}...")
current_pass = new_pass
cookies = get_token(admin_user, new_pass)
if cookies:
print("Authenticated with NEW password. No rotation needed.")
else:
print("Could not auth with new password. Trying OLD password...")
if not cookies:
print("New password failed. Trying OLD password...")
cookies = get_token(admin_user, old_pass)
if cookies:
print("Authenticated with OLD password.")
# Rotate password
if change_admin_password(cookies, old_pass, new_pass):
# Re-login to get new token
print("Authenticated with OLD password. Rotating...")
# --- USE NEW LDAP ROTATION FUNCTION ---
if change_admin_password_ldap(old_pass, new_pass, base_dn):
# Update current_pass so subsequent LDAP logic uses the NEW password
current_pass = new_pass
# Re-auth HTTP to confirm and get fresh token
cookies = get_token(admin_user, new_pass)
else:
print("CRITICAL: Admin rotation failed via LDAP. Exiting.")
sys.exit(1)
else:
print("CRITICAL: Authentication failed with both old and new passwords.")
print("CRITICAL: Authentication failed completely.")
sys.exit(1)
else:
print("Authenticated with NEW password.")
print("Fetching existing groups...")
group_map = get_group_map(cookies)
# 3. Create Users
print(f"Processing {len(users)} users...")
for user in users:
create_user(cookies, user)
create_user(cookies, user, group_map, base_dn, admin_user, current_pass)
print("--- Provisioning Complete ---")

View File

@@ -1,55 +0,0 @@
#!/bin/bash
# Configuration
LLDAP_VERSION="v0.6.1"
INSTALL_DIR="/opt/lldap"
# Change these defaults or edit the TOML after install
ADMIN_EMAIL="admin@home.arpa"
ADMIN_PASSWORD="password"
echo ">>> Updating System and Installing Dependencies..."
if [ -f /etc/debian_version ]; then
apt update && apt install -y wget curl tar
elif [ -f /etc/alpine-release ]; then
apk add wget curl tar
fi
echo ">>> creating install directory..."
mkdir -p "$INSTALL_DIR"
cd "$INSTALL_DIR"
echo ">>> Downloading lldap $LLDAP_VERSION..."
wget -q "https://github.com/lldap/lldap/releases/download/${LLDAP_VERSION}/lldap-x86_64-unknown-linux-musl.tar.gz" -O lldap.tar.gz
tar xzf lldap.tar.gz --strip-components=1
rm lldap.tar.gz
echo ">>> Configuring lldap..."
if [ ! -f lldap_config.toml ]; then
cp lldap_config.toml.example lldap_config.toml
# Simple sed replacement to set admin password/email - customize as needed
sed -i "s/admin@example.com/${ADMIN_EMAIL}/" lldap_config.toml
sed -i "s/password/${ADMIN_PASSWORD}/" lldap_config.toml
sed -i 's|database_url = .*|database_url = "sqlite://./users.db?mode=rwc"|' lldap_config.toml
fi
echo ">>> Creating Systemd Service..."
cat <<EOF > /etc/systemd/system/lldap.service
[Unit]
Description=LLDAP Server
After=network.target
[Service]
WorkingDirectory=$INSTALL_DIR
ExecStart=$INSTALL_DIR/lldap run
Restart=always
User=root
[Install]
WantedBy=multi-user.target
EOF
echo ">>> Starting lldap..."
systemctl daemon-reload
systemctl enable --now lldap
echo ">>> Done! LLDAP is running on port 17170 (HTTP) and 3890 (LDAP)."

31
test_bind.py Normal file
View File

@@ -0,0 +1,31 @@
from ldap3 import Server, Connection, ALL
# CONFIG
HOST = "localhost"
PORT = 3890
USER = "admin"
# Use the NEW password since your last rotation succeeded
PASS = "password"
def try_bind(domain):
dn = f"uid={USER},ou=people,{domain}"
print(f"Testing Bind DN: {dn}")
try:
server = Server(HOST, port=PORT, get_info=ALL)
conn = Connection(server, user=dn, password=PASS)
if conn.bind():
print(f"✅ SUCCESS! The server is using: {domain}")
return True
else:
print(f"❌ Failed ({conn.result['description']})")
return False
except Exception as e:
print(f"Error: {e}")
return False
print("--- STARTING BLIND BIND TEST ---")
# Test 1: Your desired domain
if not try_bind("dc=poppyglen,dc=cc"):
print("--------------------------------")
# Test 2: The default domain
try_bind("dc=example,dc=com")