Kafka 入门安装

本贴最后更新于 2698 天前,其中的信息可能已经时移俗易

背景

Kafka 和 RabbitMQ 均为消息队列产品,各自特点如下:

  • 吞吐:Kafka >> RabbitMQ
  • 可靠性:RabbitMQ > Kafka,RabbitMQ 有消息确认机制,支持事务
  • 可用性:Kafka 支持主备,RabbitMQ 支持 queue 的 mirror

安装

参考自官方的 quickStart

启动 Server

  • 启动单机版 Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
  • 启动 Kafka Server
bin/kafka-server-start.sh config/server.properties

topic

创建 topic

创建一下 test 的 topic,包含一个分区和一个分片

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test Created topic "test".

查看 topic

$ bin/kafka-topics.sh --list --zookeeper localhost:2181 test

发送消息

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test >This is a message >This is another message

订阅消息

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning This is a message This is another message

如果发送和订阅在两个终端,在发送消息时,可以在订阅终端实时看到消息

删除 topic

bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test

多 broker

略,详见 官方 quickstart

使用 kafka connect 来导入导出数据

导入导出数据

生成测试数据

echo -e "foo\nbar" > test.txt
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

三个配置文件,分别为

  • 集群配置文件
  • source connector: 从 input 文件 test.txt 读入数据,到 kafka
  • sink connector: 从 kafka 读出数据,写入文件 test.sink.txt

重新订阅数据

上面的测试数据保存在 topic: connect-test 中,可以从头再订阅数据

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning {"schema":{"type":"string","optional":false},"payload":"foo"} {"schema":{"type":"string","optional":false},"payload":"bar"}

继续写数据

$ echo "Another Line" >> test.txt

使用 kafka stream 来数据处理

Java Client

pom

pom.xml

<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.10.2.1</version> </dependency>

producer

import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; public class MainKafkaProducer { public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer producer = new KafkaProducer(props); for(int i = 0; i < 100; i++) { producer.send(new ProducerRecord("fw-blink-test", i % 1, Integer.toString(i), Integer.toString(i))); Thread.sleep(1000L); } producer.flush(); producer.close(); } }

Consumer

import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class MainKafkaConsumer { public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("zookeeper.connect", "localhost:2181"); props.put("group.id", "group1"); ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); Map topicCountMap = new HashMap(); String topic = "fw-blink-test"; // 一次从主题中获取一个数据 topicCountMap.put(topic, 1); Mapbyte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap); // 获取每次接收到的这个数据 KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0); ConsumerIterator<byte[], byte[]> iterator = stream.iterator(); while(iterator.hasNext()){ String message = new String(iterator.next().message()); System.out.println("接收到: " + message); } } }
  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    3198 引用 • 8215 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...