一. 安装
1.Kafka 依赖 Zookeeper,所以要先安装 Zookeeper(Zookeeper 简介、安装与使用),安装后启动。
2.下载官网版本最新版本,解压到你的目录:
tar -zxf kafka_2.12-0.10.2.1.tgz -C <YOUR_DIR>// 为了方便,将目录重命名为kafkamv kafka_2.12-0.10.2.1/ kafka
3.修改服务器配置文件:
由上文描述我们知道,Kafka 天生是集群的即使只有一个 broker,所以我们配置多个 broker 的情况,修改每个机器上的配置文件 /config/server.properties
:
官网配置文件详细说明点这里
############################# Server #############################
# broker id 要全集群唯一,你的每个机器上要配置不一样的(既然注册在zookeeper上也可以叫zookeeper上唯一),我是直接设置的1、2、3
broker.id = 1
# Switch to enable topic deletion or not, default value is false
delete.topic.enable=true
# Kafka的Socket Server监听的地址和端口,这里最好自己显示设定一下,否则值是Java的方法java.net.InetAddress.getCanonicalHostName()的返回值。
## 监听本机所有网络接口(network interfaces)
listeners=PLAINTEXT://0.0.0.0:9092
## 被发布到Zookeeper上,公布给Client让Client使用
advertised.listeners=PLAINTEXT://`**_kafka1.host_**`:9092
############################# Log #############################
# log文件存储目录
log.dir =
# 默认Topic分区数量
num.partitions=3
# log文件在被删除前的保存时间
log.retention.hours=168
############################# Zookeeper #############################
# 你的zookeeper集群的地址
zookeeper.connect=zookeeper1.host:2181,zookeeper2.host:2181,zookeeper3.host:2181
里面的 kafka1.host
的配置见 二 网络部分
4.log4j 配置
在 /config/log4j.properties
中,有各种类型日志的输出配置,需要怎样改变可以自行修改,这部分属于 log4j 部分,就不再详述;
我这里是修改为按 day 分文件、修改日志路径;
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender...
log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender
# '.'yyyy-MM-dd-HH修改为'.'yyyy-MM-dd
log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd
# 可以看到修改日志路径,只需要定义kafka.logs.dir就行
log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log
...
...
修改 /bin/kafka-run-class.sh
里 LOG_DIR
的赋值即可
# Log directory to use
if [ "x$LOG_DIR" = "x" ]; then
# "$base_dir/logs"修改为"/data/logs/kafka"
LOG_DIR="/data/logs/kafka"
fi
5.启动,启动脚本是在 /bin/kafka-server-start.sh
,如果机器内存不够可以先修改下脚本中 Kafka 使用的 JVM 堆内存设置:
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx500M -Xms500M"
fi
然后启动:
# 指定后台启动;指定配置文件地址
[root@host kafka]# bin/kafka-server-start.sh -daemon config/server.properties
也可以先不加 -daemon
,用以看是否有正常日志输出,正常的话直接结束进程再使用 -daemon
启动。
二 网络
有可能你的 Linux 服务器或 Kafka 配置不对,会导致各种网络的问题,在这里我专门列出来。(使用 netstat -anp | grep 9092
来查看端口的监听和连接情况。)
1.首先是 Kafka 自身的配置,在 /config/server.properties
中:
advertised.listeners
listeners=PLAINTEXT://0.0.0.0:9092
0.0.0.0
代表监听本机所有网络接口(network interfaces),最好这样做!尤其当你使用的是阿里云等云服务提供商的机器时。因为云服务器一般有内网 IP 和外网 IP(即内网网络接口和外网网络接口),而如果你仅仅监听内网 IP 的话,由于外部网络访问此服务器只能用外网 IP,则访问肯定会被拒绝。
advertised.listeners
这里需要注意
advertised.listeners=PLAINTEXT://<your_hostname>:9092
`` 被发布到 Zookeeper 上,被直接公布给 Client 让 Client 使用:
- 可以直接赋值为服务器的外网 IP,这样无论 broker 或者 Client 都连接你的外网 IP;
- 也可以自己配置一个
hostname
,比如叫kafka1.host
;
- 在服务器的
/etc/hosts
中配置:内网IP kafka1.host
; - 在内网其它服务器
/etc/hosts
中配置:内网IP kafka1.host
; - 在外部所有客户端配置:
外网IP kafka1.host
;
2.其次是系统防火墙
在 CentOS7 中,可以关闭、也可以将 TCP 9092
端口开放(推荐):
systemctl status firewalld // 查看状态systemctl stop firewalld // 停用systemctl start firewalld // 启动firewall-cmd --zone=public --list-ports // 查看所有允许的端口firewall-cmd --zone=public --add-port=9092/tcp --permanent // 添加TCP的9092端口firewall-cmd --reload // 重载
三 使用
Topic
创建 Topic:
bin/kafka-topics.sh —create —topic TestTopic003 —partitions 3 —replication-factor 3 —zookeeper zookeeper1.host:2181,zookeeper2.host:2181,zookeeper3.host:2181
查看所有 Topic:
bin/kafka-topics.sh —list —zookeeper zookeeper1.host:2181,zookeeper2.host:2181,zookeeper3.host:2181
分析具体 Topic:
bin/kafka-topics.sh —describe —topic TestTopic003 —zookeeper zookeeper1.host:2181,zookeeper2.host:2181,zookeeper3.host:2181
生产者-发送消息
发送时重要的是指定要往哪些 broker 上发(broker 可以同属一个集群也可以不是,这样你就可以发到多个集群上)
bin/kafka-console-producer.sh —topic TestTopic003 —broker-list kafka1.host:9092,kafka2.host:9092,kafka3.host:9092
This is a message
This is another message
消费者-消费
官网中说从 0.9.0.0 开始引入了新的配置方式,我看起来最重要的就是取消了 zookeeper,所以官网上的消费实例这样写:
bin/kafka-console-consumer.sh —topic TestTopic003 —from-beginning —bootstrap-server kafka1.host:9092,kafka2.host:9092,kafka3.host:9092
This is a message
This is another message
如果是旧版本的 Kakfa 则使用如下配置去订阅消费:
bin/kafka-console-consumer.sh --topic TestTopic003 --from-beginning --zookeeper zookeeper1.host:2181,zookeeper2.host:2181,zookeeper3.host:2181
// 会输出如下提示
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于