首页 > 中间件 > spring boot 整合rabbit

spring boot 整合rabbit

作者:bin

首先我们要加引用:
git地址

implementation 'org.springframework.boot:spring-boot-starter-amqp'

将配置写好,目的是将queue绑定到exchange上:

package com.zengbingo.mq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author mark
 */
@Configuration
@Slf4j
public class MqProducerConfig {

    @Bean
    public ConnectionFactory connectionFactory(com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitConnectionFactory);
        return connectionFactory;
    }

    @Bean
    public com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory() {
        com.rabbitmq.client.ConnectionFactory connectionFactory = new com.rabbitmq.client.ConnectionFactory();
        connectionFactory.setAutomaticRecoveryEnabled(false);
        connectionFactory.setHost("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        return connectionFactory;
    }

    @Bean(name="testExchange")
    DirectExchange testExchange(){
        return new DirectExchange("testExchange");
    }

    @Bean(name="testQueue")
    Queue testQueue(){
        return new Queue("testQueue", true);
    }

    @Bean(name="testQueue2")
    Queue testQueue2(){
        return new Queue("testQueue2", true);
    }

    @Bean
    Binding binding(Queue testQueue, DirectExchange testExchange){
        return  BindingBuilder.bind(testQueue).to(testExchange).with("testRouting");
    }

    @Bean
    Binding bindin2(Queue testQueue, DirectExchange testExchange){
        return  BindingBuilder.bind(testQueue).to(testExchange).with("testRouting2");
    }
}

定义一个消息发布者:

package com.zengbingo.mq.publisher;

import com.alibaba.fastjson.JSONObject;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
public class RabbitPub {

    @Resource
    RabbitTemplate rabbitTemplate;

    public String sendMessage(){
        JSONObject data = new JSONObject();
        data.put("曾彬", "100分");
        data.put("历史成绩", "100分");
        rabbitTemplate.convertAndSend("testExchange","testRouting",data);
        return "success";
    }
}

定义一个消息消费者:

package com.zengbingo.mq.consumer;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitCon{

    @RabbitListener(queues = "testQueue")
    public void proces2(Message message, Channel channel) throws  Exception{
        System.out.println("p2");
        System.out.println("p2 tag : " + message.getMessageProperties().getDeliveryTag());
        System.out.println("p2 CheckReceiver: " + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

您必须 [ 登录 ] 才能发表留言!