1. 集群部署的基本流程

下载安装包、解压安装包、修改配置文件、分发安装包、启动集群

2.集群部署的基础环境准备

安装前的准备工作(zk集群已经部署完毕)

关闭防火墙 
 chkconfig iptables off && service iptables stop 

3.解压安装包

tar -zxvf kafka_2.11-1.0.0.tgz -C  /export/servers/
cd /export/servers/
mv kafka_2.11-1.0.0 kafka

4.修改配置文件

进入到kafka目录

cp   config/server.properties config/server.properties.bak
vi  config/server.properties

输入以下内容:

#broker的全局唯一编号,不能重复
broker.id=0

#用来监听链接的端口,producer或consumer将在此端口建立连接
port=9092

#处理网络请求的线程数量
num.network.threads=3

#用来处理磁盘IO的现成数量
num.io.threads=8

#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400

#接受套接字的缓冲区大小
socket.receive.buffer.bytes=102400

#请求套接字的缓冲区大小
socket.request.max.bytes=104857600

#kafka运行日志存放的路径
log.dirs=/export/data/kafka/

#topic在当前broker上的分片个数
num.partitions=2

#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1

#segment文件保留的最长时间,超时将被删除
log.retention.hours=1

#滚动生成新的segment文件的最大时间
log.roll.hours=1

#日志文件中每个segment的大小,默认为1G
log.segment.bytes=1073741824

#周期性检查文件大小的时间
log.retention.check.interval.ms=300000

#日志清理是否打开
log.cleaner.enable=true

#broker需要使用zookeeper保存meta数据
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181

#zookeeper链接超时时间
zookeeper.connection.timeout.ms=6000

#partion buffer中,消息的条数达到阈值,将触发flush到磁盘
log.flush.interval.messages=10000

#消息buffer的时间,达到阈值,将触发flush到磁盘
log.flush.interval.ms=3000

#删除topic需要server.properties中设置delete.topic.enable=true否则只是标记删除
delete.topic.enable=true

#此处的host.name为本机IP(重要),如果不改,则客户端会抛出:Producer connection to localhost:9092 unsuccessful 错误!
host.name=kafka01

advertised.host.name=192.168.175.211(注意配置的是各台机器的ip)

4.分发安装包

scp -r /export/servers/kafka  kafka02:/export/servers
scp -r /export/servers/kafka  kafka03:/export/servers

5.再次修改配置文件(重要)

依次修改各服务器上配置文件的的broker.id,分别是0,1,2不得重复。在各个服务器上建立 log.dirs=/export/data/kafka/

6.启动集群

依次在各节点上启动kafka

nohup /export/servers/kafka/bin/kafka-server-start.sh /export/servers/kafka/config/server.properties >/dev/null 2>&1 &

注解:输出日志到黑洞
command >/dev/null 2>&1 & 

Kafka常用操作命令

查看当前服务器中的所有topic
bin/kafka-topics.sh --list --zookeeper  zk01:2181
创建topic
bin/kafka-topics.sh --create --zookeeper zk01:2181 --replication-factor 1 --partitions 1 --topic test
删除topic
bin/kafka-topics.sh --delete --zookeeper zk01:2181 --topic test
需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。
通过shell命令发送消息
bin/kafka-console-producer.sh --broker-list kafka01:9092 --topic test
通过shell消费消息
bin/kafka-console-consumer.sh --zookeeper zk01:2181 --from-beginning --topic test
查看消费位置
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper zk01:2181 --group testGroup
查看某个Topic的详情
bin/kafka-topics.sh --topic test --describe --zookeeper zk01:2181
对分区数进行修改
bin/kafka-topics.sh --zookeeper  zk01 --alter --partitions 2 --topic test

评论已关闭

暂无评论