springboot集成rabbitmq

目标

  • 使用springboot 整合rabbitmq
  • 测试 rabbitmq 的simple模式,work模式,发布订阅模式,路由模式,topic模式

创建项目

创建项目springboot_rabbitmq

  • 开发工具:idea
  • Create New Project -> Spring Initializr ->next-> 添加项目信息
  • 添加依赖Web->Spring Web , Messaging->Spring for RabbitMQ

simple 模式

修改配置文件

server.port=8080
#RabbitMQ服务器
spring.rabbitmq.host=192.168.16.166
#默认端口5672
spring.rabbitmq.port=5672
#默认guest,可不写
spring.rabbitmq.password=guest
#默认guest用户,可不写
spring.rabbitmq.username=guest

生产者

新建controller文件夹,并在该目录下新建RabbitmqController

package indi.xzw.springboot_rabbitmq.controller;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class RabbitmqController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/queue")
    @ResponseBody
    public String sendMsg(String msg){
        rabbitTemplate.convertAndSend("","queue1",msg );
        return "发送成功";
    }

}

消费者

新建handler目录,并在该目录下新建QueueHandler

package indi.xzw.springboot_rabbitmq.handler;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

@Component
public class QueueHandler {
    @RabbitListener(queuesToDeclare =@Queue("queue1"))
    public  void receiveMsg(String msg, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel){
        System.out.printf("收到的消息-> deliveryTag:%d\n",deliveryTag);
        System.out.printf("收到的消息-> channel:%s\n",channel.toString());
        System.out.printf("收到的消息-> msg:%s\n",msg);

    }
}

测试

访问:http://127.0.0.1:8080/queue1?msg= my name is xiazhongwei

返回:

新增手动签收

修改配置文件

在配置文件中新增手动签收配置

#配置手动签收
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.direct.acknowledge-mode=manual

新增生产者

在RabbitmqController中新增

    @RequestMapping("/queue2")
    @ResponseBody
    public String sendMsg2(String msg){
        rabbitTemplate.convertAndSend("","queue2",msg );
        return "发送成功";
    }

新增消费者

