使用注解实现“接口事务”

本贴最后更新于 534 天前,其中的信息可能已经时异事殊

前言

在之前处理接口的事务的时候,可能都是直接使用 TransactionManager 这一种“显式定义类”的方式,将接口传递给它,然后保证事务的实现。

其实可以进一步将“事务调用”这一个方法抽离出来,让其成为一个“注解”,只要接口被“事务注解”所标记,那么它就会被事务处理

使用示范

/** * 发送可靠消息,在事务提交后保证发送成功 * * @param topic * @param body */ @SecureInvoke public void sendSecureMsg(String topic, Object body, Object key) { Message<Object> build = MessageBuilder .withPayload(body) .setHeader("KEYS", key) .build(); rocketMQTemplate.send(topic, build); }
  • 被 @SecureInvoke 注解后,执行 sendSecureMsg 方法会自动启用事务

代码实现

自定义注解 @​​SecureInvoke​

/** * 保证方法成功执行。如果在事务内的方法,会将操作记录入库,保证执行。 */ @Retention(RetentionPolicy.RUNTIME)//运行时生效 @Target(ElementType.METHOD)//作用在方法上 public @interface SecureInvoke { /** * 默认3次 * * @return 最大重试次数(包括第一次正常执行) */ int maxRetryTimes() default 3; /** * 默认异步执行,先入库,后续异步执行,不影响主线程快速返回结果,毕竟失败了有重试,而且主线程的事务已经提交了,串行执行没啥意义。 * 同步执行适合mq消费场景等对耗时不关心,但是希望链路追踪不被异步影响的场景。 * * @return 是否异步执行 */ boolean async() default true; }

注解处理切片 ​SecureInvokeAspect

@Slf4j @Aspect @Order(Ordered.HIGHEST_PRECEDENCE + 1)//确保最先执行 @Component public class SecureInvokeAspect { @Autowired private SecureInvokeService secureInvokeService; @Around("@annotation(secureInvoke)") public Object around(ProceedingJoinPoint joinPoint, SecureInvoke secureInvoke) throws Throwable { boolean async = secureInvoke.async(); boolean inTransaction = TransactionSynchronizationManager.isActualTransactionActive(); //非事务状态,直接执行,不做任何保证。 if (SecureInvokeHolder.isInvoking() || !inTransaction) { return joinPoint.proceed(); } Method method = ((MethodSignature) joinPoint.getSignature()).getMethod(); List<String> parameters = Stream.of(method.getParameterTypes()).map(Class::getName).collect(Collectors.toList()); SecureInvokeDTO dto = SecureInvokeDTO.builder() .args(JsonUtils.toStr(joinPoint.getArgs())) .className(method.getDeclaringClass().getName()) .methodName(method.getName()) .parameterTypes(JsonUtils.toStr(parameters)) .build(); SecureInvokeRecord record = SecureInvokeRecord.builder() .secureInvokeDTO(dto) .maxRetryTimes(secureInvoke.maxRetryTimes()) .nextRetryTime(DateUtil.offsetMinute(new Date(), (int) SecureInvokeService.RETRY_INTERVAL_MINUTES)) .build(); secureInvokeService.invoke(record, async); return null; } }

将切片配置 TransactionAutoConfiguration

@Configuration @EnableScheduling @MapperScan(basePackageClasses = SecureInvokeRecordMapper.class) @Import({SecureInvokeAspect.class, SecureInvokeRecordDao.class}) public class TransactionAutoConfiguration { @Nullable protected Executor executor; /** * Collect any {@link AsyncConfigurer} beans through autowiring. */ @Autowired void setConfigurers(ObjectProvider<SecureInvokeConfigurer> configurers) { Supplier<SecureInvokeConfigurer> configurer = SingletonSupplier.of(() -> { List<SecureInvokeConfigurer> candidates = configurers.stream().collect(Collectors.toList()); if (CollectionUtils.isEmpty(candidates)) { return null; } if (candidates.size() > 1) { throw new IllegalStateException("Only one SecureInvokeConfigurer may exist"); } return candidates.get(0); }); executor = Optional.ofNullable(configurer.get()).map(SecureInvokeConfigurer::getSecureInvokeExecutor).orElse(ForkJoinPool.commonPool()); } @Bean public SecureInvokeService getSecureInvokeService(SecureInvokeRecordDao dao) { return new SecureInvokeService(dao, executor); } @Bean public MQProducer getMQProducer() { return new MQProducer(); } }
  • 使切片生效

