試驗(yàn)環(huán)境打算Kafka安裝
訪問Kafka官方下載頁面,下載穩(wěn)定版本0.10.1.0的kafka.此安裝包內(nèi)早已附送,不須要額外安裝.按次序執(zhí)行如下方法:
cd ~/下載
sudo tar -zxf kafka_2.11-2.4.1.tgz -C /usr/local
cd /usr/local
sudo mv kafka_2.11-2.4.1/ ./kafka
sudo chown -R hadoop ./kafka
但是根據(jù)教程的版本會(huì)出bug,因此我們選擇比較新的版本:
cd ~/下載
sudo tar -zxf kafka_2.12-3.2.0.tgz -C /usr/local
cd /usr/local
sudo mv kafka_2.12-3.2.0/ ./kafka
sudo chown -R hadoop ./kafka
檢測簡略例子
接出來在系統(tǒng)環(huán)境下檢測簡略的例子。Mac系統(tǒng)請(qǐng)自己根據(jù)安裝的位置,切換到相應(yīng)的指令。按次序執(zhí)行如下命令:
# 進(jìn)入kafka所在的目錄
cd /usr/local/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
命令執(zhí)行后不會(huì)返回Shell命令鍵入狀態(tài),還會(huì)根據(jù)默認(rèn)的配置文件啟動(dòng)服務(wù),請(qǐng)千萬不要關(guān)掉當(dāng)前終端.啟動(dòng)新的終端,鍵入如下命令:
cd /usr/local/kafka
bin/kafka-server-start.sh config/server.properties
kafka服務(wù)端就啟動(dòng)了,請(qǐng)千萬不要關(guān)掉當(dāng)前終端。啟動(dòng)另外一個(gè)終端,鍵入如下命令:
cd /usr/local/kafka
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic dblab
topic是公布消息公布的,以單節(jié)點(diǎn)的配置爭創(chuàng)了一個(gè)叫dblab的topic.可以用list列舉所有爭創(chuàng)的,來查看剛剛爭創(chuàng)的主題是否存在。
bin/kafka-topics.sh --list --zookeeper localhost:2181
可以在結(jié)果中查看到dblab這個(gè)topic存在。接下去用生產(chǎn)點(diǎn)數(shù)據(jù):
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic dblab
并嘗試鍵入如下信息:
hello hadoop
hello xmu
hadoop world
之后重新開啟新的終端或則直接按CTRL+C退出。之后使用來接收數(shù)據(jù),鍵入如下命令:
cd /usr/local/kafka
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic dblab --from-beginning
./bin/kafka-console-consumer.sh --bootstrap-server localhost:2181 --topic codesheep --from-beginning
便可以看見剛剛形成的三條信息。說明kafka安裝成功。
數(shù)據(jù)處理和操作Kafka數(shù)據(jù)預(yù)處理
數(shù)據(jù)集如下:
用戶行為日志.csv,日志中的數(shù)組定義如下:
|賣家|商品|商品類型|商家|品牌|交易時(shí)間:月day|交易丑聞:日|行為,取值范圍{0,1,2,3},0表示點(diǎn)擊,1表示加入購物車,2表示訂購,3表示關(guān)注商品|賣家年紀(jì)分段:1表示年紀(jì)=50,0和NULL則表示未知|性別:0表示男性,1表示女性,2和NULL表示未知|收獲地址省市
數(shù)據(jù)詳細(xì)格式如下:
,,,,,month,day,,,,
,,833,2882,2661,08,29,0,0,1,內(nèi)蒙
,,1271,2882,2661,08,29,0,1,1,廣東
,,1271,2882,2661,08,29,0,2,1,廣東
,,1271,2882,2661,08,29,0,1,1,內(nèi)蒙
,,1271,1253,1049,08,29,0,0,2,廣東
,,1271,2882,2661,08,29,0,0,2,廣東
,,1467,2882,2661,08,29,0,5,2,廣東
,,1095,883,1647,08,29,0,7,1,廣東
這個(gè)案例實(shí)時(shí)統(tǒng)計(jì)每秒中男女生購物數(shù)量,所以針對(duì)每條購物日志,我們只須要獲取即可,于是發(fā)送給Kafka,接下去再接收進(jìn)行處理。
執(zhí)行如下Shell命令來安裝操作Kafka的代碼庫:
conda install kafka-python
后來是.py的代碼:
# coding: utf-8
import csv
import time
from kafka import KafkaProducer
# 實(shí)例化一個(gè)KafkaProducer示例,用于向Kafka投遞消息
producer = KafkaProducer(bootstrap_servers='localhost:9092')
# 打開數(shù)據(jù)文件
csvfile = open("../data/user_log.csv","r")
# 生成一個(gè)可用于讀取csv文件的reader
reader = csv.reader(csvfile)
for line in reader:
gender = line[9] # 性別在每行日志代碼的第9個(gè)元素
if gender == 'gender':
continue # 去除第一行表頭
time.sleep(0.1) # 每隔0.1秒發(fā)送一行數(shù)據(jù)
# 發(fā)送數(shù)據(jù),topic為'sex'
producer.send('sex',line[9].encode('utf8'))
上述代碼很簡略,首先是先例子化一個(gè)Kafka生產(chǎn)者。之后調(diào)用用戶日志文件,每天調(diào)用一行,接著每隔0.1秒發(fā)送給Kafka,那樣一秒發(fā)送10條購物日志。這兒發(fā)送給Kafka的topic為’sex’。
操作kafka
我們可以寫一個(gè)檢測數(shù)據(jù)是否投遞成功,代碼如下,文件名為.py:
from kafka import KafkaConsumer
consumer = KafkaConsumer('sex')
for msg in consumer:
print((msg.value).decode('utf8'))
在開啟上述和之前,還要先開啟Kafka,命令如下:
cd /usr/local/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
打開一個(gè)新的命令行窗口,鍵入命令如下:
cd /usr/local/kafka
bin/kafka-server-start.sh config/server.properties
在Kafka開啟后來,即可開啟和。
這時(shí),你會(huì)看見屏幕上會(huì)輸出一行又一行的數(shù)字,類似下邊的樣子:
實(shí)時(shí)處理數(shù)據(jù)配置Spark開發(fā)Kafka環(huán)境
下載Spark連結(jié)Kafka的代碼庫。之后把下載的代碼庫放在目錄/usr/local/spark/jars目錄下,命令如下:
sudo mv ~/下載/spark-streaming_2.12-3.2.0.jar /usr/local/spark/jars
sudo mv ~/下載/spark-streaming-kafka-0-10_2.12-3.2.0.jar /usr/local/spark/jars
于是在/usr/local/spark/jars目錄下改建kafka目錄,把/usr/local/kafka/libs下所有函數(shù)庫復(fù)制到/usr/local/spark/jars/kafka目錄下,命令如下
cd /usr/local/spark/jars
mkdir kafka
cd kafka
cp /usr/local/kafka/libs/* .
之后,更改Spark配置文件,命令如下:
cd /usr/local/spark/conf
sudo vim spark-env.sh
把Kafka相關(guān)jar包的路徑信息提高到spark-env.sh,更改后的spark-env.sh類似如下:
export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoopclasspath):/usr/local/spark/jars/kafka/*:/usr/local/kafka/libs/*
由于我使用的是中爭創(chuàng)的環(huán)境,因此介紹一下,如何為spark設(shè)置環(huán)境。
還要更改conf目錄下的.sh:在這個(gè)文件的開頭添加:
export PYSPARK_PYTHON=/home/hadoop/anaconda3/envs/py37/bin/python
#其中,py37應(yīng)該為自行創(chuàng)建的環(huán)境名稱
!還要留意的是,作者在給出代碼的時(shí)侯/usr/local//bin/這一段是不用寫的實(shí)時(shí)分析程序考慮,假如直接復(fù)制起來會(huì)報(bào)錯(cuò),去除后來就好了。
推行項(xiàng)目
首先在/usr/local/spark/改建項(xiàng)目目錄
cd /usr/local/spark/mycode
mkdir kafka
于是在kafka這個(gè)目錄下爭創(chuàng)一個(gè).py文件。
from kafka import KafkaProducer
from pyspark.streaming import StreamingContext
#from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkConf, SparkContext
import json
import sys
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql.functions import window
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import TimestampType, StringType
from pyspark.sql.functions import col, column, expr
def KafkaWordCount(zkQuorum, group, topics, numThreads):
spark = SparkSession \
.builder \
.appName("KafkaWordCount") \
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
topicAry = topics.split(",")
# 將topic轉(zhuǎn)換為hashmap形式,而python中字典就是一種hashmap
topicMap = {}
for topic in topicAry:
topicMap[topic] = numThreads
#lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(lambda x : x[1])
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "sex") \
.load()
df.selectExpr( "CAST(timestamp AS timestamp)","CAST(value AS STRING)")
#lines = df.selectExpr("CAST(value AS STRING)")
windowedCounts = df \
.withWatermark("timestamp", "1 seconds") \
.groupBy(
window(col("timestamp"), "1 seconds" ,"1 seconds"),
col("value")) \
.count()
wind = windowedCounts.selectExpr( "CAST(value AS STRING)","CAST(count AS STRING)")
query = wind.writeStream.option("checkpointLocation", "/check").outputMode("append").foreach(sendmsg).start()
query.awaitTermination()
query.stop()
# 格式轉(zhuǎn)化,將格式變?yōu)?span id="zpnrhhbp5" class="token punctuation">[
{1: 3}]
def Get_dic(row):
res = []
#for elm in row:
tmp = {row[0]: row[1]}
res.append(tmp)
print(res)
return json.dumps(res)
def sendmsg(row):
print(row)
if row.count != 0:
msg = Get_dic(row)
# 實(shí)例化一個(gè)KafkaProducer示例,用于向Kafka投遞消息
producer = KafkaProducer(bootstrap_servers='localhost:9092')
producer.send("result", msg.encode('utf8'))
# 很重要,不然不會(huì)更新
producer.flush()
if __name__ == '__main__':
# 輸入的四個(gè)參數(shù)分別代表著
# 1.zkQuorum為zookeeper地址
# 2.group為消費(fèi)者所在的組
# 3.topics該消費(fèi)者所消費(fèi)的topics
# 4.numThreads開啟消費(fèi)topic線程的個(gè)數(shù)
if (len(sys.argv) < 5):
print("Usage: KafkaWordCount 代碼功能:
首先按每秒的速率調(diào)用Kafka消息;之后對(duì)每秒的數(shù)據(jù)執(zhí)行算法,統(tǒng)計(jì)出0的個(gè)數(shù),1的個(gè)數(shù),2的個(gè)數(shù);最后將上述結(jié)果封裝成json發(fā)送給Kafka。
在運(yùn)行代碼之前,先啟動(dòng):
cd /usr/local/hadoop #這是hadoop的安裝目錄
./sbin/start-dfs.sh
運(yùn)行項(xiàng)目
編撰好程序以后,接下去撰寫運(yùn)行腳本,在/usr/local/spark//kafka目錄下改建.sh文件,鍵入如下內(nèi)容:
/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 /usr/local/spark/mycode/kafka/kafka_test.py 127.0.0.1:2181 1 sex 1
其中最后四個(gè)為鍵入?yún)?shù),涵義如下
127.0.0.1:2181為地址1為group標(biāo)簽sex為消費(fèi)者接收的為消費(fèi)者句柄數(shù)
sh startup.sh
最后在/usr/local/spark//kafka目錄下,運(yùn)行如下命令即可執(zhí)行剛編撰好的程序
報(bào)錯(cuò):
結(jié)果展示
運(yùn)用Flask爭創(chuàng)web程序,運(yùn)用Flask-實(shí)現(xiàn)實(shí)時(shí)推送數(shù)據(jù),運(yùn)用.io.js實(shí)現(xiàn)實(shí)時(shí)接收數(shù)據(jù),.js呈現(xiàn)數(shù)據(jù)
Flask-實(shí)時(shí)推送數(shù)據(jù)
首先我們爭創(chuàng)如圖中的app.py文件,app.py的功能就是作為一個(gè)簡易的服務(wù)器,處理連結(jié)懇求實(shí)時(shí)分析程序考慮,以及處理從kafka接收的數(shù)據(jù),并實(shí)時(shí)推送到瀏覽器。app.py的代碼如下:
import json
from flask import Flask, render_template
from flask_socketio import SocketIO
from kafka import KafkaConsumer
# 因?yàn)榈谝徊襟E安裝好了flask,所以這里可以引用
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app)
thread = None
# 實(shí)例化一個(gè)consumer,接收topic為result的消息
consumer = KafkaConsumer('result')
# 一個(gè)后臺(tái)線程,持續(xù)接收Kafka消息,并發(fā)送給客戶端瀏覽器
def background_thread():
girl = 0
boy = 0
for msg in consumer:
data_json = msg.value.decode('utf8')
data_list = json.loads(data_json)
for data in data_list:
if '0' in data.keys():
girl = data['0']
elif '1' in data.keys():
boy = data['1']
else:
continue
result = str(girl) + ',' + str(boy)
print(result)
socketio.emit('test_message', {'data': result})
# 客戶端發(fā)送connect事件時(shí)的處理函數(shù)
@socketio.on('test_connect')
def connect(message):
print(message)
global thread
if thread is None:
# 單獨(dú)開啟一個(gè)線程給客戶端發(fā)送數(shù)據(jù)
thread = socketio.start_background_task(target=background_thread)
socketio.emit('connected', {'data': 'Connected'})
# 通過訪問http://127.0.0.1:5000/訪問index.html
@app.route("/")
def handle_mes():
return render_template("index.html")
# main函數(shù)
if __name__ == '__main__':
socketio.run(app, debug=True)
這段代碼實(shí)現(xiàn)比較簡略,最重要就是函數(shù),該函數(shù)從Kafka接收消息,并進(jìn)行處理,榮獲男女生每秒鐘數(shù)量,于是將結(jié)果通過函數(shù).emit實(shí)時(shí)推送至瀏覽器。
瀏覽器獲取數(shù)據(jù)并展示
index.html文件負(fù)責(zé)獲取數(shù)據(jù)并展示療效,該文件中的代碼內(nèi)容如下:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>DashBoard</title>
<script src="static/js/socket.io.js"></script>
<script src="static/js/jquery-3.1.1.min.js"></script>
<script src="static/js/highcharts.js"></script>
<script src="static/js/exporting.js"></script>
<script type="text/javascript" charset="utf-8">
var socket = io.connect('http://' + document.domain + ':' + location.port);
socket.on('connect', function() {
socket.emit('test_connect', {data: 'I\'m connected!'});
});
socket.on('test_message',function(message){
console.log(message);
var obj = eval(message);
var result = obj["data"].split(",");
$('#girl').html(result[0]);
$('#boy').html(result[1]);
});
socket.on('connected',function(){
console.log('connected');
});
socket.on('disconnect', function () {
console.log('disconnect');
});
</script>
</head>
<body>
<div>
<b>Girl: </b><b id="girl"></b>
<b>Boy: </b><b id="boy"></b>
</div>
<div id="container" style="width: 600px;height:400px;"></div>
<script type="text/javascript">
$(document).ready(function () {
Highcharts.setOptions({
global: {
useUTC: false
}
});
Highcharts.chart('container', {
chart: {
type: 'spline',
animation: Highcharts.svg, // don't animate in old IE
marginRight: 10,
events: {
load: function () {
// set up the updating of the chart each second
var series1 = this.series[0];
var series2 = this.series[1];
setInterval(function () {
var x = (new Date()).getTime(), // current time
count1 = $('#girl').text();
y = parseInt(count1);
series1.addPoint([x, y], true, true);
count2 = $('#boy').text();
z = parseInt(count2);
series2.addPoint([x, z], true, true);
}, 1000);
}
}
},
title: {
text: '男女生購物人數(shù)實(shí)時(shí)分析'
},
xAxis: {
type: 'datetime',
tickPixelInterval: 50
},
yAxis: {
title: {
text: '數(shù)量'
},
plotLines: [{
value: 0,
width: 1,
color: '#808080'
}]
},
tooltip: {
formatter: function () {
return '' + this.series.name + '
' +
Highcharts.dateFormat('%Y-%m-%d %H:%M:%S', this.x) + '
' +
Highcharts.numberFormat(this.y, 2);
}
},
legend: {
enabled: true
},
exporting: {
enabled: true
},
series: [{
name: '女生購物人數(shù)',
data: (function () {
// generate an array of random data
var data = [],
time = (new Date()).getTime(),
i;
for (i = -19; i <= 0; i += 1) {
data.push({
x: time + i * 1000,
y: Math.random()
});
}
return data;
}())
},
{
name: '男生購物人數(shù)',
data: (function () {
// generate an array of random data
var data = [],
time = (new Date()).getTime(),
i;
for (i = -19; i <= 0; i += 1) {
data.push({
x: time + i * 1000,
y: Math.random()
});
}
return data;
}())
}]
});
});
</script>
</body>
</html>
.io.js
在index.html中包含了如下一段代碼,就是拿來讀取.io.js和.io.js.map這兩個(gè)js庫文件的:
<script type="text/javascript" charset="utf-8">
// 創(chuàng)建連接服務(wù)器的鏈接
var socket = io.connect('http://' + document.domain + ':' + location.port);
socket.on('connect', function() {// 連上服務(wù)器后的回調(diào)函數(shù)
socket.emit('connect', {data: 'I\'m connected!'});
});
// 接收服務(wù)器實(shí)時(shí)發(fā)送的數(shù)據(jù)
socket.on('test_message',function(message){
console.log(message);
var obj = eval(message);
var result = obj["data"].split(",");
// 將男生和女生人數(shù)展示在html標(biāo)簽內(nèi)
$('#girl').html(result[0]);
$('#boy').html(result[1]);
});
socket.on('connected',function(){
console.log('connected');
});
// 鏈接斷開時(shí)的回調(diào)函數(shù)
socket.on('disconnect', function () {
console.log('disconnect');
});
</script>
.js
在index.html中包含如下一段代碼,就是讀取.js庫,來實(shí)時(shí)地從html標(biāo)簽內(nèi)獲取數(shù)據(jù)并展示在網(wǎng)頁中。
<script type="text/javascript">
$(document).ready(function () {
Highcharts.setOptions({
global: {
useUTC: false
}
});
Highcharts.chart('container', {
chart: {
type: 'spline',
animation: Highcharts.svg, // 這個(gè)在ie瀏覽器可能不支持
marginRight: 10,
events: {
load: function () {
//設(shè)置圖表每秒更新一次
var series1 = this.series[0];
var series2 = this.series[1];
setInterval(function () {
var x = (new Date()).getTime();// 獲取當(dāng)前時(shí)間
count1 = $('#girl').text();
y = parseInt(count1);
series1.addPoint([x, y], true, true);
count2 = $('#boy').text();
z = parseInt(count2);
series2.addPoint([x, z], true, true);
}, 1000);
}
}
},
title: { //設(shè)置圖表名
text: '男女生購物人數(shù)實(shí)時(shí)分析'
},
xAxis: { //x軸設(shè)置為實(shí)時(shí)時(shí)間
type: 'datetime',
tickPixelInterval: 50
},
yAxis: {
title: {
text: '數(shù)量'
},
plotLines: [{ //設(shè)置坐標(biāo)線顏色粗細(xì)
value: 0,
width: 1,
color: '#808080'
}]
},
tooltip: {
//規(guī)范顯示時(shí)間的格式
formatter: function () {
return '' + this.series.name + '
' +
Highcharts.dateFormat('%Y-%m-%d %H:%M:%S', this.x) + '
' +
Highcharts.numberFormat(this.y, 2);
}
},
legend: {
enabled: true
},
exporting: {
enabled: true
},
series: [{
name: '女生購物人數(shù)',
data: (function () {
// 隨機(jī)方式生成初始值填充圖表
var data = [],
time = (new Date()).getTime(),
i;
for (i = -19; i <= 0; i += 1) {
data.push({
x: time + i * 1000,
y: Math.random()
});
}
return data;
}())
},
{
name: '男生購物人數(shù)',
data: (function () {
// 隨機(jī)方式生成初始值填充圖表
var data = [],
time = (new Date()).getTime(),
i;
for (i = -19; i <= 0; i += 1) {
data.push({
x: time + i * 1000,
y: Math.random()
});
}
return data;
}())
}]
});
});
.js
.js這個(gè)庫文件的功能是實(shí)現(xiàn)導(dǎo)入功能。
療效展示
python app.py
問題與處理問題1spark-env.sh添加代碼錯(cuò)誤
把Kafka相關(guān)jar包的路徑信息提高到spark-env.sh,更改后的spark-env.sh類似如下:
原代碼:
export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoopclasspath):/usr/local/spark/jars/kafka/*:/usr/local/kafka/libs/*
!還要留意的是,作者在給出代碼的時(shí)侯/usr/local//bin/這一段是不用寫的,假如直接復(fù)制起來會(huì)報(bào)錯(cuò),去除后來就好了。
問題2相關(guān)包沒有安裝
pip install gevent-websocket -i https://pypi.tuna.tsinghua.edu.cn/simple pip --trusted-host pypi.tuna.tsi
如上圖,按照?qǐng)?bào)錯(cuò)安裝好包就行了。
問題3java版本不一致
通過代碼
java -version
javac -version
檢測
這兒經(jīng)過檢測沒有問題。
問題4啟動(dòng)晚報(bào)錯(cuò)
究其緣由,是方法三復(fù)現(xiàn)出了問題: