The goal is to build a simple Apache Kafka - Spark streaming pipeline.
Data from Kaggle on San Francisco crime incidents is used to simulate producing data to a Kafka server. Data from Kafka is then ingested by Spark and an aggregation and stream-table join is performed.
Note that the project comes with starter code and files. The main tasks in the project were to fill in key components in the
config
,kafka_server.py
,producer_server.py
, anddata_stream.py
files.
Technology used in this project:
- Apache Kafka
- Spark Structured Streaming
- Install requirements using
./start.sh
- Start Zookeeper and Kafka servers using:
/usr/bin/zookeeper-server-start config/zookeeper.properties
/usr/bin/kafka-server-start config/producer.properties
- Run the
producer_server.py
and thekafka_server.py
to initiate the server - Start the Spark Streaming data processing using: (output is saved in a .log file)
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.4 --master local[*] data_stream.py > submit.log
Here is a screenshot showing the output of Kafka consumer console to show that data was successfully produced in Kafka:
Here is the output of the aggregation in Spark:
Here is a screenshot of the process in Spark UI:
- How did changing values on the SparkSession property parameters affect the throughput and latency of the data?
- The speed of the processing can be improved by assigning more memory to both the driver and the executor of the cluster via the
spark.driver.memory
andspark.executor.memory
config options. Additionally, I can increase the number of cores and parallelism of the process using thespark.driver.cores
,spark.executor.cores
,spark.default.parallelism
. The outcome is a higher throughput though there is more network overhead involved when more distributed servers are involved. In this case keeping the number of cores to the default of 1 is sufficient. Furthermore, theprocessingTime
parameter of thetrigger
function ofwriteStream
can be reduced to increase throughput. ThemaxRatePerPartition
option ofreadStream
can also be tweaked to achieve the desired throughput.
- What were the 2-3 most efficient SparkSession property key/value pairs? Through testing multiple variations on values, how can you tell these were the most optimal?
- Based on testing, increasing the
spark.driver.memory
andspark.executor.memory
to "2g" or 2 gigabytes improved processing speed. Keepingspark.driver.cores
andspark.executor.cores
to the default was optimal. Finally keeping themaxRatePerPartition
option of readStream to roughly 200 was sufficient for the task.