快速使用 Kafka 生产者 API

本贴最后更新于 2067 天前,其中的信息可能已经斗转星移

Kafka 生产者 API

测试 kakfa 生产者 Api 的 demo

pom.xml

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.0.0</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>compile</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.25</version>
        </dependency>
    </dependencies>

样例代码

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;
import java.util.concurrent.Future;

public class Demo1 {
    private static final Logger logger = LoggerFactory.getLogger(Demo1.class);
    Properties kafkaProps;
    KafkaProducer<String, String> producer;
    @Before
    public void initProperties(){
        kafkaProps = new Properties();
        // 指定broker地址,格式: host:port.可以多个地址,用逗号拼接
        kafkaProps.put("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
        // 指定key的序列化方式
        kafkaProps.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        // value的序列化方式
        kafkaProps.put("value.serializer"
                , "org.apache.kafka.common.serialization.StringSerializer");
        // 创建producer
        producer = new KafkaProducer<>(kafkaProps);
    }


    /**
     * 同步发送消息,不获取future对象
     */
    @Test
    public void demo1(){

        // 创建消息record,三个参数,topic,key,value
        ProducerRecord<String, String> record =
                new ProducerRecord<>("ProgramingLanguage", "java", "ok");
        try {
            // 该方法会返回一个Future对象,如果不关心结果,那可以注释掉
            /*Future<RecordMetadata> send = */
            producer.send(record);
        } catch (Exception e){
            // producer可能发生异常,比如SerializationException,BufferExhaustedException
            // 或TimeoutException
        }


    }

    /**
     * 同步发送消息,获取future对象
     */
    @Test
    public void demo2(){
        // 创建消息record,三个参数,topic,key,value
        ProducerRecord<String, String> record =
                new ProducerRecord<>("ProgramingLanguage", "scala", "not bad");
        // producer一般有两类错误,一类是可重试错误,这类错误可以通过重发消息解决.
        // 另一种错误,消息太大,这类错误不会重试
        try{
            // 获取future中的recordMetadata对象,里面包含了偏移量offset
            RecordMetadata recordMetadata = producer.send(record).get();
            logger.info("recordMetadata的offset = {}",recordMetadata.offset());
        } catch (Exception e){
            // 异常如果发生错误,比如broker返回不允许重发消息的异常或已经超过了重发次数会抛异常
            e.printStackTrace();
        }

    }

    /**
     * 异步发送消息
     */
    @Test
    public void demo3(){
        // 实现org.apache.kafka.clients.producer.Callback接口
        class ProducerCallBack implements Callback{
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                // 如果onCompletion的异常不为空,说明服务端有异常
                if(e!=null){
                    e.printStackTrace();
                }
            }
        }
        // 创建消息record,三个参数,topic,key,value
        ProducerRecord<String, String> record =
                new ProducerRecord<>("ProgramingLanguage", "python", "not bad");
        // send重载方法,第二个参数传回调
        producer.send(record,new ProducerCallBack());
        // lambda表达式写法
//        producer.send(record,
//                (recordMetadata,e)->{ if(e!=null) e.printStackTrace();});

    }

}

异常

WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 23 : {Programing Language=INVALID_TOPIC_EXCEPTION}

原因:topic 的名称不能包含空格.

生产者配置的参数

  1. acks
    acks 参数指定了 必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的 。
  • acks=0 , 生产者在成功写入悄息之前不会等待任何来自服务器的响应。消息丢了就丢了,可靠性差,吞吐量高

如果 acks=1 ,只要集群的首领节点收到消息,会收到一个来自服务器的成功响应。同步消息延迟会低一些,异步会高一些,也有可能丢消息

  • 如果 acks=all ,只有当所有参与复制的节点全部收到消息时,才会收到一个来自

服务器的成功响应。可靠性高,但是延迟也很高.

  1. buffer.me mory
    该参数用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息

  2. compression .type
    默认不会被压缩。可以设置为 snappygziplz4,它指定了
    消息发送前的压缩类型。 snappy 的 cpu 使用少,gzip cpu 使用多,压缩效果好.

  3. retries
    producer 重发消息的次数,达到这个次数,producer 放弃重试并返回错误。默认情况下,producer 会在每次重试之间等待 100ms ,retry.backoff.mx 参数设置这个间隔。

  4. batch.size
    同一批次消息的数量,单位是字节数, 不会一直等到足够,配合下面那个参数使用.

  5. linger.ms
    producer 发送批次等待时间,到达这个时间就发送.

  6. client.id
    该参数可以是任意的字符串,服务器会用它来识别消息的来源。

  7. max.in.flight.requests.per.connection
    该参数指定了生产者在收到服务器晌应之前可以发送多少个消息。它的值越高,就会占用
    越多的内存,不过也会提升吞吐量。 把它设为 1 可以保证消息是按照发送的顺序写入服务
    器的,即使发生了重试。

  8. timeout.ms,request.timeout.ms 和 metadata.fetch.timeout.ms
    request.timeout.ms 为 producer 等待服务器响应的时间,metadata.fetch.timeout.ms 为 producer 获取 RecordMetadata 时等待服务器响应的时间。如果等待响应超时,那么生产者要么重试发送数据,要么返回一个错误(抛出异常或执行回调). timeout.ms 指定了 broker 等待同步副本返回消息确认的时间,与

asks 的配置相匹配一一如果在指定时间内没有收到同步副本的确认,那么 broker 就会返回
一个错误 。

  1. max.block.ms
    send() 方法或 partitionsFor() 方法获取元数据时 producer 的阻塞时间.超时会抛出异常.

  2. max.request.size

producer 发送的请求大小.可以是单个消息大小也可以是单个批次消息大小.

  1. receive.buffer.bytes和send.buffer.bytes
    这两个参数分别指定了 TCP socket 接收和发送数据包的缓冲区大小。 如果它们被设为 - 1 ,
    就使用操作系统的默认值。

参考:<<Kafka权威指南>>

  • Kafka

    Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是现代系统中许多功能的基础。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。

    35 引用 • 35 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...