#!/usr/bin/env python3 """ Grant a Portainer user access to multiple containers. Usage examples: # Using username/password auth, containers by name, on endpoint 2 python grant_container_access.py \ --url http://portainer.yourdomain:9000 \ --username admin --password 'secret' \ --endpoint-id 2 \ --user johndoe \ --containers web,db,redis # Using an API key and container IDs, dry-run first python grant_container_access.py \ --url https://portainer.example.com \ --api-key 'PTA_xxx' \ --endpoint-id 1 \ --user johndoe \ --containers 3d7c...,e1b2... \ --dry-run """ import argparse import sys import requests from typing import Dict, List, Optional # ---- Helpers --------------------------------------------------------------- def make_session(base_url: str, username: Optional[str], password: Optional[str], api_key: Optional[str]) -> requests.Session: s = requests.Session() s.headers.update({"Accept": "application/json"}) s.verify = True # change to False if you use self-signed certs (or better: add proper CA) if api_key: s.headers.update({"X-API-Key": api_key}) elif username and password: # Portainer auth: POST /api/auth -> {jwt: "..."} resp = s.post(f"{base_url}/api/auth", json={"Username": username, "Password": password}, timeout=30) resp.raise_for_status() token = resp.json().get("jwt") if not token: raise RuntimeError("Login succeeded but no JWT returned.") s.headers.update({"Authorization": f"Bearer {token}"}) else: raise ValueError("Provide either --api-key or --username/--password") return s def get_user_id(session: requests.Session, base_url: str, username: str) -> int: resp = session.get(f"{base_url}/api/users", timeout=30) resp.raise_for_status() users = resp.json() for u in users: if u.get("Username") == username: return int(u["Id"]) raise RuntimeError(f"User '{username}' not found in Portainer.") def list_containers_in_endpoint(session: requests.Session, base_url: str, endpoint_id: int) -> List[Dict]: # Docker proxy: /api/endpoints/{id}/docker/containers/json?all=1 resp = session.get(f"{base_url}/api/endpoints/{endpoint_id}/docker/containers/json", params={"all": 1}, timeout=60) resp.raise_for_status() return resp.json() def resolve_container_ids(targets: List[str], containers: List[Dict]) -> Dict[str, str]: """ Map each provided container name or short/full ID to an exact container ID. Targets may be: - exact ID (full or prefix) - name (with or without leading '/') """ id_map: Dict[str, str] = {} # build indexes by_id = {c["Id"]: c for c in containers} by_short = {c["Id"][:12]: c for c in containers} by_name = {} for c in containers: for n in c.get("Names", []): name = n.lstrip("/") by_name[name] = c for t in targets: t_clean = t.lstrip("/") match = None if t in by_id: match = by_id[t] elif t in by_short: match = by_short[t] elif t_clean in by_name: match = by_name[t_clean] else: # try prefix match against full IDs candidates = [c for cid, c in by_id.items() if cid.startswith(t)] if len(candidates) == 1: match = candidates[0] if not match: raise RuntimeError(f"Container '{t}' not found (by name or ID).") id_map[t] = match["Id"] return id_map def fetch_resource_controls(session: requests.Session, base_url: str) -> List[Dict]: # CE/EE: GET /api/resource_controls resp = session.get(f"{base_url}/api/resource_controls", timeout=60) resp.raise_for_status() return resp.json() def find_rc_for_container(resource_controls: List[Dict], container_id: str) -> Optional[Dict]: for rc in resource_controls: if rc.get("ResourceID") == container_id: # Type 1 usually denotes Docker container return rc return None def ensure_user_in_rc(session: requests.Session, base_url: str, rc: Dict, user_id: int, dry_run: bool) -> None: users = rc.get("Users", []) or [] if any(u.get("UserID") == user_id for u in users): # already present; nothing to do return users.append({"UserID": user_id, "AccessLevel": 1}) # AccessLevel is required; 1 is sufficient for containers rc_update = { "Public": rc.get("Public", False), "AdministratorsOnly": rc.get("AdministratorsOnly", False), "Users": users, "Teams": rc.get("Teams", []) or [], } if dry_run: print(f"[DRY-RUN] Would update resource control {rc.get('Id')} to add user {user_id}") return resp = session.put(f"{base_url}/api/resource_controls/{rc['Id']}", json=rc_update, timeout=60) resp.raise_for_status() def create_rc_for_container(session: requests.Session, base_url: str, container_id: str, user_id: int, dry_run: bool) -> None: payload = { "Type": 1, # 1 = container "ResourceID": container_id, "Public": False, "AdministratorsOnly": False, "Users": [{"UserID": user_id, "AccessLevel": 1}], "Teams": [], # "SubResourceIDs": [] # optional } if dry_run: print(f"[DRY-RUN] Would create resource control for container {container_id} with user {user_id}") return resp = session.post(f"{base_url}/api/resource_controls", json=payload, timeout=60) # 409 may occur if RC already exists concurrently; treat as non-fatal if resp.status_code not in (200, 201): resp.raise_for_status() # ---- Main ------------------------------------------------------------------ def main(): ap = argparse.ArgumentParser(description="Assign a Portainer user to multiple containers.") ap.add_argument("--url", required=True, help="Base URL to Portainer, e.g. https://portainer.example.com") auth = ap.add_mutually_exclusive_group(required=True) auth.add_argument("--api-key", help="Portainer API key (recommended)") auth.add_argument("--username", help="Portainer username (if not using --api-key)") ap.add_argument("--password", help="Portainer password (required if --username is used)") ap.add_argument("--endpoint-id", required=True, type=int, help="Portainer Endpoint ID where the containers live") ap.add_argument("--user", required=True, help="Portainer username to grant access to") ap.add_argument("--containers", required=True, help="Comma-separated container names or IDs") ap.add_argument("--dry-run", action="store_true", help="Preview actions without changing anything") args = ap.parse_args() if args.username and not args.password: ap.error("--password is required when --username is provided") base_url = args.url.rstrip("/") try: session = make_session(base_url, args.username, args.password, args.api_key) target_user_id = get_user_id(session, base_url, args.user) print(f"Target user '{args.user}' has ID {target_user_id}") containers = list_containers_in_endpoint(session, base_url, args.endpoint_id) targets = [t.strip() for t in args.containers.split(",") if t.strip()] id_map = resolve_container_ids(targets, containers) rcs = fetch_resource_controls(session, base_url) successes = 0 for original, cid in id_map.items(): rc = find_rc_for_container(rcs, cid) if rc: ensure_user_in_rc(session, base_url, rc, target_user_id, args.dry_run) print(f"{'[DRY-RUN] ' if args.dry_run else ''}Updated RC for container {original} ({cid[:12]})") else: create_rc_for_container(session, base_url, cid, target_user_id, args.dry_run) print(f"{'[DRY-RUN] ' if args.dry_run else ''}Created RC for container {original} ({cid[:12]})") successes += 1 print(f"Done. {successes} container(s) processed.") if args.dry_run: print("No changes were made (dry-run).") except requests.HTTPError as e: print(f"HTTP error: {e} - response text: {getattr(e.response, 'text', '')}", file=sys.stderr) sys.exit(2) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) if __name__ == "__main__": main()