Kafka 入门安装

本贴最后更新于 2545 天前,其中的信息可能已经时移俗易

背景

Kafka 和 RabbitMQ 均为消息队列产品,各自特点如下:

  • 吞吐:Kafka >> RabbitMQ
  • 可靠性:RabbitMQ > Kafka,RabbitMQ 有消息确认机制,支持事务
  • 可用性:Kafka 支持主备,RabbitMQ 支持 queue 的 mirror

安装

参考自官方的 quickStart

启动 Server

  • 启动单机版 Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
  • 启动 Kafka Server
bin/kafka-server-start.sh config/server.properties

topic

创建 topic

创建一下 test 的 topic,包含一个分区和一个分片

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

Created topic "test".

查看 topic

$ bin/kafka-topics.sh --list --zookeeper localhost:2181

test

发送消息

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>This is a message
>This is another message

订阅消息

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

如果发送和订阅在两个终端,在发送消息时,可以在订阅终端实时看到消息

删除 topic

bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test

多 broker

略,详见 官方 quickstart

使用 kafka connect 来导入导出数据

导入导出数据

生成测试数据

echo -e "foo\nbar" > test.txt
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

三个配置文件,分别为

  • 集群配置文件
  • source connector: 从 input 文件 test.txt 读入数据,到 kafka
  • sink connector: 从 kafka 读出数据,写入文件 test.sink.txt

重新订阅数据

上面的测试数据保存在 topic: connect-test 中,可以从头再订阅数据

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}

继续写数据

$ echo "Another Line" >> test.txt

使用 kafka stream 来数据处理

Java Client

pom

pom.xml

		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka_2.10</artifactId>
			<version>0.10.2.1</version>
		</dependency>

producer

import  java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class MainKafkaProducer {
    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");

        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer producer = new KafkaProducer(props);

        for(int i = 0; i < 100; i++) {
            producer.send(new ProducerRecord("fw-blink-test", i % 1, Integer.toString(i), Integer.toString(i)));
            Thread.sleep(1000L);
        }
        producer.flush();
        producer.close();
    }
}

Consumer

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class MainKafkaConsumer {
    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("zookeeper.connect", "localhost:2181");
        props.put("group.id", "group1");

        ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));

        Map topicCountMap = new HashMap();

        String topic = "fw-blink-test";

        // 一次从主题中获取一个数据
        topicCountMap.put(topic, 1);

        Mapbyte[], byte[]>>>  messageStreams = consumer.createMessageStreams(topicCountMap);

        // 获取每次接收到的这个数据
        KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);
        ConsumerIterator<byte[], byte[]> iterator =  stream.iterator();
        while(iterator.hasNext()){
            String message = new String(iterator.next().message());
            System.out.println("接收到: " + message);
        }
    }
}
  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    3187 引用 • 8213 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...