快速使用 Kafka 生产者 API

本贴最后更新于 2305 天前,其中的信息可能已经斗转星移

Kafka 生产者 API

测试 kakfa 生产者 Api 的 demo

pom.xml

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.0.0</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>compile</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.25</version>
        </dependency>
    </dependencies>

样例代码

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;
import java.util.concurrent.Future;

public class Demo1 {
    private static final Logger logger = LoggerFactory.getLogger(Demo1.class);
    Properties kafkaProps;
    KafkaProducer<String, String> producer;
    @Before
    public void initProperties(){
        kafkaProps = new Properties();
        // 指定broker地址,格式: host:port.可以多个地址,用逗号拼接
        kafkaProps.put("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
        // 指定key的序列化方式
        kafkaProps.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        // value的序列化方式
        kafkaProps.put("value.serializer"
                , "org.apache.kafka.common.serialization.StringSerializer");
        // 创建producer
        producer = new KafkaProducer<>(kafkaProps);
    }


    /**
     * 同步发送消息,不获取future对象
     */
    @Test
    public void demo1(){

        // 创建消息record,三个参数,topic,key,value
        ProducerRecord<String, String> record =
                new ProducerRecord<>("ProgramingLanguage", "java", "ok");
        try {
            // 该方法会返回一个Future对象,如果不关心结果,那可以注释掉
            /*Future<RecordMetadata> send = */
            producer.send(record);
        } catch (Exception e){
            // producer可能发生异常,比如SerializationException,BufferExhaustedException
            // 或TimeoutException
        }


    }

    /**
     * 同步发送消息,获取future对象
     */
    @Test
    public void demo2(){
        // 创建消息record,三个参数,topic,key,value
        ProducerRecord<String, String> record =
                new ProducerRecord<>("ProgramingLanguage", "scala", "not bad");
        // producer一般有两类错误,一类是可重试错误,这类错误可以通过重发消息解决.
        // 另一种错误,消息太大,这类错误不会重试
        try{
            // 获取future中的recordMetadata对象,里面包含了偏移量offset
            RecordMetadata recordMetadata = producer.send(record).get();
            logger.info("recordMetadata的offset = {}",recordMetadata.offset());
        } catch (Exception e){
            // 异常如果发生错误,比如broker返回不允许重发消息的异常或已经超过了重发次数会抛异常
            e.printStackTrace();
        }

    }

    /**
     * 异步发送消息
     */
    @Test
    public void demo3(){
        // 实现org.apache.kafka.clients.producer.Callback接口
        class ProducerCallBack implements Callback{
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                // 如果onCompletion的异常不为空,说明服务端有异常
                if(e!=null){
                    e.printStackTrace();
                }
            }
        }
        // 创建消息record,三个参数,topic,key,value
        ProducerRecord<String, String> record =
                new ProducerRecord<>("ProgramingLanguage", "python", "not bad");
        // send重载方法,第二个参数传回调
        producer.send(record,new ProducerCallBack());
        // lambda表达式写法
//        producer.send(record,
//                (recordMetadata,e)->{ if(e!=null) e.printStackTrace();});

    }

}

异常

  • Kafka

    Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是现代系统中许多功能的基础。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。

    36 引用 • 35 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...