diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml
index 13ff137..5d88da0 100644
--- a/.github/workflows/publish.yml
+++ b/.github/workflows/publish.yml
@@ -10,6 +10,7 @@ on:
env:
CORTEX_API_KEY: ${{ secrets.CORTEX_API_KEY_PRODUCTION }}
+ CORTEX_BASE_URL: ${{ vars.CORTEX_BASE_URL }}
DOCKER_USERNAME: jeffschnittercortex
DOCKER_PASSWORD: ${{ secrets.DOCKER_PASSWORD }}
DOCKER_ORGANIZATION: cortexapp
@@ -95,6 +96,9 @@ jobs:
runs-on: ubuntu-latest
container:
image: cortexapp/cli:latest
+ env:
+ CORTEX_API_KEY: ${{ secrets.CORTEX_API_KEY_PRODUCTION }}
+ CORTEX_BASE_URL: ${{ vars.CORTEX_BASE_URL }}
steps:
- name: Post pypi deploy event to Cortex
@@ -164,6 +168,9 @@ jobs:
runs-on: ubuntu-latest
container:
image: cortexapp/cli:latest
+ env:
+ CORTEX_API_KEY: ${{ secrets.CORTEX_API_KEY_PRODUCTION }}
+ CORTEX_BASE_URL: ${{ vars.CORTEX_BASE_URL }}
steps:
- name: Post docker deploy event to Cortex
@@ -211,6 +218,9 @@ jobs:
runs-on: ubuntu-latest
container:
image: cortexapp/cli:latest
+ env:
+ CORTEX_API_KEY: ${{ secrets.CORTEX_API_KEY_PRODUCTION }}
+ CORTEX_BASE_URL: ${{ vars.CORTEX_BASE_URL }}
steps:
- name: Post homebrew deploy event to Cortex
diff --git a/HISTORY.md b/HISTORY.md
index abf5dd1..0f3a9e8 100644
--- a/HISTORY.md
+++ b/HISTORY.md
@@ -6,6 +6,16 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
+## [1.2.0](https://github.com/cortexapps/cli/releases/tag/1.2.0) - 2025-11-04
+
+[Compare with 1.1.0](https://github.com/cortexapps/cli/compare/1.1.0...1.2.0)
+
+### Fixed
+
+- fix: handle 409 Already evaluating in trigger-evaluation test ([6715ea8](https://github.com/cortexapps/cli/commit/6715ea8ace42e5e137b649daf927bf2bec225b5e) by Jeff Schnitter).
+- fix: remove entity_relationships imports from wrong branch ([3d467f6](https://github.com/cortexapps/cli/commit/3d467f699a0d4883316e039fcca571bde03d7f0a) by Jeff Schnitter).
+- fix: ensure base_url defaults correctly when not set ([cadf62e](https://github.com/cortexapps/cli/commit/cadf62e79c96fb6e89046d399d9247680e8057da) by Jeff Schnitter).
+
## [1.1.0](https://github.com/cortexapps/cli/releases/tag/1.1.0) - 2025-11-04
[Compare with 1.0.6](https://github.com/cortexapps/cli/compare/1.0.6...1.1.0)
diff --git a/cortexapps_cli/commands/backup.py b/cortexapps_cli/commands/backup.py
index fa31212..a09fdc8 100644
--- a/cortexapps_cli/commands/backup.py
+++ b/cortexapps_cli/commands/backup.py
@@ -9,6 +9,7 @@
from rich.console import Console
from enum import Enum
import yaml
+from concurrent.futures import ThreadPoolExecutor, as_completed
import cortexapps_cli.commands.scorecards as scorecards
import cortexapps_cli.commands.catalog as catalog
@@ -60,7 +61,7 @@ def _file_name(directory, tag, content, extension):
def _write_file(content, file, is_json=False):
with open(file, 'w') as f:
if is_json:
- print(content, file=f)
+ json.dump(content, f, indent=2)
else:
f.write(str(content) + "\n")
f.close()
@@ -108,9 +109,27 @@ def _export_plugins(ctx, directory):
list = plugins.list(ctx, _print=False, include_drafts="true", page=None, page_size=None)
tags = [plugin["tag"] for plugin in list["plugins"]]
tags_sorted = sorted(tags)
- for tag in tags_sorted:
- content = plugins.get(ctx, tag_or_id=tag, include_blob="true", _print=False)
- _file_name(directory, tag, content, "json")
+
+ def fetch_plugin(tag):
+ try:
+ content = plugins.get(ctx, tag_or_id=tag, include_blob="true", _print=False)
+ return (tag, content, None)
+ except Exception as e:
+ return (tag, None, str(e))
+
+ # Fetch all plugins in parallel
+ with ThreadPoolExecutor(max_workers=30) as executor:
+ futures = {executor.submit(fetch_plugin, tag): tag for tag in tags_sorted}
+ results = []
+ for future in as_completed(futures):
+ results.append(future.result())
+
+ # Sort results alphabetically and write in order
+ for tag, content, error in sorted(results, key=lambda x: x[0]):
+ if error:
+ print(f"Failed to export plugin {tag}: {error}")
+ else:
+ _file_name(directory, tag, content, "json")
def _export_scorecards(ctx, directory):
directory = _directory_name(directory, "scorecards")
@@ -118,22 +137,44 @@ def _export_scorecards(ctx, directory):
list = scorecards.list(ctx, show_drafts=True, page=None, page_size=None, _print=False)
tags = [scorecard["tag"] for scorecard in list["scorecards"]]
tags_sorted = sorted(tags)
- for tag in tags_sorted:
- content = scorecards.descriptor(ctx, scorecard_tag=tag, _print=False)
- _file_name(directory, tag, content, "yaml")
+
+ def fetch_scorecard(tag):
+ try:
+ content = scorecards.descriptor(ctx, scorecard_tag=tag, _print=False)
+ return (tag, content, None)
+ except Exception as e:
+ return (tag, None, str(e))
+
+ # Fetch all scorecards in parallel
+ with ThreadPoolExecutor(max_workers=30) as executor:
+ futures = {executor.submit(fetch_scorecard, tag): tag for tag in tags_sorted}
+ results = []
+ for future in as_completed(futures):
+ results.append(future.result())
+
+ # Sort results alphabetically and write in order
+ for tag, content, error in sorted(results, key=lambda x: x[0]):
+ if error:
+ print(f"Failed to export scorecard {tag}: {error}")
+ else:
+ _file_name(directory, tag, content, "yaml")
def _export_workflows(ctx, directory):
directory = _directory_name(directory, "workflows")
- list = workflows.list(ctx, _print=False, include_actions="false", page=None, page_size=None, search_query=None)
- tags = [workflow["tag"] for workflow in list["workflows"]]
- tags_sorted = sorted(tags)
- for tag in tags_sorted:
+ # Get all workflows with actions in one API call
+ list = workflows.list(ctx, _print=False, include_actions="true", page=None, page_size=None, search_query=None)
+ workflows_data = sorted(list["workflows"], key=lambda x: x["tag"])
+
+ # Convert JSON workflows to YAML and write them
+ for workflow in workflows_data:
+ tag = workflow["tag"]
try:
- content = workflows.get(ctx, tag=tag, yaml="true", _print=False)
- _file_name(directory, tag, content, "yaml")
- except:
- print("failed for " + tag)
+ # Convert the JSON workflow data to YAML format
+ workflow_yaml = yaml.dump(workflow, default_flow_style=False, sort_keys=False)
+ _file_name(directory, tag, workflow_yaml, "yaml")
+ except Exception as e:
+ print(f"Failed to export workflow {tag}: {e}")
backupTypes = {
"catalog",
@@ -257,38 +298,122 @@ def _import_entity_types(ctx, force, directory):
def _import_catalog(ctx, directory):
if os.path.isdir(directory):
print("Processing: " + directory)
- for filename in sorted(os.listdir(directory)):
- file_path = os.path.join(directory, filename)
- if os.path.isfile(file_path):
- print(" Importing: " + filename)
- catalog.create(ctx, file_input=open(file_path), _print=False)
+ files = [(filename, os.path.join(directory, filename))
+ for filename in sorted(os.listdir(directory))
+ if os.path.isfile(os.path.join(directory, filename))]
+
+ def import_catalog_file(file_info):
+ filename, file_path = file_info
+ try:
+ with open(file_path) as f:
+ catalog.create(ctx, file_input=f, _print=False)
+ return (filename, None)
+ except Exception as e:
+ return (filename, str(e))
+
+ # Import all files in parallel
+ with ThreadPoolExecutor(max_workers=30) as executor:
+ futures = {executor.submit(import_catalog_file, file_info): file_info[0] for file_info in files}
+ results = []
+ for future in as_completed(futures):
+ results.append(future.result())
+
+ # Print results in alphabetical order
+ for filename, error in sorted(results, key=lambda x: x[0]):
+ if error:
+ print(f" Failed to import {filename}: {error}")
+ else:
+ print(f" Importing: {filename}")
def _import_plugins(ctx, directory):
if os.path.isdir(directory):
print("Processing: " + directory)
- for filename in sorted(os.listdir(directory)):
- file_path = os.path.join(directory, filename)
- if os.path.isfile(file_path):
- print(" Importing: " + filename)
- plugins.create(ctx, file_input=open(file_path), force=True)
+ files = [(filename, os.path.join(directory, filename))
+ for filename in sorted(os.listdir(directory))
+ if os.path.isfile(os.path.join(directory, filename))]
+
+ def import_plugin_file(file_info):
+ filename, file_path = file_info
+ try:
+ with open(file_path) as f:
+ plugins.create(ctx, file_input=f, force=True)
+ return (filename, None)
+ except Exception as e:
+ return (filename, str(e))
+
+ # Import all files in parallel
+ with ThreadPoolExecutor(max_workers=30) as executor:
+ futures = {executor.submit(import_plugin_file, file_info): file_info[0] for file_info in files}
+ results = []
+ for future in as_completed(futures):
+ results.append(future.result())
+
+ # Print results in alphabetical order
+ for filename, error in sorted(results, key=lambda x: x[0]):
+ if error:
+ print(f" Failed to import {filename}: {error}")
+ else:
+ print(f" Importing: {filename}")
def _import_scorecards(ctx, directory):
if os.path.isdir(directory):
print("Processing: " + directory)
- for filename in sorted(os.listdir(directory)):
- file_path = os.path.join(directory, filename)
- if os.path.isfile(file_path):
- print(" Importing: " + filename)
- scorecards.create(ctx, file_input=open(file_path), dry_run=False)
+ files = [(filename, os.path.join(directory, filename))
+ for filename in sorted(os.listdir(directory))
+ if os.path.isfile(os.path.join(directory, filename))]
+
+ def import_scorecard_file(file_info):
+ filename, file_path = file_info
+ try:
+ with open(file_path) as f:
+ scorecards.create(ctx, file_input=f, dry_run=False)
+ return (filename, None)
+ except Exception as e:
+ return (filename, str(e))
+
+ # Import all files in parallel
+ with ThreadPoolExecutor(max_workers=30) as executor:
+ futures = {executor.submit(import_scorecard_file, file_info): file_info[0] for file_info in files}
+ results = []
+ for future in as_completed(futures):
+ results.append(future.result())
+
+ # Print results in alphabetical order
+ for filename, error in sorted(results, key=lambda x: x[0]):
+ if error:
+ print(f" Failed to import {filename}: {error}")
+ else:
+ print(f" Importing: {filename}")
def _import_workflows(ctx, directory):
if os.path.isdir(directory):
print("Processing: " + directory)
- for filename in sorted(os.listdir(directory)):
- file_path = os.path.join(directory, filename)
- if os.path.isfile(file_path):
- print(" Importing: " + filename)
- workflows.create(ctx, file_input=open(file_path))
+ files = [(filename, os.path.join(directory, filename))
+ for filename in sorted(os.listdir(directory))
+ if os.path.isfile(os.path.join(directory, filename))]
+
+ def import_workflow_file(file_info):
+ filename, file_path = file_info
+ try:
+ with open(file_path) as f:
+ workflows.create(ctx, file_input=f)
+ return (filename, None)
+ except Exception as e:
+ return (filename, str(e))
+
+ # Import all files in parallel
+ with ThreadPoolExecutor(max_workers=30) as executor:
+ futures = {executor.submit(import_workflow_file, file_info): file_info[0] for file_info in files}
+ results = []
+ for future in as_completed(futures):
+ results.append(future.result())
+
+ # Print results in alphabetical order
+ for filename, error in sorted(results, key=lambda x: x[0]):
+ if error:
+ print(f" Failed to import {filename}: {error}")
+ else:
+ print(f" Importing: {filename}")
@app.command("import")
def import_tenant(
diff --git a/cortexapps_cli/cortex_client.py b/cortexapps_cli/cortex_client.py
index b84eafe..1e35713 100644
--- a/cortexapps_cli/cortex_client.py
+++ b/cortexapps_cli/cortex_client.py
@@ -1,4 +1,6 @@
import requests
+from requests.adapters import HTTPAdapter
+from urllib3.util.retry import Retry
import json
import typer
from rich import print
@@ -20,6 +22,20 @@ def __init__(self, api_key, tenant, numeric_level, base_url='https://api.getcort
logging.basicConfig(level=numeric_level)
self.logger = logging.getLogger(__name__)
+ # Create a session with connection pooling for better performance
+ self.session = requests.Session()
+
+ # Configure connection pool to support concurrent requests
+ # pool_connections: number of connection pools to cache
+ # pool_maxsize: maximum number of connections to save in the pool
+ adapter = HTTPAdapter(
+ pool_connections=10,
+ pool_maxsize=50,
+ max_retries=Retry(total=3, backoff_factor=0.3, status_forcelist=[500, 502, 503, 504])
+ )
+ self.session.mount('https://', adapter)
+ self.session.mount('http://', adapter)
+
def request(self, method, endpoint, params={}, headers={}, data=None, raw_body=False, raw_response=False, content_type='application/json'):
req_headers = {
'Authorization': f'Bearer {self.api_key}',
@@ -33,7 +49,8 @@ def request(self, method, endpoint, params={}, headers={}, data=None, raw_body=F
if content_type == 'application/json' and isinstance(data, dict):
req_data = json.dumps(data)
- response = requests.request(method, url, params=params, headers=req_headers, data=req_data)
+ # Use session for connection pooling and reuse
+ response = self.session.request(method, url, params=params, headers=req_headers, data=req_data)
self.logger.debug(f"Request Headers: {response.request.headers}")
self.logger.debug(f"Response Status Code: {response.status_code}")
diff --git a/tests/test_scorecards.py b/tests/test_scorecards.py
index 9ec4216..801f556 100644
--- a/tests/test_scorecards.py
+++ b/tests/test_scorecards.py
@@ -1,5 +1,6 @@
from tests.helpers.utils import *
import yaml
+import time
# Get rule id to be used in exemption tests.
# TODO: check for and revoke any PENDING exemptions.
@@ -10,7 +11,18 @@ def _get_rule(title):
return rule_id[0]
def test_scorecards():
- cli(["scorecards", "create", "-f", "data/import/scorecards/cli-test-scorecard.yaml"])
+ # Retry scorecard create in case there's an active evaluation
+ # (can happen if test_import.py just triggered an evaluation)
+ max_retries = 3
+ for attempt in range(max_retries):
+ try:
+ cli(["scorecards", "create", "-f", "data/import/scorecards/cli-test-scorecard.yaml"])
+ break
+ except Exception as e:
+ if "500" in str(e) and attempt < max_retries - 1:
+ time.sleep(2 ** attempt) # Exponential backoff: 1s, 2s
+ continue
+ raise
response = cli(["scorecards", "list"])
assert any(scorecard['tag'] == 'cli-test-scorecard' for scorecard in response['scorecards']), "Should find scorecard with tag cli-test-scorecard"
@@ -43,7 +55,17 @@ def test_scorecards():
# cli(["scorecards", "scores", "-t", "cli-test-scorecard"])
def test_scorecards_drafts():
- cli(["scorecards", "create", "-f", "data/import/scorecards/cli-test-draft-scorecard.yaml"])
+ # Retry scorecard create in case there's an active evaluation
+ max_retries = 3
+ for attempt in range(max_retries):
+ try:
+ cli(["scorecards", "create", "-f", "data/import/scorecards/cli-test-draft-scorecard.yaml"])
+ break
+ except Exception as e:
+ if "500" in str(e) and attempt < max_retries - 1:
+ time.sleep(2 ** attempt) # Exponential backoff: 1s, 2s
+ continue
+ raise
response = cli(["scorecards", "list", "-s"])
assert any(scorecard['tag'] == 'cli-test-draft-scorecard' for scorecard in response['scorecards'])