Skip to content

Commit 93d13bf

Browse files
Shreyas SrivathsaShreyas Srivathsa
authored andcommitted
Initial commit
0 parents  commit 93d13bf

File tree

80 files changed

+1813
-0
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

80 files changed

+1813
-0
lines changed

.gitignore

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
artifacts/
2+
.venv/
3+
dump.rdb
4+
*.tar
5+
client/__pycache__/
6+
db/__pycache__/
7+
grpc_config/out/__pycache__
8+
processor/__pycache__/
9+
producer/__pycache__/
10+
server/__pycache__/

.vscode/launch.json

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
{
2+
"version": "0.2.0",
3+
"configurations": [
4+
{
5+
"name": "Producer",
6+
"type": "debugpy",
7+
"request": "launch",
8+
"module": "producer.producer", //use module instead of program to deal with dependencies involving other user defined modules
9+
"console": "integratedTerminal",
10+
"cwd": "${workspaceRoot}"
11+
},
12+
{
13+
"name": "Processor",
14+
"type": "debugpy",
15+
"request": "launch",
16+
"module": "processor.processor", //use module instead of program to deal with dependencies involving other user defined modules
17+
"console": "integratedTerminal",
18+
"cwd": "${workspaceRoot}"
19+
}
20+
]
21+
}

LICENSE

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
MIT License
2+
3+
Copyright (c) 2025 Shreyas Srivathsa <srivathsashreyas@gmail.com>
4+
5+
Permission is hereby granted, free of charge, to any person obtaining a copy
6+
of this software and associated documentation files (the "Software"), to deal
7+
in the Software without restriction, including without limitation the rights
8+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
copies of the Software, and to permit persons to whom the Software is
10+
furnished to do so, subject to the following conditions:
11+
12+
The above copyright notice and this permission notice shall be included in all
13+
copies or substantial portions of the Software.
14+
15+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
SOFTWARE.

