Kafka 生产者 API
测试 kakfa
生产者 Api 的 demo
pom.xml
<dependencies> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>compile</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.25</version> </dependency> </dependencies>
样例代码
import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Properties; import java.util.concurrent.Future; public class Demo1 { private static final Logger logger = LoggerFactory.getLogger(Demo1.class); Properties kafkaProps; KafkaProducer<String, String> producer; @Before public void initProperties(){ kafkaProps = new Properties(); // 指定broker地址,格式: host:port.可以多个地址,用逗号拼接 kafkaProps.put("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092"); // 指定key的序列化方式 kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // value的序列化方式 kafkaProps.put("value.serializer" , "org.apache.kafka.common.serialization.StringSerializer"); // 创建producer producer = new KafkaProducer<>(kafkaProps); } /** * 同步发送消息,不获取future对象 */ @Test public void demo1(){ // 创建消息record,三个参数,topic,key,value ProducerRecord<String, String> record = new ProducerRecord<>("ProgramingLanguage", "java", "ok"); try { // 该方法会返回一个Future对象,如果不关心结果,那可以注释掉 /*Future<RecordMetadata> send = */ producer.send(record); } catch (Exception e){ // producer可能发生异常,比如SerializationException,BufferExhaustedException // 或TimeoutException } } /** * 同步发送消息,获取future对象 */ @Test public void demo2(){ // 创建消息record,三个参数,topic,key,value ProducerRecord<String, String> record = new ProducerRecord<>("ProgramingLanguage", "scala", "not bad"); // producer一般有两类错误,一类是可重试错误,这类错误可以通过重发消息解决. // 另一种错误,消息太大,这类错误不会重试 try{ // 获取future中的recordMetadata对象,里面包含了偏移量offset RecordMetadata recordMetadata = producer.send(record).get(); logger.info("recordMetadata的offset = {}",recordMetadata.offset()); } catch (Exception e){ // 异常如果发生错误,比如broker返回不允许重发消息的异常或已经超过了重发次数会抛异常 e.printStackTrace(); } } /** * 异步发送消息 */ @Test public void demo3(){ // 实现org.apache.kafka.clients.producer.Callback接口 class ProducerCallBack implements Callback{ @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { // 如果onCompletion的异常不为空,说明服务端有异常 if(e!=null){ e.printStackTrace(); } } } // 创建消息record,三个参数,topic,key,value ProducerRecord<String, String> record = new ProducerRecord<>("ProgramingLanguage", "python", "not bad"); // send重载方法,第二个参数传回调 producer.send(record,new ProducerCallBack()); // lambda表达式写法 // producer.send(record, // (recordMetadata,e)->{ if(e!=null) e.printStackTrace();}); } }
异常
WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 23 : {Programing Language=INVALID_TOPIC_EXCEPTION}
原因:topic 的名称不能包含空格.
生产者配置的参数
acks
acks 参数指定了 必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的 。
acks=0
, 生产者在成功写入悄息之前不会等待任何来自服务器的响应。消息丢了就丢了,可靠性差,吞吐量高
如果 acks=1
,只要集群的首领节点收到消息,会收到一个来自服务器的成功响应。同步消息延迟会低一些,异步会高一些,也有可能丢消息
- 如果
acks=all
,只有当所有参与复制的节点全部收到消息时,才会收到一个来自
服务器的成功响应。可靠性高,但是延迟也很高.
-
buffer.me mory
该参数用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息 -
compression .type
默认不会被压缩。可以设置为snappy
、gzip
或lz4
,它指定了
消息发送前的压缩类型。snappy
的 cpu 使用少,gzip
cpu 使用多,压缩效果好. -
retries
producer 重发消息的次数,达到这个次数,producer 放弃重试并返回错误。默认情况下,producer 会在每次重试之间等待 100ms ,retry.backoff.mx
参数设置这个间隔。 -
batch.size
同一批次消息的数量,单位是字节数, 不会一直等到足够,配合下面那个参数使用. -
linger.ms
producer 发送批次等待时间,到达这个时间就发送. -
client.id
该参数可以是任意的字符串,服务器会用它来识别消息的来源。 -
max.in.flight.requests.per.connection
该参数指定了生产者在收到服务器晌应之前可以发送多少个消息。它的值越高,就会占用
越多的内存,不过也会提升吞吐量。 把它设为 1 可以保证消息是按照发送的顺序写入服务
器的,即使发生了重试。 -
timeout.ms,request.timeout.ms 和 metadata.fetch.timeout.ms
request.timeout.ms
为 producer 等待服务器响应的时间,metadata.fetch.timeout.ms
为 producer 获取 RecordMetadata 时等待服务器响应的时间。如果等待响应超时,那么生产者要么重试发送数据,要么返回一个错误(抛出异常或执行回调).timeout.ms
指定了 broker 等待同步副本返回消息确认的时间,与
asks
的配置相匹配一一如果在指定时间内没有收到同步副本的确认,那么 broker 就会返回
一个错误 。
-
max.block.ms
send()
方法或partitionsFor()
方法获取元数据时 producer 的阻塞时间.超时会抛出异常. -
max.request.size
producer 发送的请求大小.可以是单个消息大小也可以是单个批次消息大小.
receive.buffer.bytes和send.buffer.bytes
这两个参数分别指定了 TCPsocket
接收和发送数据包的缓冲区大小。 如果它们被设为 - 1 ,
就使用操作系统的默认值。
参考:
<<Kafka权威指南>>
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于