Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions examples/axon-git/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
.venv
__pycache__
*.pyc
15 changes: 15 additions & 0 deletions examples/axon-git/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
FROM ghcr.io/cortexapps/cortex-axon-agent:latest
WORKDIR /project
COPY requirements.txt .
ENV VIRTUAL_ENV=/project/.venv
RUN python3 -m venv $VIRTUAL_ENV
ENV PATH="$VIRTUAL_ENV/bin:$PATH"

# Install Python dependencies
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
RUN echo "python main.py" > /project/run.sh
RUN chmod +x /project/run.sh
ENV PYTHONUNBUFFERED=1

ENTRYPOINT [ "/app/app_entrypoint.sh", "/project/run.sh" ]
11 changes: 11 additions & 0 deletions examples/axon-git/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
IMAGE_NAME ?= ghcr.io/cortexapps/cortex-axon-agent:latest

docker:
docker build --build-arg "IMAGE_NAME=$(IMAGE_NAME)" -t axon-git:local .

dryrun: docker
docker run -e "DRYRUN=1" --rm axon-git:local


run: docker
docker run -e "CORTEX_API_TOKEN=$(CORTEX_API_TOKEN)" --rm axon-git:local
89 changes: 89 additions & 0 deletions examples/axon-git/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Cortex Axon SDK for Python

This is the official Cortex Axon SDK for Python. It provides a simple way to interact with and extend your Cortex instance.

## Getting started

To run the Cortex SDK you need to get the Axon Agent via Docker:

```
docker pull ghcr.io/cortexapps/cortex-axon-agent:latest
```

Then to scaffold a Python project:

```
docker run -it --rm -v "$(pwd):/src" ghcr.io/cortexapps/cortex-axon-agent:latest init --language python my-python-axon project
```

This will create a new directory `my-python-axon` with a Go project scaffolded in the current directory.

## Running locally

To run your project, first start the agent Docker container like:

```
docker run -it --rm -p "50051:50051" -e "DRYRUN=true" ghcr.io/cortexapps/cortex-axon-agent:latest serve
```

This is `DRYRUN` mode that prints what it would have called, to run against the Cortex API remove the `DRYRUN` environment variable and add `-e "CORTEX_API_TOKEN=$CORTEX_API_TOKEN`. Be sure to export your token first, e.g. `export CORTEX_API_TOKEN=your-token`.


## Adding handlers

To add a handler, open `main.py` and create a function:

```python

@cortex_scheduled(interval="5s")
def my_handler(ctx: HandlerContext):

payload = {
"values": {
"my-service": [
{
"key": "exampleKey1",
"value": "exampleValue1",
},
{
"key": "exampleKey2",
"value": "exampleValue2",
},
]
}
}

json_payload = json.dumps(payload)

response = ctx.api.Call(
cortex_api_pb2.CallRequest(
method="PUT",
path="/api/v1/catalog/custom-data",
body=json_payload,
)
)

if response.status_code >= 400:
ctx.log(f"SetCustomTags error: {response.body}", level="ERROR")
exit(1)

ctx.log("CortexApi PUT custom-data called successfully!")

```

Now start the agent in a separate terminal:
```
make run-agent
```

And run your project:
```
python main.py
```

This will begin executing your handler every 5 seconds.





95 changes: 95 additions & 0 deletions examples/axon-git/git_manager/access.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import threading
from queue import Queue
from readerwriterlock import rwlock
from git_manager.manager import GitRepository


class TaskManager:
def __init__(self, parallel_limit: int):
"""
Initialize the TaskManager.

:param parallel_limit: Maximum number of tasks to run in parallel.
"""
self.parallel_limit = parallel_limit
self.task_queue = Queue()
self.locks = {} # Dictionary to store ReaderWriterLocks for each task type
self.lock = threading.Lock() # Lock to protect access to the locks dictionary
self.threads = []

def _get_lock(self, task_type: str):
"""
Get or create a ReaderWriterLock for the given task type.

:param task_type: The type of task.
:return: A ReaderWriterLock object.
"""
with self.lock:
if task_type not in self.locks:
self.locks[task_type] = rwlock.RWLockFair()
return self.locks[task_type]

