kafka 配置和使用

本贴最后更新于 2513 天前,其中的信息可能已经沧海桑田

一. 安装

1.Kafka 依赖 Zookeeper,所以要先安装 Zookeeper(Zookeeper 简介、安装与使用),安装后启动。

2.下载官网版本最新版本,解压到你的目录:

tar -zxf kafka_2.12-0.10.2.1.tgz -C <YOUR_DIR>// 为了方便,将目录重命名为kafkamv kafka_2.12-0.10.2.1/ kafka

3.修改服务器配置文件:
由上文描述我们知道,Kafka 天生是集群的即使只有一个 broker,所以我们配置多个 broker 的情况,修改每个机器上的配置文件 /config/server.properties
官网配置文件详细说明点这里

############################# Server #############################

# broker id 要全集群唯一,你的每个机器上要配置不一样的(既然注册在zookeeper上也可以叫zookeeper上唯一),我是直接设置的1、2、3
broker.id = 1

# Switch to enable topic deletion or not, default value is false
delete.topic.enable=true

# Kafka的Socket Server监听的地址和端口,这里最好自己显示设定一下,否则值是Java的方法java.net.InetAddress.getCanonicalHostName()的返回值。

## 监听本机所有网络接口(network interfaces)
listeners=PLAINTEXT://0.0.0.0:9092

## 被发布到Zookeeper上,公布给Client让Client使用
advertised.listeners=PLAINTEXT://`**_kafka1.host_**`:9092

############################# Log #############################

# log文件存储目录
log.dir =  

# 默认Topic分区数量
num.partitions=3

# log文件在被删除前的保存时间
log.retention.hours=168

############################# Zookeeper #############################

# 你的zookeeper集群的地址
  zookeeper.connect=zookeeper1.host:2181,zookeeper2.host:2181,zookeeper3.host:2181

里面的 kafka1.host 的配置见 二 网络部分

4.log4j 配置
/config/log4j.properties 中,有各种类型日志的输出配置,需要怎样改变可以自行修改,这部分属于 log4j 部分,就不再详述;
我这里是修改为按 day 分文件、修改日志路径;

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender...

log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender
# '.'yyyy-MM-dd-HH修改为'.'yyyy-MM-dd

log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd
# 可以看到修改日志路径,只需要定义kafka.logs.dir就行

log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log
...
...

修改 /bin/kafka-run-class.shLOG_DIR 的赋值即可

# Log directory to use

if [ "x$LOG_DIR" = "x" ]; then
    # "$base_dir/logs"修改为"/data/logs/kafka"
    LOG_DIR="/data/logs/kafka"
fi

5.启动,启动脚本是在 /bin/kafka-server-start.sh,如果机器内存不够可以先修改下脚本中 Kafka 使用的 JVM 堆内存设置:

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx500M -Xms500M"
fi

然后启动:

# 指定后台启动;指定配置文件地址
[root@host kafka]# bin/kafka-server-start.sh -daemon config/server.properties

也可以先不加 -daemon,用以看是否有正常日志输出,正常的话直接结束进程再使用 -daemon 启动。

二 网络

有可能你的 Linux 服务器或 Kafka 配置不对,会导致各种网络的问题,在这里我专门列出来。(使用 netstat -anp | grep 9092 来查看端口的监听和连接情况。)

1.首先是 Kafka 自身的配置,在 /config/server.properties 中:
advertised.listeners

listeners=PLAINTEXT://0.0.0.0:9092

0.0.0.0 代表监听本机所有网络接口(network interfaces),最好这样做!尤其当你使用的是阿里云等云服务提供商的机器时。因为云服务器一般有内网 IP 和外网 IP(即内网网络接口和外网网络接口),而如果你仅仅监听内网 IP 的话,由于外部网络访问此服务器只能用外网 IP,则访问肯定会被拒绝。
advertised.listeners

这里需要注意

advertised.listeners=PLAINTEXT://<your_hostname>:9092

`` 被发布到 Zookeeper 上,被直接公布给 Client 让 Client 使用:

  • 可以直接赋值为服务器的外网 IP,这样无论 broker 或者 Client 都连接你的外网 IP;
  • 也可以自己配置一个 hostname,比如叫 kafka1.host
  1. 在服务器的 /etc/hosts 中配置:内网IP kafka1.host
  2. 在内网其它服务器 /etc/hosts 中配置:内网IP kafka1.host
  3. 在外部所有客户端配置:外网IP kafka1.host

2.其次是系统防火墙
在 CentOS7 中,可以关闭、也可以将 TCP 9092 端口开放(推荐):

systemctl status firewalld // 查看状态systemctl stop firewalld // 停用systemctl start firewalld // 启动firewall-cmd --zone=public --list-ports // 查看所有允许的端口firewall-cmd --zone=public --add-port=9092/tcp --permanent // 添加TCP的9092端口firewall-cmd --reload // 重载

三 使用

可以看官方的快速开始:http://kafka.apache.org/quickstart,有简单的创建 Topic、生产消息、消费消息的过程;(直接执行脚本不加参数可以看到 help,如果使用—help 有些脚本是不支持的)

Topic

创建 Topic:

bin/kafka-topics.sh —create —topic TestTopic003 —partitions 3 —replication-factor 3 —zookeeper zookeeper1.host:2181,zookeeper2.host:2181,zookeeper3.host:2181

查看所有 Topic:

bin/kafka-topics.sh —list —zookeeper zookeeper1.host:2181,zookeeper2.host:2181,zookeeper3.host:2181

分析具体 Topic:

bin/kafka-topics.sh —describe —topic TestTopic003 —zookeeper zookeeper1.host:2181,zookeeper2.host:2181,zookeeper3.host:2181

生产者-发送消息

发送时重要的是指定要往哪些 broker 上发(broker 可以同属一个集群也可以不是,这样你就可以发到多个集群上)

bin/kafka-console-producer.sh —topic TestTopic003 —broker-list kafka1.host:9092,kafka2.host:9092,kafka3.host:9092
This is a message
This is another message

消费者-消费

官网中说从 0.9.0.0 开始引入了新的配置方式,我看起来最重要的就是取消了 zookeeper,所以官网上的消费实例这样写:

bin/kafka-console-consumer.sh —topic TestTopic003 —from-beginning —bootstrap-server kafka1.host:9092,kafka2.host:9092,kafka3.host:9092
This is a message
This is another message

如果是旧版本的 Kakfa 则使用如下配置去订阅消费:

bin/kafka-console-consumer.sh --topic TestTopic003 --from-beginning --zookeeper zookeeper1.host:2181,zookeeper2.host:2181,zookeeper3.host:2181
// 会输出如下提示
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

原文连接

  • 大数据

    大数据(big data)是指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合,是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力的海量、高增长率和多样化的信息资产。

    93 引用 • 113 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...