在QueueHandler中新增

 /**
     * 手动签收接受者
     *
     * @param msg         消息
     * @param deliveryTag 消息唯一标识
     * @param channel     通道
     * @throws IOException
     */
    @RabbitListener(
            queuesToDeclare =@Queue("queue2")//使用默认的交换机 @Queue:队列消息配置
    )
    public void receiveMsgToManuallySigned(String msg, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {
        try {
            //模拟出现异常
            System.out.printf("收到的消息-> deliveryTag:%d\n",deliveryTag);
            System.out.printf("收到的消息-> channel:%s\n",channel.toString());
            System.out.printf("收到的消息-> msg:%s\n",msg);
            //手动签收  channel.basicAck(消息唯一标识,是否批量签收);
            // 如果不签收,服务器会尝试再次发送
            channel.basicAck(deliveryTag,false);
        } catch (Exception e) {
            e.printStackTrace();
            //channel.basicNack(deliveryTag:消息的唯一标识,multiple:是否批量处理,requeue:是否重新放入队列);
            //消息出现异常时,若requeue=false,则该消息会被放入死信队列,若没有配置死信队列则该消息会丢失。
            channel.basicNack(deliveryTag,false, false);
        }
    }

一般我们在实际的使用中都会把签收方式配置为手动签收的方式。这样更方便于在消息出现异常时进行特殊的处理,如(报警机制、日志记录等)。

测试

访问:http://127.0.0.1:8080/queue2?msg= my name is xiazhongwei

输出结果和上面一致只是增加了手动签收

work 模式

Work模式和simple模式基本一样,区别在于work模式支持多个消费者同时接收消息,通过spring.rabbitmq.listener.simple.prefetch配置消息的预读数量控制消费者一次性从队列中读取多少条消息,做到能者多劳的配置(因为在实际的生产环境中每个服务器的配置不可能完全相同,带来的处理消息的时间也不一样)。

修改配置文件

在原本的基础上添加消息预读数量配置。

#消息预读数量 1表示每次从队列中读取一条消息
spring.rabbitmq.listener.simple.prefetch=1
  • 如果不设置这个,两个消费者会平均消费,即一人处理一半,设置了之后,处理快的会多处理一些。

新增生产者

在RabbitmqController中新增

    @RequestMapping("/queue3")
    @ResponseBody
    public String sendMsg3(String msg){
        for (int i = 0; i < 12; i++) {
            rabbitTemplate.convertAndSend("", "queue3", msg);
        }
        return "发送成功";
    }

新增消费者

在QueueHandler中新增

 @RabbitListener(queuesToDeclare =@Queue("queue3"))
    public  void receiveMsgWork1(String msg, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)throws IOException{
        try {
            //模拟服务器性能
            Thread.sleep(1);
            System.out.printf("01 收到的消息-> msg:%s\n", msg);
            channel.basicAck(deliveryTag,false);
        }catch (Exception e) {
            e.printStackTrace();
            channel.basicNack(deliveryTag,false, false);
        }
    }
    @RabbitListener(queuesToDeclare =@Queue("queue3"))
    public  void receiveMsgWork2(String msg, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)throws IOException{
        try {
            //模拟服务器性能
            Thread.sleep(5);
            System.out.printf("02 收到的消息-> msg:%s\n", msg);
            channel.basicAck(deliveryTag,false);
        }catch (Exception e) {
            e.printStackTrace();
            channel.basicNack(deliveryTag,false, false);
        }
    }

测试

访问:http://127.0.0.1:8080/queue3?msg= my name is xiazhongwei

因为01延时小,所以处理的比02多,但是总共处理的条数和发送的一致

发布订阅模式fanout

新增发布者

注意填写的参数是和上面有区别的,要填交换机

    @RequestMapping("/queue4")
    @ResponseBody
    public String sendMsg4(String msg){
        rabbitTemplate.convertAndSend("queue4","",msg );
        return "发送成功";
    }

新增消费者

和之前区别就是定义交换机

    @RabbitListener(bindings = @QueueBinding(//创建队列和交换机的绑定关系
            value = @Queue(),//创建个匿名队列
            exchange = @Exchange(name ="queue4", type = "fanout")))//定义交换机,指定交换机名称和类型
    public  void receiveMsgSub1(String msg, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)throws IOException{
        try {
            System.out.printf("01 收到的消息-> msg:%s\n", msg);
            channel.basicAck(deliveryTag,false);
        }catch (Exception e) {
            e.printStackTrace();
            channel.basicNack(deliveryTag,false, false);
        }
    }
    @RabbitListener(bindings = @QueueBinding(//创建队列和交换机的绑定关系
            value = @Queue(),//创建个匿名队列
            exchange = @Exchange(name ="queue4", type = "fanout")))//定义交换机,指定交换机名称和类型
    public  void receiveMsgSub2(String msg, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)throws IOException{
        try {
            System.out.printf("02 收到的消息-> msg:%s\n", msg);
            channel.basicAck(deliveryTag,false);
        }catch (Exception e) {
            e.printStackTrace();
            channel.basicNack(deliveryTag,false, false);
        }
    }

测试

访问:http://127.0.0.1:8080/queue4?msg= my name is xiazhongwei

返回:

02 收到的消息-> msg: my name is xiazhongwei
01 收到的消息-> msg: my name is xiazhongwei

路由模式

在同一个消息,根据定义的消息路由key,指定消息只发送给固定的路由key对应的消息队列。达到分别的控制。

新增发布者

新增路由key

    @RequestMapping("/queue5")
    @ResponseBody
    public String sendMsg5(String msg,String routingKey){
        rabbitTemplate.convertAndSend("queue5",routingKey,msg );
        return "发送成功";
    }

新增消费者

新增路由key

 @RabbitListener(bindings = @QueueBinding(//创建队列和交换机的绑定关系
            value = @Queue(),//创建个匿名队列
            exchange = @Exchange(name ="queue5", type = "direct"),//定义交换机,指定交换机名称和类型
            key ={"xzw"}))//定义routingKey
    public  void receiveMsgRoute1(String msg, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)throws IOException{
        try {
            System.out.printf("01 收到的消息-> msg:%s\n", msg);
            channel.basicAck(deliveryTag,false);
        }catch (Exception e) {
            e.printStackTrace();
            channel.basicNack(deliveryTag,false, false);
        }
    }
    @RabbitListener(bindings = @QueueBinding(//创建队列和交换机的绑定关系
            value = @Queue(),//创建个匿名队列
            exchange = @Exchange(name ="queue5", type = "direct"),//定义交换机,指定交换机名称和类型
            key ={"xzw", "xia"}))//定义routingKey
    public  void receiveMsgRoute2(String msg, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)throws IOException{
        try {
            System.out.printf("02 收到的消息-> msg:%s\n", msg);
            channel.basicAck(deliveryTag,false);
        }catch (Exception e) {
            e.printStackTrace();
            channel.basicNack(deliveryTag,false, false);
        }
    }

测试

访问:http://127.0.0.1:8080/queue5?msg=%20my%20name%20is%20xiazhongwei&routingKey=xzw

返回:

02 收到的消息-> msg: my name is xiazhongwei
01 收到的消息-> msg: my name is xiazhongwei

访问:http://127.0.0.1:8080/queue5?msg=%20my%20name%20is%20xiazhongwei&routingKey=xia

返回:

02 收到的消息-> msg: my name is xiazhongwei

可以看到我们改变路由key只有02搜到消息

Topic模式

Topics模式与Routing模式基本一样,他们两个的区别在于Topics模式支持通配,而Routing模式不支持通配。

新增生产者

    @RequestMapping("/queue6")
    @ResponseBody
    public String sendMsg6(String msg,String routingKey){
        rabbitTemplate.convertAndSend("queue6",routingKey,msg );
        return "发送成功";
    }

新增消费者

@RabbitListener(bindings = @QueueBinding(//创建队列和交换机的绑定关系
            value = @Queue(),//创建个匿名队列
            exchange = @Exchange(name ="queue6", type = "topic"),//定义交换机,指定交换机名称和类型
            key ={"xzw.*"}))//定义routingKey
    public  void receiveMsgTopic(String msg, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)throws IOException{
        try {
            System.out.printf(" 收到的消息-> msg:%s\n", msg);
            channel.basicAck(deliveryTag,false);
        }catch (Exception e) {
            e.printStackTrace();
            channel.basicNack(deliveryTag,false, false);
        }
    }

测试

访问:http://127.0.0.1:8080/queue6?msg=%20my%20name%20is%20xiazhongwei&routingKey=xzw.log

发现支持通配符访问。

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×