spring boot 整合rabbit
作者:bin首先我们要加引用:
git地址
implementation 'org.springframework.boot:spring-boot-starter-amqp'
将配置写好,目的是将queue绑定到exchange上:
package com.zengbingo.mq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author mark
*/
@Configuration
@Slf4j
public class MqProducerConfig {
@Bean
public ConnectionFactory connectionFactory(com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory){
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitConnectionFactory);
return connectionFactory;
}
@Bean
public com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory() {
com.rabbitmq.client.ConnectionFactory connectionFactory = new com.rabbitmq.client.ConnectionFactory();
connectionFactory.setAutomaticRecoveryEnabled(false);
connectionFactory.setHost("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
return connectionFactory;
}
@Bean(name="testExchange")
DirectExchange testExchange(){
return new DirectExchange("testExchange");
}
@Bean(name="testQueue")
Queue testQueue(){
return new Queue("testQueue", true);
}
@Bean(name="testQueue2")
Queue testQueue2(){
return new Queue("testQueue2", true);
}
@Bean
Binding binding(Queue testQueue, DirectExchange testExchange){
return BindingBuilder.bind(testQueue).to(testExchange).with("testRouting");
}
@Bean
Binding bindin2(Queue testQueue, DirectExchange testExchange){
return BindingBuilder.bind(testQueue).to(testExchange).with("testRouting2");
}
}
定义一个消息发布者:
package com.zengbingo.mq.publisher;
import com.alibaba.fastjson.JSONObject;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
public class RabbitPub {
@Resource
RabbitTemplate rabbitTemplate;
public String sendMessage(){
JSONObject data = new JSONObject();
data.put("曾彬", "100分");
data.put("历史成绩", "100分");
rabbitTemplate.convertAndSend("testExchange","testRouting",data);
return "success";
}
}
定义一个消息消费者:
package com.zengbingo.mq.consumer;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class RabbitCon{
@RabbitListener(queues = "testQueue")
public void proces2(Message message, Channel channel) throws Exception{
System.out.println("p2");
System.out.println("p2 tag : " + message.getMessageProperties().getDeliveryTag());
System.out.println("p2 CheckReceiver: " + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}