README.md

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
# Overview
2+
3+
This is a personal project (purely for learning) to briefly explore setting up a few components in a simple producer consumer application. The components include:
4+
1. Producer -> A simple FastAPI microservice which generates a json message representing
5+
an action (in this case 'login') performed by a user. The message is sent to a Kafka broker
6+
2. Kafka Broker -> Used to store messages sent by the producer. The processor component streams messages from the broker
7+
3. Processor -> A Spark based application which uses structured streaming to pull messages from the kafka broker. It processes the messages, computes the count of a particular action and writes the result to redis
8+
4. Redis -> Used to store the raw data (login information) and the metrics (count of logins)
9+
5. Grpc Server -> A simple grpc server which exposes an api to retrieve the raw data and/or metrics from Redis
10+
6. Grpc Client/Consumer -> A FastAPI microservice which includes a grpc client that interacts with the grpc server to retrieve the raw data and/or metrics
11+
12+
## Structure
13+
14+
The project is structured as follows:
15+
1. client/ -> Includes logic for the grpc client/consumer component
16+
2. db/ -> Includes logic to connect to redis
17+
3. grpc_config/ -> Includes the proto file and generated grpc code
18+
4. processor/ -> Includes logic for the spark based processor component
19+
5. producer/ -> Includes logic for the producer component
20+
6. server/ -> Includes logic for the grpc server component
21+
7. deploy/client -> Includes k8s manifest and Dockerfile for the consumer component
22+
8. deploy/processor -> Includes k8s manifest and Dockerfile for the processor component
23+
9. deploy/producer -> Includes k8s manifest and Dockerfile for the producer component
24+
10. deploy/server -> Includes k8s manifest and Dockerfile for the grpc server component
25+
11. deploy/kafka -> Includes k8s manifest to setup a kafka broker
26+
12. deploy/redis -> Includes k8s manifest to setup a redis instance
27+
13. deploy/ingress_config -> Includes logic to apply the ingress-nginx-controller and setup metallb for load balancing. Note: you may need to configure the pool of IPs in the metallb-config.yaml file based on your local setup
28+
29+
## Prerequisites
30+
31+
1. Python 3.12 (it should work with 3.x versions, but not tested)
32+
2. Colima (on macOS). This setup should work on k3s running directly on a linux machine as well (not tested)
33+
3. Podman or Docker (to build container images)
34+
4. kubectl
35+
5. Helm (to install ingress-nginx-controller)
36+
6. Maven (to retrieve the spark-sql-kafka jar and its dependencies, tested with v3.9.11)
37+
7. Java (to run maven commands, tested with openjdk v17.0.16)
38+
39+
## Deployment Steps (Local with Colima)
40+
41+
1. Ensure you've built all relevant images. For example to build the producer image, run
42+
`podman build -f deploy/producer/Dockerfile -t producer .`. Ensure that the tags match those specified in the corresponding k8s manifest files
43+
2. Save the images as a tar file. For example, to save the producer image, run
44+
`podman save -o producer.tar producer:latest`
45+
3. Start Colima with Kubernetes enabled (skip this step if you already have k3s running on your system) -> `colima start --kubernetes --runtime containerd --cpu 4 --memory 4`
46+
Adjust the cpu and memory based on your system resources (though this could affect the performance of specific components, particularly spark)
47+
4. ssh into the colima VM -> `colima ssh`. Apply the ingress config to setup the ingress-nginx controller and metallb -> `bash deploy/ingress_config/apply_config.sh`
48+
5. Load the images into the k3s cluster runnning on the colima vm -> `sudo ctr -n k8s.io images import <image-name>.tar`
49+
6. Verify the image has been uploaded into the cluster with -> `sudo ctr -n k8s.io images list`
50+
7. Apply the manifests using the start_cluster_local.sh script -> `./start_cluster_local.sh`
51+
8. You may need to reapply the processor component if the pod hasn't started -> `kubectl apply -f deploy/processor/processor.yaml`
52+
9. Verify all pods are running -> `kubectl get pods -A`
53+
10. Retrieve the ingress IP -> `kubectl get svc -A` to identify the EXTERNAL-IP of the ingress-nginx-controller service in the ingress-nginx namespace
54+
11. To test, ssh into the colima vm and run the following
55+
- `curl -H "Host: producer.local" http://<INGRESS_IP>/login`. This will generate a login message and pass it on to the other components as stated in the overview section
56+
- `curl -H "Host: grpc-client.local" http://<INGRESS_IP>/metrics` to retrieve the count of logins or `curl -H "Host: grpc-client.local" http://<INGRESS_IP>/raw-data` to retrieve the last 10 raw login messages
57+
58+
## Useful Commands (Reference)
59+
60+
1. Create a netshoot pod to test connectivity to other pods/resources using standard network tools (curl, ping, nc etc.) -> `kubectl run -i --image=nicolaka/netshoot --restart=Never -- bash`
61+
2. Exec into the netshoot pod -> `kubectl exec -it <pod-name> -- bash`
62+
3. Setup a kafka pod with client tools to debug/test the kafka broker -> `kubectl run kafka-client --restart="Never" --image=confluentinc/cp-kafka:7.6.0 -- sleep infinity`
63+
4. If the pod is already running: `kubectl exec -it kafka-client -- bash`
64+
Run kafka commands inside the pod:
65+
- to list topics - `kafka-topics --bootstrap-server kafka-0.kafka.default.svc.cluster.local:9092 --list`
66+
- to consume messages from the 'metrics' topic (refer https://stackoverflow.com/questions/38024514/understanding-kafka-topics-and-partitions for a simple description on partitions in kafka) `kafka-console-consumer --topic metrics --from-beginning --bootstrap-server kafka-0.kafka.default.svc.cluster.local:9092 --partition 0`
67+
- to send messages to the topic 'metrics' - `kafka-console-producer --broker-list kafka-0.kafka.default.svc.cluster.local:9092 --topic metrics`
68+
5. To send requests to the producer service using ingress-nginx, first get the ingress IP using `kubectl get svc -A` to identify the EXTERNAL-IP of the ingress-nginx-controller service in the ingress-nginx namespace. Then, run the following command (replace <INGRESS_IP> with the actual ingress IP):
69+
`curl -H "Host: producer.local" http://<INGRESS_IP>/login`
70+
Note: If using colima, you'll need to run colima ssh and then run the curl command from within the colima VM.
71+
6. Retrieve jars associated with spark-sql-kafka-0-10_2.13:4.0.0 using the following maven command:
72+
`mvn dependency:copy-dependencies -DoutputDirectory=./jars -DincludeScope=runtime`
73+
This will download the jar and its dependencies to the ./jars directory.
74+
Note: pom.xml should be present in the current directory with the appropriate dependency specified. Don't forget to download the jar itself from the maven repository using this link -> https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.13/4.0.0/spark-sql-kafka-0-10_2.13-4.0.0.jar or with the mvn dependency:get command. Place this jar in the same directory (./jars)
75+
7. If using separate spark master and worker nodes -> install bitnami/spark on your cluster with 2 worker pods:
76+
`helm install spark oci://registry-1.docker.io/bitnamicharts/spark --set worker.replicaCount=2`
77+
8. To upgrade and set resource limits for the spark configuration:
78+
`helm upgrade spark oci://registry-1.docker.io/bitnamicharts/spark --set worker.replicaCount=2 --set worker.resources.limits.cpu=2 --set worker.resources.limits.memory=4Gi`
79+
9. To generate the python grpc code from the proto file, run the following command (from the workspace root directory):
80+
`python -m grpc_tools.protoc -Igrpc_config/out=./grpc_config \ --python_out=. --grpc_python_out=. \ --pyi_out=. \ ./grpc_config/metrics.proto`
81+
This sets the parent directory for the generated code to ./grpc_config/out.
82+
--python_out, --grpc_python_out and pyi_out specify the relative path (w.r.t. the parent) for the generated code (based on the methods and types specified in the .proto file)
83+
84+
## Known Issues:
85+
86+
1. You may need to manually re-create specific components if certain associated resources were not configured properly when using the startup script. For example, if the producer component deployment and service was created but not the ingress; delete the producer component -> `kubectl delete -f deploy/producer/producer.yaml` and re-apply the manifest -> `kubectl apply -f deploy/producer/producer.yaml`

client/__init__.py

Whitespace-only changes.

client/client.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import grpc
2+
from grpc_config.out import metrics_pb2_grpc, metrics_pb2
3+
from google.protobuf.json_format import MessageToDict
4+
from fastapi import FastAPI
5+
import os
6+
7+
app = FastAPI()
8+
9+
10+
@app.get("/raw-data")
11+
def retrieve_raw_data():
12+
# TODO: add logic to set limit via query params
13+
raw_data_response = stub.GetRawData(metrics_pb2.RawDataRequest(limit=10))
14+
# convert protobuf message to dictionary before returning
15+
return MessageToDict(raw_data_response)
16+
17+
18+
@app.get("/metrics")
19+
def retrieve_metrics():
20+
metrics_response = stub.GetMetrics(metrics_pb2.MetricsRequest())
21+
# convert protobuf message to dictionary before returning
22+
return MessageToDict(metrics_response)
23+
24+
25+
grpc_server = (
26+
"localhost:50051" if "GRPC_SERVER" not in os.environ else os.environ["GRPC_SERVER"]
27+
)
28+
29+
## main (initialize channel to connect to grpc server and create stub to
30+
# call grpc server methods)
31+
channel = grpc.insecure_channel(grpc_server)
32+
stub = metrics_pb2_grpc.MetricsServiceStub(channel)

db/__init__.py

Whitespace-only changes.

db/connect.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
import redis
2+
import os
3+
4+
5+
def get_redis_conn():
6+
host = "localhost" if "REDIS_HOST" not in os.environ else os.environ["REDIS_HOST"]
7+
port = "6379" if "REDIS_PORT" not in os.environ else os.environ["REDIS_PORT"]
8+
r = redis.Redis(host=host, port=port, decode_responses=True)
9+
return r

deploy/client/Dockerfile

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
ARG PYTHON_VERSION=3.12
2+
FROM python:${PYTHON_VERSION}-slim
3+
4+
# Prevents Python from writing pyc files.
5+
ENV PYTHONDONTWRITEBYTECODE=1
6+
7+
# Keeps Python from buffering stdout and stderr to avoid situations where
8+
# the application crashes without emitting any logs due to buffering.
9+
ENV PYTHONUNBUFFERED=1
10+
11+
WORKDIR /app
12+
13+
# Copy the source code into the container.
14+
COPY client/ .
15+
COPY grpc_config/ ./grpc_config/
16+
COPY requirements.txt .
17+
18+
# Download dependencies as a separate step to take advantage of Docker's caching.
19+
# 1. Leverage a cache mount to /root/.cache/pip to speed up subsequent builds (pip install doesn't need to be re-run each time
20+
# since packages are cached).
21+
# 2. install packages
22+
RUN --mount=type=cache,target=/root/.cache/pip \
23+
python -m pip install -r requirements.txt
24+
25+
# Expose the port that the application will run on.
26+
EXPOSE 5001
27+
28+
# Run the application.
29+
CMD ["uvicorn", "client:app", "--host", "0.0.0.0", "--port", "5001"]

deploy/client/client.yaml

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
apiVersion: apps/v1
2+
kind: Deployment
3+
metadata:
4+
name: grpc-client
5+
labels:
6+
app: grpc-client
7+
spec:
8+
replicas: 1
9+
selector:
10+
matchLabels:
11+
app: grpc-client
12+
template:
13+
metadata:
14+
labels:
15+
app: grpc-client
16+
spec:
17+
containers:
18+
- name: grpc-client
19+
image: localhost/grpc-client:latest # replace with your built image
20+
imagePullPolicy: IfNotPresent
21+
ports:
22+
- containerPort: 5001
23+
env:
24+
- name: GRPC_SERVER
25+
value: "grpc-server-service:50051"
26+
27+
---
28+
# internal service to target the producer
29+
apiVersion: v1
30+
kind: Service
31+
metadata:
32+
name: grpc-client-service
33+
labels:
34+
app: grpc-client-service
35+
spec:
36+
selector:
37+
app: grpc-client
38+
ports:
39+
- protocol: TCP
40+
port: 5001 # Port exposed by the service
41+
targetPort: 5001 # Port on which the producer is listening
42+
type: ClusterIP
43+
---
44+
45+
# ingress to expose the producer service
46+
apiVersion: networking.k8s.io/v1
47+
kind: Ingress
48+
metadata:
49+
name: grpc-client-ingress
50+
spec:
51+
# this tells kubernetes
52+
# which ingress controller to use for this ingress resource
53+
# (useful in particular if you have multiple ingress controllers and need to
54+
# associate a specific ingress resource with a specific ingress controller)
55+
ingressClassName: nginx
56+
rules:
57+
- host: grpc-client.local
58+
http:
59+
paths:
60+
- path: /raw-data
61+
pathType: Prefix
62+
backend:
63+
service:
64+
name: grpc-client-service
65+
port:
66+
number: 5001 # Change to your service's target port
67+
- path: /metrics
68+
pathType: Prefix
69+
backend:
70+
service:
71+
name: grpc-client-service
72+
port:
73+
number: 5001 # Change to your service's target port

0 commit comments

Comments
 (0)