Skip to content

Dev #4

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 67 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
98a717a
add receivetask and inclusive gateway, update env template
SempaiEcchi Nov 17, 2021
4050dd7
save variables from receive task
SempaiEcchi Nov 17, 2021
f14435f
show variables in log
SempaiEcchi Nov 17, 2021
81dda62
test converging paralel gateway
SempaiEcchi Nov 18, 2021
57d7619
add None check
SempaiEcchi Nov 18, 2021
4213cbf
allow empty receive task
SempaiEcchi Nov 19, 2021
4ad3d97
add gateway
SempaiEcchi Nov 19, 2021
0a74138
update ticketing bpmn
SempaiEcchi Nov 19, 2021
a5d44bc
update bpmn
SempaiEcchi Nov 19, 2021
598e87b
update model
SempaiEcchi Nov 20, 2021
a26bdc5
update models
SempaiEcchi Nov 20, 2021
ff311b4
update model
SempaiEcchi Nov 20, 2021
3295a92
add input output vars for subprocess(callactivity)
SempaiEcchi Nov 21, 2021
13584cc
add variable mapping
SempaiEcchi Nov 21, 2021
f6fdc18
add ran as subprocess in db
SempaiEcchi Nov 21, 2021
f1da8fa
initiator ( parent ) instance id?
SempaiEcchi Nov 21, 2021
a3ac226
add support for nested variables in mapping
SempaiEcchi Nov 21, 2021
624be9b
fix parse
SempaiEcchi Nov 21, 2021
35636cc
replace requests with aio requests
SempaiEcchi Nov 21, 2021
046f19a
downloads test
SempaiEcchi Nov 21, 2021
e933329
parse expression update
SempaiEcchi May 3, 2022
ab1f68d
add procfile
SempaiEcchi May 3, 2022
494f8c7
update pipfile
SempaiEcchi May 3, 2022
47c9ab0
update pipfile
SempaiEcchi May 3, 2022
75eb8b1
update pipfile
SempaiEcchi May 3, 2022
578316c
update pipfile
SempaiEcchi May 3, 2022
3cd75bd
update pipfile
SempaiEcchi May 3, 2022
f7f3a92
update port
SempaiEcchi May 3, 2022
b95efbf
add model
SempaiEcchi May 4, 2022
275cb18
add model
SempaiEcchi May 4, 2022
8724558
remove models
SempaiEcchi May 4, 2022
1662f54
add dockerfile
SempaiEcchi May 4, 2022
eba69d8
add dockerfile
SempaiEcchi May 4, 2022
c0e37ed
cpt
SempaiEcchi May 4, 2022
2b63097
update docker
SempaiEcchi May 4, 2022
83dda52
add yes
SempaiEcchi May 4, 2022
8f14db0
python version
SempaiEcchi May 4, 2022
71a6d16
update file
SempaiEcchi May 4, 2022
b0ec3d4
port
SempaiEcchi May 4, 2022
c5338e1
port
SempaiEcchi May 5, 2022
ddbe859
remove docker
SempaiEcchi May 5, 2022
ef9724d
loop
SempaiEcchi May 5, 2022
9667d38
loop
SempaiEcchi May 5, 2022
2821e43
loop
SempaiEcchi May 5, 2022
c040e4e
keys only
SempaiEcchi May 5, 2022
083256f
add new endpoint
SempaiEcchi May 5, 2022
1c35ca9
websocket state
SempaiEcchi May 5, 2022
3dbdb15
update model
SempaiEcchi May 6, 2022
9da325d
remove middleware
SempaiEcchi May 6, 2022
ae8320e
loop (+13 squashed commits)
SempaiEcchi May 6, 2022
731b1ec
size
SempaiEcchi May 8, 2022
5766880
new endpoint to autocall receive
SempaiEcchi May 9, 2022
5297c6b
fix
SempaiEcchi May 9, 2022
3791dfa
sleep workaround
SempaiEcchi May 9, 2022
de85cb4
sleep workaround
SempaiEcchi May 9, 2022
c6346a8
update model
SempaiEcchi May 9, 2022
2096d2c
try catch
SempaiEcchi May 10, 2022
9f2b4ec
model update
SempaiEcchi May 10, 2022
c3aa141
workaround in model
SempaiEcchi May 10, 2022
320881f
workaround in model
SempaiEcchi May 10, 2022
c10ff9f
rollback model
SempaiEcchi May 10, 2022
0bc6c10
update model
SempaiEcchi May 10, 2022
4924636
uncomment
SempaiEcchi Aug 6, 2022
b2f6995
meta desc model
SempaiEcchi Sep 4, 2022
24608e9
meta desc mdoel
SempaiEcchi Sep 5, 2022
8671442
horizontal scaling work
SempaiEcchi Oct 28, 2022
bd9be68
fix query exception
SempaiEcchi Oct 28, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
FROM python:3.9-buster

