rabbitMQ 路由模式 - 消费者筛选订阅

本贴最后更新于 1723 天前,其中的信息可能已经东海扬尘

生产者代码



import java.util.HashMap;
import java.util.Map;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 * mq路由-生产者
 * @author pengzai
 *
 */
public class RoutingSendUtil {

	private static final String EXCHANGE_NAME = "direct_pengzai";		//交换机名称
	private static final String EXCHANGE_TYPE = "direct";			//交换类型
	private static final String CHARSET = "UTF-8";					//字符集
	private static final Integer X_MESSAGE_TTL = 180*1000;			//消息超时(3分钟)
    private static final Integer X_EXPIRES = 600*1000;				//队列超时(10分钟)
    private static final Integer X_MAX_LENGTH = 1;					//长度限制(1条)
  
    private static ConnectionFactory factory = new ConnectionFactory();
    static{
    	factory.setHost("127.0.0.1");	//mq服务ip
    	factory.setPort(5672);				//mq服务端口
    	factory.setVirtualHost("/pengzai");	//mq虚拟主机
        factory.setUsername("admin");		//mq账户
        factory.setPassword("admin");		//mq密码
    }
  
    /**
     * 发送消息
     * @param sn  --设备sn号
     */
    public static void execute(String sn) {
  
    	String queueName = sn;	//队列名称
  
    	String routingKey = sn;	//路由key
  
        Connection connection = null;
        Channel channel = null;
        try {
            // 建立TCP连接
            connection = factory.newConnection();
            // 在TCP连接的基础上创建通道
            channel = connection.createChannel();
            // 声明一个direct交换机
            channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
  
            //创建(声明)队列
            Map<String,Object> params = new HashMap<String, Object>();
            params.put("x-message-ttl", X_MESSAGE_TTL);	//消息超时
            params.put("x-expires", X_EXPIRES);			//队列超时
            params.put("x-max-length", X_MAX_LENGTH);	//长度限制
            channel.queueDeclare(queueName, false, false, true, params);
  
            //绑定 队列-交换机-路由
            channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
  
            String message = "RoutingSend-" + System.currentTimeMillis();
            // 发送消息,并配置消息的路由键
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(CHARSET));
            System.out.println("发送成功");
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            try {
                // 空值判断,为了代码简洁略
                channel.close();
                connection.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
  
    public static void main(String[] args) {
    	execute("1212415450");
	}
}

消费者代码


import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
/**
 * mq路由-消费者
 * @author pengzai
 *
 */
public class RoutingRecvUtil {
	private static final String EXCHANGE_NAME = "direct_pengzai";		//交换机名称
	private static final String EXCHANGE_TYPE = "direct";			//交换类型
	private static final String CHARSET = "UTF-8";					//字符集
	private static final Integer X_MESSAGE_TTL = 180*1000;			//消息超时(3分钟)
    private static final Integer X_EXPIRES = 600*1000;				//队列超时(10分钟)
    private static final Integer X_MAX_LENGTH = 1;					//长度限制(1条)
  
    private static ConnectionFactory factory = new ConnectionFactory();
    static{
    	factory.setHost("127.0.0.1");	//mq服务ip
    	factory.setPort(5672);				//mq服务端口
    	factory.setVirtualHost("/pengzai");	//mq虚拟主机
        factory.setUsername("pengzai");			//mq账户
        factory.setPassword("pengzai");			//mq密码
    }
  
    /**
     * 接收mq消息
     * @param sn  --设备sn号
     */
    public static void execute(String sn){
  
    	String queueName = sn;	//队列名称
  
    	String routingKey = sn;	//路由key
  
        Connection connection = null;
        Channel channel = null;
        try {
            // 建立TCP连接
            connection = factory.newConnection();
            // 在TCP连接的基础上创建通道
            channel = connection.createChannel();
            // 声明一个direct交换机
            channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
  
            //创建(声明)队列
            Map<String,Object> params = new HashMap<String, Object>();
            params.put("x-message-ttl", X_MESSAGE_TTL);	//消息超时
            params.put("x-expires", X_EXPIRES);			//队列超时
            params.put("x-max-length", X_MAX_LENGTH);	//长度限制
            channel.queueDeclare(queueName, false, false, true, params);
  
            // 绑定队列,路由
            channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
  
            // 定义消息的回调处理类
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
  
            // 接收消息
            channel.basicConsume(queueName, true, queueingConsumer);
  
            System.out.println("等待接收消息......");
            while (true) {
            	Delivery delivery = queueingConsumer.nextDelivery();
            	String msg = new String(delivery.getBody(),CHARSET);
            	System.out.println("消息内容:"+msg);
           }
        }catch (Exception e){
        	e.printStackTrace();
        }finally {
        	try {
                channel.close();
                connection.close();
            } catch (Exception x) {
  
            }
		}
    }
  
    public static void main(String[] args) throws IOException {
    	execute("1212415450");
	}
}

  • RabbitMQ

    RabbitMQ 是一个开源的 AMQP 实现,服务器端用 Erlang 语言编写,支持多种语言客户端,如:Python、Ruby、.NET、Java、C、PHP、ActionScript 等。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

    49 引用 • 60 回帖 • 361 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • 周末

    星期六到星期天晚,实行五天工作制后,指每周的最后两天。再过几年可能就是三天了。

    14 引用 • 297 回帖
  • 新人

    让我们欢迎这对新人。哦,不好意思说错了,让我们欢迎这位新人!
    新手上路,请谨慎驾驶!

    52 引用 • 228 回帖
  • Telegram

    Telegram 是一个非盈利性、基于云端的即时消息服务。它提供了支持各大操作系统平台的开源的客户端,也提供了很多强大的 APIs 给开发者创建自己的客户端和机器人。

    5 引用 • 35 回帖
  • 国际化

    i18n(其来源是英文单词 internationalization 的首末字符 i 和 n,18 为中间的字符数)是“国际化”的简称。对程序来说,国际化是指在不修改代码的情况下,能根据不同语言及地区显示相应的界面。

    8 引用 • 26 回帖 • 1 关注
  • Windows

    Microsoft Windows 是美国微软公司研发的一套操作系统,它问世于 1985 年,起初仅仅是 Microsoft-DOS 模拟环境,后续的系统版本由于微软不断的更新升级,不但易用,也慢慢的成为家家户户人们最喜爱的操作系统。

    223 引用 • 474 回帖
  • Spring

    Spring 是一个开源框架,是于 2003 年兴起的一个轻量级的 Java 开发框架,由 Rod Johnson 在其著作《Expert One-On-One J2EE Development and Design》中阐述的部分理念和原型衍生而来。它是为了解决企业应用开发的复杂性而创建的。框架的主要优势之一就是其分层架构,分层架构允许使用者选择使用哪一个组件,同时为 JavaEE 应用程序开发提供集成的框架。

    943 引用 • 1460 回帖 • 3 关注
  • 职场

    找到自己的位置,萌新烦恼少。

    127 引用 • 1706 回帖
  • Hexo

    Hexo 是一款快速、简洁且高效的博客框架,使用 Node.js 编写。

    21 引用 • 140 回帖 • 3 关注
  • SSL

    SSL(Secure Sockets Layer 安全套接层),及其继任者传输层安全(Transport Layer Security,TLS)是为网络通信提供安全及数据完整性的一种安全协议。TLS 与 SSL 在传输层对网络连接进行加密。

    70 引用 • 193 回帖 • 418 关注
  • VirtualBox

    VirtualBox 是一款开源虚拟机软件,最早由德国 Innotek 公司开发,由 Sun Microsystems 公司出品的软件,使用 Qt 编写,在 Sun 被 Oracle 收购后正式更名成 Oracle VM VirtualBox。

    10 引用 • 2 回帖
  • Android

    Android 是一种以 Linux 为基础的开放源码操作系统,主要使用于便携设备。2005 年由 Google 收购注资,并拉拢多家制造商组成开放手机联盟开发改良,逐渐扩展到到平板电脑及其他领域上。

    334 引用 • 323 回帖 • 3 关注
  • 禅道

    禅道是一款国产的开源项目管理软件,她的核心管理思想基于敏捷方法 scrum,内置了产品管理和项目管理,同时又根据国内研发现状补充了测试管理、计划管理、发布管理、文档管理、事务管理等功能,在一个软件中就可以将软件研发中的需求、任务、bug、用例、计划、发布等要素有序的跟踪管理起来,完整地覆盖了项目管理的核心流程。

    5 引用 • 15 回帖 • 102 关注
  • 酷鸟浏览器

    安全 · 稳定 · 快速
    为跨境从业人员提供专业的跨境浏览器

    3 引用 • 59 回帖 • 26 关注
  • JSON

    JSON (JavaScript Object Notation)是一种轻量级的数据交换格式。易于人类阅读和编写。同时也易于机器解析和生成。

    52 引用 • 190 回帖 • 1 关注
  • Dubbo

    Dubbo 是一个分布式服务框架,致力于提供高性能和透明化的 RPC 远程服务调用方案,是 [阿里巴巴] SOA 服务化治理方案的核心框架,每天为 2,000+ 个服务提供 3,000,000,000+ 次访问量支持,并被广泛应用于阿里巴巴集团的各成员站点。

    60 引用 • 82 回帖 • 604 关注
  • abitmean

    有点意思就行了

    27 关注
  • 服务器

    服务器,也称伺服器,是提供计算服务的设备。由于服务器需要响应服务请求,并进行处理,因此一般来说服务器应具备承担服务并且保障服务的能力。

    125 引用 • 588 回帖
  • Mobi.css

    Mobi.css is a lightweight, flexible CSS framework that focus on mobile.

    1 引用 • 6 回帖 • 745 关注
  • Vue.js

    Vue.js(读音 /vju ː/,类似于 view)是一个构建数据驱动的 Web 界面库。Vue.js 的目标是通过尽可能简单的 API 实现响应的数据绑定和组合的视图组件。

    265 引用 • 666 回帖 • 1 关注
  • 持续集成

    持续集成(Continuous Integration)是一种软件开发实践,即团队开发成员经常集成他们的工作,通过每个成员每天至少集成一次,也就意味着每天可能会发生多次集成。每次集成都通过自动化的构建(包括编译,发布,自动化测试)来验证,从而尽早地发现集成错误。

    15 引用 • 7 回帖
  • API

    应用程序编程接口(Application Programming Interface)是一些预先定义的函数,目的是提供应用程序与开发人员基于某软件或硬件得以访问一组例程的能力,而又无需访问源码,或理解内部工作机制的细节。

    77 引用 • 430 回帖 • 1 关注
  • Swift

    Swift 是苹果于 2014 年 WWDC(苹果开发者大会)发布的开发语言,可与 Objective-C 共同运行于 Mac OS 和 iOS 平台,用于搭建基于苹果平台的应用程序。

    36 引用 • 37 回帖 • 535 关注
  • CodeMirror
    1 引用 • 2 回帖 • 129 关注
  • Docker

    Docker 是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的操作系统上。容器完全使用沙箱机制,几乎没有性能开销,可以很容易地在机器和数据中心中运行。

    492 引用 • 926 回帖
  • etcd

    etcd 是一个分布式、高可用的 key-value 数据存储,专门用于在分布式系统中保存关键数据。

    5 引用 • 26 回帖 • 528 关注
  • IDEA

    IDEA 全称 IntelliJ IDEA,是一款 Java 语言开发的集成环境,在业界被公认为最好的 Java 开发工具之一。IDEA 是 JetBrains 公司的产品,这家公司总部位于捷克共和国的首都布拉格,开发人员以严谨著称的东欧程序员为主。

    181 引用 • 400 回帖
  • B3log

    B3log 是一个开源组织,名字来源于“Bulletin Board Blog”缩写,目标是将独立博客与论坛结合,形成一种新的网络社区体验,详细请看 B3log 构思。目前 B3log 已经开源了多款产品:SymSoloVditor思源笔记

    1063 引用 • 3454 回帖 • 189 关注