Java整合rabbitMQ实现死信队列
这里依旧使用 springboot
框架,同时使用spring提供的rabbitmq-starter来实现rabbitMQ死信队列。
pom依赖
在项目依赖文件 pom.xml
添加下面的依赖
1 2 3 4 5 6
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>3.0.1</version> </dependency>
|
配置rabbitmq信息
1 2 3 4 5 6 7 8
| spring: rabbitmq: password: admin publisher-returns: true publisher-confirm-type: correlated host: 127.0.0.1 port: 5672 username: admin
|
消息可靠性保障
消息可靠性需要在生产者与消费者两个地方进行配置,这里仅配置了生产者确认,后面单独开文章写一下消费者手动确认与重试机制的问题。
这里将 publisher-returns
和 publisher-confirm-type
配置了一下,保证在生产者产生消息之后消息中间件可以给程序反馈,确保生产者不丢消息(或者说,在生产者丢消息之后能够进行处理)。
在配置类里面配置回调逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| @Bean public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause); } else { log.error("消息发送失败:correlationData({}),ack({}),cause({})", correlationData, ack, cause); } } });
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override public void returnedMessage(ReturnedMessage returned) {
log.info("ReturnsCallback returned : {}", returned); } });
return rabbitTemplate; }
|
通过 @Bean
注解把 RabbitTemplate
交给spring容器管理,配置确认逻辑。
绑定消息交换机和消息队列
在绑定之前,我们需要首先进行创建,创建若干个“业务”消息队列,同时配置私信队列,便于消息中间件在处理失败的过程中能够可以进行“兜底”处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @Bean public Exchange deadLetterExchange() { return ExchangeBuilder .topicExchange("dead-letter-exchange") .durable(true) .build(); }
@Bean public Queue deadLetterQueue() { return QueueBuilder .durable("dead-letter-queue") .build(); }
|
这里同样使用 @Bean
注解交给Spring容器管理,然后我们对上述内容执行绑定。
1 2 3 4 5 6 7 8 9
| @Bean public Binding userDeadLetterBinding() { return BindingBuilder .bind(deadLetterQueue()) .to(deadLetterExchange()) .with("dead-letter-routing-key") .noargs(); }
|
with中的参数为routingKey,可以进行自定义,在转入死信队列的时候会携带这个参数。
这里使用了topic类型的交换机,具体信息暂时挖个坑,后面更新文章填坑吧。
1 2 3 4
| @Bean TopicExchange exchange() { return new TopicExchange("topic-ex"); }
|
1 2 3 4 5 6 7 8 9
| @Bean public Queue firstQueue() { return QueueBuilder.durable("first-queue") .withArgument("x-dead-letter-exchange", "dead-letter-exchange") .withArgument("x-dead-letter-routing-key", "dead-letter-routing-key") .build(); }
|
1 2 3 4
| @Bean Binding bindingExchangeMessage() { return BindingBuilder.bind(firstQueue()).to(exchange()).with("first-queue"); }
|
这里注意,由于是topic交换机,需要保证routingKey与消息队列的一致,才能正确把消息投入到指定的队列。
生产者发送消息
1 2 3 4 5 6 7 8
| @RequestMapping("/test") public String test() { String messageData = "message: rabbitmq message"; Map<String, Object> manMap = new HashMap<>(); manMap.put("messageData", messageData); rabbitTemplate.convertAndSend("topic-ex", "first-queue", manMap, new CorrelationData("1001")); return "ok"; }
|
访问对应的地址,即可将消息投入到指定的消息队列了。
消费者获取消息
1 2 3 4 5 6 7
| @RabbitListener(queues = "first-queue") @RabbitHandler public void receiver(@Payload HashMap dataMsg, Channel channel, Message message) throws IOException, InterruptedException { long deliveryTag = message.getMessageProperties().getDeliveryTag(); log.info("消费者 deliveryTag:{} dataMsg:{} ",deliveryTag ,dataMsg);
}
|
将消费者在程序初始化的时候保持后台运行,即可正常消费消息。