Flink 消费 kafka 消息实战

本贴最后更新于 2047 天前,其中的信息可能已经事过景迁

根据下文可以把 Flink 消费 kafka 消息完整流程走通,后面可以做个简单的 nginx 日志异常监控。
参考来源 https://blog.csdn.net/boling_cavalry/article/details/85549434

环境情况

本次实战用到了三台机器,它们的 IP 地址和身份如下表所示:

IP 地址 身份 备注
192.168.1.104 http 请求发起者 此机器上安装了 Apache Bench,可以发起大量 http 请求到 192.168.1.101
192.168.1.101 Docker server 此机器上安装了 Docker,并且运行了三个容器:zookeeper、kafka、消息生产者(接收 http 请求时生产一条消息)
192.168.1.102 Flink 应用 此机器部署了 Flink,运行着我们开发的 Flink 应用,接收 kafka 消息做实时处理
注意:

本文的重点是 Flink,所以在 192.168.1.101 这台机器上通过 Docker 快速搭建了 kafka server 和消息生产者,只要向这台机器的消息生产者容器发起 http 请求,就能生产一条消息到 kafka;
192.168.1.104 这台机器安装了 Apache Bench,可以通过简单的命令,向 192.168.1.101 发起大量 http 请求,这样就能产生大量 kafka 消息;
整体架构如下图:

操作步骤

在机器 192.168.1.101 上部署三个容器(消息生产者、zookeeper、kafka);
在机器 192.168.1.104 上安装 Apache Bench;
在机器 192.168.1.102 上配置 kafak 相关的 host;
开发 Flink 应用,部署到机器 192.168.1.102;
在机器 192.168.1.104 上发起压力测试,请求地址是消息生产者的 http 接口地址,产生大量消息;
观察 Flink 应用的处理情况;

版本信息

操作系统:Centos7
docker:17.03.2-ce
docker-compose:1.23.2
kafka:0.11.0.3
zookeeper:3.4.9
JDK:1.8.0_191
spring boot:1.5.9.RELEASE
spring-kafka:1.3.8.RELEASE
Flink:1.7

异常解决记录

1 需要增加类型提示

Flink 的数据类型和序列化 https://www.jianshu.com/p/e8bc484fa4c5

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/types_serialization.html#type-hints-in-the-java-api

Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(StreamingJob.java:84)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
	at org.apache.flink.streaming.api.transformations.StreamTransformation.getOutputType(StreamTransformation.java:420)
	at org.apache.flink.streaming.api.datastream.DataStream.getType(DataStream.java:175)
	at org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:316)
	at com.bolingcavalry.StreamingJob.main(StreamingJob.java:92)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
	... 10 more

java8 lambda 表达式

//ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/java_lambdas/

Java 8 introduced several new language features designed for faster and clearer coding. With the most important feature, the so-called “Lambda Expressions”, it opened the door to functional programming. Lambda expressions allow for implementing and passing functions in a straightforward way without having to declare additional (anonymous) classes.

Attention Flink supports the usage of lambda expressions for all operators of the Java API, however, whenever a lambda expression uses Java generics you need to declare type information explicitly.

Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface. Otherwise the type has to be specified explicitly using type information.
	at org.apache.flink.api.java.typeutils.TypeExtractionUtils.validateLambdaType(TypeExtractionUtils.java:350)
	at org.apache.flink.api.java.typeutils.TypeExtractionUtils.extractTypeFromLambda(TypeExtractionUtils.java:176)
	at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:571)
	at org.apache.flink.api.java.typeutils.TypeExtractor.getFlatMapReturnTypes(TypeExtractor.java:196)
	at org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:609)
	at com.bolingcavalry.StreamingJob.main(StreamingJob.java:84)
	... 15 more

不用 lambda 解决数据类型异常

针对上面的 2 个异常,最后通过改写代码,不用 lambda 的方式解决异常
具体代码可参考 https://github.com/whitespur/blog_demos/blob/master/flinkkafkademo/src/main/java/com/bolingcavalry/StreamingJob.java

  • Flink
    9 引用 • 1 回帖
  • Kafka

    Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是现代系统中许多功能的基础。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。

    35 引用 • 35 回帖 • 7 关注
  • NGINX

    NGINX 是一个高性能的 HTTP 和反向代理服务器,也是一个 IMAP/POP3/SMTP 代理服务器。 NGINX 是由 Igor Sysoev 为俄罗斯访问量第二的 Rambler.ru 站点开发的,第一个公开版本 0.1.0 发布于 2004 年 10 月 4 日。

    311 引用 • 546 回帖 • 4 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...