diff --git a/CHANGELOG.md b/CHANGELOG.md index b1c1fd19..c52c7251 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,15 @@ All notable changes to this project will be documented in this file. +## 1.3.11 + +### Added + +- `roboflow api-key` CLI command group and SDK methods to create, list, get, + update, protect, and revoke workspace API keys — including scoped keys, folder + restrictions, and custom metadata (scoping/metadata require the Advanced API + Keys plan feature). + ## 1.3.10 ### Added diff --git a/CLI-COMMANDS.md b/CLI-COMMANDS.md index 32064e80..18c9b0a8 100644 --- a/CLI-COMMANDS.md +++ b/CLI-COMMANDS.md @@ -357,6 +357,7 @@ Version numbers are always numeric — that's how `x/y` is disambiguated between | Command | Description | |---------|-------------| | `auth` | Login, logout, status, set default workspace | +| `api-key` | List, create, update, protect, disable, revoke workspace API keys | | `workspace` | List and inspect workspaces | | `project` | List, get, create projects | | `version` | List, get, download, export dataset versions | diff --git a/requirements.txt b/requirements.txt index 3984d070..168e9617 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,7 @@ idna==3.7 cycler kiwisolver>=1.3.1 matplotlib -numpy>=1.18.5 +numpy>=1.18.5,<2.5 # 2.5.0 ships type stubs using 3.12+ `type` syntax that mypy (pinned to 3.10) rejects opencv-python-headless==4.10.0.84 Pillow>=7.1.2 # https://github.com/roboflow/roboflow-python/issues/390 diff --git a/roboflow/__init__.py b/roboflow/__init__.py index 560ffbbc..6617a18b 100644 --- a/roboflow/__init__.py +++ b/roboflow/__init__.py @@ -21,7 +21,7 @@ CLIPModel = None # type: ignore[assignment,misc] GazeModel = None # type: ignore[assignment,misc] -__version__ = "1.3.10" +__version__ = "1.3.11" def check_key(api_key, model, notebook, num_retries=0): diff --git a/roboflow/adapters/rfapi.py b/roboflow/adapters/rfapi.py index eef2e2a7..1a5dbc4a 100644 --- a/roboflow/adapters/rfapi.py +++ b/roboflow/adapters/rfapi.py @@ -1450,3 +1450,153 @@ def get_model_eval_image_predictions( def get_model_eval_recommendations(api_key: str, workspace_url: str, eval_id: str) -> dict: """GET /{workspace}/model-evals/{evalId}/recommendations — improvement suggestions.""" return _eval_get(api_key, workspace_url, f"/{eval_id}/recommendations") + + +# --------------------------------------------------------------------------- +# API key management endpoints +# --------------------------------------------------------------------------- + + +class _FullAccess: + """Sentinel distinguishing "unscoped/full access" from "omit scopes". + + The API treats three ``scopes`` states differently: omitted inherits the + caller's own scopes, an explicit ``null`` grants full (unscoped) access, and + an empty ``[]`` grants no abilities. Passing ``None`` from Python means + "omit", so ``FULL_ACCESS`` is used to force an explicit ``"scopes": null`` + into the request body. + """ + + _instance = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __repr__(self) -> str: + return "FULL_ACCESS" + + +FULL_ACCESS = _FullAccess() + + +def list_api_keys( + api_key: str, + workspace_url: str, + include_disabled: bool = False, + include_folders: bool = False, +) -> dict: + """GET /{workspace}/api-keys — list API keys for a workspace.""" + params: Dict[str, Union[str, bool]] = {"api_key": api_key} + if include_disabled: + params["includeDisabled"] = "true" + if include_folders: + params["includeFolders"] = "true" + response = requests.get(f"{API_URL}/{workspace_url}/api-keys", params=params) + if not response.ok: + raise RoboflowError(response.text, status_code=response.status_code) + return response.json() + + +def get_api_key(api_key: str, workspace_url: str, key_id: str) -> dict: + """GET /{workspace}/api-keys/{keyId} — get a single API key by ID.""" + encoded = quote(key_id, safe="") + response = requests.get(f"{API_URL}/{workspace_url}/api-keys/{encoded}", params={"api_key": api_key}) + if not response.ok: + raise RoboflowError(response.text, status_code=response.status_code) + return response.json() + + +def get_publishable_key(api_key: str, workspace_url: str) -> dict: + """GET /{workspace}/api-keys/publishable — get the workspace publishable key.""" + response = requests.get(f"{API_URL}/{workspace_url}/api-keys/publishable", params={"api_key": api_key}) + if not response.ok: + raise RoboflowError(response.text, status_code=response.status_code) + return response.json() + + +def create_api_key( + api_key: str, + workspace_url: str, + name: Optional[str] = None, + scopes: Union[List[str], _FullAccess, None] = None, + folder_ids: Optional[List[str]] = None, + custom_metadata: Optional[Dict] = None, + protected: bool = False, +) -> dict: + """POST /{workspace}/api-keys — create a new API key. + + The secret ``key`` value is returned only on creation (shown once). + Omitting ``scopes`` (or passing ``None``) inherits the calling credential's + own scopes, so a full-access credential creates a full-access key. Pass a list + to scope the key (``role:`` presets are accepted), ``[]`` for a key with + no abilities, or ``FULL_ACCESS`` to send an explicit ``null`` (unscoped/full + access). ``scopes``, ``folder_ids``, and ``custom_metadata`` require the + Advanced API Keys plan feature — the backend returns 403 if unavailable. + """ + body: Dict[str, Any] = {} + if name is not None: + body["name"] = name + if scopes is FULL_ACCESS: + body["scopes"] = None + elif scopes is not None: + body["scopes"] = scopes + if folder_ids is not None: + body["folderIds"] = folder_ids + if custom_metadata is not None: + # Canonical wire field is camelCase `customMetadata` (consistent with `folderIds`). + body["customMetadata"] = custom_metadata + if protected: + body["protected"] = True + response = requests.post(f"{API_URL}/{workspace_url}/api-keys", params={"api_key": api_key}, json=body) + if not response.ok: + raise RoboflowError(response.text, status_code=response.status_code) + return response.json() + + +def update_api_key(api_key: str, workspace_url: str, key_id: str, **fields: Any) -> dict: + """PATCH /{workspace}/api-keys/{keyId} — update an existing API key. + + Pass only the fields you want to change as keyword arguments: + ``name``, ``scopes``, ``custom_metadata``, ``protected``, ``disabled``. + ``None`` values are omitted (left unchanged). To send explicit values, + pass ``scopes=[]`` (no abilities), ``scopes=FULL_ACCESS`` (unscoped/full + access, serialized as ``null``), or ``custom_metadata={}`` (clear metadata). + The API cannot unprotect a key (``protected=False`` → 403). + Disabling a protected key returns 409. + """ + encoded = quote(key_id, safe="") + # Canonical wire field is camelCase `customMetadata` (consistent with `folderIds`); callers may + # pass the Pythonic `custom_metadata` kwarg, which is normalized here. + wire_aliases = {"custom_metadata": "customMetadata"} + body: Dict[str, Any] = {} + for k, v in fields.items(): + wire_key = wire_aliases.get(k, k) + if v is FULL_ACCESS: + body[wire_key] = None + elif v is not None: + body[wire_key] = v + response = requests.patch( + f"{API_URL}/{workspace_url}/api-keys/{encoded}", + params={"api_key": api_key}, + json=body, + ) + if not response.ok: + raise RoboflowError(response.text, status_code=response.status_code) + return response.json() + + +def revoke_api_key(api_key: str, workspace_url: str, key_id: str) -> dict: + """DELETE /{workspace}/api-keys/{keyId} — revoke (permanently delete) an API key. + + Revoking a protected key returns 409. This action is irreversible. + """ + encoded = quote(key_id, safe="") + response = requests.delete( + f"{API_URL}/{workspace_url}/api-keys/{encoded}", + params={"api_key": api_key}, + ) + if not response.ok: + raise RoboflowError(response.text, status_code=response.status_code) + return response.json() diff --git a/roboflow/cli/__init__.py b/roboflow/cli/__init__.py index c2ff3594..54754a08 100644 --- a/roboflow/cli/__init__.py +++ b/roboflow/cli/__init__.py @@ -185,6 +185,7 @@ def _walk(group: Any, prefix: str = "") -> None: # --------------------------------------------------------------------------- from roboflow.cli.handlers.annotation import annotation_app # noqa: E402 +from roboflow.cli.handlers.api_key import api_key_app # noqa: E402 from roboflow.cli.handlers.asynctasks import asynctasks_app # noqa: E402 from roboflow.cli.handlers.auth import auth_app # noqa: E402 from roboflow.cli.handlers.batch import batch_app # noqa: E402 @@ -209,6 +210,7 @@ def _walk(group: Any, prefix: str = "") -> None: # Register ALL commands in alphabetical order for clean --help output app.add_typer(annotation_app, name="annotation") +app.add_typer(api_key_app, name="api-key") app.add_typer(asynctasks_app, name="asynctasks") app.add_typer(auth_app, name="auth") app.add_typer(batch_app, name="batch", hidden=True) # All stubs — hidden until implemented diff --git a/roboflow/cli/handlers/api_key.py b/roboflow/cli/handlers/api_key.py new file mode 100644 index 00000000..d4831492 --- /dev/null +++ b/roboflow/cli/handlers/api_key.py @@ -0,0 +1,589 @@ +"""API key management commands.""" + +from __future__ import annotations + +from typing import Annotated, List, Optional + +import typer + +from roboflow.cli._compat import SortedGroup, ctx_to_args + +api_key_app = typer.Typer(cls=SortedGroup, help="Manage workspace API keys", no_args_is_help=True) + + +@api_key_app.command("list") +def list_keys( + ctx: typer.Context, + include_disabled: Annotated[bool, typer.Option("--include-disabled", help="Include disabled keys")] = False, + include_folders: Annotated[bool, typer.Option("--include-folders", help="Include folder ID details")] = False, +) -> None: + """List all API keys for the workspace.""" + args = ctx_to_args(ctx, include_disabled=include_disabled, include_folders=include_folders) + _list_keys(args) + + +@api_key_app.command("get") +def get_key( + ctx: typer.Context, + key_id: Annotated[str, typer.Argument(help="Key ID (keyId) to retrieve")], +) -> None: + """Show details for a single API key.""" + args = ctx_to_args(ctx, key_id=key_id) + _get_key(args) + + +@api_key_app.command("publishable") +def get_publishable(ctx: typer.Context) -> None: + """Print the workspace publishable key (rf_). + + This is the non-secret key used for browser / inference.js requests. + It is safe to embed in client-side code. To use it programmatically: + + roboflow --json api-key publishable | jq -r .publishableKey + """ + args = ctx_to_args(ctx) + _get_publishable(args) + + +@api_key_app.command("create") +def create_key( + ctx: typer.Context, + name: Annotated[str, typer.Argument(help="Display name for the new key")], + scope: Annotated[ + Optional[List[str]], + typer.Option( + "--scope", + help="Scope or role: preset (repeatable). Omit to inherit the calling key's scopes.", + ), + ] = None, + no_scopes: Annotated[ + bool, + typer.Option( + "--no-scopes", + help="Create a key with an empty scope list (no abilities). Mutually exclusive with --scope/--full-access.", + ), + ] = False, + full_access: Annotated[ + bool, + typer.Option( + "--full-access", + help="Create an unscoped, full-access key (sends null). Mutually exclusive with --scope/--no-scopes.", + ), + ] = False, + folder: Annotated[ + Optional[List[str]], + typer.Option("--folder", help="Folder ID to restrict access to (repeatable)."), + ] = None, + metadata: Annotated[ + Optional[List[str]], + typer.Option("--metadata", help="Custom metadata as KEY=VALUE (repeatable)."), + ] = None, + protected: Annotated[ + bool, + typer.Option("--protected", help="Mark key as protected (cannot be revoked/disabled via CLI)"), + ] = False, +) -> None: + """Create a new API key. + + The secret key value is printed ONCE — save it immediately. + To capture it programmatically use --json and pipe to jq: + + roboflow --json api-key create MY-KEY | jq -r .key + + Scope selection is three-way: omit all scope flags to inherit the calling + key's scopes, pass --scope (repeatable) to scope the key, --no-scopes for a + key with no abilities, or --full-access for an unscoped/full-access key. + """ + args = ctx_to_args( + ctx, + name=name, + scope=scope, + no_scopes=no_scopes, + full_access=full_access, + folder=folder, + metadata=metadata, + protected=protected, + ) + _create_key(args) + + +@api_key_app.command("update") +def update_key( + ctx: typer.Context, + key_id: Annotated[str, typer.Argument(help="Key ID (keyId) to update")], + name: Annotated[Optional[str], typer.Option("--name", help="New display name")] = None, + scope: Annotated[ + Optional[List[str]], + typer.Option( + "--scope", + help="Scope or role: preset (repeatable). Replaces existing scopes.", + ), + ] = None, + no_scopes: Annotated[ + bool, + typer.Option( + "--no-scopes", + help="Replace scopes with an empty list (no abilities). Mutually exclusive with --scope/--full-access.", + ), + ] = False, + full_access: Annotated[ + bool, + typer.Option( + "--full-access", + help="Make the key unscoped/full access (sends null). Mutually exclusive with --scope/--no-scopes.", + ), + ] = False, + metadata: Annotated[ + Optional[List[str]], + typer.Option("--metadata", help="Custom metadata as KEY=VALUE (repeatable)."), + ] = None, + clear_metadata: Annotated[ + bool, + typer.Option( + "--clear-metadata", + help="Clear all custom metadata (sends {}). Mutually exclusive with --metadata.", + ), + ] = False, +) -> None: + """Update an API key's display name, scopes, or metadata. + + Scopes are three-way: --scope (repeatable) replaces scopes, --no-scopes + replaces them with an empty list (no abilities), and --full-access makes the + key unscoped/full access. Metadata: --metadata sets custom KEY=VALUE pairs, + --clear-metadata removes all custom metadata. + """ + args = ctx_to_args( + ctx, + key_id=key_id, + name=name, + scope=scope, + no_scopes=no_scopes, + full_access=full_access, + metadata=metadata, + clear_metadata=clear_metadata, + ) + _update_key(args) + + +@api_key_app.command("protect") +def protect_key( + ctx: typer.Context, + key_id: Annotated[str, typer.Argument(help="Key ID (keyId) to protect")], +) -> None: + """Mark an API key as protected. + + Protected keys cannot be revoked or disabled via the CLI or API. + To unprotect a key, visit app.roboflow.com/settings/api. + """ + args = ctx_to_args(ctx, key_id=key_id) + _protect_key(args) + + +@api_key_app.command("disable") +def disable_key( + ctx: typer.Context, + key_id: Annotated[str, typer.Argument(help="Key ID (keyId) to enable/disable")], + enable: Annotated[ + bool, typer.Option("--enable/--disable", help="Enable (--enable) or disable (--disable) the key") + ] = False, +) -> None: + """Enable or disable an API key (default: --disable). + + Disabled keys are rejected by the API but can be re-enabled. + Protected keys cannot be disabled — revoke them from the dashboard. + """ + args = ctx_to_args(ctx, key_id=key_id, enable=enable) + _disable_key(args) + + +@api_key_app.command("revoke") +def revoke_key( + ctx: typer.Context, + key_id: Annotated[str, typer.Argument(help="Key ID (keyId) to revoke")], + yes: Annotated[bool, typer.Option("--yes", "-y", help="Skip confirmation prompt")] = False, +) -> None: + """Permanently revoke an API key. This action cannot be undone. + + Protected keys cannot be revoked via the CLI — use the dashboard. + """ + args = ctx_to_args(ctx, key_id=key_id, yes=yes) + _revoke_key(args) + + +# --------------------------------------------------------------------------- +# Business logic +# --------------------------------------------------------------------------- + + +def _resolve_ws_and_key(args): # noqa: ANN001 + from roboflow.cli._resolver import resolve_ws_and_key + + return resolve_ws_and_key(args) + + +def _parse_metadata(args, pairs: Optional[List[str]]) -> Optional[dict]: # noqa: ANN001 + """Parse repeated ``KEY=VALUE`` strings into a dict, or ``None`` if empty.""" + if not pairs: + return None + from roboflow.cli._output import output_error + + metadata: dict[str, str] = {} + for pair in pairs: + key, sep, value = pair.partition("=") + if not sep or not key: + output_error( + args, + f"Invalid --metadata value '{pair}'. Expected KEY=VALUE.", + hint="Example: --metadata team=vision --metadata env=prod", + exit_code=1, + ) + metadata[key] = value + return metadata + + +# Sentinel meaning "field not provided" — distinct from an explicit ``None``/``[]``/``{}``. +_UNSET = object() + + +def _resolve_scopes(args): # noqa: ANN001 + """Resolve the three-way ``--scope`` / ``--no-scopes`` / ``--full-access`` selection. + + Returns one of: + * ``_UNSET`` — no scope flag given (inherit / leave unchanged), + * a list — explicit ``--scope`` values (``[]`` for ``--no-scopes``), + * ``rfapi.FULL_ACCESS`` — ``--full-access`` (send ``null``). + + Exits 1 if more than one of the three is supplied. + """ + from roboflow.adapters import rfapi + from roboflow.cli._output import output_error + + scope = getattr(args, "scope", None) or None + no_scopes = getattr(args, "no_scopes", False) + full_access = getattr(args, "full_access", False) + + if sum([scope is not None, no_scopes, full_access]) > 1: + output_error( + args, + "Only one of --scope, --no-scopes, or --full-access may be used.", + hint="Use --scope to scope the key, --no-scopes for no abilities, or --full-access for unscoped access.", + exit_code=1, + ) + + if full_access: + return rfapi.FULL_ACCESS + if no_scopes: + return [] + if scope is not None: + return scope + return _UNSET + + +def _resolve_metadata(args): # noqa: ANN001 + """Resolve the ``--metadata`` / ``--clear-metadata`` selection. + + Returns one of: + * ``_UNSET`` — neither flag given (leave unchanged), + * a dict — parsed ``--metadata`` pairs (``{}`` for ``--clear-metadata``). + + Exits 1 if both are supplied. + """ + from roboflow.cli._output import output_error + + metadata = getattr(args, "metadata", None) + clear_metadata = getattr(args, "clear_metadata", False) + + if metadata and clear_metadata: + output_error( + args, + "Only one of --metadata or --clear-metadata may be used.", + hint="Use --metadata KEY=VALUE to set metadata, or --clear-metadata to remove it.", + exit_code=1, + ) + + if clear_metadata: + return {} + parsed = _parse_metadata(args, metadata) + if parsed is not None: + return parsed + return _UNSET + + +def _list_keys(args) -> None: # noqa: ANN001 + from roboflow.adapters import rfapi + from roboflow.cli._output import output, output_api_error + from roboflow.cli._table import format_table + + resolved = _resolve_ws_and_key(args) + if not resolved: + return + ws, api_key = resolved + + try: + result = rfapi.list_api_keys( + api_key, + ws, + include_disabled=getattr(args, "include_disabled", False), + include_folders=getattr(args, "include_folders", False), + ) + except rfapi.RoboflowError as exc: + output_api_error(args, exc) + return + + keys = result.get("apiKeys", []) + rows = [] + for k in keys: + rows.append( + { + "keyId": k.get("keyId", ""), + "name": k.get("name", ""), + "prefix": k.get("prefix", ""), + "default": str(k.get("default", False)), + "protected": str(k.get("protected", False)), + "disabled": str(k.get("disabled", False)), + } + ) + table = format_table( + rows, + columns=["keyId", "name", "prefix", "default", "protected", "disabled"], + headers=["KEY ID", "NAME", "PREFIX", "DEFAULT", "PROTECTED", "DISABLED"], + ) + output(args, result, text=table) + + +def _get_key(args) -> None: # noqa: ANN001 + from roboflow.adapters import rfapi + from roboflow.cli._output import output, output_api_error + + resolved = _resolve_ws_and_key(args) + if not resolved: + return + ws, api_key = resolved + + try: + result = rfapi.get_api_key(api_key, ws, args.key_id) + except rfapi.RoboflowError as exc: + output_api_error(args, exc, not_found_hint=f"No API key with ID '{args.key_id}' in this workspace.") + return + + key_obj = result.get("apiKey", result) + lines = [ + f"Key ID: {key_obj.get('keyId', '')}", + f" Name: {key_obj.get('name', '')}", + f" Prefix: {key_obj.get('prefix', '')}", + f" Default: {key_obj.get('default', False)}", + f" Protected: {key_obj.get('protected', False)}", + f" Disabled: {key_obj.get('disabled', False)}", + ] + if key_obj.get("scopes"): + lines.append(f" Scopes: {', '.join(key_obj['scopes'])}") + if key_obj.get("created_on"): + lines.append(f" Created: {key_obj['created_on']}") + output(args, result, text="\n".join(lines)) + + +def _get_publishable(args) -> None: # noqa: ANN001 + from roboflow.adapters import rfapi + from roboflow.cli._output import output, output_api_error + + resolved = _resolve_ws_and_key(args) + if not resolved: + return + ws, api_key = resolved + + try: + result = rfapi.get_publishable_key(api_key, ws) + except rfapi.RoboflowError as exc: + output_api_error(args, exc) + return + + pub_key = result.get("publishableKey", "") + output(args, result, text=pub_key) + + +def _create_key(args) -> None: # noqa: ANN001 + from roboflow.adapters import rfapi + from roboflow.cli._output import output, output_api_error + + resolved = _resolve_ws_and_key(args) + if not resolved: + return + ws, api_key = resolved + + scopes = _resolve_scopes(args) + folder_ids = getattr(args, "folder", None) or None + metadata = _resolve_metadata(args) + + try: + result = rfapi.create_api_key( + api_key, + ws, + name=args.name, + scopes=None if scopes is _UNSET else scopes, + folder_ids=folder_ids, + custom_metadata=None if metadata is _UNSET else metadata, + protected=getattr(args, "protected", False), + ) + except rfapi.RoboflowError as exc: + status = getattr(exc, "status_code", None) + if status in (403, 404): + output_api_error( + args, + exc, + hint=( + "Creating API keys requires an unscoped key or one granted the " + "'api-key:create' scope (OAuth also needs the create_api_key permission). " + "A workspace-wide key additionally requires access to all folders. Use an " + "unscoped key, grant this key 'api-key:create', or create the key at " + "app.roboflow.com/settings/api." + ), + ) + else: + output_api_error( + args, + exc, + hint="Scopes, folders, and metadata require the Advanced API Keys plan feature.", + ) + return + + secret = result.get("key", "") + key_id = result.get("keyId", "") + + lines = [ + f"Created API key '{args.name}' (keyId: {key_id})", + "", + "WARNING: This is the only time the secret key will be shown.", + "Save it somewhere secure now.", + "", + f" Key: {secret}", + ] + output(args, result, text="\n".join(lines)) + + +def _update_key(args) -> None: # noqa: ANN001 + from roboflow.adapters import rfapi + from roboflow.cli._output import output, output_api_error + + resolved = _resolve_ws_and_key(args) + if not resolved: + return + ws, api_key = resolved + + fields = {} + if getattr(args, "name", None) is not None: + fields["name"] = args.name + scopes = _resolve_scopes(args) + if scopes is not _UNSET: + fields["scopes"] = scopes + metadata = _resolve_metadata(args) + if metadata is not _UNSET: + fields["custom_metadata"] = metadata + + try: + result = rfapi.update_api_key(api_key, ws, args.key_id, **fields) + except rfapi.RoboflowError as exc: + output_api_error( + args, + exc, + not_found_hint=f"No API key with ID '{args.key_id}' in this workspace.", + ) + return + + output(args, result, text=f"Updated API key '{args.key_id}'") + + +def _protect_key(args) -> None: # noqa: ANN001 + from roboflow.adapters import rfapi + from roboflow.cli._output import output, output_api_error + + resolved = _resolve_ws_and_key(args) + if not resolved: + return + ws, api_key = resolved + + try: + result = rfapi.update_api_key(api_key, ws, args.key_id, protected=True) + except rfapi.RoboflowError as exc: + output_api_error( + args, + exc, + hint="The API cannot unprotect a key. To unprotect, visit app.roboflow.com/settings/api.", + not_found_hint=f"No API key with ID '{args.key_id}' in this workspace.", + ) + return + + output(args, result, text=f"Marked API key '{args.key_id}' as protected.") + + +def _disable_key(args) -> None: # noqa: ANN001 + from roboflow.adapters import rfapi + from roboflow.cli._output import output, output_api_error, output_error + + resolved = _resolve_ws_and_key(args) + if not resolved: + return + ws, api_key = resolved + + enable = getattr(args, "enable", False) + disabled_value = not enable + + try: + result = rfapi.update_api_key(api_key, ws, args.key_id, disabled=disabled_value) + except rfapi.RoboflowError as exc: + if getattr(exc, "status_code", None) == 409: + output_error( + args, + str(exc), + hint="Protected keys cannot be disabled. To disable, visit app.roboflow.com/settings/api.", + exit_code=1, + ) + elif getattr(exc, "status_code", None) == 403: + output_error( + args, + str(exc), + hint="Enabling/disabling keys requires the Advanced API Keys plan feature.", + exit_code=1, + ) + else: + output_api_error( + args, + exc, + not_found_hint=f"No API key with ID '{args.key_id}' in this workspace.", + ) + return + + action = "Enabled" if enable else "Disabled" + output(args, result, text=f"{action} API key '{args.key_id}'.") + + +def _revoke_key(args) -> None: # noqa: ANN001 + from roboflow.adapters import rfapi + from roboflow.cli._output import confirm_destructive, output, output_api_error, output_error + + resolved = _resolve_ws_and_key(args) + if not resolved: + return + ws, api_key = resolved + + if not confirm_destructive(args, f"Permanently revoke API key '{args.key_id}'?"): + return + + try: + result = rfapi.revoke_api_key(api_key, ws, args.key_id) + except rfapi.RoboflowError as exc: + if getattr(exc, "status_code", None) == 409: + output_error( + args, + str(exc), + hint=("Protected keys cannot be revoked via the CLI. To revoke, visit app.roboflow.com/settings/api."), + exit_code=1, + ) + else: + output_api_error( + args, + exc, + not_found_hint=f"No API key with ID '{args.key_id}' in this workspace.", + ) + return + + output(args, result, text=f"Revoked API key '{args.key_id}'.") diff --git a/tests/cli/test_api_key.py b/tests/cli/test_api_key.py new file mode 100644 index 00000000..f1489d18 --- /dev/null +++ b/tests/cli/test_api_key.py @@ -0,0 +1,1000 @@ +"""Tests for the api-key CLI handler.""" + +import json +import re +import unittest +from argparse import Namespace +from unittest.mock import patch + +from typer.testing import CliRunner + +from roboflow.cli import app + +runner = CliRunner() + +_ANSI_RE = re.compile(r"\x1b\[[0-9;]*m") + + +def _strip_ansi(text: str) -> str: + return _ANSI_RE.sub("", text) + + +# --------------------------------------------------------------------------- +# Registration / --help tests +# --------------------------------------------------------------------------- + + +class TestApiKeyRegistration(unittest.TestCase): + """Verify the api-key group and all subcommands are registered.""" + + def test_api_key_app_exists(self) -> None: + from roboflow.cli.handlers.api_key import api_key_app + + self.assertIsNotNone(api_key_app) + + def test_group_help(self) -> None: + result = runner.invoke(app, ["api-key", "--help"]) + self.assertEqual(result.exit_code, 0, result.output) + self.assertIn("api-key", _strip_ansi(result.output).lower()) + + def test_list_help(self) -> None: + result = runner.invoke(app, ["api-key", "list", "--help"]) + self.assertEqual(result.exit_code, 0, result.output) + + def test_get_help(self) -> None: + result = runner.invoke(app, ["api-key", "get", "--help"]) + self.assertEqual(result.exit_code, 0, result.output) + + def test_publishable_help(self) -> None: + result = runner.invoke(app, ["api-key", "publishable", "--help"]) + self.assertEqual(result.exit_code, 0, result.output) + + def test_create_help(self) -> None: + result = runner.invoke(app, ["api-key", "create", "--help"]) + self.assertEqual(result.exit_code, 0, result.output) + output = _strip_ansi(result.output).lower() + self.assertIn("scope", output) + self.assertIn("folder", output) + self.assertIn("metadata", output) + self.assertIn("protected", output) + + def test_update_help(self) -> None: + result = runner.invoke(app, ["api-key", "update", "--help"]) + self.assertEqual(result.exit_code, 0, result.output) + output = _strip_ansi(result.output).lower() + self.assertIn("name", output) + self.assertIn("scope", output) + self.assertIn("metadata", output) + + def test_protect_help(self) -> None: + result = runner.invoke(app, ["api-key", "protect", "--help"]) + self.assertEqual(result.exit_code, 0, result.output) + + def test_disable_help(self) -> None: + result = runner.invoke(app, ["api-key", "disable", "--help"]) + self.assertEqual(result.exit_code, 0, result.output) + + def test_revoke_help(self) -> None: + result = runner.invoke(app, ["api-key", "revoke", "--help"]) + self.assertEqual(result.exit_code, 0, result.output) + + +# --------------------------------------------------------------------------- +# list +# --------------------------------------------------------------------------- + + +class TestListApiKeys(unittest.TestCase): + @patch("roboflow.adapters.rfapi.list_api_keys") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_list_text(self, _mock_key, _mock_ws, mock_list) -> None: + mock_list.return_value = { + "apiKeys": [ + { + "keyId": "k1", + "name": "My Key", + "prefix": "abc", + "default": True, + "protected": False, + "disabled": False, + } + ] + } + args = Namespace( + json=False, workspace=None, api_key=None, quiet=False, include_disabled=False, include_folders=False + ) + from roboflow.cli.handlers.api_key import _list_keys + + with patch("builtins.print") as mock_print: + _list_keys(args) + printed = mock_print.call_args[0][0] + self.assertIn("My Key", printed) + self.assertIn("k1", printed) + + @patch("roboflow.adapters.rfapi.list_api_keys") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_list_json_emits_full_envelope(self, _mock_key, _mock_ws, mock_list) -> None: + keys = [ + {"keyId": "k1", "name": "My Key", "prefix": "abc", "default": True, "protected": False, "disabled": False} + ] + mock_list.return_value = {"apiKeys": keys, "publishableKey": "rf_myworkspace"} + args = Namespace( + json=True, workspace=None, api_key=None, quiet=False, include_disabled=False, include_folders=False + ) + from roboflow.cli.handlers.api_key import _list_keys + + with patch("builtins.print") as mock_print: + _list_keys(args) + printed = mock_print.call_args[0][0] + data = json.loads(printed) + # Full server envelope, not a bare array — publishableKey must be preserved. + self.assertIsInstance(data, dict) + self.assertEqual(data["apiKeys"][0]["keyId"], "k1") + self.assertEqual(data["publishableKey"], "rf_myworkspace") + + @patch("roboflow.adapters.rfapi.list_api_keys") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_list_passes_include_disabled(self, _mock_key, _mock_ws, mock_list) -> None: + mock_list.return_value = {"apiKeys": []} + args = Namespace( + json=False, workspace=None, api_key=None, quiet=False, include_disabled=True, include_folders=False + ) + from roboflow.cli.handlers.api_key import _list_keys + + with patch("builtins.print"): + _list_keys(args) + mock_list.assert_called_once_with("fake-key", "test-ws", include_disabled=True, include_folders=False) + + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value=None) + def test_list_no_workspace(self, _mock_ws) -> None: + args = Namespace( + json=True, workspace=None, api_key=None, quiet=False, include_disabled=False, include_folders=False + ) + from roboflow.cli.handlers.api_key import _list_keys + + with self.assertRaises(SystemExit) as ctx: + _list_keys(args) + self.assertEqual(ctx.exception.code, 2) + + +# --------------------------------------------------------------------------- +# get +# --------------------------------------------------------------------------- + + +class TestGetApiKey(unittest.TestCase): + @patch("roboflow.adapters.rfapi.get_api_key") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_get_text(self, _mock_key, _mock_ws, mock_get) -> None: + mock_get.return_value = { + "apiKey": { + "keyId": "k1", + "name": "My Key", + "prefix": "abc", + "default": False, + "protected": False, + "disabled": False, + } + } + args = Namespace(json=False, workspace=None, api_key=None, quiet=False, key_id="k1") + from roboflow.cli.handlers.api_key import _get_key + + with patch("builtins.print") as mock_print: + _get_key(args) + printed = mock_print.call_args[0][0] + self.assertIn("k1", printed) + self.assertIn("My Key", printed) + + @patch("roboflow.adapters.rfapi.get_api_key") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_get_json(self, _mock_key, _mock_ws, mock_get) -> None: + payload = {"apiKey": {"keyId": "k1", "name": "My Key"}} + mock_get.return_value = payload + args = Namespace(json=True, workspace=None, api_key=None, quiet=False, key_id="k1") + from roboflow.cli.handlers.api_key import _get_key + + with patch("builtins.print") as mock_print: + _get_key(args) + printed = mock_print.call_args[0][0] + data = json.loads(printed) + self.assertIn("apiKey", data) + + +# --------------------------------------------------------------------------- +# publishable +# --------------------------------------------------------------------------- + + +class TestPublishableKey(unittest.TestCase): + @patch("roboflow.adapters.rfapi.get_publishable_key") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_publishable_text(self, _mock_key, _mock_ws, mock_pub) -> None: + mock_pub.return_value = {"publishableKey": "rf_myworkspace"} + args = Namespace(json=False, workspace=None, api_key=None, quiet=False) + from roboflow.cli.handlers.api_key import _get_publishable + + with patch("builtins.print") as mock_print: + _get_publishable(args) + printed = mock_print.call_args[0][0] + self.assertIn("rf_myworkspace", printed) + + @patch("roboflow.adapters.rfapi.get_publishable_key") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_publishable_json(self, _mock_key, _mock_ws, mock_pub) -> None: + mock_pub.return_value = {"publishableKey": "rf_myworkspace"} + args = Namespace(json=True, workspace=None, api_key=None, quiet=False) + from roboflow.cli.handlers.api_key import _get_publishable + + with patch("builtins.print") as mock_print: + _get_publishable(args) + printed = mock_print.call_args[0][0] + data = json.loads(printed) + self.assertEqual(data["publishableKey"], "rf_myworkspace") + + +# --------------------------------------------------------------------------- +# create +# --------------------------------------------------------------------------- + + +class TestCreateApiKey(unittest.TestCase): + @patch("roboflow.adapters.rfapi.create_api_key") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_create_prints_secret_in_text_mode(self, _mock_key, _mock_ws, mock_create) -> None: + mock_create.return_value = {"keyId": "k2", "key": "super-secret-value", "name": "New Key"} + args = Namespace( + json=False, + workspace=None, + api_key=None, + quiet=False, + name="New Key", + scope=None, + folder=None, + protected=False, + ) + from roboflow.cli.handlers.api_key import _create_key + + with patch("builtins.print") as mock_print: + _create_key(args) + printed = mock_print.call_args[0][0] + self.assertIn("super-secret-value", printed) + self.assertIn("WARNING", printed) + + @patch("roboflow.adapters.rfapi.create_api_key") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_create_json_contains_key(self, _mock_key, _mock_ws, mock_create) -> None: + mock_create.return_value = {"keyId": "k2", "key": "super-secret-value", "name": "New Key"} + args = Namespace( + json=True, + workspace=None, + api_key=None, + quiet=False, + name="New Key", + scope=None, + folder=None, + protected=False, + ) + from roboflow.cli.handlers.api_key import _create_key + + with patch("builtins.print") as mock_print: + _create_key(args) + printed = mock_print.call_args[0][0] + data = json.loads(printed) + self.assertEqual(data["key"], "super-secret-value") + self.assertEqual(data["keyId"], "k2") + + @patch("roboflow.adapters.rfapi.create_api_key") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_create_with_scopes_and_folders(self, _mock_key, _mock_ws, mock_create) -> None: + mock_create.return_value = {"keyId": "k3", "key": "s3cr3t", "name": "Scoped Key"} + args = Namespace( + json=False, + workspace=None, + api_key=None, + quiet=False, + name="Scoped Key", + scope=["image:read", "image:annotate"], + folder=["f1", "f2"], + metadata=None, + protected=False, + ) + from roboflow.cli.handlers.api_key import _create_key + + with patch("builtins.print"): + _create_key(args) + mock_create.assert_called_once_with( + "fake-key", + "test-ws", + name="Scoped Key", + scopes=["image:read", "image:annotate"], + folder_ids=["f1", "f2"], + custom_metadata=None, + protected=False, + ) + + @patch("roboflow.adapters.rfapi.create_api_key") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_create_with_metadata(self, _mock_key, _mock_ws, mock_create) -> None: + mock_create.return_value = {"keyId": "k5", "key": "m3ta", "name": "Meta Key"} + args = Namespace( + json=False, + workspace=None, + api_key=None, + quiet=False, + name="Meta Key", + scope=None, + folder=None, + metadata=["team=vision", "env=prod"], + protected=False, + ) + from roboflow.cli.handlers.api_key import _create_key + + with patch("builtins.print"): + _create_key(args) + mock_create.assert_called_once_with( + "fake-key", + "test-ws", + name="Meta Key", + scopes=None, + folder_ids=None, + custom_metadata={"team": "vision", "env": "prod"}, + protected=False, + ) + + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_create_invalid_metadata_exits(self, _mock_key, _mock_ws) -> None: + args = Namespace( + json=False, + workspace=None, + api_key=None, + quiet=False, + name="Bad Meta", + scope=None, + folder=None, + metadata=["no-equals-sign"], + protected=False, + ) + from roboflow.cli.handlers.api_key import _create_key + + with self.assertRaises(SystemExit) as ctx: + _create_key(args) + self.assertEqual(ctx.exception.code, 1) + + @patch("roboflow.adapters.rfapi.create_api_key") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_create_no_scopes_passes_none(self, _mock_key, _mock_ws, mock_create) -> None: + mock_create.return_value = {"keyId": "k4", "key": "full-key", "name": "Full Key"} + args = Namespace( + json=False, + workspace=None, + api_key=None, + quiet=False, + name="Full Key", + scope=None, + folder=None, + metadata=None, + protected=False, + ) + from roboflow.cli.handlers.api_key import _create_key + + with patch("builtins.print"): + _create_key(args) + mock_create.assert_called_once_with( + "fake-key", + "test-ws", + name="Full Key", + scopes=None, + folder_ids=None, + custom_metadata=None, + protected=False, + ) + + +class TestCreateApiKeyScopeStates(unittest.TestCase): + """--no-scopes / --full-access three-way scope selection on create.""" + + def _args(self, **overrides): + base = dict( + json=False, + workspace=None, + api_key=None, + quiet=False, + name="K", + scope=None, + no_scopes=False, + full_access=False, + folder=None, + metadata=None, + protected=False, + ) + base.update(overrides) + return Namespace(**base) + + @patch("roboflow.adapters.rfapi.create_api_key") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_create_no_scopes_sends_empty_list(self, _mock_key, _mock_ws, mock_create) -> None: + mock_create.return_value = {"keyId": "k", "key": "s", "name": "K"} + from roboflow.cli.handlers.api_key import _create_key + + with patch("builtins.print"): + _create_key(self._args(no_scopes=True)) + mock_create.assert_called_once_with( + "fake-key", + "test-ws", + name="K", + scopes=[], + folder_ids=None, + custom_metadata=None, + protected=False, + ) + + @patch("roboflow.adapters.rfapi.create_api_key") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_create_full_access_sends_sentinel(self, _mock_key, _mock_ws, mock_create) -> None: + from roboflow.adapters import rfapi + + mock_create.return_value = {"keyId": "k", "key": "s", "name": "K"} + from roboflow.cli.handlers.api_key import _create_key + + with patch("builtins.print"): + _create_key(self._args(full_access=True)) + mock_create.assert_called_once_with( + "fake-key", + "test-ws", + name="K", + scopes=rfapi.FULL_ACCESS, + folder_ids=None, + custom_metadata=None, + protected=False, + ) + + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_create_scope_and_no_scopes_conflict_exits_1(self, _mock_key, _mock_ws) -> None: + from roboflow.cli.handlers.api_key import _create_key + + with patch("sys.stderr"): + with self.assertRaises(SystemExit) as ctx: + _create_key(self._args(scope=["image:read"], no_scopes=True)) + self.assertEqual(ctx.exception.code, 1) + + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_create_no_scopes_and_full_access_conflict_exits_1(self, _mock_key, _mock_ws) -> None: + from roboflow.cli.handlers.api_key import _create_key + + with patch("sys.stderr"): + with self.assertRaises(SystemExit) as ctx: + _create_key(self._args(no_scopes=True, full_access=True)) + self.assertEqual(ctx.exception.code, 1) + + +# --------------------------------------------------------------------------- +# update +# --------------------------------------------------------------------------- + + +class TestUpdateApiKey(unittest.TestCase): + @patch("roboflow.adapters.rfapi.update_api_key") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_update_name(self, _mock_key, _mock_ws, mock_update) -> None: + mock_update.return_value = {"apiKey": {"keyId": "k1", "name": "Renamed"}} + args = Namespace( + json=True, workspace=None, api_key=None, quiet=False, key_id="k1", name="Renamed", scope=None, metadata=None + ) + from roboflow.cli.handlers.api_key import _update_key + + with patch("builtins.print") as mock_print: + _update_key(args) + printed = mock_print.call_args[0][0] + data = json.loads(printed) + self.assertIn("apiKey", data) + mock_update.assert_called_once_with("fake-key", "test-ws", "k1", name="Renamed") + + @patch("roboflow.adapters.rfapi.update_api_key") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_update_scopes_and_metadata(self, _mock_key, _mock_ws, mock_update) -> None: + mock_update.return_value = {"apiKey": {"keyId": "k1"}} + args = Namespace( + json=False, + workspace=None, + api_key=None, + quiet=False, + key_id="k1", + name=None, + scope=["image:read"], + metadata=["team=vision"], + ) + from roboflow.cli.handlers.api_key import _update_key + + with patch("builtins.print"): + _update_key(args) + mock_update.assert_called_once_with( + "fake-key", + "test-ws", + "k1", + scopes=["image:read"], + custom_metadata={"team": "vision"}, + ) + + @patch("roboflow.adapters.rfapi.update_api_key") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_update_missing_key_exits_3(self, _mock_key, _mock_ws, mock_update) -> None: + from roboflow.adapters.rfapi import RoboflowError + + mock_update.side_effect = RoboflowError("not found", status_code=404) + args = Namespace( + json=False, workspace=None, api_key=None, quiet=False, key_id="nope", name="X", scope=None, metadata=None + ) + from roboflow.cli.handlers.api_key import _update_key + + with self.assertRaises(SystemExit) as ctx: + _update_key(args) + self.assertEqual(ctx.exception.code, 3) + + +# --------------------------------------------------------------------------- +# protect +# --------------------------------------------------------------------------- + + +class TestProtectApiKey(unittest.TestCase): + @patch("roboflow.adapters.rfapi.update_api_key") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_protect_calls_patch_with_protected_true(self, _mock_key, _mock_ws, mock_update) -> None: + mock_update.return_value = {"apiKey": {"keyId": "k1", "protected": True}} + args = Namespace(json=False, workspace=None, api_key=None, quiet=False, key_id="k1") + from roboflow.cli.handlers.api_key import _protect_key + + with patch("builtins.print"): + _protect_key(args) + mock_update.assert_called_once_with("fake-key", "test-ws", "k1", protected=True) + + +# --------------------------------------------------------------------------- +# disable +# --------------------------------------------------------------------------- + + +class TestDisableApiKey(unittest.TestCase): + @patch("roboflow.adapters.rfapi.update_api_key") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_disable(self, _mock_key, _mock_ws, mock_update) -> None: + mock_update.return_value = {"apiKey": {"keyId": "k1", "disabled": True}} + args = Namespace(json=False, workspace=None, api_key=None, quiet=False, key_id="k1", enable=False) + from roboflow.cli.handlers.api_key import _disable_key + + with patch("builtins.print") as mock_print: + _disable_key(args) + mock_update.assert_called_once_with("fake-key", "test-ws", "k1", disabled=True) + printed = mock_print.call_args[0][0] + self.assertIn("Disabled", printed) + + @patch("roboflow.adapters.rfapi.update_api_key") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_enable(self, _mock_key, _mock_ws, mock_update) -> None: + mock_update.return_value = {"apiKey": {"keyId": "k1", "disabled": False}} + args = Namespace(json=False, workspace=None, api_key=None, quiet=False, key_id="k1", enable=True) + from roboflow.cli.handlers.api_key import _disable_key + + with patch("builtins.print") as mock_print: + _disable_key(args) + mock_update.assert_called_once_with("fake-key", "test-ws", "k1", disabled=False) + printed = mock_print.call_args[0][0] + self.assertIn("Enabled", printed) + + +# --------------------------------------------------------------------------- +# revoke +# --------------------------------------------------------------------------- + + +class TestRevokeApiKey(unittest.TestCase): + @patch("roboflow.adapters.rfapi.revoke_api_key") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_revoke_with_yes(self, _mock_key, _mock_ws, mock_revoke) -> None: + mock_revoke.return_value = {"status": "revoked", "keyId": "k1"} + args = Namespace(json=False, workspace=None, api_key=None, quiet=False, key_id="k1", yes=True) + from roboflow.cli.handlers.api_key import _revoke_key + + with patch("builtins.print") as mock_print: + _revoke_key(args) + mock_revoke.assert_called_once_with("fake-key", "test-ws", "k1") + printed = mock_print.call_args[0][0] + self.assertIn("Revoked", printed) + + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_revoke_without_yes_no_tty_exits(self, _mock_key, _mock_ws) -> None: + args = Namespace(json=False, workspace=None, api_key=None, quiet=False, key_id="k1", yes=False) + from roboflow.cli.handlers.api_key import _revoke_key + + with patch("sys.stdin.isatty", return_value=False): + with self.assertRaises(SystemExit) as ctx: + _revoke_key(args) + self.assertEqual(ctx.exception.code, 1) + + @patch("roboflow.adapters.rfapi.revoke_api_key") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_revoke_json(self, _mock_key, _mock_ws, mock_revoke) -> None: + mock_revoke.return_value = {"status": "revoked", "keyId": "k1"} + args = Namespace(json=True, workspace=None, api_key=None, quiet=False, key_id="k1", yes=True) + from roboflow.cli.handlers.api_key import _revoke_key + + with patch("builtins.print") as mock_print: + _revoke_key(args) + printed = mock_print.call_args[0][0] + data = json.loads(printed) + self.assertEqual(data["status"], "revoked") + + +# --------------------------------------------------------------------------- +# status-code propagation / exit-code consistency +# --------------------------------------------------------------------------- + + +class TestStatusCodePropagation(unittest.TestCase): + """The rfapi wrappers must attach response.status_code so handlers can branch.""" + + def _make_response(self, status_code, text="error body"): + from unittest.mock import MagicMock + + resp = MagicMock() + resp.ok = status_code < 400 + resp.status_code = status_code + resp.text = text + return resp + + @patch("roboflow.adapters.rfapi.requests.delete") + def test_revoke_attaches_status_code(self, mock_delete) -> None: + from roboflow.adapters.rfapi import RoboflowError, revoke_api_key + + mock_delete.return_value = self._make_response(409, "protected") + with self.assertRaises(RoboflowError) as ctx: + revoke_api_key("fake-key", "test-ws", "k1") + self.assertEqual(ctx.exception.status_code, 409) + + @patch("roboflow.adapters.rfapi.requests.patch") + def test_update_attaches_status_code(self, mock_patch) -> None: + from roboflow.adapters.rfapi import RoboflowError, update_api_key + + mock_patch.return_value = self._make_response(403, "forbidden") + with self.assertRaises(RoboflowError) as ctx: + update_api_key("fake-key", "test-ws", "k1", protected=False) + self.assertEqual(ctx.exception.status_code, 403) + + @patch("roboflow.adapters.rfapi.requests.get") + def test_get_attaches_status_code(self, mock_get) -> None: + from roboflow.adapters.rfapi import RoboflowError, get_api_key + + mock_get.return_value = self._make_response(404, "missing") + with self.assertRaises(RoboflowError) as ctx: + get_api_key("fake-key", "test-ws", "k1") + self.assertEqual(ctx.exception.status_code, 404) + + +class TestErrorBranches(unittest.TestCase): + """End-to-end: status_code now flows through to the right exit code / hint.""" + + @patch("roboflow.adapters.rfapi.revoke_api_key") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_revoke_protected_409_hint(self, _mock_key, _mock_ws, mock_revoke) -> None: + from roboflow.adapters.rfapi import RoboflowError + + mock_revoke.side_effect = RoboflowError("Key is protected", status_code=409) + args = Namespace(json=False, workspace=None, api_key=None, quiet=False, key_id="k1", yes=True) + from roboflow.cli.handlers.api_key import _revoke_key + + with patch("sys.stderr"): + with self.assertRaises(SystemExit) as ctx: + _revoke_key(args) + # 409 is a non-auth/non-notfound error → exit 1, with the protected-key hint. + self.assertEqual(ctx.exception.code, 1) + + @patch("roboflow.adapters.rfapi.revoke_api_key") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_revoke_missing_exits_3(self, _mock_key, _mock_ws, mock_revoke) -> None: + from roboflow.adapters.rfapi import RoboflowError + + mock_revoke.side_effect = RoboflowError("not found", status_code=404) + args = Namespace(json=False, workspace=None, api_key=None, quiet=False, key_id="missing", yes=True) + from roboflow.cli.handlers.api_key import _revoke_key + + with patch("sys.stderr"): + with self.assertRaises(SystemExit) as ctx: + _revoke_key(args) + # Consistent with `get ` → exit 3, not 1. + self.assertEqual(ctx.exception.code, 3) + + @patch("roboflow.adapters.rfapi.get_api_key") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_get_missing_exits_3(self, _mock_key, _mock_ws, mock_get) -> None: + from roboflow.adapters.rfapi import RoboflowError + + mock_get.side_effect = RoboflowError("not found", status_code=404) + args = Namespace(json=False, workspace=None, api_key=None, quiet=False, key_id="missing") + from roboflow.cli.handlers.api_key import _get_key + + with patch("sys.stderr"): + with self.assertRaises(SystemExit) as ctx: + _get_key(args) + self.assertEqual(ctx.exception.code, 3) + + @patch("roboflow.adapters.rfapi.get_api_key") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_get_auth_error_exits_2(self, _mock_key, _mock_ws, mock_get) -> None: + from roboflow.adapters.rfapi import RoboflowError + + mock_get.side_effect = RoboflowError("unauthorized", status_code=401) + args = Namespace(json=False, workspace=None, api_key=None, quiet=False, key_id="k1") + from roboflow.cli.handlers.api_key import _get_key + + with patch("sys.stderr"): + with self.assertRaises(SystemExit) as ctx: + _get_key(args) + self.assertEqual(ctx.exception.code, 2) + + @patch("roboflow.adapters.rfapi.update_api_key") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_disable_protected_409_hint(self, _mock_key, _mock_ws, mock_update) -> None: + from roboflow.adapters.rfapi import RoboflowError + + mock_update.side_effect = RoboflowError("Key is protected", status_code=409) + args = Namespace(json=True, workspace=None, api_key=None, quiet=False, key_id="k1", enable=False) + from roboflow.cli.handlers.api_key import _disable_key + + printed = {} + + def _capture(payload, *a, **k): + printed["json"] = payload + + with patch("builtins.print", side_effect=_capture): + with self.assertRaises(SystemExit) as ctx: + _disable_key(args) + self.assertEqual(ctx.exception.code, 1) + data = json.loads(printed["json"]) + self.assertIn("settings/api", data["error"]["hint"]) + self.assertNotIn("settings/api-keys", data["error"]["hint"]) + + @patch("roboflow.adapters.rfapi.update_api_key") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_disable_plan_feature_403_hint(self, _mock_key, _mock_ws, mock_update) -> None: + from roboflow.adapters.rfapi import RoboflowError + + mock_update.side_effect = RoboflowError("forbidden", status_code=403) + args = Namespace(json=True, workspace=None, api_key=None, quiet=False, key_id="k1", enable=False) + from roboflow.cli.handlers.api_key import _disable_key + + printed = {} + + def _capture(payload, *a, **k): + printed["json"] = payload + + with patch("builtins.print", side_effect=_capture): + with self.assertRaises(SystemExit) as ctx: + _disable_key(args) + # 403 → plan-feature hint, same as create, exit 1. + self.assertEqual(ctx.exception.code, 1) + data = json.loads(printed["json"]) + self.assertIn("Advanced API Keys", data["error"]["hint"]) + + +class TestUpdateApiKeyScopeAndMetadataStates(unittest.TestCase): + """--no-scopes / --full-access / --clear-metadata explicit states on update.""" + + def _args(self, **overrides): + base = dict( + json=False, + workspace=None, + api_key=None, + quiet=False, + key_id="k1", + name=None, + scope=None, + no_scopes=False, + full_access=False, + metadata=None, + clear_metadata=False, + ) + base.update(overrides) + return Namespace(**base) + + @patch("roboflow.adapters.rfapi.update_api_key") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_update_no_scopes_sends_empty_list(self, _mock_key, _mock_ws, mock_update) -> None: + mock_update.return_value = {"apiKey": {"keyId": "k1"}} + from roboflow.cli.handlers.api_key import _update_key + + with patch("builtins.print"): + _update_key(self._args(no_scopes=True)) + mock_update.assert_called_once_with("fake-key", "test-ws", "k1", scopes=[]) + + @patch("roboflow.adapters.rfapi.update_api_key") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_update_full_access_sends_sentinel(self, _mock_key, _mock_ws, mock_update) -> None: + from roboflow.adapters import rfapi + + mock_update.return_value = {"apiKey": {"keyId": "k1"}} + from roboflow.cli.handlers.api_key import _update_key + + with patch("builtins.print"): + _update_key(self._args(full_access=True)) + mock_update.assert_called_once_with("fake-key", "test-ws", "k1", scopes=rfapi.FULL_ACCESS) + + @patch("roboflow.adapters.rfapi.update_api_key") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_update_clear_metadata_sends_empty_dict(self, _mock_key, _mock_ws, mock_update) -> None: + mock_update.return_value = {"apiKey": {"keyId": "k1"}} + from roboflow.cli.handlers.api_key import _update_key + + with patch("builtins.print"): + _update_key(self._args(clear_metadata=True)) + mock_update.assert_called_once_with("fake-key", "test-ws", "k1", custom_metadata={}) + + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_update_scope_and_full_access_conflict_exits_1(self, _mock_key, _mock_ws) -> None: + from roboflow.cli.handlers.api_key import _update_key + + with patch("sys.stderr"): + with self.assertRaises(SystemExit) as ctx: + _update_key(self._args(scope=["image:read"], full_access=True)) + self.assertEqual(ctx.exception.code, 1) + + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_update_metadata_and_clear_metadata_conflict_exits_1(self, _mock_key, _mock_ws) -> None: + from roboflow.cli.handlers.api_key import _update_key + + with patch("sys.stderr"): + with self.assertRaises(SystemExit) as ctx: + _update_key(self._args(metadata=["a=b"], clear_metadata=True)) + self.assertEqual(ctx.exception.code, 1) + + +class TestApiKeyBodySerialization(unittest.TestCase): + """The rfapi wrappers must serialize the three scope/metadata states correctly.""" + + def _ok_response(self, payload=None): + from unittest.mock import MagicMock + + resp = MagicMock() + resp.ok = True + resp.status_code = 200 + resp.json.return_value = payload or {} + return resp + + @patch("roboflow.adapters.rfapi.requests.post") + def test_create_full_access_serializes_null_scopes(self, mock_post) -> None: + from roboflow.adapters.rfapi import FULL_ACCESS, create_api_key + + mock_post.return_value = self._ok_response() + create_api_key("fake-key", "test-ws", name="K", scopes=FULL_ACCESS) + body = mock_post.call_args.kwargs["json"] + self.assertIn("scopes", body) + self.assertIsNone(body["scopes"]) + + @patch("roboflow.adapters.rfapi.requests.post") + def test_create_empty_scopes_serializes_empty_list(self, mock_post) -> None: + from roboflow.adapters.rfapi import create_api_key + + mock_post.return_value = self._ok_response() + create_api_key("fake-key", "test-ws", name="K", scopes=[]) + body = mock_post.call_args.kwargs["json"] + self.assertEqual(body["scopes"], []) + + @patch("roboflow.adapters.rfapi.requests.post") + def test_create_omitted_scopes_absent_from_body(self, mock_post) -> None: + from roboflow.adapters.rfapi import create_api_key + + mock_post.return_value = self._ok_response() + create_api_key("fake-key", "test-ws", name="K", scopes=None) + body = mock_post.call_args.kwargs["json"] + self.assertNotIn("scopes", body) + + @patch("roboflow.adapters.rfapi.requests.patch") + def test_update_full_access_serializes_null_scopes(self, mock_patch) -> None: + from roboflow.adapters.rfapi import FULL_ACCESS, update_api_key + + mock_patch.return_value = self._ok_response() + update_api_key("fake-key", "test-ws", "k1", scopes=FULL_ACCESS) + body = mock_patch.call_args.kwargs["json"] + self.assertIn("scopes", body) + self.assertIsNone(body["scopes"]) + + @patch("roboflow.adapters.rfapi.requests.patch") + def test_update_empty_scopes_serializes_empty_list(self, mock_patch) -> None: + from roboflow.adapters.rfapi import update_api_key + + mock_patch.return_value = self._ok_response() + update_api_key("fake-key", "test-ws", "k1", scopes=[]) + body = mock_patch.call_args.kwargs["json"] + self.assertEqual(body["scopes"], []) + + @patch("roboflow.adapters.rfapi.requests.patch") + def test_update_clear_metadata_serializes_empty_dict(self, mock_patch) -> None: + from roboflow.adapters.rfapi import update_api_key + + mock_patch.return_value = self._ok_response() + update_api_key("fake-key", "test-ws", "k1", custom_metadata={}) + body = mock_patch.call_args.kwargs["json"] + # The Pythonic `custom_metadata` kwarg serializes as the canonical camelCase + # `customMetadata` wire field (consistent with `folderIds`). + self.assertIn("customMetadata", body) + self.assertEqual(body["customMetadata"], {}) + self.assertNotIn("custom_metadata", body) + + @patch("roboflow.adapters.rfapi.requests.post") + def test_create_metadata_serializes_camelcase(self, mock_post) -> None: + from roboflow.adapters.rfapi import create_api_key + + mock_post.return_value = self._ok_response() + create_api_key("fake-key", "test-ws", name="K", custom_metadata={"env": "prod"}) + body = mock_post.call_args.kwargs["json"] + self.assertEqual(body["customMetadata"], {"env": "prod"}) + self.assertNotIn("custom_metadata", body) + + @patch("roboflow.adapters.rfapi.requests.patch") + def test_update_none_scopes_absent_from_body(self, mock_patch) -> None: + from roboflow.adapters.rfapi import update_api_key + + mock_patch.return_value = self._ok_response() + update_api_key("fake-key", "test-ws", "k1", name="X", scopes=None) + body = mock_patch.call_args.kwargs["json"] + self.assertNotIn("scopes", body) + self.assertEqual(body["name"], "X") + + +class TestApiKeyNewFlagsHelp(unittest.TestCase): + """The new flags must be discoverable in --help.""" + + def test_create_help_lists_new_flags(self) -> None: + result = runner.invoke(app, ["api-key", "create", "--help"]) + self.assertEqual(result.exit_code, 0, result.output) + output = _strip_ansi(result.output).lower() + self.assertIn("--no-scopes", output) + self.assertIn("--full-access", output) + + def test_update_help_lists_new_flags(self) -> None: + result = runner.invoke(app, ["api-key", "update", "--help"]) + self.assertEqual(result.exit_code, 0, result.output) + output = _strip_ansi(result.output).lower() + self.assertIn("--no-scopes", output) + self.assertIn("--full-access", output) + self.assertIn("--clear-metadata", output) + + +if __name__ == "__main__": + unittest.main()