Skip to content

Commit 749555b

Browse files
author
xuwenyihust
committed
Added src/kafka_monitor/kafka_monitor.py.
1 parent f2ebcf9 commit 749555b

File tree

1 file changed

+43
-0
lines changed

1 file changed

+43
-0
lines changed

src/kafka_monitor/kafka_monitor.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
from pyspark import SparkContext, SparkConf
2+
from pyspark.streaming import StreamingContext
3+
from pyspark.streaming.kafka import KafkaUtils
4+
5+
#def main():
6+
7+
8+
9+
if __name__=="__main__":
10+
# Define Spark configuration
11+
conf = SparkConf()
12+
conf.setMaster("local[4]")
13+
conf.setAppName("Stream Analysis")
14+
# Initialize a SparkContext
15+
sc = SparkContext(conf=conf)
16+
17+
#batch_interval = 10
18+
#window_time = 10
19+
#process_times = 1
20+
21+
# Compute the whole time for displaying
22+
#total_time = batch_interval * process_times
23+
24+
#main(sc)
25+
26+
ssc = StreamingContext(sc, 10)
27+
28+
#zkQuorum, topic = sys.argv[1:]
29+
zkQuorum = 'localhost:9092'
30+
topic = 'TutorialTopic'
31+
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": 'localhost:9092'})
32+
kvs.foreachRDD(lambda x: print(x.collect()))
33+
#lines = kvs.map(lambda x: x[1])
34+
#counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
35+
#counts.pprint()
36+
#kvs.pprint()
37+
38+
ssc.start()
39+
ssc.awaitTermination()
40+
41+
42+
43+

0 commit comments

Comments
 (0)