下载 RabbitMQ
镜像 rabbitmq:3-management 默认安装并启用 rabbitmq_management
1 docker pull rabbitmq:3.10-management
创建并运行 RabbitMQ
容器 1 2 3 4 5 6 docker run -d -p 5672:5672 -p 15672:15672 \ -e RABBITMQ_DEFAULT_USER=admin \ -e RABBITMQ_DEFAULT_PASS=admin \ --hostname myRabbit \ --name rabbitmq \ rabbitmq:3.10-management
参数说明:
-d:表示在后台运行容器; -p:将容器的端口 5672(应用访问端口)和 15672 (控制台Web端口号)映射到主机中;5672:RabbitMQ 提供给编程语言客户端链接的端口 15672:RabbitMQ 管理界面的端口 25672:RabbitMQ 集群的端口 -e:指定环境变量:RABBITMQ_DEFAULT_USER:默认的用户名; RABBITMQ_DEFAULT_PASS:默认的用户密码; –hostname:指定主机名(RabbitMQ 的一个重要注意事项是它根据所谓的 节点名称 存储数据,默认为主机名); –name rabbitmq:设置容器名称; rabbitmq:3.10-management :容器使用的镜像名称; 查看启动情况:
1 2 3 ➜ bin docker ps -l CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 21dec23292a9 rabbitmq:3.10-management "docker-entrypoint.s…" About a minute ago Up About a minute 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp rabbitmq
设置 docker 启动的时候自动启动(可选 ):
1 docker update rabbitmq --restart=always
访问 RabbitMQ
后台管理 创建 RabbitMq 连接 创建虚拟主机目录 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class ConnectionUtils { public static Connection getConnection () throws Exception{ ConnectionFactory factory = new ConnectionFactory (); factory.setHost("dev.fungs.cn" ); factory.setPort(5672 ); factory.setVirtualHost("/likfees" ); factory.setUsername("admin" ); factory.setPassword("admin" ); return factory.newConnection(); } public static void main (String[] args) throws Exception { Connection connection = getConnection(); System.out.println(connection); connection.close(); } }
输出结果:
amqp://admin@1.14.160.174 :5672//likfees
RabbitMQ模式 简单模式
消息发送者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public class Sender { public static void main (String[] args) throws Exception { String msg = "hello 你好" ; Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("queue1" , false , false , false , null ); channel.basicPublish("" , "queue1" , null , msg.getBytes()); channel.close(); connection.close(); } }
运行结果:
消息接收者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public class Receiver { public static void main (String[] args) throws Exception { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("queue1" , false , false , false , null ); DefaultConsumer consumer = new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { String msg = new String (body); System.out.println("接收 = " + msg); } }; channel.basicConsume("queue1" ,true ,consumer); } }
运行结果:
消息确认机制 ACK 通过刚才的案例可以看出,消息一旦被消费,消息就会立刻从队列中移除
RabbitMQ如何得知消息被消费者接收?
如果消费者接收消息后,还没执行操作就抛异常宕机导致消费失败,但是RabbitMQ无从得 知,这样消息就丢失了 因此,RabbitMQ有一个ACK机制,当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收 ACK:(Acknowledge character)即是确认字符,在数据通信中,接收站发给发送站的一种 传输类控制字符。表示发来的数据已确认接收无误我们在使用http请求时,http的状态码200 就是告诉我们服务器执行成功 整个过程就想快递员将包裹送到你手里,并且需要你的签字,并拍照回执 不过这种回执ACK分为两种情况:自动ACK :消息接收后,消费者立刻自动发送ACK(快递放在快递柜)手动ACK :消息接收后,不会发送ACK,需要手动调用(快递必须本人签收) 两种情况如何选择,需要看消息的重要性: 如果消息不太重要,丢失也没有影响,自动ACK会比较方便 如果消息非常重要,最好消费完成手动ACK,如果自动ACK消费后,RabbitMQ就会把 消息从队列中 1 2 channel.basicConsume("queue1" , false , consumer);
完整代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class Receiver { public static void main (String[] args) throws Exception { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); DefaultConsumer consumer = new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { String msg = new String (body); System.out.println("接收 = " + msg); channel.basicAck(envelope.getDeliveryTag(),false ); } }; channel.basicConsume("queue1" , false , consumer); } }
工作队列模式
之前我们学习的简单模式,一个消费者来处理消息,如果生产者生产消息过快过多,而消费者的能 力有限,就会产生消息在队列中堆积(生活中的滞销) 一个烧烤师傅,一次烤50支羊肉串,就一个人吃的话,烤好的肉串会越来越多,怎么处理? 多招揽客人进行消费即可。当我们运行许多消费者程序时,消息队列中的任务会被众多消费者共 享,但其中某一个消息只会被一个消费者获取(100支肉串20个人吃,但是其中的某支肉串只能被 一个人吃) 消费者 如果有两个员工,当所有 奇怪的消息都很重,甚至消息都很轻时,一个员工会一直很忙,而另一个人几乎什么工作都 不做。好吧,RabbitMQ对此一无所知,它仍然会均匀地分派消息。 这是因为RabbitMQ只在消息进入队列时发送消息。它不查看用户未确认消息的数量。它 只是盲目地将每条第n个消息分派给第n个消费者。
为了克服这个问题,我们可以使用设置为prefetchCount = 1的basicQos方法。这告诉 RabbitMQ一次不要给一个worker发送一条以上的消息。或者,换句话说,在worker处理并 确认前一个消息之前,不要向它发送新消息。相反,它将把它分派到下一个不繁忙的 worker。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public class Receiver2 { static Integer i = 0 ; public static void main (String[] args) throws Exception { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("work_queue" , false , false , false , null ); channel.basicQos(1 ); DefaultConsumer consumer = new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { String msg = new String (body); System.out.printf("【顾客2】吃掉 = %s!总共吃[%d]串 \n" , msg, ++i); try { Thread.sleep(900 ); } catch (Exception e) { e.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag(), false ); } }; channel.basicConsume("work_queue" , false , consumer); } }
必须使用 ACK 确认消息才能生效
生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public class Sender { public static void main (String[] args) throws Exception { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("work_queue" , false , false , false , null ); for (int i = 0 ; i < 100 ; i++) { String msg = "羊肉串 --> " + i; channel.basicPublish("" , "work_queue" , null , msg.getBytes()); System.out.println("新鲜出炉:" + msg); } channel.close(); connection.close(); } }
订阅模式 生活中的案例:就是玩抖音快手,众多粉丝关注一个视频主,视频主发布视频,所有粉丝都可以得到视 频通知
X队列将信息通过信道发送给消费者,从而进行消费
整个过程,必须先创建路由
路由在生产者程序中创建 因为路由没有存储消息的能力,当生产者将信息发送给路由后,消费者还没有运行,所以没 有队列,路由并不知道将信息发送给谁 运行程序的顺序:
1. MessageSender
MessageReceiver1和MessageReceiver2
MessageSender
生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class Sender { public static void main (String[] args) throws Exception { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("exchange_fanout" , "fanout" ); String msg = "Hello Java!" ; channel.basicPublish("exchange_fanout" , "" , null , msg.getBytes()); System.out.println("生产者:" + msg); channel.close(); connection.close(); } }
消费者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public class Receiver1 { public static void main (String[] args) throws Exception { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("test_exchange_fanout_queue_1" , false , false , false , null ); channel.queueBind("test_exchange_fanout_queue_1" , "exchange_fanout" , "" ); DefaultConsumer consumer = new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { String msg = new String (body); System.out.printf("【消费者1】 %s \n" , msg); } }; channel.basicConsume("test_exchange_fanout_queue_1" , true , consumer); } }
路由模式 路由会根据类型进行定向分发消息给不同的队列,如图所示
可以理解为是快递公司的分拣中心,整个小区,东面的楼小张送货,西面的楼小王送货
生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class Sender { public static void main (String[] args) throws Exception { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("test_exchange_direct" , "direct" ); String msg = "用户注册,【userid=S101】" ; channel.basicPublish("test_exchange_direct" , "insert" , null , msg.getBytes()); System.out.println("[用户系统]:" + msg); channel.close(); connection.close(); } }
消费者1 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class Receiver1 { public static void main (String[] args) throws Exception { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("test_exchange_direct_queue_1" , false , false , false , null ); channel.queueBind("test_exchange_direct_queue_1" , "test_exchange_direct" , "insert" ); channel.queueBind("test_exchange_direct_queue_1" , "test_exchange_direct" , "update" ); channel.queueBind("test_exchange_direct_queue_1" , "test_exchange_direct" , "delete" ); DefaultConsumer consumer = new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { String s = new String (body); System.out.println("【消费者1】 = " + s); } }; channel.basicConsume("test_exchange_direct_queue_1" , true , consumer); } }
消费者2 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class Receiver2 { public static void main (String[] args) throws Exception { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("test_exchange_direct_queue_2" , false , false , false , null ); channel.queueBind("test_exchange_direct_queue_2" , "test_exchange_direct" , "select" ); DefaultConsumer consumer = new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { String s = new String (body); System.out.println("【消费者2】 = " + s); } }; channel.basicConsume("test_exchange_direct_queue_2" , true , consumer); } }
记住运行程序的顺序,先运行一次sender(创建路由器), 有了路由器之后,在创建两个Recer1和Recer2,进行队列绑定 再次运行sender,发出消息 通配符模式 topic
和路由模式90%是一样的。
唯独的区别就是路由键支持模糊匹配
匹配符号
*:只能匹配一个词(正好一个词,多一个不行,少一个也不行) #:匹配0个或更多个词 看一下官网案例: Q1绑定了路由键 .orange. Q2绑定了路由键 . .rabbit 和 lazy.# 持久化 消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何避免消息丢失?消费者的ACK确认机制,可以防止消费者丢失消息 万一在消费者消费之前,RabbitMQ服务器宕机了,那消息也会丢失 想要将消息持久化,那么 路由和队列都要持久化 才可以 生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class Sender { public static void main (String[] args) throws Exception { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("test_exchange_topic" , "topic" , true ); String msg = "商品降价" ; channel.basicPublish("test_exchange_topic" , "product.price" , MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); System.out.println("[用户系统]:" + msg); channel.close(); connection.close(); } }
消费者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class Receiver { public static void main (String[] args) throws Exception { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("test_exchange_topic_queue_1" , true , false , false , null ); channel.queueBind("test_exchange_topic_queue_1" , "test_exchange_topic" , "user.#" ); DefaultConsumer consumer = new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { String s = new String (body); System.out.println("【消费者1】 = " + s); } }; channel.basicConsume("test_exchange_topic_queue_1" , true , consumer); } }