消息队列 RabbitMQ 模式详解
下载 RabbitMQ
镜像
rabbitmq:3-management 默认安装并启用 rabbitmq_management
1 | docker pull rabbitmq:3.10-management |
创建并运行 RabbitMQ
容器
1 | docker run -d -p 5672:5672 -p 15672:15672 \ |
参数说明:
- -d:表示在后台运行容器;
- -p:将容器的端口 5672(应用访问端口)和 15672 (控制台 Web 端口号)映射到主机中;
- 5672:RabbitMQ 提供给编程语言客户端链接的端口
- 15672:RabbitMQ 管理界面的端口
- 25672:RabbitMQ 集群的端口
- -e:指定环境变量:
- RABBITMQ_DEFAULT_USER:默认的用户名;
- RABBITMQ_DEFAULT_PASS:默认的用户密码;
- –hostname:指定主机名(RabbitMQ 的一个重要注意事项是它根据所谓的 节点名称 存储数据,默认为主机名);
- –name rabbitmq:设置容器名称;
- rabbitmq:3.10-management :容器使用的镜像名称;
查看启动情况:1
2
3➜ bin docker ps -l
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
21dec23292a9 rabbitmq:3.10-management "docker-entrypoint.s…" About a minute ago Up About a minute 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp rabbitmq
设置 docker 启动的时候自动启动(可选):1
docker update rabbitmq --restart=always
访问 RabbitMQ
后台管理
浏览器输入地址:
http://ip:15672
即可访问后台管理页面,这里的ip
为运行 RabbitMQ 所在的服务器的 IP 地址;账号密码是你创建容器时指定的账号密码
如果访问失败,请尝试关闭防火墙
创建 RabbitMq 连接
- 创建虚拟主机目录
1 | public class ConnectionUtils { |
输出结果:
amqp://admin@1.14.160.174:5672//likfees
RabbitMQ 模式
简单模式
消息发送者
1 | /** |
运行结果:
消息接收者
1 | public class Receiver { |
运行结果:
消息确认机制 ACK
通过刚才的案例可以看出,消息一旦被消费,消息就会立刻从队列中移除
RabbitMQ 如何得知消息被消费者接收?
- 如果消费者接收消息后,还没执行操作就抛异常宕机导致消费失败,但是 RabbitMQ 无从得 知,这样消息就丢失了
- 因此,RabbitMQ 有一个 ACK 机制,当消费者获取消息后,会向 RabbitMQ 发送回执 ACK,告知消息已经被接收
- ACK:(Acknowledge character)即是确认字符,在数据通信中,接收站发给发送站的一种 传输类控制字符。表示发来的数据已确认接收无误我们在使用 http 请求时,http 的状态码 200 就是告诉我们服务器执行成功
- 整个过程就想快递员将包裹送到你手里,并且需要你的签字,并拍照回执
- 不过这种回执 ACK 分为两种情况:
- 自动 ACK:消息接收后,消费者立刻自动发送 ACK(快递放在快递柜)
- 手动 ACK:消息接收后,不会发送 ACK,需要手动调用(快递必须本人签收)
- 两种情况如何选择,需要看消息的重要性:
- 如果消息不太重要,丢失也没有影响,自动 ACK 会比较方便
- 如果消息非常重要,最好消费完成手动 ACK,如果自动 ACK 消费后,RabbitMQ 就会把 消息从队列中
1 | // false:手动消息确认 |
完整代码:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26public class Receiver {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// 从信道中获取信息
DefaultConsumer consumer = new DefaultConsumer(channel) {
/**
* @param consumerTag
* @param envelope
* @param properties 协议
* @param body 消息
* @throws IOException
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("接收 = " + msg);
// 手动确认(收件人信息,是否同时确认多个消息)
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// 监听队列 false手动确定消息
channel.basicConsume("queue1", false, consumer);
}
}
工作队列模式
- 之前我们学习的简单模式,一个消费者来处理消息,如果生产者生产消息过快过多,而消费者的能 力有限,就会产生消息在队列中堆积(生活中的滞销)
- 一个烧烤师傅,一次烤 50 支羊肉串,就一个人吃的话,烤好的肉串会越来越多,怎么处理?
- 多招揽客人进行消费即可。当我们运行许多消费者程序时,消息队列中的任务会被众多消费者共 享,但其中某一个消息只会被一个消费者获取(100 支肉串 20 个人吃,但是其中的某支肉串只能被 一个人吃)
消费者
如果有两个员工,当所有 奇怪的消息都很重,甚至消息都很轻时,一个员工会一直很忙,而另一个人几乎什么工作都 不做。好吧,RabbitMQ 对此一无所知,它仍然会均匀地分派消息。 这是因为 RabbitMQ 只在消息进入队列时发送消息。它不查看用户未确认消息的数量。它 只是盲目地将每条第 n 个消息分派给第 n 个消费者。
为了克服这个问题,我们可以使用设置为 prefetchCount = 1 的 basicQos 方法。这告诉 RabbitMQ 一次不要给一个 worker 发送一条以上的消息。或者,换句话说,在 worker 处理并 确认前一个消息之前,不要向它发送新消息。相反,它将把它分派到下一个不繁忙的 worker。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40/**
* 消息接收者1
*/
public class Receiver2 {
static Integer i = 0; // 统计吃掉羊肉串的数量
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work_queue", false, false, false, null);
// 可以理解为快递一个一个送,送完在送下一个
channel.basicQos(1);
// 从信道中获取信息
DefaultConsumer consumer = new DefaultConsumer(channel) {
/**
*
* @param consumerTag
* @param envelope
* @param properties 协议
* @param body 消息
* @throws IOException
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.printf("【顾客2】吃掉 = %s!总共吃[%d]串 \n", msg, ++i);
// 模拟网络延迟
try {
Thread.sleep(900);
} catch (Exception e) {
e.printStackTrace();
}
// 手动确认(收件人信息,是否同时确认多个消息)
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 手动确定消息
channel.basicConsume("work_queue", false, consumer);
}
}
必须使用 ACK 确认消息才能生效
生产者
1 | /** |
订阅模式
生活中的案例:就是玩抖音快手,众多粉丝关注一个视频主,视频主发布视频,所有粉丝都可以得到视 频通知
上图中,X 就是视频主,红色的队列就是粉丝。binding 是绑定的意思(关注)
P 生产者发送信息给 X 路由,X 将信息转发给绑定 X 的队列
X 队列将信息通过信道发送给消费者,从而进行消费
整个过程,必须先创建路由
- 路由在生产者程序中创建
- 因为路由没有存储消息的能力,当生产者将信息发送给路由后,消费者还没有运行,所以没 有队列,路由并不知道将信息发送给谁
运行程序的顺序:
1. MessageSender
MessageReceiver1 和 MessageReceiver2
MessageSender
生产者
1 | /** |
消费者
1 | /** |
路由模式
路由会根据类型进行定向分发消息给不同的队列,如图所示
可以理解为是快递公司的分拣中心,整个小区,东面的楼小张送货,西面的楼小王送货
生产者
1 | public class Sender { |
消费者 1
1 | public class Receiver1 { |
消费者 2
1 |
|
- 记住运行程序的顺序,先运行一次 sender(创建路由器),
- 有了路由器之后,在创建两个 Recer1 和 Recer2,进行队列绑定
- 再次运行 sender,发出消息
通配符模式 topic
和路由模式 90% 是一样的。
唯独的区别就是路由键支持模糊匹配
匹配符号
- *:只能匹配一个词(正好一个词,多一个不行,少一个也不行) #:匹配 0 个或更多个词 看一下官网案例: Q1 绑定了路由键 .orange. Q2 绑定了路由键 ..rabbit 和 lazy.#
持久化
- 消息的可靠性是 RabbitMQ 的一大特色,那么 RabbitMQ 是如何避免消息丢失?
- 消费者的 ACK 确认机制,可以防止消费者丢失消息
- 万一在消费者消费之前,RabbitMQ 服务器宕机了,那消息也会丢失
- 想要将消息持久化,那么 路由和队列都要持久化 才可以
生产者
1 | public class Sender { |
消费者
1 | public class Receiver { |