Kafka的安装和准备工作
- 在安装的时候,要注意,到Kafka官网下载安装文件时,一定要选择和自己电脑上已经安装的scala版本号一致才可以,本教程安装的Spark版本号是2.1.0,scala版本号是2.11,所以,一定要选择Kafka版本号是2.11开头的。比如,到Kafka官网中,可以下载安装文件Kafka_2.11-0.10.2.0.tgz,前面的2.11就是支持的scala版本号,后面的0.10.2.0是Kafka自身的版本号。
- 这里假设你已经根据这篇博客文章安装成功了Kafka。下面,我们启动Kafka。
- 登录Linux系统(本教程统一使用hadoop用户登录),打开一个终端,输入下面命令启动Zookeeper服务:
1 | cd /usr/local/kafka./bin/zookeeper-server-start.sh config/zookeeper.properties |
- 注意,执行上面命令以后,终端窗口会返回一堆信息,然后就停住不动了,没有回到shell命令提示符状态,这时,千万不要错误认为死机了,而是Zookeeper服务器启动了,正在处于服务状态。所以,千万不要关闭这个终端窗口,一旦关闭,zookeeper服务就停止了,所以,不能关闭这个终端窗口。
- 另外打开第二个终端,然后输入下面命令启动Kafka服务:
1 | cd /usr/local/kafkabin/kafka-server-start.sh config/server.properties |
- 这样,Kafka就会在后台运行,即使你关闭了这个终端,Kafka也会一直在后台运行。不过,这样做,有时候我们往往就忘记了还有Kafa在后台运行,所以,建议暂时不要用&。
- 下面先测试一下Kafka是否可以正常使用。再另外打开第三个终端,然后输入下面命令创建一个自定义名称为“wordsendertest”的topic
1 | cd /usr/local/kafka./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wordsendertest//这个topic叫wordsendertest,2181是zookeeper默认的端口号,partition是topic里面的分区数,replication-factor是备份的数量,在kafka集群中使用,这里单机版就不用备份了//可以用list列出所有创建的topics,来查看上面创建的topic是否存在./bin/kafka-topics.sh --list --zookeeper localhost:2181 |
- 这个名称为“wordsendertest”的topic,就是专门负责采集发送一些单词的。
下面,我们需要用producer来产生一些数据,请在当前终端内继续输入下面命令:
1 | ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wordsendertest |
- 上面命令执行后,你就可以在当前终端内用键盘输入一些英文单词,比如我们可以输入:
1 | hello hadoophello spark |
- 这些单词就是数据源,这些单词会被Kafka捕捉到以后发送给消费者。我们现在可以启动一个消费者,来查看刚才producer产生的数据。请另外打开第四个终端,输入下面命令:
1 | cd /usr/local/kafka./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic wordsendertest --from-beginning |
- 可以看到,屏幕上会显示出如下结果,也就是刚才你在另外一个终端里面输入的内容:
1 | hello hadoop |
- 到这里,与Kafka相关的准备工作就顺利结束了。注意,现在可以把第四个终端关闭掉,第一个终端(正在运行Zookeeper服务)、第二个终端(正在运行Kafka服务)和第三个终端不要关闭,继续留着后面使用。如果记不住是哪个终端,那么所有这些终端窗口都不要关闭,要继续留着后面使用。
Spark准备工作
- Kafka和Flume等高级输入源,需要依赖独立的库(jar文件)。按照我们前面安装好的Spark版本,这些jar包都不在里面,为了证明这一点,我们现在可以测试一下。请打开一个新的终端,然后启动pyspark:
1 | cd /usr/local/spark./bin/pyspark |
- 根据Spark官网的说明,对于Spark2.1.0版本,如果要使用Kafka,则需要下载spark-streaming-kafka-0-8_2.11相关jar包。
- 在Linux系统中,打开一个火狐浏览器,请点击这里访问Maven Repository,里面有提供spark-streaming-kafka-0-8_2.11-2.1.0.jar文件的下载,其中,2.11表示scala的版本,2.1.0表示Spark版本号。下载后的文件会被默认保存在当前Linux登录用户的下载目录下,本教程统一使用hadoop用户名登录Linux系统,所以,文件下载后会被保存到“/home/hadoop/下载”目录下面。现在,我们就把这个文件复制到Spark目录的jars目录下。请新打开一个终端,输入下面命令:
1 | cd /usr/local/spark/jarsmkdir kafkacd ~cd 下载cp ./spark-streaming-kafka-0-8_2.11-2.1.0.jar /usr/local/spark/jars/kafka |
- 这样,我们就把spark-streaming-kafka-0-8_2.11-2.1.0.jar文件拷贝到了“/usr/local/spark/jars/kafka”目录下。
同时,我们还要修改spark目录下conf/spark-env.sh文件,修改该文件下面的SPARK_DIST_CLASSPATH变量
1 | export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath):$(/usr/local/hbase/bin/hbase classpath):/usr/local/spark/examples/jars/*:/usr/local/spark/jars/kafka/*:/usr/local/kafka/libs/*` |
编写Spark程序使用Kafka数据源
- 下面,我们就可以进行程序编写了。请新打开一个终端,然后,执行命令创建代码目录:
1 | cd /usr/local/spark/mycodemkdir kafka && cd kafkavim KafkaWordCount.py |
- 使用vim编辑器新建了KafkaWordCount.py,让它去进行词频统计。请在KafkaWordCount.py中输入以下代码:
1 | from __future__ import print_function |
- 然后执行如下命令:
1 | python3 ./KafkaWordCount.py localhost:2181 wordsendertest |
- 这里我们继续使用上面第三个终端的topic。请继续在第三个终端上输入信息,就能看到当前python执行终端下显示你刚才新输入的结果。
- 运行上面命令以后,就启动了词频统计功能,屏幕上就会显示如下类似信息:
1 | ------------------------------------------- |
转载于:子雨大数据