Spring 嵌入式轻量消息队列

本贴最后更新于 1661 天前,其中的信息可能已经时移世异

为 Spring-boot 提供消息队列能力的 starter, 并提供了 VM 线程的轻量级实现。
项目地址: https://github.com/wangyuheng/embedded-mq-spring-boot-starter

什么是消息队列

消息队列是用于存放消息的容器,可供消费者取出消息进行消费。

观察者模式

观察者(Observer)模式的定义:指多个对象间存在一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知并被自动更新。这种模式有时又称作发布-订阅模式、模型-视图模式,它是对象行为型模式。

Observer 本来的意思是观察者,但具体的实现中并不是主动去观察,而是被动的接收来自 Subject 的通知,所以更合适的名字应该是"消息投递"。
而且通知的模式还存在一个弊端: 通知及多个 ConcreteObserver 的消费程序仍在一个同步线程内,所以只是代码结构层面的解耦,底层还是一个事务内。
为了解决这个弊端,将消息的发送及 N 个消费程序拆分为 N+1 个事务,所以引入消息队列用于存储 Subject

领域模型设计

  • 罗列领域概念

  • 梳理交互关系

代码实现

  1. LinkedBlockingQueue 作为存储 Message 的容器。
  2. Store 用于存储消息。为了兼容多个 Consumer,每个 Consumer 指定一个唯一标识作为 Partition Key,对应唯一的一个 LinkedBlockingQueue。 e.g. Map<Partition, LinkedBlockingQueue<Message>> messageQueueMap = new ConcurrentHashMap<>();
  3. Producer 通过 Transport 将消息发送只多个 Partition KeyLinkedBlockingQueue 队列中
  4. 每个 Consumer 开启一个线程,通过轮询方式从 LinkedBlockingQueue 队列中消费消息。

代码片段

  • VmStore
    private Map<Partition, LinkedBlockingQueue<Message>> messageQueueMap = new ConcurrentHashMap<>();

    @Override
    public void append(Message message, Partition partition) {
        initQueue(partition);
        messageQueueMap.get(partition).add(message);
    }

    @Override
    public LinkedBlockingQueue<Message> findByPartition(Partition partition) {
        initQueue(partition);
        return messageQueueMap.get(partition);
    }

    private void initQueue(Partition partition) {
        if (!messageQueueMap.containsKey(partition)) {
            synchronized (this) {
                if (!messageQueueMap.containsKey(partition)) {
                    messageQueueMap.put(partition, new LinkedBlockingQueue<>());
                }
            }
        }
    }
  • Transport
public void transfer(Message message) {
    final String topic = message.getTopic();
    topicClientIdMap.get(topic).forEach(clientId -> {
        Partition partition = new Partition(clientId, topic);
        store.append(message, partition);
    });
}
  • ConsumerCluster
/**
 * 开启消费线程
 *  只能开启一次
 */
public synchronized void start(Store store) {
    if (!initialized.get()) {
        synchronized (this) {
            SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
            taskExecutor.setDaemon(true);
            taskExecutor.execute(new ConsumerListener(this.getMessageHandler(), store.findByPartition(this.generatePartition())));
            initialized.set(true);
        }
    }
}

/**
 * 关闭消费线程
 */
public void shutdown() {
    liveToggle.set(false);
}

/**
 * 暂停消费
  */
public void pause() {
    runToggle.set(false);
}

/**
 * 重启暂停的消费线程
 */
public void restart() {
    runToggle.set(true);
}

class ConsumerListener implements Runnable {

    private MessageHandler handler;
    private LinkedBlockingQueue<Message> queue;

    ConsumerListener(MessageHandler handler, LinkedBlockingQueue<Message> queue) {
        this.handler = handler;
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                if (!liveToggle.get()) {
                    break;
                }
                if (runToggle.get()) {
                    Message message = queue.poll();
                    if (null == message) {
                        Thread.sleep(100);
                    } else {
                        handler.handle(message);
                    }
                } else {
                    Thread.sleep(100);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

使用 LinkedBlockingQueue 却未使用 take 方法的原因是为了灵活控制消费线程的启停。

spring 集成

为了方便使用,通过 annotation 的形式与 spring 框架进行集成。

示例

  • Consumer
@Consumer(topic = CONSUMER_TOPIC, id = CUSTOM_CONSUMER_ID)
public void consumerMessage(Message message) {
    consumerRecordMap.get(CUSTOM_CONSUMER_ID).add(message);
}
  • Producer
@Autowired
private DefaultProducer<String> producer;

public void sendMessage(){
    producer.send(new Message<>(CUSTOM_TOPIC, "This is a message!"));
}

代码实现

/**
 * 注册消费者bean
 *
 * @see Consumer
 * @see MessageHandler
 * @see Store
 */
public class ConsumerBeanDefinitionRegistryPostProcessor implements BeanPostProcessor, ApplicationContextAware {

    private ConfigurableApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = (ConfigurableApplicationContext) applicationContext;
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
        Method[] methods = ReflectionUtils.getAllDeclaredMethods(targetClass);
        for (Method method : methods) {
            if (AnnotatedElementUtils.hasAnnotation(method, Consumer.class)) {
                final String topic = method.getAnnotation(Consumer.class).topic();
                final String id = StringUtils.isEmpty(method.getAnnotation(Consumer.class).id()) ? beanName + method.getName() : method.getAnnotation(Consumer.class).id();
                final BeanFactory beanFactory = applicationContext.getBeanFactory();
                final Store store = beanFactory.getBean(Store.class);

                final MessageHandler messageHandler = message -> ReflectionUtils.invokeMethod(method, bean, message);

                final BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(ConsumerCluster.class, () -> {
                    ConsumerCluster consumerCluster = new ConsumerCluster();
                    consumerCluster.setId(id);
                    consumerCluster.setTopic(topic);
                    consumerCluster.setMessageHandler(messageHandler);
                    consumerCluster.start(store);
                    return consumerCluster;
                });
                BeanDefinition beanDefinition = beanDefinitionBuilder.getRawBeanDefinition();
                ((DefaultListableBeanFactory) beanFactory).registerBeanDefinition(beanName + method.getName() + "Listener", beanDefinition);
            }
        }
        return bean;
    }
}

其他

  1. 如何跨应用消费?通过 Mysql、Redis 等公共存储替换 Store 及 Transport 实现。Mysql 需要考虑行锁。
  • Spring

    Spring 是一个开源框架,是于 2003 年兴起的一个轻量级的 Java 开发框架,由 Rod Johnson 在其著作《Expert One-On-One J2EE Development and Design》中阐述的部分理念和原型衍生而来。它是为了解决企业应用开发的复杂性而创建的。框架的主要优势之一就是其分层架构,分层架构允许使用者选择使用哪一个组件,同时为 JavaEE 应用程序开发提供集成的框架。

    944 引用 • 1459 回帖 • 17 关注
  • 消息队列
    40 引用 • 52 回帖 • 2 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...