commit dc17ca7bf9dc0d530cefb351e37302f3c7624f2e Author: giumbai Date: Mon Aug 11 21:51:36 2025 +0300 Add grant_container_access.py diff --git a/grant_container_access.py b/grant_container_access.py new file mode 100644 index 0000000..e65e09a --- /dev/null +++ b/grant_container_access.py @@ -0,0 +1,202 @@ +#!/usr/bin/env python3 +""" +Grant a Portainer user access to multiple containers. + +Usage examples: + # Using username/password auth, containers by name, on endpoint 2 + python grant_container_access.py \ + --url http://portainer.yourdomain:9000 \ + --username admin --password 'secret' \ + --endpoint-id 2 \ + --user johndoe \ + --containers web,db,redis + + # Using an API key and container IDs, dry-run first + python grant_container_access.py \ + --url https://portainer.example.com \ + --api-key 'PTA_xxx' \ + --endpoint-id 1 \ + --user johndoe \ + --containers 3d7c...,e1b2... \ + --dry-run +""" + +import argparse +import sys +import requests +from typing import Dict, List, Optional + +# ---- Helpers --------------------------------------------------------------- + +def make_session(base_url: str, username: Optional[str], password: Optional[str], api_key: Optional[str]) -> requests.Session: + s = requests.Session() + s.headers.update({"Accept": "application/json"}) + s.verify = True # change to False if you use self-signed certs (or better: add proper CA) + if api_key: + s.headers.update({"X-API-Key": api_key}) + elif username and password: + # Portainer auth: POST /api/auth -> {jwt: "..."} + resp = s.post(f"{base_url}/api/auth", json={"Username": username, "Password": password}, timeout=30) + resp.raise_for_status() + token = resp.json().get("jwt") + if not token: + raise RuntimeError("Login succeeded but no JWT returned.") + s.headers.update({"Authorization": f"Bearer {token}"}) + else: + raise ValueError("Provide either --api-key or --username/--password") + return s + +def get_user_id(session: requests.Session, base_url: str, username: str) -> int: + resp = session.get(f"{base_url}/api/users", timeout=30) + resp.raise_for_status() + users = resp.json() + for u in users: + if u.get("Username") == username: + return int(u["Id"]) + raise RuntimeError(f"User '{username}' not found in Portainer.") + +def list_containers_in_endpoint(session: requests.Session, base_url: str, endpoint_id: int) -> List[Dict]: + # Docker proxy: /api/endpoints/{id}/docker/containers/json?all=1 + resp = session.get(f"{base_url}/api/endpoints/{endpoint_id}/docker/containers/json", params={"all": 1}, timeout=60) + resp.raise_for_status() + return resp.json() + +def resolve_container_ids(targets: List[str], containers: List[Dict]) -> Dict[str, str]: + """ + Map each provided container name or short/full ID to an exact container ID. + Targets may be: + - exact ID (full or prefix) + - name (with or without leading '/') + """ + id_map: Dict[str, str] = {} + # build indexes + by_id = {c["Id"]: c for c in containers} + by_short = {c["Id"][:12]: c for c in containers} + by_name = {} + for c in containers: + for n in c.get("Names", []): + name = n.lstrip("/") + by_name[name] = c + + for t in targets: + t_clean = t.lstrip("/") + match = None + if t in by_id: + match = by_id[t] + elif t in by_short: + match = by_short[t] + elif t_clean in by_name: + match = by_name[t_clean] + else: + # try prefix match against full IDs + candidates = [c for cid, c in by_id.items() if cid.startswith(t)] + if len(candidates) == 1: + match = candidates[0] + if not match: + raise RuntimeError(f"Container '{t}' not found (by name or ID).") + id_map[t] = match["Id"] + return id_map + +def fetch_resource_controls(session: requests.Session, base_url: str) -> List[Dict]: + # CE/EE: GET /api/resource_controls + resp = session.get(f"{base_url}/api/resource_controls", timeout=60) + resp.raise_for_status() + return resp.json() + +def find_rc_for_container(resource_controls: List[Dict], container_id: str) -> Optional[Dict]: + for rc in resource_controls: + if rc.get("ResourceID") == container_id: + # Type 1 usually denotes Docker container + return rc + return None + +def ensure_user_in_rc(session: requests.Session, base_url: str, rc: Dict, user_id: int, dry_run: bool) -> None: + users = rc.get("Users", []) or [] + if any(u.get("UserID") == user_id for u in users): + # already present; nothing to do + return + users.append({"UserID": user_id, "AccessLevel": 1}) # AccessLevel is required; 1 is sufficient for containers + rc_update = { + "Public": rc.get("Public", False), + "AdministratorsOnly": rc.get("AdministratorsOnly", False), + "Users": users, + "Teams": rc.get("Teams", []) or [], + } + if dry_run: + print(f"[DRY-RUN] Would update resource control {rc.get('Id')} to add user {user_id}") + return + resp = session.put(f"{base_url}/api/resource_controls/{rc['Id']}", json=rc_update, timeout=60) + resp.raise_for_status() + +def create_rc_for_container(session: requests.Session, base_url: str, container_id: str, user_id: int, dry_run: bool) -> None: + payload = { + "Type": 1, # 1 = container + "ResourceID": container_id, + "Public": False, + "AdministratorsOnly": False, + "Users": [{"UserID": user_id, "AccessLevel": 1}], + "Teams": [], + # "SubResourceIDs": [] # optional + } + if dry_run: + print(f"[DRY-RUN] Would create resource control for container {container_id} with user {user_id}") + return + resp = session.post(f"{base_url}/api/resource_controls", json=payload, timeout=60) + # 409 may occur if RC already exists concurrently; treat as non-fatal + if resp.status_code not in (200, 201): + resp.raise_for_status() + +# ---- Main ------------------------------------------------------------------ + +def main(): + ap = argparse.ArgumentParser(description="Assign a Portainer user to multiple containers.") + ap.add_argument("--url", required=True, help="Base URL to Portainer, e.g. https://portainer.example.com") + auth = ap.add_mutually_exclusive_group(required=True) + auth.add_argument("--api-key", help="Portainer API key (recommended)") + auth.add_argument("--username", help="Portainer username (if not using --api-key)") + ap.add_argument("--password", help="Portainer password (required if --username is used)") + ap.add_argument("--endpoint-id", required=True, type=int, help="Portainer Endpoint ID where the containers live") + ap.add_argument("--user", required=True, help="Portainer username to grant access to") + ap.add_argument("--containers", required=True, help="Comma-separated container names or IDs") + ap.add_argument("--dry-run", action="store_true", help="Preview actions without changing anything") + args = ap.parse_args() + + if args.username and not args.password: + ap.error("--password is required when --username is provided") + + base_url = args.url.rstrip("/") + try: + session = make_session(base_url, args.username, args.password, args.api_key) + target_user_id = get_user_id(session, base_url, args.user) + + print(f"Target user '{args.user}' has ID {target_user_id}") + + containers = list_containers_in_endpoint(session, base_url, args.endpoint_id) + targets = [t.strip() for t in args.containers.split(",") if t.strip()] + id_map = resolve_container_ids(targets, containers) + + rcs = fetch_resource_controls(session, base_url) + + successes = 0 + for original, cid in id_map.items(): + rc = find_rc_for_container(rcs, cid) + if rc: + ensure_user_in_rc(session, base_url, rc, target_user_id, args.dry_run) + print(f"{'[DRY-RUN] ' if args.dry_run else ''}Updated RC for container {original} ({cid[:12]})") + else: + create_rc_for_container(session, base_url, cid, target_user_id, args.dry_run) + print(f"{'[DRY-RUN] ' if args.dry_run else ''}Created RC for container {original} ({cid[:12]})") + successes += 1 + + print(f"Done. {successes} container(s) processed.") + if args.dry_run: + print("No changes were made (dry-run).") + except requests.HTTPError as e: + print(f"HTTP error: {e} - response text: {getattr(e.response, 'text', '')}", file=sys.stderr) + sys.exit(2) + except Exception as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + +if __name__ == "__main__": + main()