diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 13ff137..5d88da0 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -10,6 +10,7 @@ on: env: CORTEX_API_KEY: ${{ secrets.CORTEX_API_KEY_PRODUCTION }} + CORTEX_BASE_URL: ${{ vars.CORTEX_BASE_URL }} DOCKER_USERNAME: jeffschnittercortex DOCKER_PASSWORD: ${{ secrets.DOCKER_PASSWORD }} DOCKER_ORGANIZATION: cortexapp @@ -95,6 +96,9 @@ jobs: runs-on: ubuntu-latest container: image: cortexapp/cli:latest + env: + CORTEX_API_KEY: ${{ secrets.CORTEX_API_KEY_PRODUCTION }} + CORTEX_BASE_URL: ${{ vars.CORTEX_BASE_URL }} steps: - name: Post pypi deploy event to Cortex @@ -164,6 +168,9 @@ jobs: runs-on: ubuntu-latest container: image: cortexapp/cli:latest + env: + CORTEX_API_KEY: ${{ secrets.CORTEX_API_KEY_PRODUCTION }} + CORTEX_BASE_URL: ${{ vars.CORTEX_BASE_URL }} steps: - name: Post docker deploy event to Cortex @@ -211,6 +218,9 @@ jobs: runs-on: ubuntu-latest container: image: cortexapp/cli:latest + env: + CORTEX_API_KEY: ${{ secrets.CORTEX_API_KEY_PRODUCTION }} + CORTEX_BASE_URL: ${{ vars.CORTEX_BASE_URL }} steps: - name: Post homebrew deploy event to Cortex diff --git a/HISTORY.md b/HISTORY.md index abf5dd1..0f3a9e8 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -6,6 +6,16 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). +## [1.2.0](https://github.com/cortexapps/cli/releases/tag/1.2.0) - 2025-11-04 + +[Compare with 1.1.0](https://github.com/cortexapps/cli/compare/1.1.0...1.2.0) + +### Fixed + +- fix: handle 409 Already evaluating in trigger-evaluation test ([6715ea8](https://github.com/cortexapps/cli/commit/6715ea8ace42e5e137b649daf927bf2bec225b5e) by Jeff Schnitter). +- fix: remove entity_relationships imports from wrong branch ([3d467f6](https://github.com/cortexapps/cli/commit/3d467f699a0d4883316e039fcca571bde03d7f0a) by Jeff Schnitter). +- fix: ensure base_url defaults correctly when not set ([cadf62e](https://github.com/cortexapps/cli/commit/cadf62e79c96fb6e89046d399d9247680e8057da) by Jeff Schnitter). + ## [1.1.0](https://github.com/cortexapps/cli/releases/tag/1.1.0) - 2025-11-04 [Compare with 1.0.6](https://github.com/cortexapps/cli/compare/1.0.6...1.1.0) diff --git a/cortexapps_cli/commands/backup.py b/cortexapps_cli/commands/backup.py index fa31212..a09fdc8 100644 --- a/cortexapps_cli/commands/backup.py +++ b/cortexapps_cli/commands/backup.py @@ -9,6 +9,7 @@ from rich.console import Console from enum import Enum import yaml +from concurrent.futures import ThreadPoolExecutor, as_completed import cortexapps_cli.commands.scorecards as scorecards import cortexapps_cli.commands.catalog as catalog @@ -60,7 +61,7 @@ def _file_name(directory, tag, content, extension): def _write_file(content, file, is_json=False): with open(file, 'w') as f: if is_json: - print(content, file=f) + json.dump(content, f, indent=2) else: f.write(str(content) + "\n") f.close() @@ -108,9 +109,27 @@ def _export_plugins(ctx, directory): list = plugins.list(ctx, _print=False, include_drafts="true", page=None, page_size=None) tags = [plugin["tag"] for plugin in list["plugins"]] tags_sorted = sorted(tags) - for tag in tags_sorted: - content = plugins.get(ctx, tag_or_id=tag, include_blob="true", _print=False) - _file_name(directory, tag, content, "json") + + def fetch_plugin(tag): + try: + content = plugins.get(ctx, tag_or_id=tag, include_blob="true", _print=False) + return (tag, content, None) + except Exception as e: + return (tag, None, str(e)) + + # Fetch all plugins in parallel + with ThreadPoolExecutor(max_workers=30) as executor: + futures = {executor.submit(fetch_plugin, tag): tag for tag in tags_sorted} + results = [] + for future in as_completed(futures): + results.append(future.result()) + + # Sort results alphabetically and write in order + for tag, content, error in sorted(results, key=lambda x: x[0]): + if error: + print(f"Failed to export plugin {tag}: {error}") + else: + _file_name(directory, tag, content, "json") def _export_scorecards(ctx, directory): directory = _directory_name(directory, "scorecards") @@ -118,22 +137,44 @@ def _export_scorecards(ctx, directory): list = scorecards.list(ctx, show_drafts=True, page=None, page_size=None, _print=False) tags = [scorecard["tag"] for scorecard in list["scorecards"]] tags_sorted = sorted(tags) - for tag in tags_sorted: - content = scorecards.descriptor(ctx, scorecard_tag=tag, _print=False) - _file_name(directory, tag, content, "yaml") + + def fetch_scorecard(tag): + try: + content = scorecards.descriptor(ctx, scorecard_tag=tag, _print=False) + return (tag, content, None) + except Exception as e: + return (tag, None, str(e)) + + # Fetch all scorecards in parallel + with ThreadPoolExecutor(max_workers=30) as executor: + futures = {executor.submit(fetch_scorecard, tag): tag for tag in tags_sorted} + results = [] + for future in as_completed(futures): + results.append(future.result()) + + # Sort results alphabetically and write in order + for tag, content, error in sorted(results, key=lambda x: x[0]): + if error: + print(f"Failed to export scorecard {tag}: {error}") + else: + _file_name(directory, tag, content, "yaml") def _export_workflows(ctx, directory): directory = _directory_name(directory, "workflows") - list = workflows.list(ctx, _print=False, include_actions="false", page=None, page_size=None, search_query=None) - tags = [workflow["tag"] for workflow in list["workflows"]] - tags_sorted = sorted(tags) - for tag in tags_sorted: + # Get all workflows with actions in one API call + list = workflows.list(ctx, _print=False, include_actions="true", page=None, page_size=None, search_query=None) + workflows_data = sorted(list["workflows"], key=lambda x: x["tag"]) + + # Convert JSON workflows to YAML and write them + for workflow in workflows_data: + tag = workflow["tag"] try: - content = workflows.get(ctx, tag=tag, yaml="true", _print=False) - _file_name(directory, tag, content, "yaml") - except: - print("failed for " + tag) + # Convert the JSON workflow data to YAML format + workflow_yaml = yaml.dump(workflow, default_flow_style=False, sort_keys=False) + _file_name(directory, tag, workflow_yaml, "yaml") + except Exception as e: + print(f"Failed to export workflow {tag}: {e}") backupTypes = { "catalog", @@ -257,38 +298,122 @@ def _import_entity_types(ctx, force, directory): def _import_catalog(ctx, directory): if os.path.isdir(directory): print("Processing: " + directory) - for filename in sorted(os.listdir(directory)): - file_path = os.path.join(directory, filename) - if os.path.isfile(file_path): - print(" Importing: " + filename) - catalog.create(ctx, file_input=open(file_path), _print=False) + files = [(filename, os.path.join(directory, filename)) + for filename in sorted(os.listdir(directory)) + if os.path.isfile(os.path.join(directory, filename))] + + def import_catalog_file(file_info): + filename, file_path = file_info + try: + with open(file_path) as f: + catalog.create(ctx, file_input=f, _print=False) + return (filename, None) + except Exception as e: + return (filename, str(e)) + + # Import all files in parallel + with ThreadPoolExecutor(max_workers=30) as executor: + futures = {executor.submit(import_catalog_file, file_info): file_info[0] for file_info in files} + results = [] + for future in as_completed(futures): + results.append(future.result()) + + # Print results in alphabetical order + for filename, error in sorted(results, key=lambda x: x[0]): + if error: + print(f" Failed to import {filename}: {error}") + else: + print(f" Importing: {filename}") def _import_plugins(ctx, directory): if os.path.isdir(directory): print("Processing: " + directory) - for filename in sorted(os.listdir(directory)): - file_path = os.path.join(directory, filename) - if os.path.isfile(file_path): - print(" Importing: " + filename) - plugins.create(ctx, file_input=open(file_path), force=True) + files = [(filename, os.path.join(directory, filename)) + for filename in sorted(os.listdir(directory)) + if os.path.isfile(os.path.join(directory, filename))] + + def import_plugin_file(file_info): + filename, file_path = file_info + try: + with open(file_path) as f: + plugins.create(ctx, file_input=f, force=True) + return (filename, None) + except Exception as e: + return (filename, str(e)) + + # Import all files in parallel + with ThreadPoolExecutor(max_workers=30) as executor: + futures = {executor.submit(import_plugin_file, file_info): file_info[0] for file_info in files} + results = [] + for future in as_completed(futures): + results.append(future.result()) + + # Print results in alphabetical order + for filename, error in sorted(results, key=lambda x: x[0]): + if error: + print(f" Failed to import {filename}: {error}") + else: + print(f" Importing: {filename}") def _import_scorecards(ctx, directory): if os.path.isdir(directory): print("Processing: " + directory) - for filename in sorted(os.listdir(directory)): - file_path = os.path.join(directory, filename) - if os.path.isfile(file_path): - print(" Importing: " + filename) - scorecards.create(ctx, file_input=open(file_path), dry_run=False) + files = [(filename, os.path.join(directory, filename)) + for filename in sorted(os.listdir(directory)) + if os.path.isfile(os.path.join(directory, filename))] + + def import_scorecard_file(file_info): + filename, file_path = file_info + try: + with open(file_path) as f: + scorecards.create(ctx, file_input=f, dry_run=False) + return (filename, None) + except Exception as e: + return (filename, str(e)) + + # Import all files in parallel + with ThreadPoolExecutor(max_workers=30) as executor: + futures = {executor.submit(import_scorecard_file, file_info): file_info[0] for file_info in files} + results = [] + for future in as_completed(futures): + results.append(future.result()) + + # Print results in alphabetical order + for filename, error in sorted(results, key=lambda x: x[0]): + if error: + print(f" Failed to import {filename}: {error}") + else: + print(f" Importing: {filename}") def _import_workflows(ctx, directory): if os.path.isdir(directory): print("Processing: " + directory) - for filename in sorted(os.listdir(directory)): - file_path = os.path.join(directory, filename) - if os.path.isfile(file_path): - print(" Importing: " + filename) - workflows.create(ctx, file_input=open(file_path)) + files = [(filename, os.path.join(directory, filename)) + for filename in sorted(os.listdir(directory)) + if os.path.isfile(os.path.join(directory, filename))] + + def import_workflow_file(file_info): + filename, file_path = file_info + try: + with open(file_path) as f: + workflows.create(ctx, file_input=f) + return (filename, None) + except Exception as e: + return (filename, str(e)) + + # Import all files in parallel + with ThreadPoolExecutor(max_workers=30) as executor: + futures = {executor.submit(import_workflow_file, file_info): file_info[0] for file_info in files} + results = [] + for future in as_completed(futures): + results.append(future.result()) + + # Print results in alphabetical order + for filename, error in sorted(results, key=lambda x: x[0]): + if error: + print(f" Failed to import {filename}: {error}") + else: + print(f" Importing: {filename}") @app.command("import") def import_tenant( diff --git a/cortexapps_cli/cortex_client.py b/cortexapps_cli/cortex_client.py index b84eafe..1e35713 100644 --- a/cortexapps_cli/cortex_client.py +++ b/cortexapps_cli/cortex_client.py @@ -1,4 +1,6 @@ import requests +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry import json import typer from rich import print @@ -20,6 +22,20 @@ def __init__(self, api_key, tenant, numeric_level, base_url='https://api.getcort logging.basicConfig(level=numeric_level) self.logger = logging.getLogger(__name__) + # Create a session with connection pooling for better performance + self.session = requests.Session() + + # Configure connection pool to support concurrent requests + # pool_connections: number of connection pools to cache + # pool_maxsize: maximum number of connections to save in the pool + adapter = HTTPAdapter( + pool_connections=10, + pool_maxsize=50, + max_retries=Retry(total=3, backoff_factor=0.3, status_forcelist=[500, 502, 503, 504]) + ) + self.session.mount('https://', adapter) + self.session.mount('http://', adapter) + def request(self, method, endpoint, params={}, headers={}, data=None, raw_body=False, raw_response=False, content_type='application/json'): req_headers = { 'Authorization': f'Bearer {self.api_key}', @@ -33,7 +49,8 @@ def request(self, method, endpoint, params={}, headers={}, data=None, raw_body=F if content_type == 'application/json' and isinstance(data, dict): req_data = json.dumps(data) - response = requests.request(method, url, params=params, headers=req_headers, data=req_data) + # Use session for connection pooling and reuse + response = self.session.request(method, url, params=params, headers=req_headers, data=req_data) self.logger.debug(f"Request Headers: {response.request.headers}") self.logger.debug(f"Response Status Code: {response.status_code}") diff --git a/tests/test_scorecards.py b/tests/test_scorecards.py index 9ec4216..801f556 100644 --- a/tests/test_scorecards.py +++ b/tests/test_scorecards.py @@ -1,5 +1,6 @@ from tests.helpers.utils import * import yaml +import time # Get rule id to be used in exemption tests. # TODO: check for and revoke any PENDING exemptions. @@ -10,7 +11,18 @@ def _get_rule(title): return rule_id[0] def test_scorecards(): - cli(["scorecards", "create", "-f", "data/import/scorecards/cli-test-scorecard.yaml"]) + # Retry scorecard create in case there's an active evaluation + # (can happen if test_import.py just triggered an evaluation) + max_retries = 3 + for attempt in range(max_retries): + try: + cli(["scorecards", "create", "-f", "data/import/scorecards/cli-test-scorecard.yaml"]) + break + except Exception as e: + if "500" in str(e) and attempt < max_retries - 1: + time.sleep(2 ** attempt) # Exponential backoff: 1s, 2s + continue + raise response = cli(["scorecards", "list"]) assert any(scorecard['tag'] == 'cli-test-scorecard' for scorecard in response['scorecards']), "Should find scorecard with tag cli-test-scorecard" @@ -43,7 +55,17 @@ def test_scorecards(): # cli(["scorecards", "scores", "-t", "cli-test-scorecard"]) def test_scorecards_drafts(): - cli(["scorecards", "create", "-f", "data/import/scorecards/cli-test-draft-scorecard.yaml"]) + # Retry scorecard create in case there's an active evaluation + max_retries = 3 + for attempt in range(max_retries): + try: + cli(["scorecards", "create", "-f", "data/import/scorecards/cli-test-draft-scorecard.yaml"]) + break + except Exception as e: + if "500" in str(e) and attempt < max_retries - 1: + time.sleep(2 ** attempt) # Exponential backoff: 1s, 2s + continue + raise response = cli(["scorecards", "list", "-s"]) assert any(scorecard['tag'] == 'cli-test-draft-scorecard' for scorecard in response['scorecards'])