yml
rabbitmq: host: localhost port: 7672 username: guest password: guest publisher-confirms: true #开启发送确认 publisher-returns: true #开启发送失败回退 #开启ack listener: direct: acknowledge-mode: manual simple: acknowledge-mode: manual #采取手动应答 #concurrency: 1 # 指定最小的消费者数量 #max-concurrency: 1 #指定最大的消费者数量 retry: enabled: true # 是否支持重试
配置文件
package com.yss.screenmonitor.mq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author tanglonglong \(--)/
* @date 2021/2/22 17:29
*/
@Configuration
public class MqConfig {
@Bean
public Queue helloQueue(){
Queue hello = new Queue("hello");
return hello;
}
@Bean(name="getDirectExchangeTx")
public DirectExchange getDirectExchangeTx(){
return new DirectExchange("directExchangeTx", true, false);
}
@Bean(name="getQueueTx")
public Queue getQueueTx(){
return new Queue("directQueueTx", true, false, false);
}
//每个queue绑定exchange,routing key,然后发送消息时候指定routing key,指定exchange.
//三种类型的Exchange:direct ,fanout和topic
@Bean
public Binding getDirectExchangeQueueTx(
@Qualifier(value="getDirectExchangeTx") DirectExchange getDirectExchangeTx,
@org.springframework.beans.factory.annotation.Qualifier(value="getQueueTx") Queue getQueueTx){
return BindingBuilder.bind(getQueueTx).to(getDirectExchangeTx).with("directQueueTxRoutingKey");
}
}
消费者
/** * @author tanglonglong \(--)/ * @date 2021/2/22 17:27 */ @Component @RabbitListener(queues = "hello") public class WatchHello { @RabbitHandler public void get(String str, Channel chennel, Message message) throws InterruptedException { try { chennel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); // chennel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch ( IOException e) { e.printStackTrace(); } // Thread.sleep(10); System.out.println(str+"1"); } }
生产者
package com.yss.screenmonitor.mq; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.Date; /** * @author tanglonglong \(--)/ * @date 2021/2/22 17:22 */ @Service public class MQUtil implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { // @Autowired // private AmqpTemplate rabbitmqTemplate; @Autowired RabbitTemplate rabbitTemplate; /** * PostConstruct: 用于在依赖关系注入完成之后需要执行的方法上,以执行任何初始化. */ @PostConstruct public void init() { //指定 ConfirmCallback rabbitTemplate.setConfirmCallback(this); //指定 ReturnCallback rabbitTemplate.setReturnCallback(this); } public void send(String msg) throws InterruptedException { String content = msg+ new Date(); for (int i = 0; i < 1000; i++) { Thread.sleep(20); this.rabbitTemplate.convertAndSend("hello", (Object)content,new CorrelationData(Integer.valueOf(i).toString())); this.rabbitTemplate.convertAndSend("directExchangeTx", "directQueueTxRoutingKey", (Object)content,new CorrelationData(Integer.valueOf(i).toString())); } } /** * Confirmation callback. * 有没有到交换机 * * @param correlationData correlation data for the callback. * @param ack true for ack, false for nack * @param cause An optional cause, for nack, when available, otherwise null. */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String id = correlationData.getId(); System.out.println(id+ack+cause); } /** * Returned message callback. * 有没有从交换机到queue,到了不调用 * @param message the returned message. * @param replyCode the reply code. * @param replyText the reply text. * @param exchange the exchange. * @param routingKey the routing key. */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println(message.getMessageProperties().toString()+routingKey+replyText+exchange+routingKey); } }
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于