前言
在之前处理接口的事务的时候,可能都是直接使用 TransactionManager 这一种“显式定义类”的方式,将接口传递给它,然后保证事务的实现。
其实可以进一步将“事务调用”这一个方法抽离出来,让其成为一个“注解”,只要接口被“事务注解”所标记,那么它就会被事务处理
使用示范
/**
* 发送可靠消息,在事务提交后保证发送成功
*
* @param topic
* @param body
*/
@SecureInvoke
public void sendSecureMsg(String topic, Object body, Object key) {
Message<Object> build = MessageBuilder
.withPayload(body)
.setHeader("KEYS", key)
.build();
rocketMQTemplate.send(topic, build);
}
- 被 @SecureInvoke 注解后,执行 sendSecureMsg 方法会自动启用事务
代码实现
自定义注解 @SecureInvoke
/**
* 保证方法成功执行。如果在事务内的方法,会将操作记录入库,保证执行。
*/
@Retention(RetentionPolicy.RUNTIME)//运行时生效
@Target(ElementType.METHOD)//作用在方法上
public @interface SecureInvoke {
/**
* 默认3次
*
* @return 最大重试次数(包括第一次正常执行)
*/
int maxRetryTimes() default 3;
/**
* 默认异步执行,先入库,后续异步执行,不影响主线程快速返回结果,毕竟失败了有重试,而且主线程的事务已经提交了,串行执行没啥意义。
* 同步执行适合mq消费场景等对耗时不关心,但是希望链路追踪不被异步影响的场景。
*
* @return 是否异步执行
*/
boolean async() default true;
}
注解处理切片 SecureInvokeAspect
@Slf4j
@Aspect
@Order(Ordered.HIGHEST_PRECEDENCE + 1)//确保最先执行
@Component
public class SecureInvokeAspect {
@Autowired
private SecureInvokeService secureInvokeService;
@Around("@annotation(secureInvoke)")
public Object around(ProceedingJoinPoint joinPoint, SecureInvoke secureInvoke) throws Throwable {
boolean async = secureInvoke.async();
boolean inTransaction = TransactionSynchronizationManager.isActualTransactionActive();
//非事务状态,直接执行,不做任何保证。
if (SecureInvokeHolder.isInvoking() || !inTransaction) {
return joinPoint.proceed();
}
Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
List<String> parameters = Stream.of(method.getParameterTypes()).map(Class::getName).collect(Collectors.toList());
SecureInvokeDTO dto = SecureInvokeDTO.builder()
.args(JsonUtils.toStr(joinPoint.getArgs()))
.className(method.getDeclaringClass().getName())
.methodName(method.getName())
.parameterTypes(JsonUtils.toStr(parameters))
.build();
SecureInvokeRecord record = SecureInvokeRecord.builder()
.secureInvokeDTO(dto)
.maxRetryTimes(secureInvoke.maxRetryTimes())
.nextRetryTime(DateUtil.offsetMinute(new Date(), (int) SecureInvokeService.RETRY_INTERVAL_MINUTES))
.build();
secureInvokeService.invoke(record, async);
return null;
}
}
将切片配置 TransactionAutoConfiguration
@Configuration
@EnableScheduling
@MapperScan(basePackageClasses = SecureInvokeRecordMapper.class)
@Import({SecureInvokeAspect.class, SecureInvokeRecordDao.class})
public class TransactionAutoConfiguration {
@Nullable
protected Executor executor;
/**
* Collect any {@link AsyncConfigurer} beans through autowiring.
*/
@Autowired
void setConfigurers(ObjectProvider<SecureInvokeConfigurer> configurers) {
Supplier<SecureInvokeConfigurer> configurer = SingletonSupplier.of(() -> {
List<SecureInvokeConfigurer> candidates = configurers.stream().collect(Collectors.toList());
if (CollectionUtils.isEmpty(candidates)) {
return null;
}
if (candidates.size() > 1) {
throw new IllegalStateException("Only one SecureInvokeConfigurer may exist");
}
return candidates.get(0);
});
executor = Optional.ofNullable(configurer.get()).map(SecureInvokeConfigurer::getSecureInvokeExecutor).orElse(ForkJoinPool.commonPool());
}
@Bean
public SecureInvokeService getSecureInvokeService(SecureInvokeRecordDao dao) {
return new SecureInvokeService(dao, executor);
}
@Bean
public MQProducer getMQProducer() {
return new MQProducer();
}
}
- 使切片生效
事务处理类 SecureInvokeService
@Slf4j
@AllArgsConstructor
public class SecureInvokeService {
public static final double RETRY_INTERVAL_MINUTES = 2D;
private final SecureInvokeRecordDao secureInvokeRecordDao;
private final Executor executor;
@Scheduled(cron = "*/5 * * * * ?")
public void retry() {
List<SecureInvokeRecord> secureInvokeRecords = secureInvokeRecordDao.getWaitRetryRecords();
for (SecureInvokeRecord secureInvokeRecord : secureInvokeRecords) {
doAsyncInvoke(secureInvokeRecord);
}
}
public void save(SecureInvokeRecord record) {
secureInvokeRecordDao.save(record);
}
private void retryRecord(SecureInvokeRecord record, String errorMsg) {
Integer retryTimes = record.getRetryTimes() + 1;
SecureInvokeRecord update = new SecureInvokeRecord();
update.setId(record.getId());
update.setFailReason(errorMsg);
update.setNextRetryTime(getNextRetryTime(retryTimes));
if (retryTimes > record.getMaxRetryTimes()) {
update.setStatus(SecureInvokeRecord.STATUS_FAIL);
} else {
update.setRetryTimes(retryTimes);
}
secureInvokeRecordDao.updateById(update);
}
private Date getNextRetryTime(Integer retryTimes) {//或者可以采用退避算法
double waitMinutes = Math.pow(RETRY_INTERVAL_MINUTES, retryTimes);//重试时间指数上升 2m 4m 8m 16m
return DateUtil.offsetMinute(new Date(), (int) waitMinutes);
}
private void removeRecord(Long id) {
secureInvokeRecordDao.removeById(id);
}
public void invoke(SecureInvokeRecord record, boolean async) {
boolean inTransaction = TransactionSynchronizationManager.isActualTransactionActive();
//非事务状态,直接执行,不做任何保证。
if (!inTransaction) {
return;
}
//保存执行数据
save(record);
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@SneakyThrows
@Override
public void afterCommit() {
//事务后执行
if (async) {
doAsyncInvoke(record);
} else {
doInvoke(record);
}
}
});
}
public void doAsyncInvoke(SecureInvokeRecord record) {
executor.execute(() -> {
System.out.println(Thread.currentThread().getName());
doInvoke(record);
});
}
public void doInvoke(SecureInvokeRecord record) {
SecureInvokeDTO secureInvokeDTO = record.getSecureInvokeDTO();
try {
SecureInvokeHolder.setInvoking();
Class<?> beanClass = Class.forName(secureInvokeDTO.getClassName());
Object bean = SpringUtil.getBean(beanClass);
List<String> parameterStrings = JsonUtils.toList(secureInvokeDTO.getParameterTypes(), String.class);
List<Class<?>> parameterClasses = getParameters(parameterStrings);
Method method = ReflectUtil.getMethod(beanClass, secureInvokeDTO.getMethodName(), parameterClasses.toArray(new Class[]{}));
Object[] args = getArgs(secureInvokeDTO, parameterClasses);
//执行方法
method.invoke(bean, args);
//执行成功更新状态
removeRecord(record.getId());
} catch (Throwable e) {
log.error("SecureInvokeService invoke fail", e);
//执行失败,等待下次执行
retryRecord(record, e.getMessage());
} finally {
SecureInvokeHolder.invoked();
}
}
@NotNull
private Object[] getArgs(SecureInvokeDTO secureInvokeDTO, List<Class<?>> parameterClasses) {
JsonNode jsonNode = JsonUtils.toJsonNode(secureInvokeDTO.getArgs());
Object[] args = new Object[jsonNode.size()];
for (int i = 0; i < jsonNode.size(); i++) {
Class<?> aClass = parameterClasses.get(i);
args[i] = JsonUtils.nodeToValue(jsonNode.get(i), aClass);
}
return args;
}
@NotNull
private List<Class<?>> getParameters(List<String> parameterStrings) {
return parameterStrings.stream().map(name -> {
try {
return Class.forName(name);
} catch (ClassNotFoundException e) {
log.error("SecureInvokeService class not fund", e);
}
return null;
}).collect(Collectors.toList());
}
}
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于