def run_task(self, repo: GitRepository, action: callable):
done_event = threading.Event()
task = GitTask(repo, self._get_lock(repo.target_dir), action)
def task_wrapper():
try:
task.run()

finally:
done_event.set() # Signal that the task is complete

self.task_queue.put(task_wrapper)
done_event.wait()
return task.result

def _worker(self):
"""
Worker thread to process tasks from the queue.
"""
while True:
task = self.task_queue.get()
try:
task()
except Exception as e:
print(f"Error executing task: {e}")
self.task_queue.task_done()

def run(self):
"""
Start processing tasks with the specified parallelization limit.
"""
for _ in range(self.parallel_limit):
thread = threading.Thread(target=self._worker)
thread.start()
self.threads.append(thread)

class GitTask:
def __init__(self, repo: GitRepository, rwlock: rwlock, action: callable):
self.repo = repo
self.action = action
self.rwlock = rwlock
self.result = None

def key(self):
return self.repo.target_dir

def run(self):
if self.repo.needs_update():
wlock = self.rwlock.gen_wlock()
with wlock:
try:
self.repo.update(force=True)
except Exception as e:
print(f"Error executing task {task.key()}: {e}")
raise e

with self.rwlock.gen_rlock():
try:
self.result = self.action()
return self.result
except Exception as e:
print(f"Error executing task {task.key()}: {e}")
raise e


69 changes: 69 additions & 0 deletions examples/axon-git/git_manager/formatter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import jinja2


GIT_REPO_URL_TEMPLATE_KEY = "git_repo_url"
GIT_COMMIT_URL_TEMPLATE_KEY = "git_commit_url"
GIT_BLOB_URL_TEMPLATE_KEY = "git_blob_url"
GIT_BRANCH_URL_TEMPLATE_KEY = "git_branch_url"

default_templates = {
GIT_REPO_URL_TEMPLATE_KEY: "https://{{ git_host }}/{{ repo_name }}",
GIT_COMMIT_URL_TEMPLATE_KEY: "https://{{ git_host }}/{{ repo_name }}/commit/{{ sha }}",
GIT_BLOB_URL_TEMPLATE_KEY: "https://{{ git_host }}/{{ repo_name }}/blob/{{ sha }}/{{ path }}",
GIT_BRANCH_URL_TEMPLATE_KEY: "https://{{ git_host }}/{{ repo_name }}/tree/{{ ref }}"
}

class GitFormatter:
_global_formatter = None
@staticmethod
def set_global_formatter(formatter: 'GitFormtter'):
GitFormatter._global_formatter = formatter

@staticmethod
def get_global_formatter() -> 'GitFormtter':
if GitFormtter._global_formatter is None:
raise ValueError("Global formatter not set")
return GitFormtter._global_formatter

def __init__(self, git_host: str, templates: dict):
self.git_host = git_host
self.templates = default_templates.copy()
self.templates.update(templates)

def __render(self, template_key: str, data: dict) -> str:
template = self.templates.get(template_key)
if not template:
raise ValueError(f"Template {template_key} not found")
return jinja2.Template(template).render(data)

def repo_url(self, repo_name: str) -> str:
data = {
"git_host": self.git_host,
"repo_name": repo_name
}
return self.__render(GIT_REPO_URL_TEMPLATE_KEY, data)

def commit_url(self, repo_name: str, sha: str) -> str:
data = {
"git_host": self.git_host,
"repo_name": repo_name,
"sha": sha
}
return self.__render(GIT_COMMIT_URL_TEMPLATE_KEY, data)

def blob_url(self, repo_name: str, sha: str, path: str) -> str:
data = {
"git_host": self.git_host,
"repo_name": repo_name,
"sha": sha,
"path": path
}
return self.__render(GIT_BLOB_URL_TEMPLATE_KEY, data)

def branch_url(self, repo_name: str, ref: str) -> str:
data = {
"git_host": self.git_host,
"repo_name": repo_name,
"ref": ref
}
return self.__render(GIT_BRANCH_URL_TEMPLATE_KEY, data)
Loading
Loading