rabbitMQ 路由模式 - 消费者筛选订阅

本贴最后更新于 1551 天前,其中的信息可能已经东海扬尘

生产者代码



import java.util.HashMap;
import java.util.Map;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 * mq路由-生产者
 * @author pengzai
 *
 */
public class RoutingSendUtil {

	private static final String EXCHANGE_NAME = "direct_pengzai";		//交换机名称
	private static final String EXCHANGE_TYPE = "direct";			//交换类型
	private static final String CHARSET = "UTF-8";					//字符集
	private static final Integer X_MESSAGE_TTL = 180*1000;			//消息超时(3分钟)
    private static final Integer X_EXPIRES = 600*1000;				//队列超时(10分钟)
    private static final Integer X_MAX_LENGTH = 1;					//长度限制(1条)
  
    private static ConnectionFactory factory = new ConnectionFactory();
    static{
    	factory.setHost("127.0.0.1");	//mq服务ip
    	factory.setPort(5672);				//mq服务端口
    	factory.setVirtualHost("/pengzai");	//mq虚拟主机
        factory.setUsername("admin");		//mq账户
        factory.setPassword("admin");		//mq密码
    }
  
    /**
     * 发送消息
     * @param sn  --设备sn号
     */
    public static void execute(String sn) {
  
    	String queueName = sn;	//队列名称
  
    	String routingKey = sn;	//路由key
  
        Connection connection = null;
        Channel channel = null;
        try {
            // 建立TCP连接
            connection = factory.newConnection();
            // 在TCP连接的基础上创建通道
            channel = connection.createChannel();
            // 声明一个direct交换机
            channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
  
            //创建(声明)队列
            Map<String,Object> params = new HashMap<String, Object>();
            params.put("x-message-ttl", X_MESSAGE_TTL);	//消息超时
            params.put("x-expires", X_EXPIRES);			//队列超时
            params.put("x-max-length", X_MAX_LENGTH);	//长度限制
            channel.queueDeclare(queueName, false, false, true, params);
  
            //绑定 队列-交换机-路由
            channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
  
            String message = "RoutingSend-" + System.currentTimeMillis();
            // 发送消息,并配置消息的路由键
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(CHARSET));
            System.out.println("发送成功");
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            try {
                // 空值判断,为了代码简洁略
                channel.close();
                connection.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
  
    public static void main(String[] args) {
    	execute("1212415450");
	}
}

消费者代码


import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
/**
 * mq路由-消费者
 * @author pengzai
 *
 */
public class RoutingRecvUtil {
	private static final String EXCHANGE_NAME = "direct_pengzai";		//交换机名称
	private static final String EXCHANGE_TYPE = "direct";			//交换类型
	private static final String CHARSET = "UTF-8";					//字符集
	private static final Integer X_MESSAGE_TTL = 180*1000;			//消息超时(3分钟)
    private static final Integer X_EXPIRES = 600*1000;				//队列超时(10分钟)
    private static final Integer X_MAX_LENGTH = 1;					//长度限制(1条)
  
    private static ConnectionFactory factory = new ConnectionFactory();
    static{
    	factory.setHost("127.0.0.1");	//mq服务ip
    	factory.setPort(5672);				//mq服务端口
    	factory.setVirtualHost("/pengzai");	//mq虚拟主机
        factory.setUsername("pengzai");			//mq账户
        factory.setPassword("pengzai");			//mq密码
    }
  
    /**
     * 接收mq消息
     * @param sn  --设备sn号
     */
    public static void execute(String sn){
  
    	String queueName = sn;	//队列名称
  
    	String routingKey = sn;	//路由key
  
        Connection connection = null;
        Channel channel = null;
        try {
            // 建立TCP连接
            connection = factory.newConnection();
            // 在TCP连接的基础上创建通道
            channel = connection.createChannel();
            // 声明一个direct交换机
            channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
  
            //创建(声明)队列
            Map<String,Object> params = new HashMap<String, Object>();
            params.put("x-message-ttl", X_MESSAGE_TTL);	//消息超时
            params.put("x-expires", X_EXPIRES);			//队列超时
            params.put("x-max-length", X_MAX_LENGTH);	//长度限制
            channel.queueDeclare(queueName, false, false, true, params);
  
            // 绑定队列,路由
            channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
  
            // 定义消息的回调处理类
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
  
            // 接收消息
            channel.basicConsume(queueName, true, queueingConsumer);
  
            System.out.println("等待接收消息......");
            while (true) {
            	Delivery delivery = queueingConsumer.nextDelivery();
            	String msg = new String(delivery.getBody(),CHARSET);
            	System.out.println("消息内容:"+msg);
           }
        }catch (Exception e){
        	e.printStackTrace();
        }finally {
        	try {
                channel.close();
                connection.close();
            } catch (Exception x) {
  
            }
		}
    }
  
    public static void main(String[] args) throws IOException {
    	execute("1212415450");
	}
}

  • RabbitMQ

    RabbitMQ 是一个开源的 AMQP 实现,服务器端用 Erlang 语言编写,支持多种语言客户端,如:Python、Ruby、.NET、Java、C、PHP、ActionScript 等。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

    49 引用 • 60 回帖 • 400 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • 周末

    星期六到星期天晚,实行五天工作制后,指每周的最后两天。再过几年可能就是三天了。

    14 引用 • 297 回帖
  • 机器学习

    机器学习(Machine Learning)是一门多领域交叉学科,涉及概率论、统计学、逼近论、凸分析、算法复杂度理论等多门学科。专门研究计算机怎样模拟或实现人类的学习行为,以获取新的知识或技能,重新组织已有的知识结构使之不断改善自身的性能。

    76 引用 • 37 回帖
  • Swift

    Swift 是苹果于 2014 年 WWDC(苹果开发者大会)发布的开发语言,可与 Objective-C 共同运行于 Mac OS 和 iOS 平台,用于搭建基于苹果平台的应用程序。

    34 引用 • 37 回帖 • 507 关注
  • Rust

    Rust 是一门赋予每个人构建可靠且高效软件能力的语言。Rust 由 Mozilla 开发,最早发布于 2014 年 9 月。

    58 引用 • 22 回帖 • 3 关注
  • RYMCU

    RYMCU 致力于打造一个即严谨又活泼、专业又不失有趣,为数百万人服务的开源嵌入式知识学习交流平台。

    4 引用 • 6 回帖 • 50 关注
  • BAE

    百度应用引擎(Baidu App Engine)提供了 PHP、Java、Python 的执行环境,以及云存储、消息服务、云数据库等全面的云服务。它可以让开发者实现自动地部署和管理应用,并且提供动态扩容和负载均衡的运行环境,让开发者不用考虑高成本的运维工作,只需专注于业务逻辑,大大降低了开发者学习和迁移的成本。

    19 引用 • 75 回帖 • 618 关注
  • ActiveMQ

    ActiveMQ 是 Apache 旗下的一款开源消息总线系统,它完整实现了 JMS 规范,是一个企业级的消息中间件。

    19 引用 • 13 回帖 • 644 关注
  • LaTeX

    LaTeX(音译“拉泰赫”)是一种基于 ΤΕΧ 的排版系统,由美国计算机学家莱斯利·兰伯特(Leslie Lamport)在 20 世纪 80 年代初期开发,利用这种格式,即使使用者没有排版和程序设计的知识也可以充分发挥由 TeX 所提供的强大功能,能在几天,甚至几小时内生成很多具有书籍质量的印刷品。对于生成复杂表格和数学公式,这一点表现得尤为突出。因此它非常适用于生成高印刷质量的科技和数学类文档。

    9 引用 • 32 回帖 • 152 关注
  • sts
    2 引用 • 2 回帖 • 162 关注
  • 一些有用的避坑指南。

    69 引用 • 93 回帖
  • Kotlin

    Kotlin 是一种在 Java 虚拟机上运行的静态类型编程语言,由 JetBrains 设计开发并开源。Kotlin 可以编译成 Java 字节码,也可以编译成 JavaScript,方便在没有 JVM 的设备上运行。在 Google I/O 2017 中,Google 宣布 Kotlin 成为 Android 官方开发语言。

    19 引用 • 33 回帖 • 43 关注
  • 服务器

    服务器,也称伺服器,是提供计算服务的设备。由于服务器需要响应服务请求,并进行处理,因此一般来说服务器应具备承担服务并且保障服务的能力。

    124 引用 • 580 回帖
  • HTML

    HTML5 是 HTML 下一个的主要修订版本,现在仍处于发展阶段。广义论及 HTML5 时,实际指的是包括 HTML、CSS 和 JavaScript 在内的一套技术组合。

    103 引用 • 294 回帖
  • gRpc
    10 引用 • 8 回帖 • 51 关注
  • 尊园地产

    昆明尊园房地产经纪有限公司,即:Kunming Zunyuan Property Agency Company Limited(简称“尊园地产”)于 2007 年 6 月开始筹备,2007 年 8 月 18 日正式成立,注册资本 200 万元,公司性质为股份经纪有限公司,主营业务为:代租、代售、代办产权过户、办理银行按揭、担保、抵押、评估等。

    1 引用 • 22 回帖 • 703 关注
  • 生活

    生活是指人类生存过程中的各项活动的总和,范畴较广,一般指为幸福的意义而存在。生活实际上是对人生的一种诠释。生活包括人类在社会中与自己息息相关的日常活动和心理影射。

    229 引用 • 1450 回帖
  • SOHO

    为成为自由职业者在家办公而努力吧!

    7 引用 • 55 回帖 • 65 关注
  • 正则表达式

    正则表达式(Regular Expression)使用单个字符串来描述、匹配一系列遵循某个句法规则的字符串。

    31 引用 • 94 回帖
  • 小薇

    小薇是一个用 Java 写的 QQ 聊天机器人 Web 服务,可以用于社群互动。

    由于 Smart QQ 从 2019 年 1 月 1 日起停止服务,所以该项目也已经停止维护了!

    34 引用 • 467 回帖 • 711 关注
  • Redis

    Redis 是一个开源的使用 ANSI C 语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value 数据库,并提供多种语言的 API。从 2010 年 3 月 15 日起,Redis 的开发工作由 VMware 主持。从 2013 年 5 月开始,Redis 的开发由 Pivotal 赞助。

    284 引用 • 247 回帖 • 148 关注
  • 博客

    记录并分享人生的经历。

    272 引用 • 2386 回帖
  • ReactiveX

    ReactiveX 是一个专注于异步编程与控制可观察数据(或者事件)流的 API。它组合了观察者模式,迭代器模式和函数式编程的优秀思想。

    1 引用 • 2 回帖 • 140 关注
  • 爬虫

    网络爬虫(Spider、Crawler),是一种按照一定的规则,自动地抓取万维网信息的程序。

    106 引用 • 275 回帖
  • Sym

    Sym 是一款用 Java 实现的现代化社区(论坛/BBS/社交网络/博客)系统平台。

    下一代的社区系统,为未来而构建

    524 引用 • 4599 回帖 • 689 关注
  • Flume

    Flume 是一套分布式的、可靠的,可用于有效地收集、聚合和搬运大量日志数据的服务架构。

    9 引用 • 6 回帖 • 608 关注
  • 新人

    让我们欢迎这对新人。哦,不好意思说错了,让我们欢迎这位新人!
    新手上路,请谨慎驾驶!

    51 引用 • 226 回帖
  • NetBeans

    NetBeans 是一个始于 1997 年的 Xelfi 计划,本身是捷克布拉格查理大学的数学及物理学院的学生计划。此计划延伸而成立了一家公司进而发展这个商用版本的 NetBeans IDE,直到 1999 年 Sun 买下此公司。Sun 于次年(2000 年)六月将 NetBeans IDE 开源,直到现在 NetBeans 的社群依然持续增长。

    78 引用 • 102 回帖 • 646 关注