0%

Spark之Kafka作为DStream的数据源(Python版)

Kafka的安装和准备工作

  • 在安装的时候,要注意,到Kafka官网下载安装文件时,一定要选择和自己电脑上已经安装的scala版本号一致才可以,本教程安装的Spark版本号是2.1.0,scala版本号是2.11,所以,一定要选择Kafka版本号是2.11开头的。比如,到Kafka官网中,可以下载安装文件Kafka_2.11-0.10.2.0.tgz,前面的2.11就是支持的scala版本号,后面的0.10.2.0是Kafka自身的版本号。
  • 这里假设你已经根据这篇博客文章安装成功了Kafka。下面,我们启动Kafka。
  • 登录Linux系统(本教程统一使用hadoop用户登录),打开一个终端,输入下面命令启动Zookeeper服务:
1
cd /usr/local/kafka./bin/zookeeper-server-start.sh config/zookeeper.properties
  • 注意,执行上面命令以后,终端窗口会返回一堆信息,然后就停住不动了,没有回到shell命令提示符状态,这时,千万不要错误认为死机了,而是Zookeeper服务器启动了,正在处于服务状态。所以,千万不要关闭这个终端窗口,一旦关闭,zookeeper服务就停止了,所以,不能关闭这个终端窗口。
  • 另外打开第二个终端,然后输入下面命令启动Kafka服务:
1
cd /usr/local/kafkabin/kafka-server-start.sh config/server.properties
  • 这样,Kafka就会在后台运行,即使你关闭了这个终端,Kafka也会一直在后台运行。不过,这样做,有时候我们往往就忘记了还有Kafa在后台运行,所以,建议暂时不要用&。
  • 下面先测试一下Kafka是否可以正常使用。再另外打开第三个终端,然后输入下面命令创建一个自定义名称为“wordsendertest”的topic
1
cd /usr/local/kafka./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wordsendertest//这个topic叫wordsendertest,2181是zookeeper默认的端口号,partition是topic里面的分区数,replication-factor是备份的数量,在kafka集群中使用,这里单机版就不用备份了//可以用list列出所有创建的topics,来查看上面创建的topic是否存在./bin/kafka-topics.sh --list --zookeeper localhost:2181
  • 这个名称为“wordsendertest”的topic,就是专门负责采集发送一些单词的。
    下面,我们需要用producer来产生一些数据,请在当前终端内继续输入下面命令:
1
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wordsendertest
  • 上面命令执行后,你就可以在当前终端内用键盘输入一些英文单词,比如我们可以输入:
1
hello hadoophello spark
  • 这些单词就是数据源,这些单词会被Kafka捕捉到以后发送给消费者。我们现在可以启动一个消费者,来查看刚才producer产生的数据。请另外打开第四个终端,输入下面命令:
1
cd /usr/local/kafka./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic wordsendertest --from-beginning
  • 可以看到,屏幕上会显示出如下结果,也就是刚才你在另外一个终端里面输入的内容:
1
2
hello hadoop
hello spark
  • 到这里,与Kafka相关的准备工作就顺利结束了。注意,现在可以把第四个终端关闭掉,第一个终端(正在运行Zookeeper服务)、第二个终端(正在运行Kafka服务)和第三个终端不要关闭,继续留着后面使用。如果记不住是哪个终端,那么所有这些终端窗口都不要关闭,要继续留着后面使用。

Spark准备工作

  • Kafka和Flume等高级输入源,需要依赖独立的库(jar文件)。按照我们前面安装好的Spark版本,这些jar包都不在里面,为了证明这一点,我们现在可以测试一下。请打开一个新的终端,然后启动pyspark:
1
cd /usr/local/spark./bin/pyspark
  • 根据Spark官网的说明,对于Spark2.1.0版本,如果要使用Kafka,则需要下载spark-streaming-kafka-0-8_2.11相关jar包。
  • 在Linux系统中,打开一个火狐浏览器,请点击这里访问Maven Repository,里面有提供spark-streaming-kafka-0-8_2.11-2.1.0.jar文件的下载,其中,2.11表示scala的版本,2.1.0表示Spark版本号。下载后的文件会被默认保存在当前Linux登录用户的下载目录下,本教程统一使用hadoop用户名登录Linux系统,所以,文件下载后会被保存到“/home/hadoop/下载”目录下面。现在,我们就把这个文件复制到Spark目录的jars目录下。请新打开一个终端,输入下面命令:
1
cd /usr/local/spark/jarsmkdir kafkacd ~cd 下载cp ./spark-streaming-kafka-0-8_2.11-2.1.0.jar /usr/local/spark/jars/kafka
  • 这样,我们就把spark-streaming-kafka-0-8_2.11-2.1.0.jar文件拷贝到了“/usr/local/spark/jars/kafka”目录下。
    同时,我们还要修改spark目录下conf/spark-env.sh文件,修改该文件下面的SPARK_DIST_CLASSPATH变量
1
export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath):$(/usr/local/hbase/bin/hbase classpath):/usr/local/spark/examples/jars/*:/usr/local/spark/jars/kafka/*:/usr/local/kafka/libs/*`

编写Spark程序使用Kafka数据源

  • 下面,我们就可以进行程序编写了。请新打开一个终端,然后,执行命令创建代码目录:
1
cd /usr/local/spark/mycodemkdir kafka && cd kafkavim KafkaWordCount.py
  • 使用vim编辑器新建了KafkaWordCount.py,让它去进行词频统计。请在KafkaWordCount.py中输入以下代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from __future__ import print_function 
import sys
from pyspark import SparkContextfrom pyspark.streaming
import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingKafkaWordCount")
ssc = StreamingContext(sc, 1)
zkQuorum, topic = sys.argv[1:]
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
  • 然后执行如下命令:
1
python3 ./KafkaWordCount.py localhost:2181 wordsendertest
  • 这里我们继续使用上面第三个终端的topic。请继续在第三个终端上输入信息,就能看到当前python执行终端下显示你刚才新输入的结果。
  • 运行上面命令以后,就启动了词频统计功能,屏幕上就会显示如下类似信息:
1
2
3
4
5
6
7
8
-------------------------------------------
Time: 2017-12-12 10:57:46
-------------------------------------------
('ts', 1)

-------------------------------------------
Time: 2017-12-12 10:57:47
-------------------------------------------

转载于:子雨大数据

-------------本文结束感谢您的阅读-------------