Spring 整合 Disruptor3

本贴最后更新于 1802 天前,其中的信息可能已经渤澥桑田

什么是 Disruptor

从功能上来看,Disruptor 是实现了“队列”的功能,而且是一个有界队列。那么它的应用场景自然就是“生产者-消费者”模型的应用场合了。

可以拿 JDK 的 BlockingQueue 做一个简单对比,以便更好地认识 Disruptor 是什么。

我们知道 BlockingQueue 是一个 FIFO 队列,生产者(Producer)往队列里发布(publish)一项事件(或称之为“消息”也可以)时,消费者(Consumer)能获得通知;如果没有事件时,消费者被堵塞,直到生产者发布了新的事件。

这些都是 Disruptor 能做到的,与之不同的是,Disruptor 能做更多:

  • 同一个“事件”可以有多个消费者,消费者之间既可以并行处理,也可以相互依赖形成处理的先后次序(形成一个依赖图);
  • 预分配用于存储事件内容的内存空间;
  • 针对极高的性能目标而实现的极度优化和无锁的设计;

以上虽然简单地描述了 Disruptor 是什么,但对于它"能做什么",还不是那么明白。简而言之,当你需要在两个独立的处理过程之间交换数据时,就可以使用 Disruptor 。当然使用队列也可以,只不过 Disruptor 的性能更好。

实战

本文先不具体去阐述 Disruptor 的工作具体原理,只是简单地将 Spring 与其整合。整合过程很简单,具体步骤如下:

1、在 pom 文件中引入 disruptor

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.2</version>
</dependency>

2、创建事件

@Data 
public class NotifyEvent { 
    private String message;
}

3、创建消息工厂用于生产消息

public class NotifyEventFactory implements EventFactory {
    @Override 
    public Object newInstance() { 
       return new NotifyEvent();
    }
}

4、创建消费者,此处用于处理业务逻辑

public class NotifyEventHandler implements EventHandler<NotifyEvent>,WorkHandler<NotifyEvent> {

    @Override public void onEvent(NotifyEvent notifyEvent, long l, boolean b) throws Exception {
        System.out.println("接收到消息"); this.onEvent(notifyEvent);

    }

    @Override public void onEvent(NotifyEvent notifyEvent) throws Exception {
        System.out.println(notifyEvent+">>>"+UUID.randomUUID().toString());
    }
}

5、自定义异常

@Log4j2 
public class NotifyEventHandlerException implements ExceptionHandler {
    @Override 
	public void handleEventException(Throwable throwable, long sequence, Object event) {
        throwable.fillInStackTrace();
        log.error("process data error sequence ==[{}] event==[{}] ,ex ==[{}]", sequence, event.toString(), throwable.getMessage());
    }

    @Override 
	public void handleOnStartException(Throwable throwable) {
        log.error("start disruptor error ==[{}]!", throwable.getMessage());
    }

    @Override 
	public void handleOnShutdownException(Throwable throwable) {
        log.error("shutdown disruptor error ==[{}]!", throwable.getMessage());
    }
}

6、整合 Spring,对 Disruptor 进行初始化

@Service
public class NotifyServiceImpl implements INotifyService, DisposableBean,InitializingBean {
    private Disruptor<NotifyEvent> disruptor;
    private static final int RING_BUFFER_SIZE = 1024 * 1024;

    @Override
    public void destroy() throws Exception {
        disruptor.shutdown();
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        disruptor = new Disruptor<NotifyEvent>(new NotifyEventFactory(),RING_BUFFER_SIZE, Executors.defaultThreadFactory(), ProducerType.SINGLE,new BlockingWaitStrategy());
        disruptor.setDefaultExceptionHandler(new NotifyEventHandlerException());
        disruptor.handleEventsWith(new NotifyEventHandler());
        disruptor.start();
    }


    @Override
    public void sendNotify(String message) {
        RingBuffer<NotifyEvent> ringBuffer = disruptor.getRingBuffer();
//        ringBuffer.publishEvent(new EventTranslatorOneArg<NotifyEvent,  String>() {
//            @Override
//            public void translateTo(NotifyEvent event, long sequence, String data) {
//                event.setMessage(data);
//            }
//        }, message);
        ringBuffer.publishEvent((event, sequence, data) -> event.setMessage(data), message); //lambda式写法,如果是用jdk1.8以下版本使用以上注释的一段
     
    }
}

7、消息生产接口

public interface INotifyService { /** *  发送消息
     * @author jianzhang11
     * @date 2018/4/13 16:52
     * @param message */
    void sendNotify(String message);

}

8、在需要调用的地方注入 INotifyService 并调用 sendNotify 方法

@GetMapping("test")
@ResponseBody public String testLog() {
    log.info("=============");
    notifyService.sendNotify("Hello,World!"); return "hello,world";
}

  • 代码
    467 引用 • 586 回帖 • 9 关注
  • 工具

    子曰:“工欲善其事,必先利其器。”

    288 引用 • 734 回帖 • 1 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • danl
    146 关注
  • 微软

    微软是一家美国跨国科技公司,也是世界 PC 软件开发的先导,由比尔·盖茨与保罗·艾伦创办于 1975 年,公司总部设立在华盛顿州的雷德蒙德(Redmond,邻近西雅图)。以研发、制造、授权和提供广泛的电脑软件服务业务为主。

    8 引用 • 44 回帖
  • CSDN

    CSDN (Chinese Software Developer Network) 创立于 1999 年,是中国的 IT 社区和服务平台,为中国的软件开发者和 IT 从业者提供知识传播、职业发展、软件开发等全生命周期服务,满足他们在职业发展中学习及共享知识和信息、建立职业发展社交圈、通过软件开发实现技术商业化等刚性需求。

    14 引用 • 155 回帖
  • 以太坊

    以太坊(Ethereum)并不是一个机构,而是一款能够在区块链上实现智能合约、开源的底层系统。以太坊是一个平台和一种编程语言 Solidity,使开发人员能够建立和发布下一代去中心化应用。 以太坊可以用来编程、分散、担保和交易任何事物:投票、域名、金融交易所、众筹、公司管理、合同和知识产权等等。

    34 引用 • 367 回帖 • 1 关注
  • TextBundle

    TextBundle 文件格式旨在应用程序之间交换 Markdown 或 Fountain 之类的纯文本文件时,提供更无缝的用户体验。

    1 引用 • 2 回帖 • 52 关注
  • Vim

    Vim 是类 UNIX 系统文本编辑器 Vi 的加强版本,加入了更多特性来帮助编辑源代码。Vim 的部分增强功能包括文件比较(vimdiff)、语法高亮、全面的帮助系统、本地脚本(Vimscript)和便于选择的可视化模式。

    29 引用 • 66 回帖 • 2 关注
  • Eclipse

    Eclipse 是一个开放源代码的、基于 Java 的可扩展开发平台。就其本身而言,它只是一个框架和一组服务,用于通过插件组件构建开发环境。

    75 引用 • 258 回帖 • 623 关注
  • 导航

    各种网址链接、内容导航。

    42 引用 • 175 回帖
  • Hprose

    Hprose 是一款先进的轻量级、跨语言、跨平台、无侵入式、高性能动态远程对象调用引擎库。它不仅简单易用,而且功能强大。你无需专门学习,只需看上几眼,就能用它轻松构建分布式应用系统。

    9 引用 • 17 回帖 • 612 关注
  • WiFiDog

    WiFiDog 是一套开源的无线热点认证管理工具,主要功能包括:位置相关的内容递送;用户认证和授权;集中式网络监控。

    1 引用 • 7 回帖 • 592 关注
  • PWA

    PWA(Progressive Web App)是 Google 在 2015 年提出、2016 年 6 月开始推广的项目。它结合了一系列现代 Web 技术,在网页应用中实现和原生应用相近的用户体验。

    14 引用 • 69 回帖 • 159 关注
  • MySQL

    MySQL 是一个关系型数据库管理系统,由瑞典 MySQL AB 公司开发,目前属于 Oracle 公司。MySQL 是最流行的关系型数据库管理系统之一。

    692 引用 • 535 回帖
  • 服务器

    服务器,也称伺服器,是提供计算服务的设备。由于服务器需要响应服务请求,并进行处理,因此一般来说服务器应具备承担服务并且保障服务的能力。

    125 引用 • 588 回帖
  • WordPress

    WordPress 是一个使用 PHP 语言开发的博客平台,用户可以在支持 PHP 和 MySQL 数据库的服务器上架设自己的博客。也可以把 WordPress 当作一个内容管理系统(CMS)来使用。WordPress 是一个免费的开源项目,在 GNU 通用公共许可证(GPLv2)下授权发布。

    66 引用 • 114 回帖 • 223 关注
  • Openfire

    Openfire 是开源的、基于可拓展通讯和表示协议 (XMPP)、采用 Java 编程语言开发的实时协作服务器。Openfire 的效率很高,单台服务器可支持上万并发用户。

    6 引用 • 7 回帖 • 101 关注
  • InfluxDB

    InfluxDB 是一个开源的没有外部依赖的时间序列数据库。适用于记录度量,事件及实时分析。

    2 引用 • 76 关注
  • 又拍云

    又拍云是国内领先的 CDN 服务提供商,国家工信部认证通过的“可信云”,乌云众测平台认证的“安全云”,为移动时代的创业者提供新一代的 CDN 加速服务。

    21 引用 • 37 回帖 • 548 关注
  • Ruby

    Ruby 是一种开源的面向对象程序设计的服务器端脚本语言,在 20 世纪 90 年代中期由日本的松本行弘(まつもとゆきひろ/Yukihiro Matsumoto)设计并开发。在 Ruby 社区,松本也被称为马茨(Matz)。

    7 引用 • 31 回帖 • 216 关注
  • FreeMarker

    FreeMarker 是一款好用且功能强大的 Java 模版引擎。

    23 引用 • 20 回帖 • 465 关注
  • sts
    2 引用 • 2 回帖 • 197 关注
  • 职场

    找到自己的位置,萌新烦恼少。

    127 引用 • 1706 回帖
  • Redis

    Redis 是一个开源的使用 ANSI C 语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value 数据库,并提供多种语言的 API。从 2010 年 3 月 15 日起,Redis 的开发工作由 VMware 主持。从 2013 年 5 月开始,Redis 的开发由 Pivotal 赞助。

    286 引用 • 248 回帖 • 44 关注
  • Latke

    Latke 是一款以 JSON 为主的 Java Web 框架。

    71 引用 • 535 回帖 • 789 关注
  • PWL

    组织简介

    用爱发电 (Programming With Love) 是一个以开源精神为核心的民间开源爱好者技术组织,“用爱发电”象征开源与贡献精神,加入组织,代表你将遵守组织的“个人开源爱好者”的各项条款。申请加入:用爱发电组织邀请帖
    用爱发电组织官网:https://programmingwithlove.stackoverflow.wiki/

    用爱发电组织的核心驱动力:

    • 遵守开源守则,体现开源&贡献精神:以分享为目的,拒绝非法牟利。
    • 自我保护:使用适当的 License 保护自己的原创作品。
    • 尊重他人:不以各种理由、各种漏洞进行未经允许的抄袭、散播、洩露;以礼相待,尊重所有对社区做出贡献的开发者;通过他人的分享习得知识,要留下足迹,表示感谢。
    • 热爱编程、热爱学习:加入组织,热爱编程是首当其要的。我们欢迎热爱讨论、分享、提问的朋友,也同样欢迎默默成就的朋友。
    • 倾听:正确并恳切对待、处理问题与建议,及时修复开源项目的 Bug ,及时与反馈者沟通。不抬杠、不无视、不辱骂。
    • 平视:不诋毁、轻视、嘲讽其他开发者,主动提出建议、施以帮助,以和谐为本。只要他人肯努力,你也可能会被昔日小看的人所超越,所以请保持谦虚。
    • 乐观且活跃:你的努力决定了你的高度。不要放弃,多年后回头俯瞰,才会发现自己已经成就往日所仰望的水平。积极地将项目开源,帮助他人学习、改进,自己也会获得相应的提升、成就与成就感。
    1 引用 • 487 回帖 • 2 关注
  • CloudFoundry

    Cloud Foundry 是 VMware 推出的业界第一个开源 PaaS 云平台,它支持多种框架、语言、运行时环境、云平台及应用服务,使开发人员能够在几秒钟内进行应用程序的部署和扩展,无需担心任何基础架构的问题。

    5 引用 • 18 回帖 • 172 关注
  • OkHttp

    OkHttp 是一款 HTTP & HTTP/2 客户端库,专为 Android 和 Java 应用打造。

    16 引用 • 6 回帖 • 75 关注
  • Markdown

    Markdown 是一种轻量级标记语言,用户可使用纯文本编辑器来排版文档,最终通过 Markdown 引擎将文档转换为所需格式(比如 HTML、PDF 等)。

    167 引用 • 1520 回帖