开源推荐
推荐一款一站式性能监控工具(开源项目)
Pepper-Metrics 是跟一位同事一起开发的开源组件,主要功能是通过比较轻量的方式与常用开源组件(jedis/mybatis/motan/dubbo/servlet)集成,收集并计算 metrics,并支持输出到日志及转换成多种时序数据库兼容数据格式,配套的 grafana dashboard 友好的进行展示。项目当中原理文档齐全,且全部基于 SPI 设计的可扩展式架构,方便的开发新插件。另有一个基于 docker-compose 的独立 demo 项目可以快速启动一套 demo 示例查看效果 https://github.com/zrbcool/pepper-metrics-demo。如果大家觉得有用的话,麻烦给个 star,也欢迎大家参与开发,谢谢:)
进入正题...
Motan 系列文章
0 @MotanReferer 注解是个啥
被 @MotanReferer
标注的 setter 方法或 field 会被 motan 在启动时扫描,并为其创建动态代理,并将动态代理的实例赋值给这个 field。远程服务的调用都是在这个代理中实现的。
下面以注解在 field 的情况为例,说明 @MotanReferer
的解析以及创建动态代理的过程:
@MotanReferer(basicReferer = "ad-commonBasicRefererConfigBean", application = "ad-filter", version = "1.1.0")
private AdCommonRPC adCommonRPC;
关于如何扫描,在 Motan 如何完成与 Spring 的集成 一文中有详细说明,这里不再赘述。
1 @MotanReferer 注解的解析
这个过程始于 AnnotationBean
中对扫描到的 bean 的 field 的解析。Motan 会解析出带有 @MotanReferer
的 field,应调用 AnnotationBean 的 refer
方法初始化并创建代理。field 的解析如下:
Field[] fields = clazz.getDeclaredFields();
for (Field field : fields) {
try {
if (!field.isAccessible()) {
field.setAccessible(true);
}
MotanReferer reference = field.getAnnotation(MotanReferer.class);
if (reference != null) {
// 调用 refer 方法初始化并创建动态代理
// 并将field的引用指向这个代理对象
Object value = refer(reference, field.getType());
if (value != null) {
field.set(bean, value);
}
}
} catch (Throwable t) {
throw new BeanInitializationException("Failed to init remote service reference at filed " + field.getName()
+ " in class " + bean.getClass().getName(), t);
}
}
2 @MotanReferer 的初始化
类似于服务注册的过程,MotanReferer 是通过 RefererConfigBean
类来管理配置、注册中心、URL、HA、LoadBalance、Proxy 等资源的。
还是先来个 RefererConfigBean
的 UML,熟悉一下整个体系都有啥东西。
其中,注册中心、URL、Protocol、HA、LoadBalance 策略等都是在 RefererConfig
的 clusterSupports
中管理的。
来继续看这个 refer 方法,这个方法中首先将 @MotanReferer
注解中的配置信息解析到 RefererConfigBean
中,然后依然是调用 afterPropertiesSet()
方法做一些校验,最后调用 RefererConfigBean
的 getRef()
方法,各个组件的初始化以及 Proxy 都在这里创建。
private <T> Object refer(MotanReferer reference, Class<?> referenceClass) {
// 解析接口名
String interfaceName;
if (!void.class.equals(reference.interfaceClass())) {
interfaceName = reference.interfaceClass().getName();
} else if (referenceClass.isInterface()) {
interfaceName = referenceClass.getName();
} else {
throw new IllegalStateException("The @Reference undefined interfaceClass or interfaceName, and the property type "
+ referenceClass.getName() + " is not a interface.");
}
String key = reference.group() + "/" + interfaceName + ":" + reference.version();
RefererConfigBean<T> referenceConfig = referenceConfigs.get(key);
if (referenceConfig == null) {
referenceConfig = new RefererConfigBean<T>();
referenceConfig.setBeanFactory(beanFactory);
if (void.class.equals(reference.interfaceClass())
&& referenceClass.isInterface()) {
referenceConfig.setInterface((Class<T>) referenceClass);
} else if (!void.class.equals(reference.interfaceClass())) {
referenceConfig.setInterface((Class<T>) reference.interfaceClass());
}
if (beanFactory != null) {
// ... 省略,初始化 @MotanReferer的配置信息
try {
// 校验basicReferer配置、Protocol、Registry配置,与服务注册过程相似
referenceConfig.afterPropertiesSet();
} catch (RuntimeException e) {
throw (RuntimeException) e;
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
referenceConfigs.putIfAbsent(key, referenceConfig);
referenceConfig = referenceConfigs.get(key);
}
// 创建Proxy
return referenceConfig.getRef();
}
getRef()
方法中实际是调用 initRef()
方法来创建 Proxy 的。
public synchronized void initRef() {
// ... 校验 interface 和 protocols 是否非空
checkInterfaceAndMethods(interfaceClass, methods);
clusterSupports = new ArrayList<>(protocols.size());
List<Cluster<T>> clusters = new ArrayList<>(protocols.size());
String proxy = null;
ConfigHandler configHandler = ExtensionLoader.getExtensionLoader(ConfigHandler.class).getExtension(MotanConstants.DEFAULT_VALUE);
// 解析注册中心地址
List<URL> registryUrls = loadRegistryUrls();
// 解析本机IP
String localIp = getLocalHostAddress(registryUrls);
for (ProtocolConfig protocol : protocols) {
Map<String, String> params = new HashMap<>();
params.put(URLParamType.nodeType.getName(), MotanConstants.NODE_TYPE_REFERER);
params.put(URLParamType.version.getName(), URLParamType.version.getValue());
params.put(URLParamType.refreshTimestamp.getName(), String.valueOf(System.currentTimeMillis()));
collectConfigParams(params, protocol, basicReferer, extConfig, this);
collectMethodConfigParams(params, this.getMethods());
String path = StringUtils.isBlank(serviceInterface) ? interfaceClass.getName() : serviceInterface;
URL refUrl = new URL(protocol.getName(), localIp, MotanConstants.DEFAULT_INT_VALUE, path, params);
// 初始化ClusterSupport
ClusterSupport<T> clusterSupport = createClusterSupport(refUrl, configHandler, registryUrls);
clusterSupports.add(clusterSupport);
clusters.add(clusterSupport.getCluster());
if (proxy == null) {
// 获取创建proxy的方式,默认是JDK动态代理
String defaultValue = StringUtils.isBlank(serviceInterface) ? URLParamType.proxy.getValue() : MotanConstants.PROXY_COMMON;
proxy = refUrl.getParameter(URLParamType.proxy.getName(), defaultValue);
}
}
// 创建代理
ref = configHandler.refer(interfaceClass, clusters, proxy);
initialized.set(true);
}
可以发现,又调用了 configHandler.refer
方法,默认情况下,这个 proxy 参数的值是"jdk",即使用 JDK 自身的动态代理功能创建代理。
另外一个比较重要的类是 ClusterSupport
,这个类封装了下面这些信息:
private static ConcurrentHashMap<String, Protocol> protocols = new ConcurrentHashMap<String, Protocol>();
// 集群支持
private Cluster<T> cluster;
// 注册中心URL
private List<URL> registryUrls;
// 远程调用URL
private URL url;
private Class<T> interfaceClass;
// 使用的协议
private Protocol protocol;
private ConcurrentHashMap<URL, List<Referer<T>>> registryReferers = new ConcurrentHashMap<URL, List<Referer<T>>>();
其中,cluster
在 motan 的具体实现实际上是 ClusterSpi
这个类,他封装了以下信息:
public class ClusterSpi<T> implements Cluster<T> {
// 高可用策略
private HaStrategy<T> haStrategy;
// 负载均衡策略
private LoadBalance<T> loadBalance;
private List<Referer<T>> referers;
private AtomicBoolean available = new AtomicBoolean(false);
private URL url;
}
这些东西在上面的 createClusterSupport
方法中生成好了,这里先不关注 Cluster、Ha、LoadBalance 等,本文主要关注代理的创建及 RPC 调用。继续看 configHandler.refer
这个方法。
public <T> T refer(Class<T> interfaceClass, List<Cluster<T>> clusters, String proxyType) {
ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(proxyType);
return proxyFactory.getProxy(interfaceClass, clusters);
}
又是一个 SPI 扩展,默认使用的下面这个 JDK 的 ProxyFactory。
@SpiMeta(name = "jdk")
public class JdkProxyFactory implements ProxyFactory {
@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Class<T> clz, List<Cluster<T>> clusters) {
return (T) Proxy.newProxyInstance(clz.getClassLoader(), new Class[]{clz}, new RefererInvocationHandler<>(clz, clusters));
}
}
OK,到这里代理就创建好了。这个代理的实例最终会被 @MotanReferer 注解的 field 引用。初始化过程结束。
这里要明确,最终是通过 RefererInvocationHandler
这个类创建的代理。
3 RPC 调用
既然 RefererInvocationHandler
代理了我们的目标接口,那么接口的每个方法调用都会走到这个代理类中。所以接下来主要关注代理是咋完成 RPC 调用的。
这里只给出关键代码:
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 省略 local method 部分
DefaultRequest request = new DefaultRequest();
request.setRequestId(RequestIdGenerator.getRequestId());
request.setArguments(args);
String methodName = method.getName();
boolean async = false; // 异步调用支持,暂不关注
if (methodName.endsWith(MotanConstants.ASYNC_SUFFIX) && method.getReturnType().equals(ResponseFuture.class)) {
methodName = MotanFrameworkUtil.removeAsyncSuffix(methodName);
async = true;
}
request.setMethodName(methodName);
request.setParamtersDesc(ReflectUtil.getMethodParamDesc(method));
request.setInterfaceName(interfaceName);
return invokeRequest(request, getRealReturnType(async, this.clz, method, methodName), async);
}
这个方法先将 RPC 的相关信息封装到 DefaultRequest
中,然后调用 invokeRequest
方法。
Object invokeRequest(Request request, Class returnType, boolean async) throws Throwable {
RpcContext curContext = RpcContext.getContext();
// 省略 初始化 RpcContext
// 当 referer配置多个protocol的时候,比如A,B,C,
// 那么正常情况下只会使用A,如果A被开关降级,那么就会使用B,B也被降级,那么会使用C
for (Cluster<T> cluster : clusters) {
// 如果开关处于关闭状态,不会去调用这个远程机器
String protocolSwitcher = MotanConstants.PROTOCOL_SWITCHER_PREFIX + cluster.getUrl().getProtocol();
Switcher switcher = switcherService.getSwitcher(protocolSwitcher);
if (switcher != null && !switcher.isOn()) {
continue;
}
request.setAttachment(URLParamType.version.getName(), cluster.getUrl().getVersion());
request.setAttachment(URLParamType.clientGroup.getName(), cluster.getUrl().getGroup());
// 带上client的application和module
request.setAttachment(URLParamType.application.getName(), cluster.getUrl().getApplication());
request.setAttachment(URLParamType.module.getName(), cluster.getUrl().getModule());
Response response = null;
boolean throwException = Boolean.parseBoolean(cluster.getUrl().getParameter(URLParamType.throwException.getName(), URLParamType.throwException.getValue()));
try {
MotanFrameworkUtil.logEvent(request, MotanConstants.TRACE_INVOKE);
// 执行调用
response = cluster.call(request);
if (async) {
// 省略异步调用的支持
} else {
Object value = response.getValue();
if (value != null && value instanceof DeserializableObject) {
try {
value = ((DeserializableObject) value).deserialize(returnType);
} catch (IOException e) {
LoggerUtil.error("deserialize response value fail! deserialize type:" + returnType, e);
throw new MotanFrameworkException("deserialize return value fail! deserialize type:" + returnType, e);
}
}
return value;
}
} catch (RuntimeException e) {
// 异常处理,包括处理是否向上游服务抛出
}
}
throw new MotanServiceException("Referer call Error: cluster not exist, interface=" + interfaceName + " " + MotanFrameworkUtil.toString(request), MotanErrorMsgConstant.SERVICE_UNFOUND);
}
cluster.call()
public Response call(Request request) {
if (available.get()) {
try {
// haStrategy是通过SPI来管理的,默认的HA策略是 failover
// 即调用失败时,自动尝试其他服务器
return haStrategy.call(request, loadBalance);
} catch (Exception e) {
return callFalse(request, e);
}
}
return callFalse(request, new MotanServiceException(MotanErrorMsgConstant.SERVICE_UNFOUND));
}
haStrategy.call()
public Response call(Request request, LoadBalance<T> loadBalance) {
// refer列表
List<Referer<T>> referers = selectReferers(request, loadBalance);
if (referers.isEmpty()) {
throw new MotanServiceException(String.format("FailoverHaStrategy No referers for request:%s, loadbalance:%s", request,
loadBalance));
}
URL refUrl = referers.get(0).getUrl();
// 这里是配置中配置的 retries 重试次数,默认:0
int tryCount =
refUrl.getMethodParameter(request.getMethodName(), request.getParamtersDesc(), URLParamType.retries.getName(),
URLParamType.retries.getIntValue());
// 如果有问题,则设置为不重试
if (tryCount < 0) {
tryCount = 0;
}
for (int i = 0; i <= tryCount; i++) {
Referer<T> refer = referers.get(i % referers.size());
try {
request.setRetries(i);
return refer.call(request); // RPC
} catch (RuntimeException e) {
// 对于业务异常,直接抛出
if (ExceptionUtil.isBizException(e)) {
throw e;
} else if (i >= tryCount) {
throw e;
}
LoggerUtil.warn(String.format("FailoverHaStrategy Call false for request:%s error=%s", request, e.getMessage()));
}
}
throw new MotanFrameworkException("FailoverHaStrategy.call should not come here!");
}
然后,refer.call()
public Response call(Request request) {
if (!isAvailable()) {
throw new MotanFrameworkException(this.getClass().getSimpleName() + " call Error: node is not available, url=" + url.getUri()
+ " " + MotanFrameworkUtil.toString(request));
}
// 增加目标server的连接数,用于loadBalance
incrActiveCount(request);
Response response = null;
try {
response = doCall(request); // do rpc
return response;
} finally {
// 调用完要将目标server的连接数-1
decrActiveCount(request, response);
}
}
到这里,doCall
方法就是通过 Netty 调用 RPC 了。
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于