配置一个Kafka群集
默认分类
2020-01-15
489
0
最近需要改一个实时程序,数据源订阅的Kafka,虽然之前有看过一阵资料,但遗忘的差不多了,因此建个环境练习一下。本文会创建两个节点的Kafka群集,配置环境为MacOS。
Kafka介绍
这部分可以参考我以前翻译的几篇文章:
Spark Streaming + Kafka集成指南(Kafka broker version 0.8.2.1 or highe)
Cloudera Distribution of Apache Kafka[译]
下载安装Kafka
从Kafka官方下载最新的版本,如果你Kafka要与Scala一起使用,那么请选择相应的Scala版本,否则随意。
解压
tar -xzf ~/Downloads/kafka_2.12-2.0.0.tgz
创建Kafka存储目录
mkdir kafka-log-1
mkdir kafka-log-2
配置kafka
设置第一个broker
编辑kafka_2.12-2.0.0/config/server.properties
配置文件
修改log.dirs
的目录
log.dirs=/Users/haseochen/app/kafka-log-1
设置第二个broker
复制一份server.properties
cp config/server.properties config/server2.properties
编辑kafka_2.12-2.0.0/config/server2.properties
配置文件
修改log.dirs
的目录
log.dirs=/Users/haseochen/app/kafka-log-2
增加port
属性
port=9091
修改broker.id
broker.id=1
测试配置
启用服务
1.启动zookeeper,这里启动是kafka自带的zookeeper(请勿在生产环境使用)
kafka_2.12-2.0.0/bin/zookeeper-server-start.sh kafka_2.12-2.0.0/config/zookeeper.properties
2.启动第一个broker
kafka_2.12-2.0.0/bin/kafka-server-start.sh kafka_2.12-2.0.0/config/server.properties
3.启动第二个broker
kafka_2.12-2.0.0/bin/kafka-server-start.sh kafka_2.12-2.0.0/config/server2.properties
4.查看进程
使用ps -ef | grep kafka
将看到两个kafka进程。
创建topic
通过bin/kafka-topics.sh
可以创建Topic,zookeeper指向刚才启动的本地zookeeper,topic名字为first,分区数2,副本数2(因为我们才两台机器所以最多就是2个副本)
kafka_2.12-2.0.0/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic first --partitions 2 --replication-factor 2
查看topic
kafka_2.12-2.0.0/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic first
修改分区数
kafka_2.12-2.0.0/bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --partitions 4
验证置producer和consumer的配置
创建数据
kafka_2.12-2.0.0/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic first
生成测试数据
bin/kafka-producer-perf-test.sh --topic my-topic --num-records 50 --record-size 2 --throughput 10 --producer-props bootstrap.servers=localhost:9092 key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer
消费数据
kafka_2.12-2.0.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first
kafka_2.12-2.0.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first --from-beginning #--from-beginning表示从头开始读取该topic的数据。
此时在producer界面里面输入数据回车后再consumer中就会获得数据
程序
Java版
生产数据
cd /kafka-examples/SimpleCounter
mvn install
./run_params.sh localhost:9092 v1 new async 500 10
消费数据
cd /kafka-examples/SimpleMovingAvg
mvn install
java -cp target/uber-SimpleMovingAvg-1.0-SNAPSHOT.jar com.shapira.examples.newconsumer.simplemovingavg.SimpleMovingAvgNewConsumer localhost:9092 g1 v1 10
Python版
pip install kafka-python