186 lines
5.7 KiB
Python
186 lines
5.7 KiB
Python
import os
|
|
import subprocess
|
|
import json
|
|
import glob
|
|
import sys
|
|
|
|
# Configuration
|
|
SECRETS_DIR = "/mnt/secrets"
|
|
TARGET_FOLDER_NAME = "Homelab Infrastructure"
|
|
BW_CMD = "/usr/local/bin/bw"
|
|
|
|
def run_bw_command(args, input_data=None):
|
|
"""Helper to run BW commands."""
|
|
try:
|
|
cmd_args = [BW_CMD] + args
|
|
result = subprocess.run(
|
|
cmd_args,
|
|
input=input_data,
|
|
capture_output=True,
|
|
text=True,
|
|
encoding='utf-8'
|
|
)
|
|
return result
|
|
except FileNotFoundError:
|
|
print(f"CRITICAL ERROR: Could not find executable at {BW_CMD}")
|
|
sys.exit(1)
|
|
|
|
def encode_payload(json_data):
|
|
"""Encrypts and encodes JSON data using 'bw encode'."""
|
|
# We must pass the JSON string to 'bw encode' first
|
|
json_str = json.dumps(json_data)
|
|
result = run_bw_command(["encode"], input_data=json_str)
|
|
|
|
if result.returncode != 0:
|
|
print("Error encoding data!")
|
|
print(result.stderr)
|
|
sys.exit(1)
|
|
|
|
return result.stdout.strip()
|
|
|
|
def get_folder_id(folder_name):
|
|
"""Checks if folder exists, creates it if not, returns ID."""
|
|
print(f"Checking for folder: {folder_name}...")
|
|
|
|
# 1. List Folders
|
|
result = run_bw_command(["list", "folders"])
|
|
if result.returncode != 0:
|
|
print("Error listing folders. Is BW_SESSION set?")
|
|
sys.exit(1)
|
|
|
|
folders = json.loads(result.stdout)
|
|
for folder in folders:
|
|
if folder['name'] == folder_name:
|
|
return folder['id']
|
|
|
|
# 2. Create Folder if missing
|
|
print(f"Creating new folder: {folder_name}")
|
|
|
|
# Construct raw payload
|
|
raw_payload = {"name": folder_name}
|
|
|
|
# ENCODE PAYLOAD (The missing step!)
|
|
encoded_payload = encode_payload(raw_payload)
|
|
|
|
# Send encoded data to create
|
|
create_cmd = run_bw_command(["create", "folder"], input_data=encoded_payload)
|
|
|
|
if create_cmd.returncode != 0:
|
|
print(f"\n!!! FAILED to create folder '{folder_name}' !!!")
|
|
print(f"BW Error Message: {create_cmd.stderr}")
|
|
sys.exit(1)
|
|
|
|
return json.loads(create_cmd.stdout)['id']
|
|
|
|
def parse_env_file(filepath):
|
|
"""Parses KEY=VALUE from .env files."""
|
|
fields = []
|
|
try:
|
|
with open(filepath, 'r') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line or line.startswith('#') or '=' not in line:
|
|
continue
|
|
|
|
key, value = line.split('=', 1)
|
|
fields.append({
|
|
"name": key.strip(),
|
|
"value": value.strip().strip('"').strip("'"),
|
|
"type": 1, # 1 = Hidden field
|
|
"linkedId": None
|
|
})
|
|
except Exception as e:
|
|
print(f"Error reading {filepath}: {e}")
|
|
return fields
|
|
|
|
def create_or_update_item(name, fields, folder_id):
|
|
"""Creates a new item OR updates an existing one."""
|
|
|
|
# 1. Search for existing item
|
|
search = run_bw_command(["list", "items", "--search", name])
|
|
items = []
|
|
if search.returncode == 0:
|
|
items = json.loads(search.stdout)
|
|
|
|
# Filter for EXACT name match (because search is fuzzy)
|
|
existing_item = next((i for i in items if i['name'] == name), None)
|
|
|
|
if existing_item:
|
|
# --- UPDATE PATH ---
|
|
item_id = existing_item['id']
|
|
print(f"Updating existing item: {name}...")
|
|
|
|
# We must 'get' the full item first to ensure we have the correct structure/ID
|
|
get_res = run_bw_command(["get", "item", item_id])
|
|
if get_res.returncode != 0:
|
|
print(f"Error fetching item {name} for update.")
|
|
return
|
|
|
|
full_item = json.loads(get_res.stdout)
|
|
|
|
# Overwrite the fields with new data from .env
|
|
full_item['fields'] = fields
|
|
full_item['notes'] = "Updated via Python script from Proxmox host"
|
|
|
|
# Encode and Push
|
|
encoded_payload = encode_payload(full_item)
|
|
update_res = run_bw_command(["edit", "item", item_id], input_data=encoded_payload)
|
|
|
|
if update_res.returncode != 0:
|
|
print(f"Error updating {name}: {update_res.stderr}")
|
|
else:
|
|
print(f"Successfully updated {name}")
|
|
|
|
else:
|
|
# --- CREATE PATH ---
|
|
print(f"Creating new item: {name}...")
|
|
|
|
item = {
|
|
"type": 1, # Login
|
|
"name": name,
|
|
"folderId": folder_id,
|
|
"notes": "Created via Python script",
|
|
"fields": fields,
|
|
"login": {
|
|
"uris": [],
|
|
"username": "",
|
|
"password": "",
|
|
"totp": None
|
|
}
|
|
}
|
|
|
|
encoded_payload = encode_payload(item)
|
|
create_res = run_bw_command(["create", "item"], input_data=encoded_payload)
|
|
|
|
if create_res.returncode != 0:
|
|
print(f"Error creating {name}: {create_res.stderr}")
|
|
else:
|
|
print(f"Successfully created {name}")
|
|
|
|
def main():
|
|
if "BW_SESSION" not in os.environ:
|
|
print("CRITICAL: BW_SESSION variable not set. Run 'bw unlock' first.")
|
|
return
|
|
|
|
folder_id = get_folder_id(TARGET_FOLDER_NAME)
|
|
env_files = glob.glob(os.path.join(SECRETS_DIR, "*.env"))
|
|
|
|
if not env_files:
|
|
print(f"No .env files found in {SECRETS_DIR}")
|
|
return
|
|
|
|
for filepath in env_files:
|
|
filename = os.path.basename(filepath)
|
|
item_name = filename.replace(".env", "").capitalize() + " Env"
|
|
|
|
fields = parse_env_file(filepath)
|
|
if fields:
|
|
create_or_update_item(item_name, fields, folder_id)
|
|
else:
|
|
print(f"Skipping {filename} (empty or invalid)")
|
|
|
|
print("\nDone! Check your vault.")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|