spring boot 整合rabbit
作者:bin首先我们要加引用:
git地址
implementation 'org.springframework.boot:spring-boot-starter-amqp'
将配置写好,目的是将queue绑定到exchange上:
package com.zengbingo.mq.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @Author mark */ @Configuration @Slf4j public class MqProducerConfig { @Bean public ConnectionFactory connectionFactory(com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory){ CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitConnectionFactory); return connectionFactory; } @Bean public com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory() { com.rabbitmq.client.ConnectionFactory connectionFactory = new com.rabbitmq.client.ConnectionFactory(); connectionFactory.setAutomaticRecoveryEnabled(false); connectionFactory.setHost("localhost"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setVirtualHost("/"); return connectionFactory; } @Bean(name="testExchange") DirectExchange testExchange(){ return new DirectExchange("testExchange"); } @Bean(name="testQueue") Queue testQueue(){ return new Queue("testQueue", true); } @Bean(name="testQueue2") Queue testQueue2(){ return new Queue("testQueue2", true); } @Bean Binding binding(Queue testQueue, DirectExchange testExchange){ return BindingBuilder.bind(testQueue).to(testExchange).with("testRouting"); } @Bean Binding bindin2(Queue testQueue, DirectExchange testExchange){ return BindingBuilder.bind(testQueue).to(testExchange).with("testRouting2"); } }
定义一个消息发布者:
package com.zengbingo.mq.publisher; import com.alibaba.fastjson.JSONObject; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; @Component public class RabbitPub { @Resource RabbitTemplate rabbitTemplate; public String sendMessage(){ JSONObject data = new JSONObject(); data.put("曾彬", "100分"); data.put("历史成绩", "100分"); rabbitTemplate.convertAndSend("testExchange","testRouting",data); return "success"; } }
定义一个消息消费者:
package com.zengbingo.mq.consumer; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class RabbitCon{ @RabbitListener(queues = "testQueue") public void proces2(Message message, Channel channel) throws Exception{ System.out.println("p2"); System.out.println("p2 tag : " + message.getMessageProperties().getDeliveryTag()); System.out.println("p2 CheckReceiver: " + new String(message.getBody())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }