RabbitMq 从零开始

yml

rabbitmq:
    host: localhost
    port: 7672
    username: guest
    password: guest
    publisher-confirms: true   #开启发送确认
    publisher-returns: true  #开启发送失败回退

    #开启ack
    listener:
      direct:
        acknowledge-mode: manual
      simple:
        acknowledge-mode: manual #采取手动应答
        #concurrency: 1 # 指定最小的消费者数量
        #max-concurrency: 1 #指定最大的消费者数量
        retry:
          enabled: true # 是否支持重试

配置文件

package com.yss.screenmonitor.mq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author tanglonglong \(--)/
 * @date 2021/2/22 17:29
 */
@Configuration
public class MqConfig {
    @Bean
    public Queue helloQueue(){
        Queue hello = new Queue("hello");
        return hello;
    }

    @Bean(name="getDirectExchangeTx")
    public DirectExchange getDirectExchangeTx(){
        return new DirectExchange("directExchangeTx", true, false);
    }

    @Bean(name="getQueueTx")
    public Queue getQueueTx(){
        return new Queue("directQueueTx", true, false, false);
    }
//每个queue绑定exchange,routing key,然后发送消息时候指定routing key,指定exchange.
//三种类型的Exchange:direct ,fanout和topic
    @Bean
    public Binding getDirectExchangeQueueTx(
            @Qualifier(value="getDirectExchangeTx") DirectExchange getDirectExchangeTx,
            @org.springframework.beans.factory.annotation.Qualifier(value="getQueueTx") Queue getQueueTx){
        return BindingBuilder.bind(getQueueTx).to(getDirectExchangeTx).with("directQueueTxRoutingKey");
    }
}

消费者

/**
 * @author tanglonglong \(--)/
 * @date 2021/2/22 17:27
 */
@Component
@RabbitListener(queues = "hello")
public class WatchHello {
    @RabbitHandler
    public void get(String str, Channel chennel, Message message) throws InterruptedException {
        try {
            chennel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
//            chennel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        } catch (
    IOException e) {
            e.printStackTrace();
        }
//        Thread.sleep(10);
        System.out.println(str+"1");
    }
}

生产者

package com.yss.screenmonitor.mq;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.Date;

/**
 * @author tanglonglong \(--)/
 * @date 2021/2/22 17:22
 */
@Service
public class MQUtil implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
//    @Autowired
//    private AmqpTemplate rabbitmqTemplate;
    @Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * PostConstruct: 用于在依赖关系注入完成之后需要执行的方法上,以执行任何初始化.
     */
    @PostConstruct
    public void init() {
        //指定 ConfirmCallback
        rabbitTemplate.setConfirmCallback(this);
        //指定 ReturnCallback
        rabbitTemplate.setReturnCallback(this);
    }
    public void send(String msg) throws InterruptedException {
        String content = msg+ new Date();
        for (int i = 0; i < 1000; i++) {
        Thread.sleep(20);
            this.rabbitTemplate.convertAndSend("hello", (Object)content,new CorrelationData(Integer.valueOf(i).toString()));
            this.rabbitTemplate.convertAndSend("directExchangeTx", "directQueueTxRoutingKey", (Object)content,new CorrelationData(Integer.valueOf(i).toString()));
        }
    }

    /**
     * Confirmation callback.
     * 有没有到交换机
     *
     * @param correlationData correlation data for the callback.
     * @param ack true for ack, false for nack
     * @param cause An optional cause, for nack, when available, otherwise null.
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData.getId();
        System.out.println(id+ack+cause);
    }

    /**
     * Returned message callback.
     * 有没有从交换机到queue,到了不调用
     * @param message the returned message.
     * @param replyCode the reply code.
     * @param replyText the reply text.
     * @param exchange the exchange.
     * @param routingKey the routing key.
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println(message.getMessageProperties().toString()+routingKey+replyText+exchange+routingKey);
    }
}
  • RabbitMQ

    RabbitMQ 是一个开源的 AMQP 实现,服务器端用 Erlang 语言编写,支持多种语言客户端,如:Python、Ruby、.NET、Java、C、PHP、ActionScript 等。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

    47 引用 • 60 回帖 • 505 关注
  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    2884 引用 • 8096 回帖 • 691 关注

赞助商 我要投放

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...