Skip to content

Commit c7c17c8

Browse files
author
xuwenyihust
committed
Some updates.
1 parent ea1e430 commit c7c17c8

File tree

6 files changed

+26
-39
lines changed

6 files changed

+26
-39
lines changed

README.md

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,9 +115,6 @@ python3.4 $Visor_HOME/src/socket/fake_log_stream.py -m access
115115
python3.4 $Visor_HOME/src/socket/fake_log_stream.py -m error
116116
```
117117

118-
## Requirements
119-
* Python 3.4
120-
121118
## Resources
122119
* [Apache Log Files](https://httpd.apache.org/docs/1.3/logs.html)
123120

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
pymongo==2.6.3
22
numpy==1.8.2
33
matplotlib==1.5.1
4+
kafka-python==1.3.2

src/fake_log_gen/fake_log_gen.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Fake log files generator
22

3-
3+
import os
44
import random
55
import json
66
import logging
@@ -181,7 +181,7 @@ def main():
181181
'''
182182

183183
# Load the configure json file to a dict
184-
with open("../config/fake_log_gen.json") as config_file:
184+
with open(os.environ['VISORHOME']+"/config/fake_log_gen.json") as config_file:
185185
config = json.load(config_file)
186186

187187
# Instantiate a fake log generator

src/fake_log_gen/fake_log_producer.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from kafka import KafkaProducer
44
from kafka.errors import KafkaError
55

6+
import os
67
import random
78
import json
89
import logging
@@ -128,7 +129,7 @@ def main():
128129
log.addHandler(out)
129130

130131
# Load the configure json file to a dict
131-
with open("../../config/fake_log_gen.json") as config_file:
132+
with open(os.environ['VISORHOME']+"/config/fake_log_gen.json") as config_file:
132133
config = json.load(config_file)
133134

134135
# Instantiate a fake log generator

src/fake_log_gen/fake_log_stream.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from src.fake_log_gen import fake_log_gen
22
import socket
33

4+
import os
45
import random
56
import json
67
import logging
@@ -163,7 +164,7 @@ def main():
163164
log.addHandler(out)
164165

165166
# Load the configure json file to a dict
166-
with open("../../config/fake_log_gen.json") as config_file:
167+
with open(os.environ['VISORHOME']+"/config/fake_log_gen.json") as config_file:
167168
config = json.load(config_file)
168169

169170
# Instantiate a fake log generator

src/kafka_monitor/kafka_monitor.py

Lines changed: 19 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -2,42 +2,29 @@
22
from pyspark.streaming import StreamingContext
33
from pyspark.streaming.kafka import KafkaUtils
44

5-
#def main():
65

6+
class kafka_monitor(object):
77

8+
def __init__(self):
9+
# Define Spark configuration
10+
conf = SparkConf()
11+
conf.setMaster("local[4]")
12+
conf.setAppName("Kafka Monitor")
13+
# Initialize a SparkContext
14+
sc = SparkContext(conf=conf)
15+
self.ssc = StreamingContext(sc, 10)
816

9-
if __name__=="__main__":
10-
# Define Spark configuration
11-
conf = SparkConf()
12-
conf.setMaster("local[4]")
13-
conf.setAppName("Stream Analysis")
14-
# Initialize a SparkContext
15-
sc = SparkContext(conf=conf)
16-
17-
#batch_interval = 10
18-
#window_time = 10
19-
#process_times = 1
20-
21-
# Compute the whole time for displaying
22-
#total_time = batch_interval * process_times
23-
24-
#main(sc)
25-
26-
ssc = StreamingContext(sc, 10)
27-
28-
#zkQuorum, topic = sys.argv[1:]
29-
zkQuorum = 'localhost:9092'
30-
topic = 'TutorialTopic'
31-
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": 'localhost:9092'})
32-
kvs.foreachRDD(lambda x: print(x.collect()))
33-
#lines = kvs.map(lambda x: x[1])
34-
#counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
35-
#counts.pprint()
36-
#kvs.pprint()
37-
38-
ssc.start()
39-
ssc.awaitTermination()
17+
self.addr = 'localhost:9092'
18+
self.topic = 'TutorialTopic'
4019

20+
def run(self):
21+
lines = KafkaUtils.createDirectStream(self.ssc, [self.topic], {"metadata.broker.list": self.addr})
22+
lines.foreachRDD(lambda x: print(x.collect()))
4123

24+
self.ssc.start()
25+
self.ssc.awaitTermination()
4226

27+
if __name__=="__main__":
28+
monitor = kafka_monitor()
29+
monitor.run()
4330

0 commit comments

Comments
 (0)