Skip to content

Commit 5b38e39

Browse files
William ChrispJoshArmi
authored andcommitted
Simplify incoming event creation
1 parent cb5c775 commit 5b38e39

31 files changed

+234
-340
lines changed

publisher/drivers/checkov.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@ def get_events(self, file_data: dict, repo_name: str) -> Union[Exception, Tuple[
1818
events.append(GuardrailPassed(
1919
aggregate_id = repo_name + "." + result["resource"],
2020
guardrail_id = result["check_id"],
21-
time = current_time,
21+
timestamp = current_time,
2222
))
2323
for result in file_data["results"]["failed_checks"]:
2424
events.append(GuardrailActivated(
2525
aggregate_id = repo_name + "." + result["resource"],
2626
guardrail_id = result["check_id"],
27-
time = current_time,
27+
timestamp = current_time,
2828
))
2929
return tuple(events)
3030
return Exception(f"Unable to read Checkov results from file")

publisher/drivers/open_policy_agent.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@ def get_events(self, file_data: dict, repo_name: str) -> Union[Exception, Tuple[
1919
events.append(GuardrailPassed(
2020
aggregate_id = repo_name + "." + result["input"]["metadata"]["namespace"] + "." + result["input"]["metadata"]["name"],
2121
guardrail_id = result["query"],
22-
time = current_time,
22+
timestamp = current_time,
2323
))
2424
else:
2525
events.append(GuardrailActivated(
2626
aggregate_id = repo_name + "." + result["input"]["metadata"]["namespace"] + "." + result["input"]["metadata"]["name"],
2727
guardrail_id = result["query"],
28-
time = current_time,
28+
timestamp = current_time,
2929
))
3030
return tuple(events)
3131
return Exception(f"Unable to read Open Policy Agent results from file")

publisher/entities/guardrail.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@
77
@dataclass
88
class GuardrailActivated(BaseEvent):
99
guardrail_id: str
10-
time: int
10+
timestamp: int
1111
event_type: Literal["guardrail_activated"] = "guardrail_activated"
1212

1313

1414
@dataclass
1515
class GuardrailPassed(BaseEvent):
1616
guardrail_id: str
17-
time: int
17+
timestamp: int
1818
event_type: Literal["guardrail_passed"] = "guardrail_passed"

src/adapters/controller.py

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,43 @@
1-
from typing import Any, List, Tuple, Union
1+
from typing import Dict, List, Tuple, Union
2+
from uuid import uuid4
23

3-
from src.entities.events import Event
4+
from src.entities.events import Event, EVENT_CLASSES
45
from src.entities.metrics import Metric
56
from src.usecases.event_functions import EVENT_FUNCTIONS
67

8+
def _convert_payload_to_event(
9+
payload: Dict, aggregate_version: int
10+
) -> Event:
11+
try:
12+
event_type = payload["event_type"]
13+
aggregate_id = payload["aggregate_id"]
14+
event_payload = payload.copy()
15+
for key in ["event_type", "aggregate_id"]:
16+
event_payload.pop(key, None)
17+
event_class, payload_class = EVENT_CLASSES[event_type]
18+
return event_class(
19+
aggregate_id=aggregate_id,
20+
aggregate_version=aggregate_version + 1,
21+
event_id=str(uuid4()),
22+
payload=payload_class(**event_payload)
23+
)
24+
except Exception as err:
25+
return err
726

827
def handle_event(
9-
payload: Any, aggregate_events: List[Event]
28+
payload: Dict, aggregate_events: List[Event]
1029
) -> Union[Exception, Tuple[Event, List[Metric]]]:
1130
if "event_type" in payload:
12-
if payload["event_type"] in EVENT_FUNCTIONS:
13-
event, metrics = EVENT_FUNCTIONS[payload["event_type"]](
14-
payload, aggregate_events
15-
)
16-
return (event, metrics)
31+
event_type = payload["event_type"]
32+
if event_type in EVENT_CLASSES:
33+
event = _convert_payload_to_event(payload, len(aggregate_events))
34+
if event_type in EVENT_FUNCTIONS:
35+
event, metrics = EVENT_FUNCTIONS[event_type](
36+
event, aggregate_events
37+
)
38+
return (event, metrics)
39+
else:
40+
return Exception(f"No event type function defined: {event_type}")
1741
else:
18-
return Exception(f"Unknown event type {payload['event_type']}")
42+
return Exception(f"Unknown event type {event_type}")
1943
return Exception("Malformed event with no event_type")

src/drivers/dynamo_event_sink_source.py

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,6 @@ def store_events(self, events: List[Event]) -> Optional[Exception]:
4141
}
4242
batch.put_item(Item=value)
4343
except Exception as err:
44-
logger.error(
45-
f"Couldn't add to table type of error is {type(err)} AND reason is {err}"
46-
)
4744
return err
4845

4946

@@ -56,28 +53,30 @@ def _sort_events(self, event: Event) -> int:
5653
return event.aggregate_version
5754

5855
def get_events_for_aggregate(self, aggregate_id: str) -> List[Event]:
59-
response = self.dynamo_db_table.query(
60-
KeyConditionExpression=Key("aggregate_id").eq(aggregate_id)
61-
)
62-
logger.msg(f"responses {response}")
56+
try:
57+
response = self.dynamo_db_table.query(
58+
KeyConditionExpression=Key("aggregate_id").eq(aggregate_id)
59+
)
6360

64-
events: List[Event] = []
61+
events: List[Event] = []
6562

66-
for item in response["Items"]:
67-
event_type = item["event_type"]
68-
if event_type in EVENT_CLASSES:
69-
event_class, payload_class = EVENT_CLASSES[event_type]
70-
payload = payload_class(**json.loads(item["payload"]))
71-
event = event_class(
72-
aggregate_id=item["aggregate_id"],
73-
aggregate_type=item["aggregate_type"],
74-
aggregate_version=int(item["aggregate_version"]),
75-
event_id=UUID(item["event_id"]),
76-
event_version=int(item["event_version"]),
77-
payload=payload,
78-
)
79-
events.append(event)
63+
for item in response["Items"]:
64+
event_type = item["event_type"]
65+
if event_type in EVENT_CLASSES:
66+
event_class, payload_class = EVENT_CLASSES[event_type]
67+
payload = payload_class(**json.loads(item["payload"]))
68+
event = event_class(
69+
aggregate_id=item["aggregate_id"],
70+
aggregate_type=item["aggregate_type"],
71+
aggregate_version=int(item["aggregate_version"]),
72+
event_id=UUID(item["event_id"]),
73+
event_version=int(item["event_version"]),
74+
payload=payload,
75+
)
76+
events.append(event)
8077

81-
events.sort(key=self._sort_events)
78+
events.sort(key=self._sort_events)
8279

83-
return events
80+
return events
81+
except Exception as err:
82+
return err

src/drivers/timestream_metric_sink.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ def __init__(self) -> None:
1818
self.timestream_client = boto3.client("timestream-write")
1919

2020
def store_metrics(self, metrics: List[Metric]) -> Optional[Exception]:
21-
logger.msg(f"Writing records for metrics {metrics}")
2221
current_time = round(time.time() * 1000)
2322
records = []
2423
for metric in metrics:
@@ -48,9 +47,5 @@ def store_metrics(self, metrics: List[Metric]) -> Optional[Exception]:
4847
Records=records,
4948
CommonAttributes={},
5049
)
51-
logger.msg(
52-
f"WriteRecords Status: {result['ResponseMetadata']['HTTPStatusCode']}"
53-
)
5450
except Exception as err:
55-
logger.error(f"Error: {err}")
5651
return err

src/entities/accounts.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ class AccountRequested(BaseEvent):
1313
aggregate_type = "Account"
1414
event_version = 1
1515
payload: AccountRequestedPayload
16-
event_type = "account_requested"
16+
event_type: Literal["account_requested"] = "account_requested"
1717

1818

1919
class AccountCreatedPayload(BaseModel):

src/entities/patch.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
# Events
99
class PatchRunSummaryPayload(BaseModel):
10+
timestamp: float
1011
failed_instances: str
1112
successful_instances: str
1213

src/entrypoints/aws_lambda.py

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,24 @@
1212

1313

1414
def lambda_handler(event, _) -> None:
15-
print(f"event is {event}")
15+
logger.msg("Incoming event", incoming_event=event)
1616
full_event = event["detail"]
17-
events = event_source.get_events_for_aggregate(full_event["aggregate_id"])
18-
logger.msg("Returned aggregate events", events=events)
19-
response = handle_event(full_event, events)
17+
aggregate_events = event_source.get_events_for_aggregate(full_event["aggregate_id"])
18+
if isinstance(aggregate_events, Exception):
19+
logger.error("Exception retrieving aggregate events", exception=str(aggregate_events))
20+
logger.msg("Aggregate event/s retrieved", aggregate_count=len(aggregate_events))
21+
response = handle_event(full_event, aggregate_events)
2022
if isinstance(response, Exception):
21-
logger.msg("Exception handling event", exception=str(response))
23+
logger.error("Exception handling event", exception=str(response))
2224
else:
2325
event, metrics = response
24-
logger.msg("Received from handling event", response=str(response))
25-
event_sink.store_events([event])
26-
metric_sink.store_metrics(metrics)
26+
logger.msg("Received from handling event", outgoing_event=str(event), outgoing_metrics=str(metrics))
27+
response = event_sink.store_events([event])
28+
if isinstance(response, Exception):
29+
logger.error("Exception storing event", exception=str(response))
30+
logger.info("Event stored in DynamoDB", stored_event=str(event))
31+
response = metric_sink.store_metrics(metrics)
32+
if isinstance(response, Exception):
33+
logger.error("Exception storing metric", exception=str(response))
34+
logger.info("Metrics stored in Timestream", stored_metrics=str(metrics))
35+

src/usecases/accounts.py

Lines changed: 3 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,16 @@
1-
from typing import List, Dict, Tuple, Union
2-
from uuid import uuid4
1+
from typing import List, Tuple
32

43
from src.entities.accounts import (
54
AccountCreated,
6-
AccountCreatedPayload,
75
AccountLeadTime,
86
AccountRequested,
9-
AccountRequestedPayload,
107
)
118
from src.entities.events import Event
129

1310

14-
def _convert_payload_to_event(
15-
event: Dict, aggregate_version: int
16-
) -> Union[AccountCreated, AccountRequested]:
17-
if event["event_type"] == "account_requested":
18-
return AccountRequested(
19-
aggregate_id=event["aggregate_id"],
20-
aggregate_version=aggregate_version + 1,
21-
event_id=str(uuid4()),
22-
event_version=1,
23-
payload=AccountRequestedPayload(timestamp=int(event["time"])),
24-
)
25-
elif event["event_type"] == "account_created":
26-
return AccountCreated(
27-
aggregate_id=event["aggregate_id"],
28-
aggregate_version=aggregate_version + 1,
29-
event_id=str(uuid4()),
30-
event_version=1,
31-
payload=AccountCreatedPayload(timestamp=int(event["time"])),
32-
)
33-
34-
3511
def handle_account_created(
36-
event: Dict, aggregate_events: List[Event]
12+
event: Event, aggregate_events: List[Event]
3713
) -> Tuple[AccountCreated, List[AccountLeadTime]]:
38-
event = _convert_payload_to_event(event, len(aggregate_events))
3914
if aggregate_events:
4015
return (
4116
event,
@@ -52,7 +27,6 @@ def handle_account_created(
5227

5328

5429
def handle_account_requested(
55-
event: Dict, aggregate_events: List[Event]
30+
event: Event, aggregate_events: List[Event]
5631
) -> Tuple[AccountRequested, List]:
57-
event = _convert_payload_to_event(event, len(aggregate_events))
5832
return (event, [])

0 commit comments

Comments
 (0)