From 72eab0cb0ae7bb4bda0d80bc2ed431b5f574a403 Mon Sep 17 00:00:00 2001 From: Murad Akhundov Date: Sat, 20 Dec 2025 01:29:14 +0100 Subject: [PATCH 1/3] feat: Add `typeid explain` command for human/machine-readable TypeID introspection and optional schema support Changes: - Introduced a new CLI command: `typeid explain`, enabling human-readable and machine-readable (JSON) explanations for TypeID values. - Enhanced documentation in README.md with detailed usage, configuration, and design principles for the new explain feature. - Added support for schema-based explanations via JSON (default) and YAML (optional extra dependency). - Described schema discovery rules, including support for both local and user config directory schemas. - Updated `pyproject.toml` and `poetry.lock` to define a new optional extra dependency group: `yaml` (using PyYAML). - Registered the `yaml` extra in [tool.poetry.extras], enabling users to install YAML support as needed. - Added explanation formatting utilities and proper schema loading logic in CLI, robustly handling both errors and absence of schema. - Provided new examples in documentation for using explain with and without schema, and illustrating output formats. - Ensured backward compatibility by preserving all existing APIs and CLI commands. --- README.md | 168 +++++++++++++++++++ poetry.lock | 3 +- pyproject.toml | 1 + tests/explain/__init__.py | 0 tests/explain/test_cli.py | 93 +++++++++++ tests/explain/test_cli_yaml.py | 64 +++++++ tests/explain/test_discovery.py | 70 ++++++++ tests/explain/test_engine.py | 179 ++++++++++++++++++++ tests/explain/test_formatters.py | 23 +++ tests/explain/test_registry.py | 106 ++++++++++++ tests/explain/test_registry_yaml.py | 175 ++++++++++++++++++++ typeid/cli.py | 95 ++++++++++- typeid/explain/__init__.py | 81 +++++++++ typeid/explain/discovery.py | 128 ++++++++++++++ typeid/explain/engine.py | 247 ++++++++++++++++++++++++++++ typeid/explain/formatters.py | 228 +++++++++++++++++++++++++ typeid/explain/model.py | 152 +++++++++++++++++ typeid/explain/registry.py | 217 ++++++++++++++++++++++++ 18 files changed, 2027 insertions(+), 3 deletions(-) create mode 100644 tests/explain/__init__.py create mode 100644 tests/explain/test_cli.py create mode 100644 tests/explain/test_cli_yaml.py create mode 100644 tests/explain/test_discovery.py create mode 100644 tests/explain/test_engine.py create mode 100644 tests/explain/test_formatters.py create mode 100644 tests/explain/test_registry.py create mode 100644 tests/explain/test_registry_yaml.py create mode 100644 typeid/explain/__init__.py create mode 100644 typeid/explain/discovery.py create mode 100644 typeid/explain/engine.py create mode 100644 typeid/explain/formatters.py create mode 100644 typeid/explain/model.py create mode 100644 typeid/explain/registry.py diff --git a/README.md b/README.md index 0798a0c..b012236 100644 --- a/README.md +++ b/README.md @@ -36,6 +36,19 @@ This particular implementation provides an pip package that can be used by any P poetry add typeid-python ``` +### Optional dependencies + +TypeID supports schema-based ID explanations using JSON (always available) and +YAML (optional). + +To enable YAML support: + +```console +pip install typeid-python[yaml] +``` + +If the extra is not installed, JSON schemas will still work. + ## Usage ### Basic @@ -109,3 +122,158 @@ This particular implementation provides an pip package that can be used by any P $ typeid encode 0188bac7-4afa-78aa-bc3b-bd1eef28d881 --prefix prefix prefix_01h2xcejqtf2nbrexx3vqjhp41 ``` + +## ✨ NEW: `typeid explain` — “What is this ID?” + +TypeID can now **explain a TypeID** in a human-readable way. + +This is useful when: + +* debugging logs +* inspecting database records +* reviewing production incidents +* understanding IDs shared via Slack, tickets, or dashboards + +### Basic usage (no schema required) + +```console +$ typeid explain user_01h45ytscbebyvny4gc8cr8ma2 +``` + +Example output: + +```yaml +id: user_01h45ytscbebyvny4gc8cr8ma2 +valid: true + +parsed: + prefix: user + suffix: 01h45ytscbebyvny4gc8cr8ma2 + uuid: 01890bf0-846f-7762-8605-5a3abb40e0e5 + created_at: 2025-03-12T10:41:23Z + sortable: true + +schema: + found: false +``` + +Even without configuration, `typeid explain` can: + +* validate the ID +* extract the UUID +* derive creation time (UUIDv7) +* determine sortability + +## Schema-based explanations + +To make explanations richer, you can define a **TypeID schema** describing what each +prefix represents. + +### Example schema (`typeid.schema.json`) + +```json +{ + "schema_version": 1, + "types": { + "user": { + "name": "User", + "description": "End-user account", + "owner_team": "identity-platform", + "pii": true, + "retention": "7y", + "links": { + "logs": "https://logs.company/search?q={id}", + "trace": "https://traces.company/?id={id}" + } + } + } +} +``` + +### Explain using schema + +```console +$ typeid explain user_01h45ytscbebyvny4gc8cr8ma2 +``` + +Output (excerpt): + +```yaml +schema: + found: true + name: User + owner_team: identity-platform + pii: true + retention: 7y + +links: + logs: https://logs.company/search?q=user_01h45ytscbebyvny4gc8cr8ma2 +``` + +## Schema discovery rules + +If `--schema` is not provided, TypeID looks for a schema in the following order: + +1. Environment variable: + + ```console + TYPEID_SCHEMA=/path/to/schema.json + ``` +2. Current directory: + + * `typeid.schema.json` + * `typeid.schema.yaml` +3. User config directory: + + * `~/.config/typeid/schema.json` + * `~/.config/typeid/schema.yaml` + +If no schema is found, the command still works with derived information only. + +## YAML schemas (optional) + +YAML schemas are supported if the optional dependency is installed: + +```console +pip install typeid-python[yaml] +``` + +Example (`typeid.schema.yaml`): + +```yaml +schema_version: 1 +types: + user: + name: User + owner_team: identity-platform + links: + logs: "https://logs.company/search?q={id}" +``` + +## JSON output (machine-readable) + +```console +$ typeid explain user_01h45ytscbebyvny4gc8cr8ma2 --json +``` + +Useful for: + +* scripts +* CI pipelines +* IDE integrations + +## Design principles + +* **Non-breaking**: existing APIs and CLI commands remain unchanged +* **Schema-optional**: works fully offline +* **Read-only**: no side effects or external mutations +* **Declarative**: meaning is defined by users, not inferred by the tool + +You can think of `typeid explain` as: + +> **OpenAPI — but for identifiers instead of HTTP endpoints** + +## License + +MIT + diff --git a/poetry.lock b/poetry.lock index f060c87..f0ce401 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1262,8 +1262,9 @@ type = ["pytest-mypy"] [extras] cli = [] +yaml = [] [metadata] lock-version = "2.1" python-versions = ">=3.10,<4" -content-hash = "f66215f5241552eee740c17308e9127cf905fb4d19f8da749dd7dce47a2745da" +content-hash = "74841c80ed43987ac54e9949a0732f02edd2ee2094050e8a49ab69feea00bac0" diff --git a/pyproject.toml b/pyproject.toml index 6935817..d26baee 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -60,6 +60,7 @@ twine = "^6.2.0" [tool.poetry.extras] cli = ["click"] +yaml = ["PyYAML"] [tool.poetry.scripts] typeid = "typeid.cli:cli" diff --git a/tests/explain/__init__.py b/tests/explain/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/explain/test_cli.py b/tests/explain/test_cli.py new file mode 100644 index 0000000..3f25304 --- /dev/null +++ b/tests/explain/test_cli.py @@ -0,0 +1,93 @@ +import json +from pathlib import Path + +from click.testing import CliRunner + +from typeid import TypeID +from typeid.cli import cli + + +def _make_valid_id(prefix: str = "usr") -> str: + return str(TypeID(prefix=prefix)) + + +def test_cli_explain_pretty_offline_no_schema(): + runner = CliRunner() + tid = _make_valid_id("usr") + + result = runner.invoke(cli, ["explain", tid, "--no-schema"]) + assert result.exit_code == 0 + out = result.output + + assert f"id: {tid}" in out + assert "valid: true" in out + assert "schema:" in out + assert "found: false" in out or "found: false" in out.lower() + + +def test_cli_explain_json_offline(): + runner = CliRunner() + tid = _make_valid_id("usr") + + result = runner.invoke(cli, ["explain", tid, "--no-schema", "--json"]) + assert result.exit_code == 0 + + payload = json.loads(result.output) + assert payload["id"] == tid + assert payload["valid"] is True + assert payload["schema"] is None + assert payload["parsed"]["prefix"] == "usr" + assert payload["parsed"]["uuid"] is not None + + +def test_cli_explain_with_schema_file(tmp_path: Path): + runner = CliRunner() + tid = _make_valid_id("usr") + + schema = { + "schema_version": 1, + "types": { + "usr": {"name": "User", "owner_team": "identity-platform", "links": {"logs": "https://logs?q={id}"}} + }, + } + p = tmp_path / "typeid.schema.json" + p.write_text(json.dumps(schema), encoding="utf-8") + + result = runner.invoke(cli, ["explain", tid, "--schema", str(p)]) + assert result.exit_code == 0 + out = result.output + + assert "schema:" in out + assert "found: true" in out + assert "name: User" in out + assert "owner_team: identity-platform" in out + assert "links:" in out + assert "logs:" in out + + +def test_cli_explain_schema_load_failure_still_works(tmp_path: Path): + runner = CliRunner() + tid = _make_valid_id("usr") + + p = tmp_path / "typeid.schema.json" + p.write_text("{not json", encoding="utf-8") + + result = runner.invoke(cli, ["explain", tid, "--schema", str(p)]) + assert result.exit_code == 0 + out = result.output + + # Should still explain derived facts and surface warning + assert f"id: {tid}" in out + assert "valid: true" in out + assert "warnings:" in out.lower() + + +def test_cli_explain_invalid_id_exit_code_zero_but_valid_false(): + # We keep exit_code 0 for "explain" so it can be used in scripts without + # failing pipelines; the content will indicate validity. + runner = CliRunner() + + result = runner.invoke(cli, ["explain", "not_a_typeid", "--no-schema"]) + assert result.exit_code == 0 + assert "valid: false" in result.output.lower() + assert "errors:" in result.output.lower() diff --git a/tests/explain/test_cli_yaml.py b/tests/explain/test_cli_yaml.py new file mode 100644 index 0000000..178b485 --- /dev/null +++ b/tests/explain/test_cli_yaml.py @@ -0,0 +1,64 @@ +import json +from pathlib import Path + +import pytest +from click.testing import CliRunner + +from typeid import TypeID +from typeid.cli import cli + + +yaml = pytest.importorskip("yaml") # skip if PyYAML not installed + + +def test_cli_explain_with_yaml_schema(tmp_path: Path): + runner = CliRunner() + tid = str(TypeID(prefix="usr")) + + p = tmp_path / "typeid.schema.yaml" + p.write_text( + """ +schema_version: 1 +types: + usr: + name: User + owner_team: identity-platform + links: + logs: "https://logs?q={id}" +""", + encoding="utf-8", + ) + + result = runner.invoke(cli, ["explain", tid, "--schema", str(p)]) + assert result.exit_code == 0 + out = result.output + + assert "schema:" in out + assert "found: true" in out + assert "name: User" in out + assert "owner_team: identity-platform" in out + assert "logs:" in out + + +def test_cli_explain_with_yaml_schema_json_output(tmp_path: Path): + runner = CliRunner() + tid = str(TypeID(prefix="usr")) + + p = tmp_path / "typeid.schema.yaml" + p.write_text( + """ +schema_version: 1 +types: + usr: + name: User +""", + encoding="utf-8", + ) + + result = runner.invoke(cli, ["explain", tid, "--schema", str(p), "--json"]) + assert result.exit_code == 0 + + payload = json.loads(result.output) + assert payload["valid"] is True + assert payload["schema"] is not None + assert payload["schema"]["name"] == "User" diff --git a/tests/explain/test_discovery.py b/tests/explain/test_discovery.py new file mode 100644 index 0000000..2708039 --- /dev/null +++ b/tests/explain/test_discovery.py @@ -0,0 +1,70 @@ +from pathlib import Path + +import pytest + +from typeid.explain.discovery import discover_schema_path + + +def test_discovery_env_var_wins(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + schema = tmp_path / "schema.json" + schema.write_text('{"schema_version": 1, "types": {}}', encoding="utf-8") + + monkeypatch.setenv("TYPEID_SCHEMA", str(schema)) + + # even if cwd has other candidates, env must win + cwd = tmp_path / "cwd" + cwd.mkdir() + (cwd / "typeid.schema.json").write_text('{"schema_version": 1, "types": {}}', encoding="utf-8") + + res = discover_schema_path(cwd=cwd) + assert res.path == schema + assert res.source.startswith("env:") + + +def test_discovery_cwd_candidate(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + monkeypatch.delenv("TYPEID_SCHEMA", raising=False) + + cwd = tmp_path / "cwd" + cwd.mkdir() + schema = cwd / "typeid.schema.json" + schema.write_text('{"schema_version": 1, "types": {}}', encoding="utf-8") + + res = discover_schema_path(cwd=cwd) + assert res.path == schema + assert res.source == "cwd" + + +def test_discovery_user_config_when_no_env_and_no_cwd(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + monkeypatch.delenv("TYPEID_SCHEMA", raising=False) + + # Force XDG_CONFIG_HOME to a temp dir + xdg = tmp_path / "xdg" + xdg.mkdir() + monkeypatch.setenv("XDG_CONFIG_HOME", str(xdg)) + monkeypatch.delenv("APPDATA", raising=False) + + # Put schema in user config location: /typeid/schema.json + base = xdg / "typeid" + base.mkdir() + schema = base / "schema.json" + schema.write_text('{"schema_version": 1, "types": {}}', encoding="utf-8") + + cwd = tmp_path / "cwd" + cwd.mkdir() + + res = discover_schema_path(cwd=cwd) + assert res.path == schema + assert res.source == "user_config" + + +def test_discovery_none_when_missing_everywhere(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + monkeypatch.delenv("TYPEID_SCHEMA", raising=False) + monkeypatch.setenv("XDG_CONFIG_HOME", str(tmp_path / "xdg_missing")) + monkeypatch.delenv("APPDATA", raising=False) + + cwd = tmp_path / "cwd" + cwd.mkdir() + + res = discover_schema_path(cwd=cwd) + assert res.path is None + assert res.source in {"none", "env:TYPEID_SCHEMA (not found)"} diff --git a/tests/explain/test_engine.py b/tests/explain/test_engine.py new file mode 100644 index 0000000..a79cdac --- /dev/null +++ b/tests/explain/test_engine.py @@ -0,0 +1,179 @@ +import json +from datetime import datetime + +from typeid import TypeID +from typeid.explain.engine import explain +from typeid.explain.model import Provenance, TypeSchema + + +def _make_valid_id(prefix: str = "usr") -> str: + return str(TypeID(prefix=prefix)) + + +def test_explain_valid_id_without_schema_has_derived_fields(): + tid = _make_valid_id("usr") + + exp = explain(tid, schema_lookup=None, enable_schema=True, enable_links=True) + + assert exp.id == tid + assert exp.valid is True + assert exp.parsed.valid is True + assert exp.parsed.prefix == "usr" + assert exp.parsed.suffix is not None + assert exp.parsed.uuid is not None + + # UUIDv7 timestamp should be derivable + assert exp.parsed.created_at is not None + assert isinstance(exp.parsed.created_at, datetime) + assert exp.parsed.created_at.tzinfo is not None + + # Provenance should mark derived fields + assert exp.provenance["prefix"] == Provenance.DERIVED_FROM_ID + assert exp.provenance["suffix"] == Provenance.DERIVED_FROM_ID + assert exp.provenance["uuid"] == Provenance.DERIVED_FROM_ID + assert exp.provenance["created_at"] == Provenance.DERIVED_FROM_ID + + +def test_explain_invalid_id_returns_valid_false_and_errors(): + exp = explain("not_a_typeid", schema_lookup=None) + + assert exp.valid is False + assert exp.parsed.valid is False + assert exp.errors, "Should include parse/validation errors" + assert any(e.code in {"invalid_typeid", "parse_error"} for e in exp.errors) + + +def test_explain_best_effort_split_on_invalid_but_contains_underscore(): + # invalid suffix, but prefix/suffix should still be split best-effort + exp = explain("usr_badSuffix", schema_lookup=None) + + assert exp.valid is False + assert exp.parsed.prefix == "usr" + assert exp.parsed.suffix == "badSuffix" + + +def test_explain_schema_lookup_applies_schema_fields_and_provenance(): + tid = _make_valid_id("ord") + + schema = TypeSchema( + prefix="ord", + raw={ + "name": "Order", + "owner_team": "commerce-platform", + "pii": False, + "retention": "7y", + }, + name="Order", + owner_team="commerce-platform", + pii=False, + retention="7y", + links={}, + ) + + def lookup(prefix: str): + assert prefix == "ord" + return schema + + exp = explain(tid, schema_lookup=lookup, enable_schema=True, enable_links=False) + + assert exp.valid is True + assert exp.schema is not None + assert exp.schema.name == "Order" + assert exp.schema.owner_team == "commerce-platform" + assert exp.provenance["name"] == Provenance.SCHEMA + assert exp.provenance["owner_team"] == Provenance.SCHEMA + assert exp.provenance["pii"] == Provenance.SCHEMA + assert exp.provenance["retention"] == Provenance.SCHEMA + + +def test_explain_schema_lookup_exception_does_not_crash_and_adds_warning(): + tid = _make_valid_id("usr") + + def lookup(_prefix: str): + raise RuntimeError("boom") + + exp = explain(tid, schema_lookup=lookup, enable_schema=True) + + assert exp.valid is True + assert exp.schema is None + assert any("Schema lookup failed" in w for w in exp.warnings) + + +def test_explain_disable_schema_skips_lookup(): + tid = _make_valid_id("usr") + + called = {"n": 0} + + def lookup(_prefix: str): + called["n"] += 1 + return None + + exp = explain(tid, schema_lookup=lookup, enable_schema=False) + + assert exp.valid is True + assert exp.schema is None + assert called["n"] == 0 + + +def test_explain_link_rendering_basic_placeholders(): + tid = _make_valid_id("usr") + + schema = TypeSchema( + prefix="usr", + raw={}, + name="User", + links={ + "logs": "https://logs.local/search?q={id}", + "trace": "https://trace.local/?id={id}&uuid={uuid}", + }, + ) + + exp = explain(tid, schema_lookup=lambda p: schema if p == "usr" else None, enable_schema=True, enable_links=True) + + assert "logs" in exp.links + assert tid in exp.links["logs"] + assert "trace" in exp.links + assert tid in exp.links["trace"] + assert (exp.parsed.uuid or "") in exp.links["trace"] + + assert exp.provenance["links.logs"] == Provenance.SCHEMA + assert exp.provenance["links.trace"] == Provenance.SCHEMA + + +def test_explain_link_rendering_unknown_placeholder_is_left_intact(): + tid = _make_valid_id("usr") + + schema = TypeSchema( + prefix="usr", + raw={}, + links={"x": "http://x/{does_not_exist}/{id}"}, + ) + + exp = explain(tid, schema_lookup=lambda p: schema if p == "usr" else None) + + assert exp.links["x"].startswith("http://x/") + assert "{does_not_exist}" in exp.links["x"] + assert tid in exp.links["x"] + + +def test_explain_link_rendering_non_string_template_is_skipped_with_warning(): + tid = _make_valid_id("usr") + + schema = TypeSchema( + prefix="usr", + raw={}, + links={"bad": 123}, # type: ignore + ) + + exp = explain(tid, schema_lookup=lambda p: schema if p == "usr" else None) + + assert "bad" not in exp.links + assert any("not a string" in w.lower() for w in exp.warnings) + + +def test_to_dict_is_json_serializable(): + tid = _make_valid_id("usr") + exp = explain(tid) + + payload = exp.to_dict() + json.dumps(payload) # should not raise diff --git a/tests/explain/test_formatters.py b/tests/explain/test_formatters.py new file mode 100644 index 0000000..f175d6d --- /dev/null +++ b/tests/explain/test_formatters.py @@ -0,0 +1,23 @@ +import json + +from typeid import TypeID +from typeid.explain.engine import explain +from typeid.explain.formatters import format_explanation_json, format_explanation_pretty + + +def test_pretty_formatter_contains_sections(): + tid = str(TypeID(prefix="usr")) + exp = explain(tid) + + out = format_explanation_pretty(exp) + assert "parsed:" in out + assert "schema:" in out + assert "links:" in out + + +def test_json_formatter_is_valid_json(): + tid = str(TypeID(prefix="usr")) + exp = explain(tid) + + out = format_explanation_json(exp) + json.loads(out) # should not raise diff --git a/tests/explain/test_registry.py b/tests/explain/test_registry.py new file mode 100644 index 0000000..6ee94db --- /dev/null +++ b/tests/explain/test_registry.py @@ -0,0 +1,106 @@ +import json +from pathlib import Path + +from typeid.explain.registry import load_registry + + +def test_load_registry_json_happy_path(tmp_path: Path): + schema = { + "schema_version": 1, + "types": { + "usr": { + "name": "User", + "description": "End-user account", + "owner_team": "identity-platform", + "pii": True, + "retention": "7y", + "links": { + "logs": "https://logs?q={id}", + }, + } + }, + } + p = tmp_path / "typeid.schema.json" + p.write_text(json.dumps(schema), encoding="utf-8") + + result = load_registry(p) + assert result.registry is not None + assert result.error is None + + s = result.registry.get("usr") + assert s is not None + assert s.name == "User" + assert s.pii is True + assert s.links["logs"].startswith("https://") + + +def test_load_registry_missing_schema_version(tmp_path: Path): + schema = {"types": {"usr": {"name": "User"}}} + p = tmp_path / "typeid.schema.json" + p.write_text(json.dumps(schema), encoding="utf-8") + + result = load_registry(p) + assert result.registry is None + assert result.error is not None + assert result.error.code == "missing_schema_version" + + +def test_load_registry_unsupported_schema_version(tmp_path: Path): + schema = {"schema_version": 999, "types": {}} + p = tmp_path / "typeid.schema.json" + p.write_text(json.dumps(schema), encoding="utf-8") + + result = load_registry(p) + assert result.registry is None + assert result.error is not None + assert result.error.code == "unsupported_schema_version" + + +def test_load_registry_types_not_a_map(tmp_path: Path): + schema = {"schema_version": 1, "types": ["usr"]} + p = tmp_path / "typeid.schema.json" + p.write_text(json.dumps(schema), encoding="utf-8") + + result = load_registry(p) + assert result.registry is None + assert result.error is not None + assert result.error.code == "invalid_types" + + +def test_load_registry_skips_invalid_type_entries(tmp_path: Path): + schema = { + "schema_version": 1, + "types": { + "usr": {"name": "User"}, + "": {"name": "EmptyPrefixShouldSkip"}, + "bad": "not a map", + }, + } + p = tmp_path / "typeid.schema.json" + p.write_text(json.dumps(schema), encoding="utf-8") + + result = load_registry(p) + assert result.registry is not None + + assert result.registry.get("usr") is not None + assert result.registry.get("") is None + assert result.registry.get("bad") is None + + +def test_load_registry_unknown_extension_tries_json_then_fails(tmp_path: Path): + p = tmp_path / "schema.weird" + p.write_text('{"schema_version": 1, "types": {"usr": {"name": "User"}}}', encoding="utf-8") + + result = load_registry(p) + assert result.registry is not None + assert result.error is None + + +def test_load_registry_invalid_json_returns_error(tmp_path: Path): + p = tmp_path / "typeid.schema.json" + p.write_text("{not json", encoding="utf-8") + + result = load_registry(p) + assert result.registry is None + assert result.error is not None + assert result.error.code == "read_failed" diff --git a/tests/explain/test_registry_yaml.py b/tests/explain/test_registry_yaml.py new file mode 100644 index 0000000..ea4c66d --- /dev/null +++ b/tests/explain/test_registry_yaml.py @@ -0,0 +1,175 @@ +from pathlib import Path + +import pytest + +from typeid.explain.registry import load_registry + + +yaml = pytest.importorskip("yaml") # skip entire file if PyYAML is not installed + + +def test_load_registry_yaml_happy_path(tmp_path: Path): + p = tmp_path / "typeid.schema.yaml" + p.write_text( + """ +schema_version: 1 +types: + usr: + name: User + description: End-user account + owner_team: identity-platform + pii: true + retention: 7y + links: + logs: "https://logs?q={id}" + trace: "https://trace?id={id}&uuid={uuid}" +""", + encoding="utf-8", + ) + + result = load_registry(p) + assert result.registry is not None + assert result.error is None + + s = result.registry.get("usr") + assert s is not None + assert s.name == "User" + assert s.description == "End-user account" + assert s.owner_team == "identity-platform" + assert s.pii is True + assert s.retention == "7y" + assert "logs" in s.links + assert "{id}" in s.links["logs"] + + +def test_load_registry_yaml_missing_schema_version(tmp_path: Path): + p = tmp_path / "typeid.schema.yaml" + p.write_text( + """ +types: + usr: + name: User +""", + encoding="utf-8", + ) + + result = load_registry(p) + assert result.registry is None + assert result.error is not None + assert result.error.code == "missing_schema_version" + + +def test_load_registry_yaml_unsupported_schema_version(tmp_path: Path): + p = tmp_path / "typeid.schema.yaml" + p.write_text( + """ +schema_version: 2 +types: {} +""", + encoding="utf-8", + ) + + result = load_registry(p) + assert result.registry is None + assert result.error is not None + assert result.error.code == "unsupported_schema_version" + + +def test_load_registry_yaml_types_not_a_map(tmp_path: Path): + p = tmp_path / "typeid.schema.yaml" + p.write_text( + """ +schema_version: 1 +types: + - usr + - ord +""", + encoding="utf-8", + ) + + result = load_registry(p) + assert result.registry is None + assert result.error is not None + assert result.error.code == "invalid_types" + + +def test_load_registry_yaml_root_not_a_map(tmp_path: Path): + p = tmp_path / "typeid.schema.yaml" + p.write_text( + """ +- not +- a +- map +""", + encoding="utf-8", + ) + + result = load_registry(p) + assert result.registry is None + assert result.error is not None + assert result.error.code == "invalid_schema" + + +def test_load_registry_yaml_skips_invalid_type_entries(tmp_path: Path): + p = tmp_path / "typeid.schema.yaml" + p.write_text( + """ +schema_version: 1 +types: + usr: + name: User + "": # invalid prefix key -> should be skipped + name: EmptyPrefix + bad: "not a map" # invalid value -> should be skipped +""", + encoding="utf-8", + ) + + result = load_registry(p) + assert result.registry is not None + assert result.error is None + + assert result.registry.get("usr") is not None + assert result.registry.get("") is None + assert result.registry.get("bad") is None + + +def test_load_registry_yaml_links_not_a_map_becomes_empty(tmp_path: Path): + p = tmp_path / "typeid.schema.yaml" + p.write_text( + """ +schema_version: 1 +types: + usr: + name: User + links: "not a map" +""", + encoding="utf-8", + ) + + result = load_registry(p) + assert result.registry is not None + s = result.registry.get("usr") + assert s is not None + assert s.links == {} + + +def test_load_registry_yaml_malformed_yaml_returns_read_failed(tmp_path: Path): + p = tmp_path / "typeid.schema.yaml" + p.write_text( + """ +schema_version: 1 +types: + usr: + name: User + links: + logs: "https://logs?q={id}" + bad: [unclosed +""", + encoding="utf-8", + ) + + result = load_registry(p) + assert result.registry is None + assert result.error is not None + assert result.error.code == "read_failed" diff --git a/typeid/cli.py b/typeid/cli.py index dac000e..8cb9d56 100644 --- a/typeid/cli.py +++ b/typeid/cli.py @@ -5,6 +5,11 @@ from typeid import TypeID, base32, from_uuid, get_prefix_and_suffix +from typeid.explain.engine import explain as explain_engine +from typeid.explain.discovery import discover_schema_path +from typeid.explain.formatters import format_explanation_json, format_explanation_pretty +from typeid.explain.registry import load_registry, make_lookup + @click.group() def cli(): @@ -30,13 +35,99 @@ def encode(uuid: str, prefix: Optional[str] = None) -> None: @click.argument("encoded") def decode(encoded: str) -> None: prefix, suffix = get_prefix_and_suffix(encoded) - decoded_bytes = bytes(base32.decode(suffix)) uuid = UUID(bytes=decoded_bytes) - click.echo(f"type: {prefix}") click.echo(f"uuid: {uuid}") +@cli.command() +@click.argument("encoded") +@click.option( + "--schema", + "schema_path", + type=click.Path(exists=True, dir_okay=False, path_type=str), + required=False, + help="Path to TypeID schema file (JSON, or YAML if PyYAML is installed). " + "If omitted, TypeID will try to discover a schema automatically.", +) +@click.option( + "--json", + "as_json", + is_flag=True, + help="Output machine-readable JSON.", +) +@click.option( + "--no-schema", + is_flag=True, + help="Disable schema lookup (derived facts only).", +) +@click.option( + "--no-links", + is_flag=True, + help="Disable link template rendering.", +) +def explain( + encoded: str, + schema_path: Optional[str], + as_json: bool, + no_schema: bool, + no_links: bool, +) -> None: + """ + Explain a TypeID: parse/validate it, derive facts (uuid, created_at), + and optionally enrich explanation from a user-provided schema. + """ + enable_schema = not no_schema + enable_links = not no_links + + schema_lookup = None + warnings: list[str] = [] + + # Load schema (optional) + if enable_schema: + resolved_path = None + + if schema_path: + resolved_path = schema_path + else: + discovery = discover_schema_path() + if discovery.path is not None: + resolved_path = str(discovery.path) + # If env var was set but invalid, discovery returns source info; + # we keep CLI robust and simply proceed without schema. + + if resolved_path: + result = load_registry(click.Path(resolved_path)) + # NOTE: click.Path is not a real filesystem path. Convert to pathlib Path. + # We'll do it safely: + from pathlib import Path + result = load_registry(Path(resolved_path)) + + if result.registry is not None: + schema_lookup = make_lookup(result.registry) + else: + if result.error is not None: + warnings.append(f"Schema load failed: {result.error.message}") + + # Build explanation (never raises on normal errors) + exp = explain_engine( + encoded, + schema_lookup=schema_lookup, + enable_schema=enable_schema, + enable_links=enable_links, + ) + + # Surface schema-load warnings (if any) + if warnings: + exp.warnings.extend(warnings) + + # Print + if as_json: + click.echo(format_explanation_json(exp)) + else: + click.echo(format_explanation_pretty(exp)) + + if __name__ == "__main__": cli() diff --git a/typeid/explain/__init__.py b/typeid/explain/__init__.py new file mode 100644 index 0000000..cebf75b --- /dev/null +++ b/typeid/explain/__init__.py @@ -0,0 +1,81 @@ +""" +Explain subsystem for TypeID. + +This package provides a high-level, non-breaking API and CLI support +for answering the question: + + "What is this TypeID?" + +It is intentionally: +- additive (no changes to existing TypeID semantics), +- schema-optional (works fully offline), +- safe by default (read-only, no side effects). + +Public API: + explain(id_str, schema_path=None, **options) -> Explanation +""" + +from pathlib import Path +from typing import Optional + +from .engine import explain as _explain_engine +from .registry import load_registry, make_lookup +from .discovery import discover_schema_path +from .model import Explanation + +__all__ = [ + "explain", + "Explanation", +] + + +def explain( + id_str: str, + *, + schema_path: Optional[str | Path] = None, + enable_schema: bool = True, + enable_links: bool = True, +) -> Explanation: + """ + High-level convenience API for explaining a TypeID. + + This function: + - parses and validates the TypeID, + - discovers and loads schema if enabled, + - executes the explain engine, + - never raises on normal user errors. + + Args: + id_str: TypeID string to explain. + schema_path: Optional explicit path to schema file. + If None, discovery rules are applied. + enable_schema: Disable schema usage entirely if False. + enable_links: Disable link rendering if False. + + Returns: + Explanation object. + """ + lookup = None + + if enable_schema: + path = None + + if schema_path is not None: + path = Path(schema_path).expanduser() + else: + discovery = discover_schema_path() + path = discovery.path + + if path is not None: + result = load_registry(path) + if result.registry is not None: + lookup = make_lookup(result.registry) + # Note: load errors are intentionally not raised here. + # They will be surfaced as warnings by the CLI layer if desired. + + return _explain_engine( + id_str, + schema_lookup=lookup, + enable_schema=enable_schema, + enable_links=enable_links, + ) diff --git a/typeid/explain/discovery.py b/typeid/explain/discovery.py new file mode 100644 index 0000000..8cf9e14 --- /dev/null +++ b/typeid/explain/discovery.py @@ -0,0 +1,128 @@ +""" +Schema discovery for `typeid explain`. + +This module implements a conservative, non-breaking discovery mechanism: +- If nothing is found, callers proceed without schema (feature still works). +- No new mandatory dependencies. +- Paths are resolved deterministically with clear precedence. + +Precedence (first match wins): +1) explicit CLI arg: --schema PATH (handled by caller; use discover_schema only if not provided) +2) environment variable: TYPEID_SCHEMA +3) current working directory: + - typeid.schema.json + - typeid.schema.yaml / typeid.schema.yml +4) user config directory: + - /typeid/schema.json + - /typeid/schema.yaml / schema.yml +""" + +import os +from dataclasses import dataclass +from pathlib import Path +from typing import Iterable, Optional + + +DEFAULT_CWD_CANDIDATES = ( + "typeid.schema.json", + "typeid.schema.yaml", + "typeid.schema.yml", +) + +DEFAULT_USER_CANDIDATES = ( + "schema.json", + "schema.yaml", + "schema.yml", +) + + +@dataclass(frozen=True, slots=True) +class DiscoveryResult: + """Result of schema discovery.""" + path: Optional[Path] + source: str # e.g., "env:TYPEID_SCHEMA", "cwd", "user_config", "none" + + +def discover_schema_path( + *, + env_var: str = "TYPEID_SCHEMA", + cwd: Optional[Path] = None, +) -> DiscoveryResult: + """ + Discover schema file path using the configured precedence rules. + + Args: + env_var: environment variable name to check first. + cwd: optional cwd override (useful for tests). + + Returns: + DiscoveryResult with found path or None. + """ + # 1) Environment variable + env_value = os.environ.get(env_var) + if env_value: + p = Path(env_value).expanduser() + if p.is_file(): + return DiscoveryResult(path=p, source=f"env:{env_var}") + # If provided but invalid, we treat it as "not found" but caller can + # warn separately if they want. + return DiscoveryResult(path=None, source=f"env:{env_var} (not found)") + + # 2) Current working directory + cwd_path = (cwd or Path.cwd()) + for name in DEFAULT_CWD_CANDIDATES: + p = cwd_path / name + if p.is_file(): + return DiscoveryResult(path=p, source="cwd") + + # 3) User config directory + user_cfg = _user_config_dir() + if user_cfg is not None: + base = user_cfg / "typeid" + for name in DEFAULT_USER_CANDIDATES: + p = base / name + if p.is_file(): + return DiscoveryResult(path=p, source="user_config") + + return DiscoveryResult(path=None, source="none") + + +def _user_config_dir() -> Optional[Path]: + """ + Return OS-appropriate user config directory. + + - Linux/macOS: ~/.config + - Windows: %APPDATA% + """ + # Windows: APPDATA is the typical location for roaming config + appdata = os.environ.get("APPDATA") + if appdata: + return Path(appdata).expanduser() + + # XDG on Linux, also often present on macOS + xdg = os.environ.get("XDG_CONFIG_HOME") + if xdg: + return Path(xdg).expanduser() + + # Fallback to ~/.config + home = Path.home() + if home: + return home / ".config" + return None + + +def iter_default_candidate_paths(*, cwd: Optional[Path] = None) -> Iterable[Path]: + """ + Yield all candidate paths in discovery order (excluding env var). + + Useful for debugging or `typeid explain --debug-discovery` style features. + """ + cwd_path = (cwd or Path.cwd()) + for name in DEFAULT_CWD_CANDIDATES: + yield cwd_path / name + + user_cfg = _user_config_dir() + if user_cfg is not None: + base = user_cfg / "typeid" + for name in DEFAULT_USER_CANDIDATES: + yield base / name diff --git a/typeid/explain/engine.py b/typeid/explain/engine.py new file mode 100644 index 0000000..c07de10 --- /dev/null +++ b/typeid/explain/engine.py @@ -0,0 +1,247 @@ +""" +Explain engine for the `typeid explain` feature. + +This module is intentionally: +- Additive (doesn't change existing TypeID behavior) +- Defensive (never crashes on normal user input) +- Dependency-light (stdlib only) + +It builds an Explanation by combining: +1) parsed + derived facts from the ID (always available if parsable) +2) optional schema (registry) data looked up by prefix +3) optional rendered links (from schema templates) +""" + +from dataclasses import replace +from datetime import datetime, timezone +from typing import Any, Callable, Dict, Optional + +from typeid import TypeID +from typeid.errors import TypeIDException + +from .model import Explanation, ParseError, ParsedTypeID, Provenance, TypeSchema + + +SchemaLookup = Callable[[str], Optional[TypeSchema]] + + +def explain( + id_str: str, + *, + schema_lookup: Optional[SchemaLookup] = None, + enable_schema: bool = True, + enable_links: bool = True, +) -> Explanation: + """ + Produce an Explanation for a TypeID string. + + Args: + id_str: The TypeID string to explain. + schema_lookup: Optional callable to fetch TypeSchema by prefix. + If provided and enable_schema=True, we will look up schema. + enable_schema: If False, do not attempt schema lookup (offline mode). + enable_links: If True, render link templates from schema (if any). + + Returns: + Explanation (always returned; valid=False if parse/validation fails). + """ + parsed = _parse_typeid(id_str) + + # Start building explanation; keep it useful even if invalid. + exp = Explanation( + id=id_str, + valid=parsed.valid, + parsed=parsed, + schema=None, + derived={}, + links={}, + provenance={}, + warnings=[], + errors=list(parsed.errors), + ) + + # If parse failed, nothing more we can deterministically derive. + if not parsed.valid or parsed.prefix is None or parsed.suffix is None: + return exp + + # Schema lookup (optional) + schema: Optional[TypeSchema] = None + if enable_schema and schema_lookup is not None and parsed.prefix: + try: + schema = schema_lookup(parsed.prefix) + except Exception as e: # never let schema backend break explain + exp.warnings.append(f"Schema lookup failed: {e!s}") + schema = None + + if schema is not None: + exp = replace(exp, schema=schema) + _apply_schema_provenance(exp) + + # Render links (optional) + if enable_links and schema is not None and schema.links: + rendered, warnings = _render_links(schema.links, exp) + exp.links.update(rendered) + exp.warnings.extend(warnings) + for k in rendered.keys(): + exp.provenance.setdefault(f"links.{k}", Provenance.SCHEMA) + + # Derived facts provenance + _apply_derived_provenance(exp) + + return exp + + +def _parse_typeid(id_str: str) -> ParsedTypeID: + """ + Parse and validate a TypeID using the library's existing logic. + + Implementation detail: + - We rely on TypeID.from_string() to ensure behavior matches existing users. + - On error, we still attempt to extract prefix/suffix best-effort to show + something helpful (without promising correctness). + """ + try: + tid = TypeID.from_string(id_str) + except TypeIDException as e: + # Best-effort split so users can see what's wrong. + prefix, suffix = _best_effort_split(id_str) + return ParsedTypeID( + raw=id_str, + prefix=prefix, + suffix=suffix, + valid=False, + errors=[ParseError(code="invalid_typeid", message=str(e))], + uuid=None, + created_at=None, + sortable=None, + ) + except Exception as e: + prefix, suffix = _best_effort_split(id_str) + return ParsedTypeID( + raw=id_str, + prefix=prefix, + suffix=suffix, + valid=False, + errors=[ParseError(code="parse_error", message=f"Unexpected error: {e!s}")], + uuid=None, + created_at=None, + sortable=None, + ) + + # Derived facts from the validated TypeID + uuid_obj = tid.uuid # library returns a UUID object (uuid6.UUID) + uuid_str = str(uuid_obj) + + created_at = _uuid7_created_at(uuid_obj) + sortable = True # UUIDv7 is time-ordered by design + + return ParsedTypeID( + raw=id_str, + prefix=tid.prefix, + suffix=tid.suffix, + valid=True, + errors=[], + uuid=uuid_str, + created_at=created_at, + sortable=sortable, + ) + + +def _best_effort_split(id_str: str) -> tuple[Optional[str], Optional[str]]: + """ + Split by the last underscore (TypeID allows underscores in prefix). + Returns (prefix, suffix) or (None, None) if not splittable. + """ + if "_" not in id_str: + return None, None + prefix, suffix = id_str.rsplit("_", 1) + if not prefix or not suffix: + return None, None + return prefix, suffix + + +def _uuid7_created_at(uuid_obj: Any) -> Optional[datetime]: + """ + Extract created_at from a UUIDv7. + + UUIDv7 layout: the top 48 bits are unix epoch time in milliseconds. + Python's uuid.UUID.int is a 128-bit integer with the most significant bits first, + so unix_ms = int >> 80 (128-48). + + Returns: + UTC datetime or None if extraction fails. + """ + try: + # uuid_obj is likely uuid6.UUID, but supports .int like uuid.UUID + u_int = int(uuid_obj.int) + unix_ms = u_int >> 80 + unix_s = unix_ms / 1000.0 + return datetime.fromtimestamp(unix_s, tz=timezone.utc) + except Exception: + return None + + +class _SafeFormatDict(dict): + """dict that leaves unknown placeholders intact instead of raising KeyError.""" + def __missing__(self, key: str) -> str: + return "{" + key + "}" + + +def _render_links(templates: Dict[str, str], exp: Explanation) -> tuple[Dict[str, str], list[str]]: + """ + Render schema link templates using known values. + + Supported placeholders: + {id}, {prefix}, {suffix}, {uuid} + {created_at} (ISO8601 if available) + + Unknown placeholders remain unchanged. + """ + mapping = _SafeFormatDict( + id=exp.id, + prefix=exp.parsed.prefix or "", + suffix=exp.parsed.suffix or "", + uuid=exp.parsed.uuid or "", + created_at=exp.parsed.created_at.isoformat() if exp.parsed.created_at else "", + ) + + rendered: Dict[str, str] = {} + warnings: list[str] = [] + + for name, tmpl in templates.items(): + if not isinstance(tmpl, str): + warnings.append(f"Link template '{name}' is not a string; skipping.") + continue + try: + rendered[name] = tmpl.format_map(mapping) + except Exception as e: + warnings.append(f"Failed to render link '{name}': {e!s}") + + return rendered, warnings + + +def _apply_schema_provenance(exp: Explanation) -> None: + """ + Mark common schema fields as coming from schema. + (We keep this small; schema.raw stays schema by definition.) + """ + if exp.schema is None: + return + + for key in ("name", "description", "owner_team", "pii", "retention"): + if getattr(exp.schema, key, None) is not None: + exp.provenance.setdefault(key, Provenance.SCHEMA) + + +def _apply_derived_provenance(exp: Explanation) -> None: + """Mark parsed-derived fields as coming from the ID itself.""" + if exp.parsed.prefix is not None: + exp.provenance.setdefault("prefix", Provenance.DERIVED_FROM_ID) + if exp.parsed.suffix is not None: + exp.provenance.setdefault("suffix", Provenance.DERIVED_FROM_ID) + if exp.parsed.uuid is not None: + exp.provenance.setdefault("uuid", Provenance.DERIVED_FROM_ID) + if exp.parsed.created_at is not None: + exp.provenance.setdefault("created_at", Provenance.DERIVED_FROM_ID) + if exp.parsed.sortable is not None: + exp.provenance.setdefault("sortable", Provenance.DERIVED_FROM_ID) diff --git a/typeid/explain/formatters.py b/typeid/explain/formatters.py new file mode 100644 index 0000000..588e754 --- /dev/null +++ b/typeid/explain/formatters.py @@ -0,0 +1,228 @@ +""" +Formatting helpers for `typeid explain`. + +This module is intentionally small and dependency-free. +It supports: +- YAML-ish pretty output (human-friendly) +- JSON output via Explanation.to_dict() (machine-friendly) + +It also provides a minimal "safe formatter" for link templates +(kept here so CLI and engine can share behavior if needed). + +Note: This file does NOT require PyYAML. We output YAML-like text +without claiming it's strict YAML. +""" + +import json +from datetime import datetime +from typing import Any, Dict, List, Mapping, Optional + +from .model import Explanation, Provenance + + +def format_explanation_pretty(exp: Explanation) -> str: + """ + Render an Explanation as readable YAML-ish text. + + We intentionally keep it stable-ish and human-friendly: + - predictable section ordering + - indentation + - lists rendered as "- item" + + This is NOT guaranteed to be strict YAML; it is "YAML-like". + For strict machine consumption, use JSON output. + """ + lines: List[str] = [] + + def add(line: str = "") -> None: + lines.append(line) + + add(f"id: {exp.id}") + add(f"valid: {str(exp.valid).lower()}") + + if exp.errors: + add("errors:") + for e in exp.errors: + add(f" - code: {e.code}") + add(f" message: {_quote_if_needed(e.message)}") + + add() + add("parsed:") + _emit_kv(lines, " ", "prefix", exp.parsed.prefix) + _emit_kv(lines, " ", "suffix", exp.parsed.suffix) + _emit_kv(lines, " ", "uuid", exp.parsed.uuid) + _emit_kv(lines, " ", "created_at", _iso(exp.parsed.created_at)) + _emit_kv(lines, " ", "sortable", exp.parsed.sortable) + + # Schema section + add() + add("schema:") + if exp.schema is None: + add(" found: false") + else: + add(" found: true") + _emit_kv(lines, " ", "prefix", exp.schema.prefix) + _emit_kv(lines, " ", "name", exp.schema.name) + _emit_kv(lines, " ", "description", exp.schema.description) + _emit_kv(lines, " ", "owner_team", exp.schema.owner_team) + _emit_kv(lines, " ", "pii", exp.schema.pii) + _emit_kv(lines, " ", "retention", exp.schema.retention) + + # Show extra raw keys (optional, but helpful) + extra = _schema_extras(exp.schema.raw) + if extra: + add(" extra:") + for k in sorted(extra.keys()): + _emit_any(lines, " ", k, extra[k]) + + # Derived + if exp.derived: + add() + add("derived:") + for k in sorted(exp.derived.keys()): + _emit_any(lines, " ", k, exp.derived[k]) + + # Links + add() + add("links:") + if not exp.links: + add(" {}") + else: + for k in sorted(exp.links.keys()): + _emit_kv(lines, " ", k, exp.links[k]) + + # Provenance + if exp.provenance: + add() + add("provenance:") + for k in sorted(exp.provenance.keys()): + prov = exp.provenance[k] + add(f" {k}: {prov.value if isinstance(prov, Provenance) else str(prov)}") + + # Warnings + if exp.warnings: + add() + add("warnings:") + for w in exp.warnings: + add(f" - {_quote_if_needed(w)}") + + return "\n".join(lines).rstrip() + "\n" + + +def format_explanation_json(exp: Explanation, *, indent: int = 2) -> str: + """ + Render Explanation as JSON string. + """ + return json.dumps(exp.to_dict(), indent=indent, ensure_ascii=False) + "\n" + + +class SafeFormatDict(dict): + """dict that leaves unknown placeholders intact rather than raising KeyError.""" + def __missing__(self, key: str) -> str: + return "{" + key + "}" + + +def render_template(template: str, mapping: Mapping[str, Any]) -> str: + """ + Render a template using str.format_map with SafeFormatDict. + + Unknown placeholders remain unchanged. + """ + safe = SafeFormatDict({k: _stringify(v) for k, v in mapping.items()}) + return template.format_map(safe) + + +def _iso(dt: Optional[datetime]) -> Optional[str]: + return dt.isoformat() if dt else None + + +def _emit_kv(lines: List[str], indent: str, key: str, value: Any) -> None: + if value is None: + lines.append(f"{indent}{key}: null") + return + if isinstance(value, bool): + lines.append(f"{indent}{key}: {str(value).lower()}") + return + if isinstance(value, (int, float)): + lines.append(f"{indent}{key}: {value}") + return + lines.append(f"{indent}{key}: {_quote_if_needed(str(value))}") + + +def _emit_any(lines: List[str], indent: str, key: str, value: Any) -> None: + """ + Emit arbitrary JSON-y values in YAML-ish style. + """ + if value is None or isinstance(value, (str, bool, int, float)): + _emit_kv(lines, indent, key, value) + return + + if isinstance(value, list): + lines.append(f"{indent}{key}:") + if not value: + lines.append(f"{indent} []") + return + for item in value: + if isinstance(item, (str, int, float, bool)) or item is None: + lines.append(f"{indent} - {_quote_if_needed(_stringify(item))}") + else: + # nested complex item: render as JSON inline + lines.append(f"{indent} - {_quote_if_needed(json.dumps(item, ensure_ascii=False))}") + return + + if isinstance(value, dict): + lines.append(f"{indent}{key}:") + if not value: + lines.append(f"{indent} {{}}") + return + for k in sorted(value.keys(), key=lambda x: str(x)): + _emit_any(lines, indent + " ", str(k), value[k]) + return + + # Fallback: stringify + _emit_kv(lines, indent, key, _stringify(value)) + + +def _stringify(v: Any) -> str: + if v is None: + return "null" + if isinstance(v, bool): + return str(v).lower() + if isinstance(v, (int, float)): + return str(v) + if isinstance(v, datetime): + return v.isoformat() + return str(v) + + +def _quote_if_needed(s: str) -> str: + """ + Add quotes if the string contains characters that could confuse YAML-ish output. + """ + if s == "": + return '""' + # Minimal quoting rules for readability; not strict YAML. + needs = any(ch in s for ch in [":", "#", "{", "}", "[", "]", ",", "\n", "\r", "\t"]) + if s.strip() != s: + needs = True + if s.lower() in {"true", "false", "null", "none"}: + needs = True + if needs: + escaped = s.replace("\\", "\\\\").replace('"', '\\"') + return f'"{escaped}"' + return s + + +def _schema_extras(raw: Dict[str, Any]) -> Dict[str, Any]: + """ + Return schema keys excluding the ones we already print as normalized fields. + """ + exclude = { + "name", + "description", + "owner_team", + "pii", + "retention", + "links", + } + return {k: v for k, v in raw.items() if k not in exclude} diff --git a/typeid/explain/model.py b/typeid/explain/model.py new file mode 100644 index 0000000..065daab --- /dev/null +++ b/typeid/explain/model.py @@ -0,0 +1,152 @@ +""" +Data models for the `typeid explain` feature. + +Design goals: +- Additive, non-breaking: does not modify existing TypeID behavior. +- Stable-ish: callers can rely on these dataclasses, but we keep flexibility + by storing schema/derived sections as dicts (schema evolves without breaking). +- Provenance: every top-level field can be tagged by where it came from. +""" + +from dataclasses import dataclass, field +from datetime import datetime +from enum import Enum +from typing import Any, Dict, List, Optional + + +class Provenance(str, Enum): + """Where a piece of information came from.""" + DERIVED_FROM_ID = "derived_from_id" + SCHEMA = "schema" + EXTERNAL = "external" + UNKNOWN = "unknown" + + +@dataclass(frozen=True, slots=True) +class ParseError: + """Represents a recoverable parse/validation error.""" + code: str + message: str + + +@dataclass(frozen=True, slots=True) +class ParsedTypeID: + """ + Facts extracted from the TypeID string without any schema lookup. + + Notes: + - `prefix` is the full prefix as per TypeID spec (may contain underscores). + - `suffix` is the encoded UUIDv7 portion (base32 string). + - `uuid` and `created_at` are *derived* from suffix if possible. + """ + raw: str + prefix: Optional[str] + suffix: Optional[str] + + valid: bool + errors: List[ParseError] = field(default_factory=list) + + # Derived (best-effort) + uuid: Optional[str] = None # keep as string to avoid uuid/uuid6 typing bleed + created_at: Optional[datetime] = None + sortable: Optional[bool] = None # TypeIDs w/ UUIDv7 are typically sortable + + +@dataclass(frozen=True, slots=True) +class TypeSchema: + """ + Schema info for a given prefix, loaded from a registry file. + + This is intentionally flexible to keep the schema format evolving without + breaking the Python API: we store raw dict and also normalize a few + commonly-used fields for nicer UX. + """ + prefix: str + raw: Dict[str, Any] = field(default_factory=dict) + + # Common optional fields (convenience) + name: Optional[str] = None + description: Optional[str] = None + owner_team: Optional[str] = None + pii: Optional[bool] = None + retention: Optional[str] = None + + # Link templates (e.g. {"logs": "https://...q={id}"}) + links: Dict[str, str] = field(default_factory=dict) + + +@dataclass(frozen=True, slots=True) +class Explanation: + """ + Final explanation object produced by the explain engine. + + Sections: + - parsed: always present (even if invalid; fields may be None) + - schema: may be None if no schema found or schema loading disabled + - derived: small dict for extra derived facts (extensible) + - links: rendered links (from schema templates), safe for display + - provenance: per-field provenance labels for transparency + """ + id: str + valid: bool + + parsed: ParsedTypeID + schema: Optional[TypeSchema] = None + + # Additional derived facts that aren't worth dedicated fields yet + derived: Dict[str, Any] = field(default_factory=dict) + + # Rendered (not templates) links + links: Dict[str, str] = field(default_factory=dict) + + # Field -> provenance label; keep keys simple (e.g. "created_at", "retention") + provenance: Dict[str, Provenance] = field(default_factory=dict) + + # Non-fatal warnings (e.g. schema loaded but link template failed) + warnings: List[str] = field(default_factory=list) + + # Errors copied from parsed.errors for convenience (and future external errors) + errors: List[ParseError] = field(default_factory=list) + + def to_dict(self) -> Dict[str, Any]: + """ + Convert to a JSON-serializable dict. + + We avoid serializing complex objects directly (datetime, Enums) without + conversion to keep `--json` output stable and easy to consume. + """ + parsed = { + "raw": self.parsed.raw, + "prefix": self.parsed.prefix, + "suffix": self.parsed.suffix, + "valid": self.parsed.valid, + "errors": [e.__dict__ for e in self.parsed.errors], + "uuid": self.parsed.uuid, + "created_at": self.parsed.created_at.isoformat() if self.parsed.created_at else None, + "sortable": self.parsed.sortable, + } + + schema = None + if self.schema is not None: + schema = { + "prefix": self.schema.prefix, + "name": self.schema.name, + "description": self.schema.description, + "owner_team": self.schema.owner_team, + "pii": self.schema.pii, + "retention": self.schema.retention, + "links": dict(self.schema.links), + "raw": dict(self.schema.raw), + } + + return { + "id": self.id, + "valid": self.valid, + "parsed": parsed, + "derived": dict(self.derived), + "schema": schema, + "links": dict(self.links), + "provenance": {k: str(v.value) for k, v in self.provenance.items()}, + "warnings": list(self.warnings), + "errors": [e.__dict__ for e in self.errors], + } diff --git a/typeid/explain/registry.py b/typeid/explain/registry.py new file mode 100644 index 0000000..14f3781 --- /dev/null +++ b/typeid/explain/registry.py @@ -0,0 +1,217 @@ +""" +Schema registry loader for `typeid explain`. + +This module loads a schema file (JSON by default, YAML optionally) and exposes +a lookup function: prefix -> TypeSchema. + +Goals: +- Non-breaking: schema is optional; failures are handled gracefully. +- Minimal dependencies: JSON uses stdlib; YAML support is optional. +- Future-proof: schema versioning with a light validation layer. + +Schema shape (v1) - JSON/YAML: +{ + "schema_version": 1, + "types": { + "usr": { + "name": "User", + "description": "...", + "owner_team": "...", + "pii": true, + "retention": "7y", + "links": { + "logs": "https://...q={id}", + "trace": "https://...?id={id}" + } + } + } +} +""" + +import json +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Dict, Optional, Tuple + +from .model import TypeSchema + + +@dataclass(frozen=True, slots=True) +class RegistryLoadError: + code: str + message: str + + +@dataclass(frozen=True, slots=True) +class RegistryLoadResult: + registry: Optional["SchemaRegistry"] + error: Optional[RegistryLoadError] = None + + +class SchemaRegistry: + """ + In-memory registry of TypeSchema objects loaded from a schema file. + + Lookup is by full TypeID prefix (which may contain underscores). + """ + def __init__(self, *, schema_version: int, types: Dict[str, TypeSchema], source_path: Path): + self.schema_version = schema_version + self._types = types + self.source_path = source_path + + def get(self, prefix: str) -> Optional[TypeSchema]: + return self._types.get(prefix) + + def __contains__(self, prefix: str) -> bool: + return prefix in self._types + + def __len__(self) -> int: + return len(self._types) + + +def load_registry(path: Path) -> RegistryLoadResult: + """ + Load a schema registry from the given path. + + Returns RegistryLoadResult: + - registry != None on success + - error != None on failure (never raises for normal user mistakes) + """ + try: + data, fmt = _read_schema_file(path) + except Exception as e: + return RegistryLoadResult( + registry=None, + error=RegistryLoadError(code="read_failed", message=f"Failed to read schema: {e!s}"), + ) + + if not isinstance(data, dict): + return RegistryLoadResult( + registry=None, + error=RegistryLoadError(code="invalid_schema", message="Schema root must be an object/map."), + ) + + schema_version = data.get("schema_version") + if schema_version is None: + return RegistryLoadResult( + registry=None, + error=RegistryLoadError(code="missing_schema_version", message="Schema missing 'schema_version'."), + ) + if not isinstance(schema_version, int): + return RegistryLoadResult( + registry=None, + error=RegistryLoadError(code="invalid_schema_version", message="'schema_version' must be an integer."), + ) + if schema_version != 1: + return RegistryLoadResult( + registry=None, + error=RegistryLoadError( + code="unsupported_schema_version", + message=f"Unsupported schema_version={schema_version}. Supported: 1.", + ), + ) + + types_raw = data.get("types") + if types_raw is None: + return RegistryLoadResult( + registry=None, + error=RegistryLoadError(code="missing_types", message="Schema missing 'types' map."), + ) + if not isinstance(types_raw, dict): + return RegistryLoadResult( + registry=None, + error=RegistryLoadError(code="invalid_types", message="'types' must be an object/map."), + ) + + types: Dict[str, TypeSchema] = {} + for prefix, spec in types_raw.items(): + if not isinstance(prefix, str) or not prefix: + # skip invalid keys but don't fail entire load + continue + if not isinstance(spec, dict): + # skip invalid type spec entries + continue + types[prefix] = _to_type_schema(prefix, spec) + + return RegistryLoadResult(registry=SchemaRegistry(schema_version=schema_version, types=types, source_path=path)) + + +def make_lookup(registry: Optional[SchemaRegistry]): + """ + Convenience helper to make a schema_lookup callable for engine.explain(). + + Example: + reg = load_registry(path).registry + lookup = make_lookup(reg) + explanation = explain(id, schema_lookup=lookup) + """ + def _lookup(prefix: str) -> Optional[TypeSchema]: + if registry is None: + return None + return registry.get(prefix) + return _lookup + + +def _read_schema_file(path: Path) -> Tuple[Dict[str, Any], str]: + """ + Read schema file and parse it into a dict. + + Returns: + (data, format) where format is 'json' or 'yaml' + + JSON is always supported. + YAML is supported only if PyYAML is installed. + """ + suffix = path.suffix.lower() + raw = path.read_text(encoding="utf-8") + + if suffix == ".json": + return json.loads(raw), "json" + + if suffix in (".yaml", ".yml"): + # Optional dependency + try: + import yaml # type: ignore + except Exception as e: + raise RuntimeError( + "YAML schema requires optional dependency. " + "Install PyYAML (or `typeid[yaml]` if you provide extras)." + ) from e + data = yaml.safe_load(raw) + return data, "yaml" + + # If extension unknown, try JSON first for convenience. + try: + return json.loads(raw), "json" + except Exception: + raise RuntimeError(f"Unsupported schema file extension: {path.suffix!s} (supported: .json, .yaml, .yml)") + + +def _to_type_schema(prefix: str, spec: Dict[str, Any]) -> TypeSchema: + """ + Normalize a raw type spec into TypeSchema. + + We keep `raw` for forward-compatibility but also extract a few common fields + for nicer UX. + """ + links = spec.get("links") or {} + if not isinstance(links, dict): + links = {} + + # Extract common fields safely + name = spec.get("name") + description = spec.get("description") + owner_team = spec.get("owner_team") + pii = spec.get("pii") + retention = spec.get("retention") + + return TypeSchema( + prefix=prefix, + raw=dict(spec), + name=name if isinstance(name, str) else None, + description=description if isinstance(description, str) else None, + owner_team=owner_team if isinstance(owner_team, str) else None, + pii=pii if isinstance(pii, bool) else None, + retention=retention if isinstance(retention, str) else None, + links={str(k): str(v) for k, v in links.items() if isinstance(k, str) and isinstance(v, str)}, + ) From b200efc70902a324d043ae0c875cd8d88be80463 Mon Sep 17 00:00:00 2001 From: Murad Akhundov Date: Sat, 20 Dec 2025 01:43:05 +0100 Subject: [PATCH 2/3] feat: Add advanced usage examples for `typeid explain` --- examples/README.md | 86 ++++++++++++++++++ examples/explain/__init__.py | 0 examples/explain/explain_complex.py | 87 ++++++++++++++++++ examples/explain/explain_report.py | 98 +++++++++++++++++++++ examples/explain/sample_ids.txt | 10 +++ examples/explain/schemas/typeid.schema.json | 78 ++++++++++++++++ examples/explain/schemas/typeid.schema.yaml | 24 +++++ 7 files changed, 383 insertions(+) create mode 100644 examples/README.md create mode 100644 examples/explain/__init__.py create mode 100644 examples/explain/explain_complex.py create mode 100644 examples/explain/explain_report.py create mode 100644 examples/explain/sample_ids.txt create mode 100644 examples/explain/schemas/typeid.schema.json create mode 100644 examples/explain/schemas/typeid.schema.yaml diff --git a/examples/README.md b/examples/README.md new file mode 100644 index 0000000..25a7314 --- /dev/null +++ b/examples/README.md @@ -0,0 +1,86 @@ +# TypeID Examples + +This directory contains **independent, self-contained examples** demonstrating +different ways to use **TypeID in real projects**. + +Each example focuses on a specific integration or use case and can be studied +and used on its own. + +## `examples/explain/` — `typeid explain` feature + +This directory contains **advanced examples** for the `typeid explain` feature. + +These examples demonstrate how to: + +* inspect TypeIDs (“what is this ID?”) +* enrich IDs using schemas (JSON / YAML) +* batch-process IDs for automation +* safely handle invalid or unknown IDs +* generate machine-readable reports + +📄 See **`examples/explain/README.md`** for full documentation and usage instructions. + +## `examples/sqlalchemy.py` — SQLAlchemy integration + +This example demonstrates how to use **TypeID with SQLAlchemy** in a clean and +database-friendly way. + +### Purpose + +* Store **native UUIDs** in the database +* Expose **TypeID objects** at the application level +* Enforce prefix correctness automatically +* Keep database schema simple and efficient + +This example is **independent** of the `typeid explain` feature. + +### What this example shows + +* How to implement a custom `TypeDecorator` for TypeID +* How to: + + * bind a `TypeID` to a UUID column + * reconstruct a `TypeID` on read +* How to ensure: + + * prefixes are validated + * Alembic autogeneration preserves constructor arguments + +### Usage snippet + +```python +id = mapped_column( + TypeIDType("user"), + primary_key=True, + default=lambda: TypeID("user") +) +``` + +Resulting identifiers look like: + +```text +user_01h45ytscbebyvny4gc8cr8ma2 +``` + +while the database stores only the UUID value. + +## Choosing the right example + +| Use case | Example | +| ---------------------------- | ------------------------------------ | +| Understand `typeid explain` | `examples/explain/` | +| Batch / CI / reporting | `examples/explain/explain_report.py` | +| SQLAlchemy ORM integration | `examples/sqlalchemy.py` | +| UUID-native database storage | `examples/sqlalchemy.py` | + +## Design Principles + +All examples in this directory follow these principles: + +* ✅ non-breaking +* ✅ production-oriented +* ✅ minimal dependencies +* ✅ explicit and readable +* ✅ safe handling of invalid input + +Examples are meant to be **copied, adapted, and extended**. diff --git a/examples/explain/__init__.py b/examples/explain/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/explain/explain_complex.py b/examples/explain/explain_complex.py new file mode 100644 index 0000000..8c312b8 --- /dev/null +++ b/examples/explain/explain_complex.py @@ -0,0 +1,87 @@ +""" +Complex example: schema discovery + taxonomy prefixes + robust handling. + +Run: + # (recommended) set schema location so discovery works + export TYPEID_SCHEMA=examples/schemas/typeid.schema.json + + python examples/explain_complex.py + +Optional: + pip install typeid-python[yaml] + export TYPEID_SCHEMA=examples/schemas/typeid.schema.yaml +""" + +import os +from typing import Iterable + +from typeid import TypeID +from typeid.explain.discovery import discover_schema_path +from typeid.explain.registry import load_registry, make_lookup +from typeid.explain.engine import explain as explain_engine +from typeid.explain.formatters import format_explanation_pretty + + +def _load_schema_lookup(): + discovery = discover_schema_path() + if discovery.path is None: + print("No schema discovered. Proceeding without schema.") + return None + + result = load_registry(discovery.path) + if result.registry is None: + print(f"Schema load failed: {result.error.message if result.error else 'unknown error'}") + return None + + print(f"Schema loaded from: {discovery.path} ({discovery.source})") + return make_lookup(result.registry) + + +def _banner(title: str) -> None: + print("\n" + "=" * 80) + print(title) + print("=" * 80) + + +def _explain_many(ids: Iterable[str], lookup) -> None: + for tid in ids: + exp = explain_engine(tid, schema_lookup=lookup, enable_schema=True, enable_links=True) + print(format_explanation_pretty(exp)) + + +def main() -> None: + _banner("TypeID explain — complex demo") + + # Use schema discovery (env/cwd/user-config) + lookup = _load_schema_lookup() + + # Create a bunch of IDs: + # - standard prefixes + # - taxonomy prefix (env/region in prefix) + # - unknown prefix + # - invalid string + user_id = str(TypeID(prefix="user")) + order_id = str(TypeID(prefix="order")) + evt_id = str(TypeID(prefix="evt_payment")) + user_live_eu_id = str(TypeID(prefix="user_live_eu")) + unknown_id = str(TypeID(prefix="something_new")) + invalid_id = "user_NOT_A_SUFFIX" + + _banner("Explaining generated IDs") + ids = [user_id, order_id, evt_id, user_live_eu_id, unknown_id, invalid_id] + _explain_many(ids, lookup) + + _banner("Notes") + print("- IDs still explain offline (derived facts always present).") + print("- Schema adds meaning, ownership, policies, and links.") + print("- Prefix taxonomy works because TypeID prefixes allow underscores.") + print("- Invalid IDs never crash; they return valid=false and errors.") + print("- Unknown prefixes still show derived facts, schema found=false.") + + +if __name__ == "__main__": + # Helpful hint for users + if "TYPEID_SCHEMA" not in os.environ: + print("Tip: set TYPEID_SCHEMA to enable schema discovery, e.g.:") + print(" export TYPEID_SCHEMA=examples/schemas/typeid.schema.json\n") + main() diff --git a/examples/explain/explain_report.py b/examples/explain/explain_report.py new file mode 100644 index 0000000..aeab92a --- /dev/null +++ b/examples/explain/explain_report.py @@ -0,0 +1,98 @@ +""" +Batch report example: +- Reads TypeIDs from a file (sample_ids.txt) +- Explains each one +- Prints summary stats +- Optionally writes JSON report + +Run: + export TYPEID_SCHEMA=examples/schemas/typeid.schema.json + python examples/explain_report.py examples/sample_ids.txt --json-out /tmp/report.json +""" + +import argparse +import json +from pathlib import Path + +from typeid.explain.discovery import discover_schema_path +from typeid.explain.engine import explain as explain_engine +from typeid.explain.registry import load_registry, make_lookup + + +def _read_ids(path: Path) -> list[str]: + ids: list[str] = [] + for line in path.read_text(encoding="utf-8").splitlines(): + line = line.strip() + if not line or line.startswith("#"): + continue + ids.append(line) + return ids + + +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument("file", type=str, help="Path to file with TypeIDs (one per line).") + parser.add_argument("--json-out", type=str, default=None, help="Optional path to write JSON report.") + args = parser.parse_args() + + ids = _read_ids(Path(args.file)) + + # Discover schema (optional) + discovery = discover_schema_path() + lookup = None + schema_info = {"found": False} + + if discovery.path is not None: + r = load_registry(discovery.path) + if r.registry is not None: + lookup = make_lookup(r.registry) + schema_info = {"found": True, "path": str(discovery.path), "source": discovery.source} + else: + schema_info = {"found": False, "error": r.error.message if r.error else "unknown"} + + explanations = [] + valid_count = 0 + schema_hit = 0 + + for tid in ids: + exp = explain_engine(tid, schema_lookup=lookup, enable_schema=True, enable_links=True) + explanations.append(exp) + if exp.valid: + valid_count += 1 + if exp.schema is not None: + schema_hit += 1 + + # Summary + print("TypeID explain report") + print("--------------------") + print(f"IDs processed: {len(ids)}") + print(f"Valid IDs: {valid_count}") + print(f"Schema hits: {schema_hit}") + print(f"Schema: {schema_info}") + print() + + # Print concise table + for exp in explanations: + prefix = exp.parsed.prefix or "-" + ok = "OK" if exp.valid else "ERR" + name = exp.schema.name if exp.schema and exp.schema.name else "-" + print(f"{ok:>3} {prefix:<16} {name:<22} {exp.id}") + + # Optional JSON output + if args.json_out: + payload = { + "summary": { + "count": len(ids), + "valid": valid_count, + "schema_hits": schema_hit, + "schema": schema_info, + }, + "items": [e.to_dict() for e in explanations], + } + out_path = Path(args.json_out) + out_path.write_text(json.dumps(payload, indent=2, ensure_ascii=False), encoding="utf-8") + print(f"\nWrote JSON report to: {out_path}") + + +if __name__ == "__main__": + main() diff --git a/examples/explain/sample_ids.txt b/examples/explain/sample_ids.txt new file mode 100644 index 0000000..53b9f44 --- /dev/null +++ b/examples/explain/sample_ids.txt @@ -0,0 +1,10 @@ +# Valid +user_01h45ytscbebyvny4gc8cr8ma2 +order_01h2xcejqtf2nbrexx3vqjhp41 + +# Unknown prefix (still valid TypeID) +mystery_01h2xcejqtf2nbrexx3vqjhp41 + +# Invalid +user_NOT_A_SUFFIX +not_a_typeid diff --git a/examples/explain/schemas/typeid.schema.json b/examples/explain/schemas/typeid.schema.json new file mode 100644 index 0000000..06b49ef --- /dev/null +++ b/examples/explain/schemas/typeid.schema.json @@ -0,0 +1,78 @@ +{ + "schema_version": 1, + "types": { + "user": { + "name": "User", + "description": "End-user account", + "owner_team": "identity-platform", + "pii": true, + "retention": "7y", + "services": ["user-service", "auth-service"], + "storage": { + "primary": { "kind": "postgres", "table": "users", "shard_by": "tenant_id" } + }, + "events": ["user.created", "user.updated", "user.deleted"], + "policies": { + "delete": { "allowed": false, "reason": "GDPR retention policy" } + }, + "links": { + "docs": "https://docs.company/entities/user", + "logs": "https://logs.company/search?q={id}", + "trace": "https://traces.company/?q={id}", + "admin": "https://admin.company/users/{id}" + } + }, + + "order": { + "name": "Order", + "description": "Customer purchase order", + "owner_team": "commerce-platform", + "pii": false, + "retention": "10y", + "services": ["order-service", "billing-service"], + "storage": { + "primary": { "kind": "postgres", "table": "orders", "shard_by": "region" } + }, + "events": ["order.created", "order.paid", "order.refunded"], + "policies": { + "delete": { "allowed": true, "reason": "No compliance hold" } + }, + "links": { + "admin": "https://admin.company/orders/{id}", + "logs": "https://logs.company/search?q={id}" + } + }, + + "evt_payment": { + "name": "PaymentEvent", + "description": "Event emitted by payment pipeline", + "owner_team": "payments", + "pii": false, + "retention": "30d", + "services": ["payment-service"], + "events": ["payment.authorized", "payment.failed", "payment.captured"], + "policies": { + "replay": { "allowed": false, "reason": "Non-idempotent event stream" } + }, + "links": { + "kafka": "https://kafka-ui.company/topics/payment-events?key={id}", + "trace": "https://traces.company/?q={id}" + } + }, + + "user_live_eu": { + "name": "User (prod EU)", + "description": "Production EU user identifier (prefix taxonomy demo)", + "owner_team": "identity-platform", + "pii": true, + "retention": "7y", + "policies": { + "cross_region": { "allowed": false, "reason": "EU PII must not leave EU" } + }, + "links": { + "logs": "https://logs.company/search?q={id}®ion=eu", + "admin": "https://admin.company/eu/users/{id}" + } + } + } +} diff --git a/examples/explain/schemas/typeid.schema.yaml b/examples/explain/schemas/typeid.schema.yaml new file mode 100644 index 0000000..9afa8f8 --- /dev/null +++ b/examples/explain/schemas/typeid.schema.yaml @@ -0,0 +1,24 @@ +schema_version: 1 +types: + user: + name: User + description: End-user account + owner_team: identity-platform + pii: true + retention: 7y + services: [user-service, auth-service] + storage: + primary: + kind: postgres + table: users + shard_by: tenant_id + events: [user.created, user.updated, user.deleted] + policies: + delete: + allowed: false + reason: GDPR retention policy + links: + docs: "https://docs.company/entities/user" + logs: "https://logs.company/search?q={id}" + trace: "https://traces.company/?q={id}" + admin: "https://admin.company/users/{id}" From fe128f6f3db4de4318507230b203bdfc2a9fb84c Mon Sep 17 00:00:00 2001 From: Murad Akhundov Date: Sat, 20 Dec 2025 01:49:31 +0100 Subject: [PATCH 3/3] fix: resolve small linting and typing issues --- tests/explain/test_cli_yaml.py | 1 - tests/explain/test_registry_yaml.py | 1 - typeid/cli.py | 10 +++------- typeid/explain/__init__.py | 4 ++-- typeid/explain/discovery.py | 6 +++--- typeid/explain/engine.py | 4 ++-- typeid/explain/formatters.py | 1 + typeid/explain/model.py | 5 +++++ typeid/explain/registry.py | 9 +++++++-- 9 files changed, 23 insertions(+), 18 deletions(-) diff --git a/tests/explain/test_cli_yaml.py b/tests/explain/test_cli_yaml.py index 178b485..a13f6d1 100644 --- a/tests/explain/test_cli_yaml.py +++ b/tests/explain/test_cli_yaml.py @@ -7,7 +7,6 @@ from typeid import TypeID from typeid.cli import cli - yaml = pytest.importorskip("yaml") # skip if PyYAML not installed diff --git a/tests/explain/test_registry_yaml.py b/tests/explain/test_registry_yaml.py index ea4c66d..d1ac20e 100644 --- a/tests/explain/test_registry_yaml.py +++ b/tests/explain/test_registry_yaml.py @@ -4,7 +4,6 @@ from typeid.explain.registry import load_registry - yaml = pytest.importorskip("yaml") # skip entire file if PyYAML is not installed diff --git a/typeid/cli.py b/typeid/cli.py index 8cb9d56..8e32e98 100644 --- a/typeid/cli.py +++ b/typeid/cli.py @@ -1,12 +1,12 @@ +from pathlib import Path from typing import Optional import click from uuid6 import UUID from typeid import TypeID, base32, from_uuid, get_prefix_and_suffix - -from typeid.explain.engine import explain as explain_engine from typeid.explain.discovery import discover_schema_path +from typeid.explain.engine import explain as explain_engine from typeid.explain.formatters import format_explanation_json, format_explanation_pretty from typeid.explain.registry import load_registry, make_lookup @@ -49,7 +49,7 @@ def decode(encoded: str) -> None: type=click.Path(exists=True, dir_okay=False, path_type=str), required=False, help="Path to TypeID schema file (JSON, or YAML if PyYAML is installed). " - "If omitted, TypeID will try to discover a schema automatically.", + "If omitted, TypeID will try to discover a schema automatically.", ) @click.option( "--json", @@ -98,10 +98,6 @@ def explain( # we keep CLI robust and simply proceed without schema. if resolved_path: - result = load_registry(click.Path(resolved_path)) - # NOTE: click.Path is not a real filesystem path. Convert to pathlib Path. - # We'll do it safely: - from pathlib import Path result = load_registry(Path(resolved_path)) if result.registry is not None: diff --git a/typeid/explain/__init__.py b/typeid/explain/__init__.py index cebf75b..949fd40 100644 --- a/typeid/explain/__init__.py +++ b/typeid/explain/__init__.py @@ -18,10 +18,10 @@ from pathlib import Path from typing import Optional -from .engine import explain as _explain_engine -from .registry import load_registry, make_lookup from .discovery import discover_schema_path +from .engine import explain as _explain_engine from .model import Explanation +from .registry import load_registry, make_lookup __all__ = [ "explain", diff --git a/typeid/explain/discovery.py b/typeid/explain/discovery.py index 8cf9e14..bc581bc 100644 --- a/typeid/explain/discovery.py +++ b/typeid/explain/discovery.py @@ -22,7 +22,6 @@ from pathlib import Path from typing import Iterable, Optional - DEFAULT_CWD_CANDIDATES = ( "typeid.schema.json", "typeid.schema.yaml", @@ -39,6 +38,7 @@ @dataclass(frozen=True, slots=True) class DiscoveryResult: """Result of schema discovery.""" + path: Optional[Path] source: str # e.g., "env:TYPEID_SCHEMA", "cwd", "user_config", "none" @@ -69,7 +69,7 @@ def discover_schema_path( return DiscoveryResult(path=None, source=f"env:{env_var} (not found)") # 2) Current working directory - cwd_path = (cwd or Path.cwd()) + cwd_path = cwd or Path.cwd() for name in DEFAULT_CWD_CANDIDATES: p = cwd_path / name if p.is_file(): @@ -117,7 +117,7 @@ def iter_default_candidate_paths(*, cwd: Optional[Path] = None) -> Iterable[Path Useful for debugging or `typeid explain --debug-discovery` style features. """ - cwd_path = (cwd or Path.cwd()) + cwd_path = cwd or Path.cwd() for name in DEFAULT_CWD_CANDIDATES: yield cwd_path / name diff --git a/typeid/explain/engine.py b/typeid/explain/engine.py index c07de10..6b6ac0a 100644 --- a/typeid/explain/engine.py +++ b/typeid/explain/engine.py @@ -19,8 +19,7 @@ from typeid import TypeID from typeid.errors import TypeIDException -from .model import Explanation, ParseError, ParsedTypeID, Provenance, TypeSchema - +from .model import Explanation, ParsedTypeID, ParseError, Provenance, TypeSchema SchemaLookup = Callable[[str], Optional[TypeSchema]] @@ -183,6 +182,7 @@ def _uuid7_created_at(uuid_obj: Any) -> Optional[datetime]: class _SafeFormatDict(dict): """dict that leaves unknown placeholders intact instead of raising KeyError.""" + def __missing__(self, key: str) -> str: return "{" + key + "}" diff --git a/typeid/explain/formatters.py b/typeid/explain/formatters.py index 588e754..0d7491f 100644 --- a/typeid/explain/formatters.py +++ b/typeid/explain/formatters.py @@ -118,6 +118,7 @@ def format_explanation_json(exp: Explanation, *, indent: int = 2) -> str: class SafeFormatDict(dict): """dict that leaves unknown placeholders intact rather than raising KeyError.""" + def __missing__(self, key: str) -> str: return "{" + key + "}" diff --git a/typeid/explain/model.py b/typeid/explain/model.py index 065daab..e5cdb90 100644 --- a/typeid/explain/model.py +++ b/typeid/explain/model.py @@ -16,6 +16,7 @@ class Provenance(str, Enum): """Where a piece of information came from.""" + DERIVED_FROM_ID = "derived_from_id" SCHEMA = "schema" EXTERNAL = "external" @@ -25,6 +26,7 @@ class Provenance(str, Enum): @dataclass(frozen=True, slots=True) class ParseError: """Represents a recoverable parse/validation error.""" + code: str message: str @@ -39,6 +41,7 @@ class ParsedTypeID: - `suffix` is the encoded UUIDv7 portion (base32 string). - `uuid` and `created_at` are *derived* from suffix if possible. """ + raw: str prefix: Optional[str] suffix: Optional[str] @@ -61,6 +64,7 @@ class TypeSchema: breaking the Python API: we store raw dict and also normalize a few commonly-used fields for nicer UX. """ + prefix: str raw: Dict[str, Any] = field(default_factory=dict) @@ -87,6 +91,7 @@ class Explanation: - links: rendered links (from schema templates), safe for display - provenance: per-field provenance labels for transparency """ + id: str valid: bool diff --git a/typeid/explain/registry.py b/typeid/explain/registry.py index 14f3781..f9ae415 100644 --- a/typeid/explain/registry.py +++ b/typeid/explain/registry.py @@ -54,6 +54,7 @@ class SchemaRegistry: Lookup is by full TypeID prefix (which may contain underscores). """ + def __init__(self, *, schema_version: int, types: Dict[str, TypeSchema], source_path: Path): self.schema_version = schema_version self._types = types @@ -145,10 +146,12 @@ def make_lookup(registry: Optional[SchemaRegistry]): lookup = make_lookup(reg) explanation = explain(id, schema_lookup=lookup) """ + def _lookup(prefix: str) -> Optional[TypeSchema]: if registry is None: return None return registry.get(prefix) + return _lookup @@ -183,8 +186,10 @@ def _read_schema_file(path: Path) -> Tuple[Dict[str, Any], str]: # If extension unknown, try JSON first for convenience. try: return json.loads(raw), "json" - except Exception: - raise RuntimeError(f"Unsupported schema file extension: {path.suffix!s} (supported: .json, .yaml, .yml)") + except Exception as e: + raise RuntimeError( + f"Unsupported schema file extension: {path.suffix!s} (supported: .json, .yaml, .yml)" + ) from e def _to_type_schema(prefix: str, spec: Dict[str, Any]) -> TypeSchema: