rabbitMQ 路由模式 - 消费者筛选订阅

本贴最后更新于 1861 天前,其中的信息可能已经东海扬尘

生产者代码

import java.util.HashMap; import java.util.Map; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * mq路由-生产者 * @author pengzai * */ public class RoutingSendUtil { private static final String EXCHANGE_NAME = "direct_pengzai"; //交换机名称 private static final String EXCHANGE_TYPE = "direct"; //交换类型 private static final String CHARSET = "UTF-8"; //字符集 private static final Integer X_MESSAGE_TTL = 180*1000; //消息超时(3分钟) private static final Integer X_EXPIRES = 600*1000; //队列超时(10分钟) private static final Integer X_MAX_LENGTH = 1; //长度限制(1条) private static ConnectionFactory factory = new ConnectionFactory(); static{ factory.setHost("127.0.0.1"); //mq服务ip factory.setPort(5672); //mq服务端口 factory.setVirtualHost("/pengzai"); //mq虚拟主机 factory.setUsername("admin"); //mq账户 factory.setPassword("admin"); //mq密码 } /** * 发送消息 * @param sn --设备sn号 */ public static void execute(String sn) { String queueName = sn; //队列名称 String routingKey = sn; //路由key Connection connection = null; Channel channel = null; try { // 建立TCP连接 connection = factory.newConnection(); // 在TCP连接的基础上创建通道 channel = connection.createChannel(); // 声明一个direct交换机 channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE); //创建(声明)队列 Map<String,Object> params = new HashMap<String, Object>(); params.put("x-message-ttl", X_MESSAGE_TTL); //消息超时 params.put("x-expires", X_EXPIRES); //队列超时 params.put("x-max-length", X_MAX_LENGTH); //长度限制 channel.queueDeclare(queueName, false, false, true, params); //绑定 队列-交换机-路由 channel.queueBind(queueName, EXCHANGE_NAME, routingKey); String message = "RoutingSend-" + System.currentTimeMillis(); // 发送消息,并配置消息的路由键 channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(CHARSET)); System.out.println("发送成功"); }catch (Exception e){ e.printStackTrace(); }finally { try { // 空值判断,为了代码简洁略 channel.close(); connection.close(); } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args) { execute("1212415450"); } }

消费者代码

import java.io.IOException; import java.util.HashMap; import java.util.Map; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; /** * mq路由-消费者 * @author pengzai * */ public class RoutingRecvUtil { private static final String EXCHANGE_NAME = "direct_pengzai"; //交换机名称 private static final String EXCHANGE_TYPE = "direct"; //交换类型 private static final String CHARSET = "UTF-8"; //字符集 private static final Integer X_MESSAGE_TTL = 180*1000; //消息超时(3分钟) private static final Integer X_EXPIRES = 600*1000; //队列超时(10分钟) private static final Integer X_MAX_LENGTH = 1; //长度限制(1条) private static ConnectionFactory factory = new ConnectionFactory(); static{ factory.setHost("127.0.0.1"); //mq服务ip factory.setPort(5672); //mq服务端口 factory.setVirtualHost("/pengzai"); //mq虚拟主机 factory.setUsername("pengzai"); //mq账户 factory.setPassword("pengzai"); //mq密码 } /** * 接收mq消息 * @param sn --设备sn号 */ public static void execute(String sn){ String queueName = sn; //队列名称 String routingKey = sn; //路由key Connection connection = null; Channel channel = null; try { // 建立TCP连接 connection = factory.newConnection(); // 在TCP连接的基础上创建通道 channel = connection.createChannel(); // 声明一个direct交换机 channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE); //创建(声明)队列 Map<String,Object> params = new HashMap<String, Object>(); params.put("x-message-ttl", X_MESSAGE_TTL); //消息超时 params.put("x-expires", X_EXPIRES); //队列超时 params.put("x-max-length", X_MAX_LENGTH); //长度限制 channel.queueDeclare(queueName, false, false, true, params); // 绑定队列,路由 channel.queueBind(queueName, EXCHANGE_NAME, routingKey); // 定义消息的回调处理类 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); // 接收消息 channel.basicConsume(queueName, true, queueingConsumer); System.out.println("等待接收消息......"); while (true) { Delivery delivery = queueingConsumer.nextDelivery(); String msg = new String(delivery.getBody(),CHARSET); System.out.println("消息内容:"+msg); } }catch (Exception e){ e.printStackTrace(); }finally { try { channel.close(); connection.close(); } catch (Exception x) { } } } public static void main(String[] args) throws IOException { execute("1212415450"); } }
  • RabbitMQ

    RabbitMQ 是一个开源的 AMQP 实现,服务器端用 Erlang 语言编写,支持多种语言客户端,如:Python、Ruby、.NET、Java、C、PHP、ActionScript 等。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

    49 引用 • 60 回帖 • 345 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • uTools

    uTools 是一个极简、插件化、跨平台的现代桌面软件。通过自由选配丰富的插件,打造你得心应手的工具集合。

    7 引用 • 27 回帖 • 1 关注
  • Dubbo

    Dubbo 是一个分布式服务框架,致力于提供高性能和透明化的 RPC 远程服务调用方案,是 [阿里巴巴] SOA 服务化治理方案的核心框架,每天为 2,000+ 个服务提供 3,000,000,000+ 次访问量支持,并被广泛应用于阿里巴巴集团的各成员站点。

    60 引用 • 82 回帖 • 613 关注
  • CloudFoundry

    Cloud Foundry 是 VMware 推出的业界第一个开源 PaaS 云平台,它支持多种框架、语言、运行时环境、云平台及应用服务,使开发人员能够在几秒钟内进行应用程序的部署和扩展,无需担心任何基础架构的问题。

    5 引用 • 18 回帖 • 181 关注
  • 安装

    你若安好,便是晴天。

    132 引用 • 1184 回帖
  • 工具

    子曰:“工欲善其事,必先利其器。”

    298 引用 • 763 回帖
  • RabbitMQ

    RabbitMQ 是一个开源的 AMQP 实现,服务器端用 Erlang 语言编写,支持多种语言客户端,如:Python、Ruby、.NET、Java、C、PHP、ActionScript 等。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

    49 引用 • 60 回帖 • 345 关注
  • ZeroNet

    ZeroNet 是一个基于比特币加密技术和 BT 网络技术的去中心化的、开放开源的网络和交流系统。

    1 引用 • 21 回帖 • 647 关注
  • 以太坊

    以太坊(Ethereum)并不是一个机构,而是一款能够在区块链上实现智能合约、开源的底层系统。以太坊是一个平台和一种编程语言 Solidity,使开发人员能够建立和发布下一代去中心化应用。 以太坊可以用来编程、分散、担保和交易任何事物:投票、域名、金融交易所、众筹、公司管理、合同和知识产权等等。

    34 引用 • 367 回帖
  • Electron

    Electron 基于 Chromium 和 Node.js,让你可以使用 HTML、CSS 和 JavaScript 构建应用。它是一个由 GitHub 及众多贡献者组成的活跃社区共同维护的开源项目,兼容 Mac、Windows 和 Linux,它构建的应用可在这三个操作系统上面运行。

    15 引用 • 136 回帖 • 4 关注
  • Flutter

    Flutter 是谷歌的移动 UI 框架,可以快速在 iOS 和 Android 上构建高质量的原生用户界面。 Flutter 可以与现有的代码一起工作,它正在被越来越多的开发者和组织使用,并且 Flutter 是完全免费、开源的。

    39 引用 • 92 回帖 • 3 关注
  • SpaceVim

    SpaceVim 是一个社区驱动的模块化 vim/neovim 配置集合,以模块的方式组织管理插件以
    及相关配置,为不同的语言开发量身定制了相关的开发模块,该模块提供代码自动补全,
    语法检查、格式化、调试、REPL 等特性。用户仅需载入相关语言的模块即可得到一个开箱
    即用的 Vim-IDE。

    3 引用 • 31 回帖 • 113 关注
  • 服务

    提供一个服务绝不仅仅是简单的把硬件和软件累加在一起,它包括了服务的可靠性、服务的标准化、以及对服务的监控、维护、技术支持等。

    41 引用 • 24 回帖 • 1 关注
  • 音乐

    你听到信仰的声音了么?

    62 引用 • 512 回帖
  • C++

    C++ 是在 C 语言的基础上开发的一种通用编程语言,应用广泛。C++ 支持多种编程范式,面向对象编程、泛型编程和过程化编程。

    107 引用 • 153 回帖
  • 游戏

    沉迷游戏伤身,强撸灰飞烟灭。

    181 引用 • 821 回帖
  • WebClipper

    Web Clipper 是一款浏览器剪藏扩展,它可以帮助你把网页内容剪藏到本地。

    3 引用 • 9 回帖
  • JWT

    JWT(JSON Web Token)是一种用于双方之间传递信息的简洁的、安全的表述性声明规范。JWT 作为一个开放的标准(RFC 7519),定义了一种简洁的,自包含的方法用于通信双方之间以 JSON 的形式安全的传递信息。

    20 引用 • 15 回帖 • 22 关注
  • 禅道

    禅道是一款国产的开源项目管理软件,她的核心管理思想基于敏捷方法 scrum,内置了产品管理和项目管理,同时又根据国内研发现状补充了测试管理、计划管理、发布管理、文档管理、事务管理等功能,在一个软件中就可以将软件研发中的需求、任务、bug、用例、计划、发布等要素有序的跟踪管理起来,完整地覆盖了项目管理的核心流程。

    6 引用 • 15 回帖 • 12 关注
  • 知乎

    知乎是网络问答社区,连接各行各业的用户。用户分享着彼此的知识、经验和见解,为中文互联网源源不断地提供多种多样的信息。

    10 引用 • 66 回帖
  • JetBrains

    JetBrains 是一家捷克的软件开发公司,该公司位于捷克的布拉格,并在俄国的圣彼得堡及美国麻州波士顿都设有办公室,该公司最为人所熟知的产品是 Java 编程语言开发撰写时所用的集成开发环境:IntelliJ IDEA

    18 引用 • 54 回帖 • 1 关注
  • Logseq

    Logseq 是一个隐私优先、开源的知识库工具。

    Logseq is a joyful, open-source outliner that works on top of local plain-text Markdown and Org-mode files. Use it to write, organize and share your thoughts, keep your to-do list, and build your own digital garden.

    7 引用 • 69 回帖 • 1 关注
  • VirtualBox

    VirtualBox 是一款开源虚拟机软件,最早由德国 Innotek 公司开发,由 Sun Microsystems 公司出品的软件,使用 Qt 编写,在 Sun 被 Oracle 收购后正式更名成 Oracle VM VirtualBox。

    10 引用 • 2 回帖 • 18 关注
  • 倾城之链
    23 引用 • 66 回帖 • 168 关注
  • 导航

    各种网址链接、内容导航。

    44 引用 • 177 回帖
  • V2EX

    V2EX 是创意工作者们的社区。这里目前汇聚了超过 400,000 名主要来自互联网行业、游戏行业和媒体行业的创意工作者。V2EX 希望能够成为创意工作者们的生活和事业的一部分。

    16 引用 • 236 回帖 • 262 关注
  • Sillot

    Insights(注意当前设置 master 为默认分支)

    汐洛彖夲肜矩阵(Sillot T☳Converbenk Matrix),致力于服务智慧新彖乄,具有彖乄驱动、极致优雅、开发者友好的特点。其中汐洛绞架(Sillot-Gibbet)基于自思源笔记(siyuan-note),前身是思源笔记汐洛版(更早是思源笔记汐洛分支),是智慧新录乄终端(多端融合,移动端优先)。

    主仓库地址:Hi-Windom/Sillot

    文档地址:sillot.db.sc.cn

    注意事项:

    1. ⚠️ 汐洛仍在早期开发阶段,尚不稳定
    2. ⚠️ 汐洛并非面向普通用户设计,使用前请了解风险
    3. ⚠️ 汐洛绞架基于思源笔记,开发者尽最大努力与思源笔记保持兼容,但无法实现 100% 兼容
    29 引用 • 25 回帖 • 116 关注
  • QQ

    1999 年 2 月腾讯正式推出“腾讯 QQ”,在线用户由 1999 年的 2 人(马化腾和张志东)到现在已经发展到上亿用户了,在线人数超过一亿,是目前使用最广泛的聊天软件之一。

    45 引用 • 557 回帖 • 1 关注