Files
public_scripts/grant_container_access.py

203 lines
8.3 KiB
Python

#!/usr/bin/env python3
"""
Grant a Portainer user access to multiple containers.
Usage examples:
# Using username/password auth, containers by name, on endpoint 2
python grant_container_access.py \
--url http://portainer.yourdomain:9000 \
--username admin --password 'secret' \
--endpoint-id 2 \
--user johndoe \
--containers web,db,redis
# Using an API key and container IDs, dry-run first
python grant_container_access.py \
--url https://portainer.example.com \
--api-key 'PTA_xxx' \
--endpoint-id 1 \
--user johndoe \
--containers 3d7c...,e1b2... \
--dry-run
"""
import argparse
import sys
import requests
from typing import Dict, List, Optional
# ---- Helpers ---------------------------------------------------------------
def make_session(base_url: str, username: Optional[str], password: Optional[str], api_key: Optional[str]) -> requests.Session:
s = requests.Session()
s.headers.update({"Accept": "application/json"})
s.verify = True # change to False if you use self-signed certs (or better: add proper CA)
if api_key:
s.headers.update({"X-API-Key": api_key})
elif username and password:
# Portainer auth: POST /api/auth -> {jwt: "..."}
resp = s.post(f"{base_url}/api/auth", json={"Username": username, "Password": password}, timeout=30)
resp.raise_for_status()
token = resp.json().get("jwt")
if not token:
raise RuntimeError("Login succeeded but no JWT returned.")
s.headers.update({"Authorization": f"Bearer {token}"})
else:
raise ValueError("Provide either --api-key or --username/--password")
return s
def get_user_id(session: requests.Session, base_url: str, username: str) -> int:
resp = session.get(f"{base_url}/api/users", timeout=30)
resp.raise_for_status()
users = resp.json()
for u in users:
if u.get("Username") == username:
return int(u["Id"])
raise RuntimeError(f"User '{username}' not found in Portainer.")
def list_containers_in_endpoint(session: requests.Session, base_url: str, endpoint_id: int) -> List[Dict]:
# Docker proxy: /api/endpoints/{id}/docker/containers/json?all=1
resp = session.get(f"{base_url}/api/endpoints/{endpoint_id}/docker/containers/json", params={"all": 1}, timeout=60)
resp.raise_for_status()
return resp.json()
def resolve_container_ids(targets: List[str], containers: List[Dict]) -> Dict[str, str]:
"""
Map each provided container name or short/full ID to an exact container ID.
Targets may be:
- exact ID (full or prefix)
- name (with or without leading '/')
"""
id_map: Dict[str, str] = {}
# build indexes
by_id = {c["Id"]: c for c in containers}
by_short = {c["Id"][:12]: c for c in containers}
by_name = {}
for c in containers:
for n in c.get("Names", []):
name = n.lstrip("/")
by_name[name] = c
for t in targets:
t_clean = t.lstrip("/")
match = None
if t in by_id:
match = by_id[t]
elif t in by_short:
match = by_short[t]
elif t_clean in by_name:
match = by_name[t_clean]
else:
# try prefix match against full IDs
candidates = [c for cid, c in by_id.items() if cid.startswith(t)]
if len(candidates) == 1:
match = candidates[0]
if not match:
raise RuntimeError(f"Container '{t}' not found (by name or ID).")
id_map[t] = match["Id"]
return id_map
def fetch_resource_controls(session: requests.Session, base_url: str) -> List[Dict]:
# CE/EE: GET /api/resource-controls
resp = session.get(f"{base_url}/api/resource-controls", timeout=60)
resp.raise_for_status()
return resp.json()
def find_rc_for_container(resource_controls: List[Dict], container_id: str) -> Optional[Dict]:
for rc in resource_controls:
if rc.get("ResourceID") == container_id:
# Type 1 usually denotes Docker container
return rc
return None
def ensure_user_in_rc(session: requests.Session, base_url: str, rc: Dict, user_id: int, dry_run: bool) -> None:
users = rc.get("Users", []) or []
if any(u.get("UserID") == user_id for u in users):
# already present; nothing to do
return
users.append({"UserID": user_id, "AccessLevel": 1}) # AccessLevel is required; 1 is sufficient for containers
rc_update = {
"Public": rc.get("Public", False),
"AdministratorsOnly": rc.get("AdministratorsOnly", False),
"Users": users,
"Teams": rc.get("Teams", []) or [],
}
if dry_run:
print(f"[DRY-RUN] Would update resource control {rc.get('Id')} to add user {user_id}")
return
resp = session.put(f"{base_url}/api/resource-controls/{rc['Id']}", json=rc_update, timeout=60)
resp.raise_for_status()
def create_rc_for_container(session: requests.Session, base_url: str, container_id: str, user_id: int, dry_run: bool) -> None:
payload = {
"Type": 1, # 1 = container
"ResourceID": container_id,
"Public": False,
"AdministratorsOnly": False,
"Users": [{"UserID": user_id, "AccessLevel": 1}],
"Teams": [],
# "SubResourceIDs": [] # optional
}
if dry_run:
print(f"[DRY-RUN] Would create resource control for container {container_id} with user {user_id}")
return
resp = session.post(f"{base_url}/api/resource-controls", json=payload, timeout=60)
# 409 may occur if RC already exists concurrently; treat as non-fatal
if resp.status_code not in (200, 201):
resp.raise_for_status()
# ---- Main ------------------------------------------------------------------
def main():
ap = argparse.ArgumentParser(description="Assign a Portainer user to multiple containers.")
ap.add_argument("--url", required=True, help="Base URL to Portainer, e.g. https://portainer.example.com")
auth = ap.add_mutually_exclusive_group(required=True)
auth.add_argument("--api-key", help="Portainer API key (recommended)")
auth.add_argument("--username", help="Portainer username (if not using --api-key)")
ap.add_argument("--password", help="Portainer password (required if --username is used)")
ap.add_argument("--endpoint-id", required=True, type=int, help="Portainer Endpoint ID where the containers live")
ap.add_argument("--user", required=True, help="Portainer username to grant access to")
ap.add_argument("--containers", required=True, help="Comma-separated container names or IDs")
ap.add_argument("--dry-run", action="store_true", help="Preview actions without changing anything")
args = ap.parse_args()
if args.username and not args.password:
ap.error("--password is required when --username is provided")
base_url = args.url.rstrip("/")
try:
session = make_session(base_url, args.username, args.password, args.api_key)
target_user_id = get_user_id(session, base_url, args.user)
print(f"Target user '{args.user}' has ID {target_user_id}")
containers = list_containers_in_endpoint(session, base_url, args.endpoint_id)
targets = [t.strip() for t in args.containers.split(",") if t.strip()]
id_map = resolve_container_ids(targets, containers)
rcs = fetch_resource_controls(session, base_url)
successes = 0
for original, cid in id_map.items():
rc = find_rc_for_container(rcs, cid)
if rc:
ensure_user_in_rc(session, base_url, rc, target_user_id, args.dry_run)
print(f"{'[DRY-RUN] ' if args.dry_run else ''}Updated RC for container {original} ({cid[:12]})")
else:
create_rc_for_container(session, base_url, cid, target_user_id, args.dry_run)
print(f"{'[DRY-RUN] ' if args.dry_run else ''}Created RC for container {original} ({cid[:12]})")
successes += 1
print(f"Done. {successes} container(s) processed.")
if args.dry_run:
print("No changes were made (dry-run).")
except requests.HTTPError as e:
print(f"HTTP error: {e} - response text: {getattr(e.response, 'text', '')}", file=sys.stderr)
sys.exit(2)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()