RUN apt-get update \
# dependencies for building Python packages \
&& apt-get install -y libpq-dev python3-dev \
&& apt-get install -y build-essential \
# psycopg2 dependencies
&& apt-get install -y libpq-dev \
# Translations dependencies
&& apt-get install -y gettext \
&& apt-get install -y libcairo2 libpango-1.0-0 libpangocairo-1.0-0 libgdk-pixbuf2.0-0 libffi-dev shared-mime-info \
# cleaning up unused files
&& apt-get purge -y --auto-remove -o APT::AutoRemove::RecommendsImportant=false \
&& rm -rf /var/lib/apt/lists/*

RUN mkdir -p /usr/src/app
WORKDIR /usr/src/app

COPY ./requirements.txt /requirements.txt
RUN pip install --no-cache-dir -r /requirements.txt \
&& rm -rf /requirements.txt

COPY . /usr/src/app

EXPOSE 9000

CMD [ "python3", "-m" , "server"]
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
__pycache__
.vscode
models/diagram_1.bpmn
models/private/diagram_1.bpmn
models/chatbot_model.bpmn
models/test_podproces.bpmn
models/test_call_activity.bpmn
models/test_business_rule.bpmn
models/test_dmn.dmn
models/test_exe_dmn.dmn
py-bpmn-env
database/
database/
env.py
22 changes: 19 additions & 3 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,24 @@ verify_ssl = true
[dev-packages]

[packages]
aiohttp = "==3.7.4.post0"
asyncio = "*"
pipenv = "2022.5.2"
aiohttp = "3.7.4.post0"
aiohttp-cors = "0.7.0"
async-timeout = "3.0.1"
asyncio = "3.4.3"
attrs = "21.2.0"
certifi = "2021.5.30"
chardet = "4.0.0"
charset-normalizer = "2.0.4"
idna = "3.2"
multidict = "5.1.0"
pony = "0.7.14"
requests = "2.26.0"
typing-extensions = "3.10.0.0"
urllib3 = "1.26.6"
yarl = "1.6.3"
psycopg2 = "2.9.1"
gunicorn = "20.1.0"

[requires]
python_version = "3.9"
python_version = "3.8.10"
583 changes: 446 additions & 137 deletions Pipfile.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Procfile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
web: python server.py
115 changes: 77 additions & 38 deletions bpmn_model.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from asyncio import get_running_loop
from concurrent.futures import ThreadPoolExecutor
from types import SimpleNamespace
import xml.etree.ElementTree as ET
from bpmn_types import *
Expand All @@ -11,12 +13,10 @@
import os
from uuid import uuid4
import env
from bpmn_types import Task, ServiceTask

instance_models = {}


def get_model_for_instance(iid):
return instance_models.get(iid, None)
# thread_pool = ThreadPoolExecutor(max_workers=100)


class UserFormMessage:
Expand All @@ -25,6 +25,12 @@ def __init__(self, task_id, form_data={}):
self.form_data = form_data


class ReceiveMessage:
def __init__(self, task_id, data={}):
self.task_id = task_id
self.data = data or {}


class BpmnModel:
def __init__(self, model_path):
self.pending = []
Expand Down Expand Up @@ -77,28 +83,39 @@ def __init__(self, model_path):
break

def to_json(self):
tasks = [x.to_json() for x in self.elements.values() if isinstance(x, UserTask) or isinstance(x, ReceiveTask)]
return {
"model_path": self.model_path,
"main_process": self.main_process.__dict__,
"tasks": [
x.to_json() for x in self.elements.values() if isinstance(x, UserTask)
],
"tasks": tasks,
"instances": [i._id for i in self.instances.values()],
}

async def create_instance(self, _id, variables, process=None):
async def create_or_get_instance(self, _id, variables, process=None, initial=None):
if _id in self.instances:
return self.instances[_id]
instance = await self.create_instance(_id, variables, process=None, initial=None)
return instance

async def create_instance(self, _id, variables, process=None, initial=None):
queue = asyncio.Queue()
if initial:
queue.put_nowait(initial)
if not process:
if self.main_collaboration_process:
# If Collaboration diagram
process = self.main_collaboration_process
else:
# If Process diagram
process = list(self.process_elements)[0]
instance = await self.instance_obj(_id, process, queue, variables)
self.instances[_id] = instance
return instance

async def instance_obj(self, _id, process, queue, variables):
instance = BpmnInstance(
_id, model=self, variables=variables, in_queue=queue, process=process
)
self.instances[_id] = instance
return instance

# Takes model_path needed for deployed subprocess
Expand All @@ -122,7 +139,7 @@ def handle_deployment_subprocesses(self):

class BpmnInstance:
def __init__(self, _id, model, variables, in_queue, process):
instance_models[_id] = model
# instance_models[_id] = model
self._id = _id
self.model = model
self.variables = deepcopy(variables)
Expand All @@ -131,6 +148,7 @@ def __init__(self, _id, model, variables, in_queue, process):
self.pending = deepcopy(self.model.process_pending[process])
self.process = process


def to_json(self):
return {
"id": self._id,
Expand All @@ -148,12 +166,18 @@ def check_condition(cls, state, condition, log):
if condition:
key = condition.partition(":")[0]
value = condition.partition(":")[2]
if key in state and state[key] == value:

if key in state and str(state[key]).upper() == str(value).upper():
ok = True
log("\t DONE: Result is", ok)
return ok

async def run_from_log(self, log):
async def run_from_log(self, log,state):
if self.state == "running" or self.state == "finished":
return self
if state == "running" or state == "finished":
return self

for l in log:
if l.get("activity_id") in self.model.elements:
pending_elements_list = []
Expand All @@ -163,26 +187,19 @@ async def run_from_log(self, log):
self.variables = {**l.get("activity_variables"), **self.variables}
return self

async def run_subprocess(self, process_id):
new_subproces_instance_id = str(uuid4())
if not self.model.subprocesses[process_id]:
new_subprocess_instance = await self.model.create_instance(
new_subproces_instance_id, {}, process_id
)
finished_subprocess = await new_subprocess_instance.run()
else:
subprocess_model = BpmnModel(self.model.subprocesses[process_id])
new_subproces_instance = await subprocess_model.create_instance(
new_subproces_instance_id, {}, process_id
)
finished_subprocess = await new_subproces_instance.run()
return True

async def run(self):
# async def run(self, is_subprocess=False):
# def task():
# await self._run(is_subprocess=is_subprocess)
#
# vars = (get_running_loop().run_in_executor(thread_pool, lambda: task(), ))
# return vars

async def run(self, is_subprocess=False):

self.state = "running"
_id = self._id
prefix = f"\t[{_id}]"

log = partial(print, prefix) # if _id == "2" else lambda *x: x

in_queue = self.in_queue
Expand Down Expand Up @@ -220,7 +237,7 @@ async def run(self):
}
current_and_variables_dict[current._id] = new_variables
# Create new running instance
db_connector.add_running_instance(instance_id=self._id)
db_connector.add_running_instance(instance_id=self._id, state=self.state, ran_as_subprocess=is_subprocess)

if isinstance(current, EndEvent):
exit = True
Expand All @@ -232,15 +249,15 @@ async def run(self):
activity_id=current._id,
timestamp=datetime.now(),
pending=[],
activity_variables={},
activity_variables=self.variables,
)
break

if isinstance(current, UserTask):
if (
message
and isinstance(message, UserFormMessage)
and message.task_id == current._id
message
and isinstance(message, UserFormMessage)
and message.task_id == current._id
):
user_action = message.form_data

Expand All @@ -257,6 +274,7 @@ async def run(self):

elif isinstance(current, ServiceTask):
log("DOING:", current)

can_continue = await current.run(self.variables, _id)
# Helper variables for DB insert
new_variables = {
Expand All @@ -274,11 +292,26 @@ async def run(self):
for k in set(self.variables) - set(before_variables)
}
current_and_variables_dict[current._id] = new_variables
elif isinstance(current, ReceiveTask):
if (
message
and isinstance(message, ReceiveMessage)
and message.task_id == current._id
):
log("DOING:", current)
can_continue = current.run(self.variables, message.data)
# Helper variables for DB insert
new_variables = {
k: self.variables[k]
for k in set(self.variables) - set(before_variables)
}
current_and_variables_dict[current._id] = new_variables

elif isinstance(current, CallActivity):
# TODO implement Variables tab CallActivity

log("DOING:", current)
can_continue = await self.run_subprocess(current.called_element)
can_continue = await current.run_subprocess(self.model, current.called_element, self.variables)
# log("SUBPROCESS DONE WITH VARIABLES\n" + "---> " + str(self.variables))
# Helper variables for DB insert
new_variables = {
k: self.variables[k]
Expand Down Expand Up @@ -316,9 +349,10 @@ async def run(self):
continue

if sequence.condition:
if self.check_condition(
cond = self.check_condition(
self.variables, sequence.condition, log
):
)
if cond:
next_tasks.append(elements[sequence.target])
else:
next_tasks.append(elements[sequence.target])
Expand All @@ -335,7 +369,9 @@ async def run(self):
if isinstance(next_task, ParallelGateway):
next_task.add_token()
else:

log("Waiting for user...", self.pending)

queue.append(await in_queue.get())

# Insert finished events into DB
Expand All @@ -349,10 +385,13 @@ async def run(self):
pending=[pending._id for pending in self.pending],
activity_variables=current_and_variables_dict[c],
)
if not is_subprocess:
log("WORKFLOW DONE WITH VARIABLES\n" + "---> ")

log("DONE")
self.state = "finished"
db_connector.change_instance_state(self._id,state=self.state)
self.pending = []
# Running instance finished
db_connector.finish_running_instance(self._id)

return self.variables
Loading