事务处理类 ​SecureInvokeService

@Slf4j @AllArgsConstructor public class SecureInvokeService { public static final double RETRY_INTERVAL_MINUTES = 2D; private final SecureInvokeRecordDao secureInvokeRecordDao; private final Executor executor; @Scheduled(cron = "*/5 * * * * ?") public void retry() { List<SecureInvokeRecord> secureInvokeRecords = secureInvokeRecordDao.getWaitRetryRecords(); for (SecureInvokeRecord secureInvokeRecord : secureInvokeRecords) { doAsyncInvoke(secureInvokeRecord); } } public void save(SecureInvokeRecord record) { secureInvokeRecordDao.save(record); } private void retryRecord(SecureInvokeRecord record, String errorMsg) { Integer retryTimes = record.getRetryTimes() + 1; SecureInvokeRecord update = new SecureInvokeRecord(); update.setId(record.getId()); update.setFailReason(errorMsg); update.setNextRetryTime(getNextRetryTime(retryTimes)); if (retryTimes > record.getMaxRetryTimes()) { update.setStatus(SecureInvokeRecord.STATUS_FAIL); } else { update.setRetryTimes(retryTimes); } secureInvokeRecordDao.updateById(update); } private Date getNextRetryTime(Integer retryTimes) {//或者可以采用退避算法 double waitMinutes = Math.pow(RETRY_INTERVAL_MINUTES, retryTimes);//重试时间指数上升 2m 4m 8m 16m return DateUtil.offsetMinute(new Date(), (int) waitMinutes); } private void removeRecord(Long id) { secureInvokeRecordDao.removeById(id); } public void invoke(SecureInvokeRecord record, boolean async) { boolean inTransaction = TransactionSynchronizationManager.isActualTransactionActive(); //非事务状态,直接执行,不做任何保证。 if (!inTransaction) { return; } //保存执行数据 save(record); TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { @SneakyThrows @Override public void afterCommit() { //事务后执行 if (async) { doAsyncInvoke(record); } else { doInvoke(record); } } }); } public void doAsyncInvoke(SecureInvokeRecord record) { executor.execute(() -> { System.out.println(Thread.currentThread().getName()); doInvoke(record); }); } public void doInvoke(SecureInvokeRecord record) { SecureInvokeDTO secureInvokeDTO = record.getSecureInvokeDTO(); try { SecureInvokeHolder.setInvoking(); Class<?> beanClass = Class.forName(secureInvokeDTO.getClassName()); Object bean = SpringUtil.getBean(beanClass); List<String> parameterStrings = JsonUtils.toList(secureInvokeDTO.getParameterTypes(), String.class); List<Class<?>> parameterClasses = getParameters(parameterStrings); Method method = ReflectUtil.getMethod(beanClass, secureInvokeDTO.getMethodName(), parameterClasses.toArray(new Class[]{})); Object[] args = getArgs(secureInvokeDTO, parameterClasses); //执行方法 method.invoke(bean, args); //执行成功更新状态 removeRecord(record.getId()); } catch (Throwable e) { log.error("SecureInvokeService invoke fail", e); //执行失败,等待下次执行 retryRecord(record, e.getMessage()); } finally { SecureInvokeHolder.invoked(); } } @NotNull private Object[] getArgs(SecureInvokeDTO secureInvokeDTO, List<Class<?>> parameterClasses) { JsonNode jsonNode = JsonUtils.toJsonNode(secureInvokeDTO.getArgs()); Object[] args = new Object[jsonNode.size()]; for (int i = 0; i < jsonNode.size(); i++) { Class<?> aClass = parameterClasses.get(i); args[i] = JsonUtils.nodeToValue(jsonNode.get(i), aClass); } return args; } @NotNull private List<Class<?>> getParameters(List<String> parameterStrings) { return parameterStrings.stream().map(name -> { try { return Class.forName(name); } catch (ClassNotFoundException e) { log.error("SecureInvokeService class not fund", e); } return null; }).collect(Collectors.toList()); } }

  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    3194 引用 • 8214 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...