Skip to content

Commit 244f0ac

Browse files
authored
Merge branch 'main' into update/typesafe-1.4.3
2 parents 9d7934d + 81dad91 commit 244f0ac

File tree

12 files changed

+163
-88
lines changed

12 files changed

+163
-88
lines changed

.git-blame-ignore-revs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
# Scala Steward: Reformat with scalafmt 3.8.3
2+
51ae86836c4b4dae7b615a16f3380fe0459f83fa

.scalafmt.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
version = 3.8.2
1+
version = 3.8.3
22
runner.dialect = scala213
33
style = default
44
maxColumn = 120

README.md

Lines changed: 23 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,66 +1,23 @@
1-
# ✨ spark-kafka_rabbitmq_sqs-course
2-
3-
[![License](https://img.shields.io/github/license/com.codely/spark-kafka_rabbitmq_sqs-course?style=flat-square)](/LICENSE)
4-
5-
[![GitHub Repo stars](https://img.shields.io/github/stars/com.codely/spark-kafka_rabbitmq_sqs-course?style=flat-square)](https://github.com/com.codely/spark-kafka_rabbitmq_sqs-course/stargazers)
6-
7-
[![Continuous Integration status](https://img.shields.io/github/actions/workflow/status/com.codely/spark-kafka_rabbitmq_sqs-course/ci.yml?style=flat-square)](https://github.com/com.codely/spark-kafka_rabbitmq_sqs-course/actions/workflows/ci.yml)
8-
9-
## 🚀 Environment setup
10-
11-
You only need the common tooling used for developing Scala applications:
12-
13-
- [JDK](https://www.oracle.com/java/technologies/downloads/)
14-
- [SBT](https://www.scala-sbt.org/download)
15-
16-
<details>
17-
<summary>Installing instructions for macOS with SDKMAN!</summary>
18-
19-
If you use macOS, we would recommend using SDKMAN! to manage different JDK versions and tooling:
20-
21-
1. [Install SDKMAN with homebrew](https://github.com/sdkman/homebrew-tap?tab=readme-ov-file#installation)
22-
2. Install the JDK you prefer. If you ask us, we will opt for:
23-
1. [Check the latest Java LTS JDK version](https://endoflife.date/oracle-jdk)
24-
2. Check the latest Zulu distribution version for that LTS with:
25-
```shell
26-
sdk list java
27-
```
28-
3. Install it:
29-
```shell
30-
sdk install java XX.YY.ZZ-zulu
31-
```
32-
3. Install the latest SBT:
33-
```shell
34-
sdk install sbt
35-
```
36-
</details>
37-
38-
## ✅ Tests
39-
40-
Just run:
41-
42-
```shell
43-
sbt test
44-
```
45-
46-
There is also the `sbt t` alias 😊
47-
48-
## 🤽‍ Pre-push Git hook
49-
50-
There's one Git hook included. It's inside the `doc/hooks` folder, and it will run the `prep` SBT task before pushing to any remote.
51-
52-
This `prep` task is intended to run all the checks you consider before pushing.
53-
At this very moment, it tries to compile and check the code style rules with ScalaFmt.
54-
55-
You can define what this task does by modifying the `prep` task in the `build.sbt` file.
56-
We like the approach of just having to run 1 single SBT task instead of multiple tasks because it's more efficient (the hook doesn't have to create multiple SBT sessions), and also because this way we can control the pre push tasks with the SBT alias defined at the `build.sbt` without altering the hooks.
57-
58-
If you want to install this hook, just `cd doc/hooks` and run `./install-hooks.sh`.
59-
60-
## ⚖️ License
61-
62-
See [`LICENSE`](LICENSE).
63-
64-
## 🤔 About
65-
66-
Project developed by [com.codely, Inc.](https://com.codely) and bootstrapped with the [Codely Scala Spark Skeleton (powered by Giter 8)](https://github.com/CodelyTV/scala-spark-skeleton.g8).
1+
<p align="center">
2+
<a href="https://codely.com">
3+
<img src="https://user-images.githubusercontent.com/10558907/170513882-a09eee57-7765-4ca4-b2dd-3c2e061fdad0.png" width="300px" height="92px" alt="Codely logo"/>
4+
</a>
5+
</p>
6+
7+
<h1 align="center">
8+
🐰 Spark: Integration with Kafka, RabbitMQ and AWS SQS
9+
</h1>
10+
11+
<p align="center">
12+
<a href="https://github.com/CodelyTV"><img src="https://img.shields.io/badge/Codely-OS-green.svg?style=flat-square" alt="Codely Open Source projects"/></a>
13+
<a href="https://pro.codely.com"><img src="https://img.shields.io/badge/Codely-Pro-black.svg?style=flat-square" alt="Codely Pro courses"/></a>
14+
</p>
15+
16+
<p align="center">
17+
Learn Apache Spark from scratch with a practical approach
18+
</p>
19+
20+
<p align="center">
21+
<a href="https://github.com/CodelyTV/spark_for_devs-course/stargazers">Stars are welcome 😊</a><br><br>
22+
Course (Spanish): <a href="https://pro.codely.com/library/spark-integracion-con-kafka-rabbitmq-y-aws-sqs-230966">Spark: Integración con Kafka, RabbitMQ y AWS SQS</a>
23+
</p>

project/Dependencies.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,21 @@ import sbt._
22

33
object Dependencies {
44
private val prod = Seq(
5-
"com.github.nscala-time" %% "nscala-time" % "2.32.0",
5+
"com.github.nscala-time" %% "nscala-time" % "2.34.0",
66
"com.lihaoyi" %% "pprint" % "0.9.0",
77
"org.apache.spark" %% "spark-core" % "3.5.0" % Provided,
88
"org.apache.spark" %% "spark-sql" % "3.5.0" % Provided,
99
"org.apache.spark" %% "spark-streaming" % "3.5.0",
1010
"org.apache.spark" %% "spark-hive" % "3.5.0",
1111
"org.apache.spark" %% "spark-sql-kafka-0-10" % "3.5.0",
1212
"org.apache.hadoop" % "hadoop-aws" % "3.2.2",
13-
"com.rabbitmq" % "amqp-client" % "5.21.0",
1413
"com.typesafe" % "config" % "1.4.3",
14+
"com.rabbitmq" % "amqp-client" % "5.23.0",
1515
"io.delta" %% "delta-spark" % "3.1.0",
16-
"io.spray" %% "spray-json" % "1.3.6"
16+
"io.spray" %% "spray-json" % "1.3.6",
17+
"io.circe" %% "circe-core" % "0.14.10",
18+
"io.circe" %% "circe-generic" % "0.14.10",
19+
"io.circe" %% "circe-parser" % "0.14.10"
1720
)
1821
private val test = Seq(
1922
"org.scalatest" %% "scalatest" % "3.2.19",

project/build.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
sbt.version=1.10.1
1+
sbt.version=1.10.5

project/sbt-scalafmt.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.2")
2-
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "1.2.0")
2+
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.3.0")
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
aws events put-events \
2+
--endpoint-url http://localhost:4566 \
3+
--region us-east-1 \
4+
--entries '[{
5+
"EventBusName": "codely.domain_events",
6+
"Source": "codely",
7+
"DetailType": "user.registered",
8+
"Detail": "{ \"detail_type\": \"userRegistered\", \"user_id\": \"123\", \"email\": \"javi@hola.com\", \"timestamp\": \"2023-07-21T10:00:00Z\" }"
9+
}, {
10+
"EventBusName": "codely.domain_events",
11+
"Source": "codely",
12+
"DetailType": "user.registered",
13+
"Detail": "{ \"detail_type\": \"userRegistered\", \"user_id\": \"124\", \"email\": \"ana@hola.com\", \"timestamp\": \"2023-07-21T10:01:00Z\" }"
14+
}, {
15+
"EventBusName": "codely.domain_events",
16+
"Source": "codely",
17+
"DetailType": "user.registered",
18+
"Detail": "{ \"detail_type\": \"other\", \"user_id\": \"125\", \"email\": \"pep@hola.com\", \"timestamp\": \"2023-07-21T10:02:00Z\" }"
19+
}]'
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package com.codely.lesson_03_spark_streaming_sqs.z_practical_exercise
2+
3+
import io.circe.generic.auto._
4+
import io.circe.parser._
5+
import org.apache.spark.SparkConf
6+
import org.apache.spark.streaming.{Seconds, StreamingContext}
7+
import org.apache.spark.streaming.dstream.DStream
8+
9+
object SQSReceiverSparkApp extends App {
10+
private val sqsEndpoint = "http://localhost:4566"
11+
private val region = "us-east-1"
12+
private val queueUrl = "http://localhost:4566/000000000000/send_welcome_email_on_user_registered"
13+
14+
val conf = new SparkConf().setAppName("SQSReceiverSparkApp").setMaster("local[*]")
15+
16+
val ssc = new StreamingContext(conf, Seconds(5))
17+
18+
val receiver = new SQSSparkReceiver(sqsEndpoint, region, queueUrl)
19+
20+
val messages: DStream[String] = ssc.receiverStream(receiver)
21+
22+
case class Detail(detail_type: String, user_id: String, email: String, timestamp: String)
23+
case class Event(detail: Detail)
24+
25+
val filteredMessages = messages.flatMap { message =>
26+
decode[Event](message) match {
27+
case Right(event) if event.detail.detail_type == "userRegistered" => Some(event)
28+
case _ => None
29+
}
30+
}
31+
32+
val eventsDStream = filteredMessages.map { event =>
33+
(event.detail.user_id, event.detail.timestamp)
34+
}
35+
36+
val windowedCounts = eventsDStream
37+
.map { case (userId, _) => (userId, 1) }
38+
.reduceByKeyAndWindow(_ + _, Seconds(300))
39+
40+
windowedCounts.print()
41+
42+
ssc.start()
43+
ssc.awaitTermination()
44+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package com.codely.lesson_03_spark_streaming_sqs.z_practical_exercise
2+
3+
import com.amazonaws.client.builder.AwsClientBuilder
4+
import com.amazonaws.services.sqs.model.{DeleteMessageRequest, ReceiveMessageRequest}
5+
import com.amazonaws.services.sqs.{AmazonSQS, AmazonSQSClientBuilder}
6+
import org.apache.spark.storage.StorageLevel
7+
import org.apache.spark.streaming.receiver.Receiver
8+
9+
import scala.collection.JavaConverters._
10+
11+
class SQSSparkReceiver(endpoint: String, region: String, queueUrl: String)
12+
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
13+
14+
private var sqsClient: AmazonSQS = _
15+
16+
override def onStart(): Unit = {
17+
18+
sqsClient = AmazonSQSClientBuilder
19+
.standard()
20+
.withEndpointConfiguration(
21+
new AwsClientBuilder.EndpointConfiguration(
22+
endpoint,
23+
region
24+
)
25+
)
26+
.build()
27+
28+
new Thread("SQS Receiver") {
29+
override def run() {
30+
receive()
31+
}
32+
}.start()
33+
}
34+
35+
override def onStop(): Unit = {
36+
// Any necessary cleanup
37+
}
38+
39+
private def receive(): Unit = {
40+
while (!isStopped()) {
41+
val request = new ReceiveMessageRequest(queueUrl)
42+
.withMaxNumberOfMessages(10)
43+
.withWaitTimeSeconds(20)
44+
45+
val messages = sqsClient.receiveMessage(request).getMessages.asScala
46+
47+
for (message <- messages) {
48+
store(message.getBody)
49+
val deleteRequest =
50+
new DeleteMessageRequest(queueUrl, message.getReceiptHandle)
51+
sqsClient.deleteMessage(deleteRequest)
52+
}
53+
}
54+
}
55+
}

src/main/com/codely/lesson_04__spark_streaming_rabbitmq/video_01__rabbitmq_receiver/RabbitMQReceiver.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ import org.apache.spark.storage.StorageLevel
55
import org.apache.spark.streaming.receiver.Receiver
66

77
class RabbitMQReceiver(queueName: String, host: String, port: Int, username: String, password: String)
8-
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
8+
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
99

1010
@transient var connection: Connection = _
11-
@transient var channel: Channel = _
11+
@transient var channel: Channel = _
1212

1313
override def onStart(): Unit = {
1414

@@ -33,11 +33,11 @@ class RabbitMQReceiver(queueName: String, host: String, port: Int, username: Str
3333
try {
3434
val consumer = new DefaultConsumer(channel) {
3535
override def handleDelivery(
36-
consumerTag: String,
37-
envelope: Envelope,
38-
properties: AMQP.BasicProperties,
39-
body: Array[Byte]
40-
): Unit = {
36+
consumerTag: String,
37+
envelope: Envelope,
38+
properties: AMQP.BasicProperties,
39+
body: Array[Byte]
40+
): Unit = {
4141
val message = new String(body, "UTF-8")
4242
store(message)
4343
}

0 commit comments

Comments
 (0)