203 lines
8.3 KiB
Python
203 lines
8.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Grant a Portainer user access to multiple containers.
|
|
|
|
Usage examples:
|
|
# Using username/password auth, containers by name, on endpoint 2
|
|
python grant_container_access.py \
|
|
--url http://portainer.yourdomain:9000 \
|
|
--username admin --password 'secret' \
|
|
--endpoint-id 2 \
|
|
--user johndoe \
|
|
--containers web,db,redis
|
|
|
|
# Using an API key and container IDs, dry-run first
|
|
python grant_container_access.py \
|
|
--url https://portainer.example.com \
|
|
--api-key 'PTA_xxx' \
|
|
--endpoint-id 1 \
|
|
--user johndoe \
|
|
--containers 3d7c...,e1b2... \
|
|
--dry-run
|
|
"""
|
|
|
|
import argparse
|
|
import sys
|
|
import requests
|
|
from typing import Dict, List, Optional
|
|
|
|
# ---- Helpers ---------------------------------------------------------------
|
|
|
|
def make_session(base_url: str, username: Optional[str], password: Optional[str], api_key: Optional[str]) -> requests.Session:
|
|
s = requests.Session()
|
|
s.headers.update({"Accept": "application/json"})
|
|
s.verify = True # change to False if you use self-signed certs (or better: add proper CA)
|
|
if api_key:
|
|
s.headers.update({"X-API-Key": api_key})
|
|
elif username and password:
|
|
# Portainer auth: POST /api/auth -> {jwt: "..."}
|
|
resp = s.post(f"{base_url}/api/auth", json={"Username": username, "Password": password}, timeout=30)
|
|
resp.raise_for_status()
|
|
token = resp.json().get("jwt")
|
|
if not token:
|
|
raise RuntimeError("Login succeeded but no JWT returned.")
|
|
s.headers.update({"Authorization": f"Bearer {token}"})
|
|
else:
|
|
raise ValueError("Provide either --api-key or --username/--password")
|
|
return s
|
|
|
|
def get_user_id(session: requests.Session, base_url: str, username: str) -> int:
|
|
resp = session.get(f"{base_url}/api/users", timeout=30)
|
|
resp.raise_for_status()
|
|
users = resp.json()
|
|
for u in users:
|
|
if u.get("Username") == username:
|
|
return int(u["Id"])
|
|
raise RuntimeError(f"User '{username}' not found in Portainer.")
|
|
|
|
def list_containers_in_endpoint(session: requests.Session, base_url: str, endpoint_id: int) -> List[Dict]:
|
|
# Docker proxy: /api/endpoints/{id}/docker/containers/json?all=1
|
|
resp = session.get(f"{base_url}/api/endpoints/{endpoint_id}/docker/containers/json", params={"all": 1}, timeout=60)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
|
|
def resolve_container_ids(targets: List[str], containers: List[Dict]) -> Dict[str, str]:
|
|
"""
|
|
Map each provided container name or short/full ID to an exact container ID.
|
|
Targets may be:
|
|
- exact ID (full or prefix)
|
|
- name (with or without leading '/')
|
|
"""
|
|
id_map: Dict[str, str] = {}
|
|
# build indexes
|
|
by_id = {c["Id"]: c for c in containers}
|
|
by_short = {c["Id"][:12]: c for c in containers}
|
|
by_name = {}
|
|
for c in containers:
|
|
for n in c.get("Names", []):
|
|
name = n.lstrip("/")
|
|
by_name[name] = c
|
|
|
|
for t in targets:
|
|
t_clean = t.lstrip("/")
|
|
match = None
|
|
if t in by_id:
|
|
match = by_id[t]
|
|
elif t in by_short:
|
|
match = by_short[t]
|
|
elif t_clean in by_name:
|
|
match = by_name[t_clean]
|
|
else:
|
|
# try prefix match against full IDs
|
|
candidates = [c for cid, c in by_id.items() if cid.startswith(t)]
|
|
if len(candidates) == 1:
|
|
match = candidates[0]
|
|
if not match:
|
|
raise RuntimeError(f"Container '{t}' not found (by name or ID).")
|
|
id_map[t] = match["Id"]
|
|
return id_map
|
|
|
|
def fetch_resource_controls(session: requests.Session, base_url: str) -> List[Dict]:
|
|
# CE/EE: GET /api/resource-controls
|
|
resp = session.get(f"{base_url}/api/resource-controls", timeout=60)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
|
|
def find_rc_for_container(resource_controls: List[Dict], container_id: str) -> Optional[Dict]:
|
|
for rc in resource_controls:
|
|
if rc.get("ResourceID") == container_id:
|
|
# Type 1 usually denotes Docker container
|
|
return rc
|
|
return None
|
|
|
|
def ensure_user_in_rc(session: requests.Session, base_url: str, rc: Dict, user_id: int, dry_run: bool) -> None:
|
|
users = rc.get("Users", []) or []
|
|
if any(u.get("UserID") == user_id for u in users):
|
|
# already present; nothing to do
|
|
return
|
|
users.append({"UserID": user_id, "AccessLevel": 1}) # AccessLevel is required; 1 is sufficient for containers
|
|
rc_update = {
|
|
"Public": rc.get("Public", False),
|
|
"AdministratorsOnly": rc.get("AdministratorsOnly", False),
|
|
"Users": users,
|
|
"Teams": rc.get("Teams", []) or [],
|
|
}
|
|
if dry_run:
|
|
print(f"[DRY-RUN] Would update resource control {rc.get('Id')} to add user {user_id}")
|
|
return
|
|
resp = session.put(f"{base_url}/api/resource-controls/{rc['Id']}", json=rc_update, timeout=60)
|
|
resp.raise_for_status()
|
|
|
|
def create_rc_for_container(session: requests.Session, base_url: str, container_id: str, user_id: int, dry_run: bool) -> None:
|
|
payload = {
|
|
"Type": 1, # 1 = container
|
|
"ResourceID": container_id,
|
|
"Public": False,
|
|
"AdministratorsOnly": False,
|
|
"Users": [{"UserID": user_id, "AccessLevel": 1}],
|
|
"Teams": [],
|
|
# "SubResourceIDs": [] # optional
|
|
}
|
|
if dry_run:
|
|
print(f"[DRY-RUN] Would create resource control for container {container_id} with user {user_id}")
|
|
return
|
|
resp = session.post(f"{base_url}/api/resource-controls", json=payload, timeout=60)
|
|
# 409 may occur if RC already exists concurrently; treat as non-fatal
|
|
if resp.status_code not in (200, 201):
|
|
resp.raise_for_status()
|
|
|
|
# ---- Main ------------------------------------------------------------------
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser(description="Assign a Portainer user to multiple containers.")
|
|
ap.add_argument("--url", required=True, help="Base URL to Portainer, e.g. https://portainer.example.com")
|
|
auth = ap.add_mutually_exclusive_group(required=True)
|
|
auth.add_argument("--api-key", help="Portainer API key (recommended)")
|
|
auth.add_argument("--username", help="Portainer username (if not using --api-key)")
|
|
ap.add_argument("--password", help="Portainer password (required if --username is used)")
|
|
ap.add_argument("--endpoint-id", required=True, type=int, help="Portainer Endpoint ID where the containers live")
|
|
ap.add_argument("--user", required=True, help="Portainer username to grant access to")
|
|
ap.add_argument("--containers", required=True, help="Comma-separated container names or IDs")
|
|
ap.add_argument("--dry-run", action="store_true", help="Preview actions without changing anything")
|
|
args = ap.parse_args()
|
|
|
|
if args.username and not args.password:
|
|
ap.error("--password is required when --username is provided")
|
|
|
|
base_url = args.url.rstrip("/")
|
|
try:
|
|
session = make_session(base_url, args.username, args.password, args.api_key)
|
|
target_user_id = get_user_id(session, base_url, args.user)
|
|
|
|
print(f"Target user '{args.user}' has ID {target_user_id}")
|
|
|
|
containers = list_containers_in_endpoint(session, base_url, args.endpoint_id)
|
|
targets = [t.strip() for t in args.containers.split(",") if t.strip()]
|
|
id_map = resolve_container_ids(targets, containers)
|
|
|
|
rcs = fetch_resource_controls(session, base_url)
|
|
|
|
successes = 0
|
|
for original, cid in id_map.items():
|
|
rc = find_rc_for_container(rcs, cid)
|
|
if rc:
|
|
ensure_user_in_rc(session, base_url, rc, target_user_id, args.dry_run)
|
|
print(f"{'[DRY-RUN] ' if args.dry_run else ''}Updated RC for container {original} ({cid[:12]})")
|
|
else:
|
|
create_rc_for_container(session, base_url, cid, target_user_id, args.dry_run)
|
|
print(f"{'[DRY-RUN] ' if args.dry_run else ''}Created RC for container {original} ({cid[:12]})")
|
|
successes += 1
|
|
|
|
print(f"Done. {successes} container(s) processed.")
|
|
if args.dry_run:
|
|
print("No changes were made (dry-run).")
|
|
except requests.HTTPError as e:
|
|
print(f"HTTP error: {e} - response text: {getattr(e.response, 'text', '')}", file=sys.stderr)
|
|
sys.exit(2)
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|