概述
之前几篇文章,我们着重介绍了在对 SkyWalking
进行二次开发之前的环境搭建问题,因此本篇文章将基于 SkyWalking-8.1.0 版本,以开发 webflux-webclent
插件为例,分享一下对 SkyWalking
插件开发以及贡献 PR 的过程(PR 地址),以其能为大家了解 SkyWalking java agent
插件的开发有所帮助。
概念
span
Span
应该是分布式链路追踪系统一个非常重要而且常见的一个概念。最早源自于 Google Dapper 的论文--Dapper, a Large-Scale Distributed Systems Tracing Infrastructure,此处给出论文地址,感兴趣的小伙伴可以深入学习。简单来说,Span 可以简单理解成一次服务的调用。只要是一个具有完整时间周期的程序访问,都可以简单看做是一个 span。
当然 SkyWalking
中的 span
与论文中的 span
类似,但同时也进行了一些扩展,具体来说,在 SkyWalking
中 span 分成以下三种:
- EntrySapn:代表服务提供者,也就是服务器的端点。我们可以简单理解成服务的提供方,比如对外提供服务的
Webflux
服务或者 MQ 的消费则都是 EntrySpan。 - ExitSpan:代表服务的消费者,比如一个服务的客户端或者消息队列的生产者都可以理解成一个 ExitSpan。
- LocalSpan:与前边的 EntrySpan 和 ExitSpan 相比,LocalSpan 的概念就比较特殊了,它其实本身与远程服务调用没有任何关系,它更多的可能指代的的本地的 java 方法。它的出现可能是为了解决
SkyWalking
监控本地方法调用的问题。比如说,我们想知道某个本地方法的调用请求,我们便可以将该方法定义成一个LocalSpan
,然后OAP
端便可以收集到对应的 span 信息,然后在 web 端清晰的展示该方法的调用情况。
上下文载体(ContextCarrier)
因为分布式追踪,大部分情况下都是跨进程的,因此为了解决跨进程的链路绑定问题,SkyWalking
引入了 ContextCarrier
的概念。
以下是有关如何在 A -> B 分布式调用中使用 ContextCarrier
的步骤.
- 在客户端, 创建一个新的空的 ContextCarrier.
- 通过
ContextManager#createExitSpan
创建一个ExitSpan
或者使用ContextManager#inject
来初始化ContextCarrier
. - 将
ContextCarrier
所有信息放到请求头 (如 HTTP HEAD), 附件(如 Dubbo RPC 框架), 或者消息 (如 Kafka) 中 - 通过服务调用, 将
ContextCarrier
传递到服务端.在服务端, 在对应组件的头部, 附件或消息中获取ContextCarrier
所有内容. - 通过
ContestManager#createEntrySpan
创建EntrySpan
或者使用ContextManager#extract
来绑定服务端和客户端.
异步 API
因为官方关于插件具体的开发是给了比较详细的开发文档的(戳这里)👈,因此我在此时针对 API 部分就不详细来说了,我会重点介绍几个自己在开发 webflux webclient
的过程中用到的异步 API。
因为此次是对 webflux WebClient
来开发插件,许多方法的调用都需要时跨线程的因此,我们需要使用异步 API。
简单来说异步 API 的使用步骤如下:
- 在原始上下文中调用
AsyncSpan#PrepareForAsync
; - 将该 Span 传递到其他线程,并江湾城相关属性比如 tag、log、status code 等属性进行设置;
- 全部操作就绪之后,可在任意线程中调用
#asyncFinish
结束调用 - 当所有的
#prepareForAsync
完成之后,追踪上下文就会结束,并一起被会传到后端服务(根据 API 的执行次数来进行判断)。
插件编写
确定拦截点
插件本身的开发肯定有一定的业务的逻辑,因此我们在开发之前需要根据插件的业务逻辑的确定合适的插入点位置。以 webflux-webclient-plugin
为例,因为该插件本质上是为了获取 webclient 在发起请求时的调用信息,因此在确定插入点之前我们首先要分析,它整个的调用过程是怎么的。
因此我对 WebClient 从发起请求到获得相应整个过程进行了分析,画出了如下的:
分析整个过程,我发现,无论 WebClient 调用的是 retrieve()
方法还是调用的 exchange()方法,最终在发起请求的时候都是通过 org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction
的 exchange()
方法实际执行异步请求,并且返回一个 Mono<ClientResponse>
类型的响应结果。
因此我们考虑使用 DefalutExchangeFunction#exchange()
方法作为插入点方法,但仅仅使用这一个插入点是否是足够的哪?
这里我们先留下一个小小的悬念,在业务代码开发部分,我会详细讲解自己在开发过程中所遇到的坑!!
拦截与业务代码开发
在插入点进行确定之后,我们便可以结合业务逻辑开始代码部分的开发。
定义拦截点
前边我们已经确定出了具体的拦截点,下边我们需要在插件目录中定义出该拦截点。
在创建的插件目录的 Resourse
目录,定义一个 skywalking-plugin.def
文件,添加插件定义:
spring-webflux-5.x-webclient=org.apache.skywalking.apm.plugin.spring.webflux.v5.webclient.define.BodyInserterRequestInstrumentation
在 define
目录下创建 Instrumentation
类,以 webflux-webclient 插件为例,我创建了一个 WebFluxWebClientInterceptor
类,用来指定拦截点的具体方法。
具体代码如下所示:
public class WebFluxWebClientInstrumentation extends ClassEnhancePluginDefine {
private static final String ENHANCE_CLASS = "org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction";
private static final String INTERCEPT_CLASS = "org.apache.skywalking.apm.plugin.spring.webflux.v5.webclient.WebFluxWebClientInterceptor";
@Override
protected ClassMatch enhanceClass() {
return NameMatch.byName(ENHANCE_CLASS);
}
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[0];
}
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[]{
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("exchange");
}
@Override
public String getMethodsInterceptor() {
return INTERCEPT_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}
@Override
public StaticMethodsInterceptPoint[] getStaticMethodsInterceptPoints() {
return new StaticMethodsInterceptPoint[0];
}
实现对应的 Interceptor 类
有了插入点之后,我们还需要通过一个类来对插入点方法做具体增强的工作,因此我们定义了一个 WebFluxWebClientInstrumentation
类用来做具体的方法增强工作。
具体来说,在该类中做了如下操作:
- 获取请求参数,收集链路信息
- 创建 ContextCarrier,为进程的数据管理做准备。
- 创建 ExitSpan
- 设置 span 相关信息,比如请求方法的类型、访问的 url 等内容
- 将 ContextCarrier 对象进行动态传递,传递给第二个插入点增强类
- 将当前 span 进行传递,便于后续对响应信息进行判断和设置
具体代码如下(org.apache.skywalking.apm.plugin.spring.webflux.v5.webclient
包下 WebFluxWebClientInterceptor 类)。
同时,我在后续调试的过程中发现,只定义一个拦截点是不够的,因为 request 只有在初始化的过程中才能被操作,也就是是说,在该位置违法将 span 的相关信息放置到 request 的头文件中,进行跨链传输。
因此我在 org.springframework.http.client.reactive.ClientHttpRequest
的构造方法处也设置了一个拦截点,负责讲 span 信息放置到 request 中进行跨链传输。
具体实现如下所示:
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
ClientHttpRequest clientHttpRequest = (ClientHttpRequest) allArguments[0];
ContextCarrier contextCarrier = (ContextCarrier) objInst.getSkyWalkingDynamicField();
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
clientHttpRequest.getHeaders().set(next.getHeadKey(), next.getHeadValue());
编写测试用例
在插件编写完成之后,我们还需要编写一个测试用例用来做 CI 测试。插件开发的详细文档可以参考戳一下👈
此处我就简单说一下用例的编写流程。
用例工程是一个独立的 Maven 工程。该工程能将工程打包镜像, 并要求提供一个外部能够访问的 Web 服务用例测试调用链追踪。
用例工程的目录图如下所示:
[plugin_testcase]
|__ [config]
| |__ docker-compse.yml
| |__ expectedData.yaml
|__ [src]
| |__ [main]
| | ...
| |__ [resources]
| | ...
|__ pom.xml
|__ testcase.yml
[] = directory
文件用途说明
以下是用例工程中配置文件的说明:
文件 | 用途 |
---|---|
docker-compose.xml | 定义用例的 docker 运行容器环境 |
expectedData.yaml | 定义用例期望生成的 Segment 的数据 |
testcase.yml | 定义用例的基本信息,如: 被测试框架名称、版本号 |
测试用例编写流程
- 编写用例代码
- 打包并测试用例镜像,确保在没有加载探针时的用例镜像能够正常运行
- 编写期望数据文件
- 编写用例配置文件
- 测试用例
Pull Request
提交前的检查
- 在正式提交以前一定要保证集成测试在本地通过
- 更新插件文档
- 插件文档需要更新:Supported-list.md 相关插件信息的支持。
- 插件如果为可选插件需要在 agent-optional-plugins 可选插件文档中增加对应的描述。
提交 PR
在提交 PR 时,一定要简要描述个人对插件的设计思路,这样有助于社区贡献者讨论完成 codereview。
申请自动化测试
测试用例编写完成后,可以申请自动化测试,了解插件的兼容性等问题
在自动化测试完成之后,会有社区成员进行代码审查,审查通过后,不出意外最终会被合并到主分支上。
自己在开发过程中遇到的问题
-
在搭建开发环境,完成项目的导入工作之后,maven 总报错。
解决方法:增加了国内的多个 maven 源之后该问题被解决
-
在确定插入点 exchange()方法之后,在调试过程中无法被拦截。
解决方法由于选择的增强类属于内部类,因此在
DefaultExchangeFunction
,因此在选择该类作为内部类的时候应该使用#
进行连接,而不是通过.
。即应该写成org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction
的形式。 -
在插件基本功能编写完成后,OAP 端却无法收集到链路信息。
解决方法: 使用最新的 OAP 收集端程序来进行接收。之前一直使用的本地直接编译的 OAP 端,发现不能工作,使用编译好的 OAP 端代码版本过低时也不能使用。
-
同一服务的两个 span 不能够串联。
原因分析: 经过分析出现该问题的原因主要是关闭 span 的时机不对。由于使用的是异步接口,因此在关闭 span 的时候必须在
doFinally()
方法体内进行关闭。防治 span 提前关闭,从而出现同一服务的 span 不能串联的情况发成解决方法: 修改 span 的关闭时机,在
doFinally()
方法体中执行span.asyncFinish()
方法 -
在本地跑集成测试时,遇到无法启动 docker 的问题。
原因分析: 根据保存内容发现是测试脚本在启动 docker 的过程中出现权限不足的问题,可能是 docker 的使用需要用的 root 的权限。
解决方法: 将当前用户增加到 docker 的用户组中,从而使得当前的用户具有操作 docker 的权限。
-
在集成测试阶段出现 SegementNotFoundException 问题
原因分析:该问题的出现主要是在对 Segment 进行验证的过程中,发现 Segement 丢失的情况发生
解决方法: 该问题在经过深入分析之后发现,实际上就是因为在编写插件的时候,插入点选择不充分导致的。
exchange
()这个插入点可以用来收集信息,但却无法用来进行链路信息绑定。因此后续重新设计了插件的插入点,增加了第二个插入点,并且在第二个插入点位置进行链路的绑定,至此问题解决。
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于