rabbitMQ 路由模式 - 消费者筛选订阅

本贴最后更新于 1692 天前,其中的信息可能已经东海扬尘

生产者代码



import java.util.HashMap;
import java.util.Map;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 * mq路由-生产者
 * @author pengzai
 *
 */
public class RoutingSendUtil {

	private static final String EXCHANGE_NAME = "direct_pengzai";		//交换机名称
	private static final String EXCHANGE_TYPE = "direct";			//交换类型
	private static final String CHARSET = "UTF-8";					//字符集
	private static final Integer X_MESSAGE_TTL = 180*1000;			//消息超时(3分钟)
    private static final Integer X_EXPIRES = 600*1000;				//队列超时(10分钟)
    private static final Integer X_MAX_LENGTH = 1;					//长度限制(1条)
  
    private static ConnectionFactory factory = new ConnectionFactory();
    static{
    	factory.setHost("127.0.0.1");	//mq服务ip
    	factory.setPort(5672);				//mq服务端口
    	factory.setVirtualHost("/pengzai");	//mq虚拟主机
        factory.setUsername("admin");		//mq账户
        factory.setPassword("admin");		//mq密码
    }
  
    /**
     * 发送消息
     * @param sn  --设备sn号
     */
    public static void execute(String sn) {
  
    	String queueName = sn;	//队列名称
  
    	String routingKey = sn;	//路由key
  
        Connection connection = null;
        Channel channel = null;
        try {
            // 建立TCP连接
            connection = factory.newConnection();
            // 在TCP连接的基础上创建通道
            channel = connection.createChannel();
            // 声明一个direct交换机
            channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
  
            //创建(声明)队列
            Map<String,Object> params = new HashMap<String, Object>();
            params.put("x-message-ttl", X_MESSAGE_TTL);	//消息超时
            params.put("x-expires", X_EXPIRES);			//队列超时
            params.put("x-max-length", X_MAX_LENGTH);	//长度限制
            channel.queueDeclare(queueName, false, false, true, params);
  
            //绑定 队列-交换机-路由
            channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
  
            String message = "RoutingSend-" + System.currentTimeMillis();
            // 发送消息,并配置消息的路由键
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(CHARSET));
            System.out.println("发送成功");
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            try {
                // 空值判断,为了代码简洁略
                channel.close();
                connection.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
  
    public static void main(String[] args) {
    	execute("1212415450");
	}
}

消费者代码


import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
/**
 * mq路由-消费者
 * @author pengzai
 *
 */
public class RoutingRecvUtil {
	private static final String EXCHANGE_NAME = "direct_pengzai";		//交换机名称
	private static final String EXCHANGE_TYPE = "direct";			//交换类型
	private static final String CHARSET = "UTF-8";					//字符集
	private static final Integer X_MESSAGE_TTL = 180*1000;			//消息超时(3分钟)
    private static final Integer X_EXPIRES = 600*1000;				//队列超时(10分钟)
    private static final Integer X_MAX_LENGTH = 1;					//长度限制(1条)
  
    private static ConnectionFactory factory = new ConnectionFactory();
    static{
    	factory.setHost("127.0.0.1");	//mq服务ip
    	factory.setPort(5672);				//mq服务端口
    	factory.setVirtualHost("/pengzai");	//mq虚拟主机
        factory.setUsername("pengzai");			//mq账户
        factory.setPassword("pengzai");			//mq密码
    }
  
    /**
     * 接收mq消息
     * @param sn  --设备sn号
     */
    public static void execute(String sn){
  
    	String queueName = sn;	//队列名称
  
    	String routingKey = sn;	//路由key
  
        Connection connection = null;
        Channel channel = null;
        try {
            // 建立TCP连接
            connection = factory.newConnection();
            // 在TCP连接的基础上创建通道
            channel = connection.createChannel();
            // 声明一个direct交换机
            channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
  
            //创建(声明)队列
            Map<String,Object> params = new HashMap<String, Object>();
            params.put("x-message-ttl", X_MESSAGE_TTL);	//消息超时
            params.put("x-expires", X_EXPIRES);			//队列超时
            params.put("x-max-length", X_MAX_LENGTH);	//长度限制
            channel.queueDeclare(queueName, false, false, true, params);
  
            // 绑定队列,路由
            channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
  
            // 定义消息的回调处理类
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
  
            // 接收消息
            channel.basicConsume(queueName, true, queueingConsumer);
  
            System.out.println("等待接收消息......");
            while (true) {
            	Delivery delivery = queueingConsumer.nextDelivery();
            	String msg = new String(delivery.getBody(),CHARSET);
            	System.out.println("消息内容:"+msg);
           }
        }catch (Exception e){
        	e.printStackTrace();
        }finally {
        	try {
                channel.close();
                connection.close();
            } catch (Exception x) {
  
            }
		}
    }
  
    public static void main(String[] args) throws IOException {
    	execute("1212415450");
	}
}

  • RabbitMQ

    RabbitMQ 是一个开源的 AMQP 实现,服务器端用 Erlang 语言编写,支持多种语言客户端,如:Python、Ruby、.NET、Java、C、PHP、ActionScript 等。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

    49 引用 • 60 回帖 • 362 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • Laravel

    Laravel 是一套简洁、优雅的 PHP Web 开发框架。它采用 MVC 设计,是一款崇尚开发效率的全栈框架。

    20 引用 • 23 回帖 • 721 关注
  • 数据库

    据说 99% 的性能瓶颈都在数据库。

    340 引用 • 708 回帖
  • 微服务

    微服务架构是一种架构模式,它提倡将单一应用划分成一组小的服务。服务之间互相协调,互相配合,为用户提供最终价值。每个服务运行在独立的进程中。服务于服务之间才用轻量级的通信机制互相沟通。每个服务都围绕着具体业务构建,能够被独立的部署。

    96 引用 • 155 回帖 • 1 关注
  • sts
    2 引用 • 2 回帖 • 196 关注
  • C++

    C++ 是在 C 语言的基础上开发的一种通用编程语言,应用广泛。C++ 支持多种编程范式,面向对象编程、泛型编程和过程化编程。

    107 引用 • 153 回帖 • 1 关注
  • 机器学习

    机器学习(Machine Learning)是一门多领域交叉学科,涉及概率论、统计学、逼近论、凸分析、算法复杂度理论等多门学科。专门研究计算机怎样模拟或实现人类的学习行为,以获取新的知识或技能,重新组织已有的知识结构使之不断改善自身的性能。

    83 引用 • 37 回帖
  • Hprose

    Hprose 是一款先进的轻量级、跨语言、跨平台、无侵入式、高性能动态远程对象调用引擎库。它不仅简单易用,而且功能强大。你无需专门学习,只需看上几眼,就能用它轻松构建分布式应用系统。

    9 引用 • 17 回帖 • 610 关注
  • Solo

    Solo 是一款小而美的开源博客系统,专为程序员设计。Solo 有着非常活跃的社区,可将文章作为帖子推送到社区,来自社区的回帖将作为博客评论进行联动(具体细节请浏览 B3log 构思 - 分布式社区网络)。

    这是一种全新的网络社区体验,让热爱记录和分享的你不再感到孤单!

    1434 引用 • 10054 回帖 • 490 关注
  • 正则表达式

    正则表达式(Regular Expression)使用单个字符串来描述、匹配一系列遵循某个句法规则的字符串。

    31 引用 • 94 回帖
  • DNSPod

    DNSPod 建立于 2006 年 3 月份,是一款免费智能 DNS 产品。 DNSPod 可以为同时有电信、网通、教育网服务器的网站提供智能的解析,让电信用户访问电信的服务器,网通的用户访问网通的服务器,教育网的用户访问教育网的服务器,达到互联互通的效果。

    6 引用 • 26 回帖 • 510 关注
  • OpenStack

    OpenStack 是一个云操作系统,通过数据中心可控制大型的计算、存储、网络等资源池。所有的管理通过前端界面管理员就可以完成,同样也可以通过 Web 接口让最终用户部署资源。

    10 引用 • 4 关注
  • 人工智能

    人工智能(Artificial Intelligence)是研究、开发用于模拟、延伸和扩展人的智能的理论、方法、技术及应用系统的一门技术科学。

    132 引用 • 189 回帖
  • 爬虫

    网络爬虫(Spider、Crawler),是一种按照一定的规则,自动地抓取万维网信息的程序。

    106 引用 • 275 回帖
  • Flutter

    Flutter 是谷歌的移动 UI 框架,可以快速在 iOS 和 Android 上构建高质量的原生用户界面。 Flutter 可以与现有的代码一起工作,它正在被越来越多的开发者和组织使用,并且 Flutter 是完全免费、开源的。

    39 引用 • 92 回帖 • 1 关注
  • Jenkins

    Jenkins 是一套开源的持续集成工具。它提供了非常丰富的插件,让构建、部署、自动化集成项目变得简单易用。

    53 引用 • 37 回帖
  • CongSec

    本标签主要用于分享网络空间安全专业的学习笔记

    1 引用 • 1 回帖 • 9 关注
  • NGINX

    NGINX 是一个高性能的 HTTP 和反向代理服务器,也是一个 IMAP/POP3/SMTP 代理服务器。 NGINX 是由 Igor Sysoev 为俄罗斯访问量第二的 Rambler.ru 站点开发的,第一个公开版本 0.1.0 发布于 2004 年 10 月 4 日。

    311 引用 • 546 回帖
  • 创业

    你比 99% 的人都优秀么?

    84 引用 • 1399 回帖 • 1 关注
  • 分享

    有什么新发现就分享给大家吧!

    248 引用 • 1792 回帖
  • jsDelivr

    jsDelivr 是一个开源的 CDN 服务,可为 npm 包、GitHub 仓库提供免费、快速并且可靠的全球 CDN 加速服务。

    5 引用 • 31 回帖 • 57 关注
  • 周末

    星期六到星期天晚,实行五天工作制后,指每周的最后两天。再过几年可能就是三天了。

    14 引用 • 297 回帖 • 2 关注
  • 心情

    心是产生任何想法的源泉,心本体会陷入到对自己本体不能理解的状态中,因为心能产生任何想法,不能分出对错,不能分出自己。

    59 引用 • 369 回帖
  • TGIF

    Thank God It's Friday! 感谢老天,总算到星期五啦!

    287 引用 • 4484 回帖 • 667 关注
  • Caddy

    Caddy 是一款默认自动启用 HTTPS 的 HTTP/2 Web 服务器。

    12 引用 • 54 回帖 • 166 关注
  • CSS

    CSS(Cascading Style Sheet)“层叠样式表”是用于控制网页样式并允许将样式信息与网页内容分离的一种标记性语言。

    198 引用 • 550 回帖
  • Hadoop

    Hadoop 是由 Apache 基金会所开发的一个分布式系统基础架构。用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利用集群的威力进行高速运算和存储。

    86 引用 • 122 回帖 • 625 关注
  • CAP

    CAP 指的是在一个分布式系统中, Consistency(一致性)、 Availability(可用性)、Partition tolerance(分区容错性),三者不可兼得。

    11 引用 • 5 回帖 • 607 关注