服务端-ch11-消息中间件(ActiveMQ、RabbitMQ)
ch11-消息中间件(ActiveMQ、RabbitMQ)
ActiveMQ
同步与异步

broker

消息中间件
- 消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然
- 常用的消息中间件有:ActiveMQ、RabbitMQ、kafka
- 大的系统中有很多子系统,子系统之间的解耦,使用消息中间件
本节课例子


JMS
- Java 消息服务(Java Message Servcie)
- JMS是一个Java标准,定义了使用消息代理(message broker)的通用API
- Spring通过基于模板的抽象为JMS功能提供了支持,这个模板就是JmsTemplate
消息代理(broker)
- Apache ActiveMQ
- Apache ActiveMQ Artemis,重新实现的下一代ActiveMQ
IntelliJ IDEA不能下载源代码的问题
直接使用JMS接口发送与接收消息
- JMS规范:jakarta.jms-api-2.0.3.jar
- artemis客户端:artemis-jms-client-2.17.0.jar
- ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL, USERNAME, PASSWORD); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(“queue.example”);
关键概念
- javax.jms.Message:TextMessage、ObjectMessage
- javax.jms.Destination,队列或主题,如:org.apache.activemq.artemis.jms.client.ActiveMQQueue
- 3种指定方式:
- application.yml(default-destination)
- @Bean(Destination对象)
- 直接String指定
配置
1 2 3 4 5 6 7 8 9 10 11
| spring: jms: template: default-destination: tacocloud.order.queue artemis: host: localhost port: 61616 user: artemis password: artemis embedded: enabled: false
|
使用JmsTemplate
- JmsTemplate是Spring对JMS集成支持的核心
- 发送的两个方法:send、convertAndSend
ActiveMQ Artemis
Docker运行
- $ docker run --detach --name mycontainer -p 61616:61616 -p 8161:8161 apache/activemq-artemis:latest-alpine
- docker logs -f mycontainer
- $ docker exec -it mycontainer /var/lib/artemis-instance/bin/artemis shell --user artemis --password artemis
- 管理控制台:http://localhost:8161 artemis/artemis
控制台

- 需要先建立个会话的session
- 然后消息会被放入队列queue:先进先出
依赖
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-artemis</artifactId> </dependency>
|
代码
发送方:不适用springboot直接使用java
ArtemisExampleSend
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| package tacos.messaging;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import javax.jms.JMSException; import javax.jms.*;
public class ArtemisExampleSend { private static final String BROKER_URL = "tcp://localhost:61616"; private static final String USERNAME = "artemis"; private static final String PASSWORD = "artemis";
public static void main(String[] args) throws JMSException { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL, USERNAME, PASSWORD); Connection connection = connectionFactory.createConnection(); connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("queue.example");
MessageProducer messageProducer = session.createProducer(destination); TextMessage textMessage = session.createTextMessage(); textMessage.setText("Hello, Artemis!");
messageProducer.send(textMessage); System.out.println("Message sent successfully!");
connection.close(); } }
|
接收端
ArtemisExampleReceive
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| package tacos.messaging;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import javax.jms.*;
public class ArtemisExampleReceive { private static final String BROKER_URL = "tcp://localhost:61616"; private static final String USERNAME = "artemis"; private static final String PASSWORD = "artemis";
public static void main(String[] args) throws JMSException { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL, USERNAME, PASSWORD); Connection connection = connectionFactory.createConnection(); connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("queue.example");
MessageConsumer messageConsumer = session.createConsumer(destination); Message message = messageConsumer.receive();
if (message instanceof TextMessage) { System.out.println("Message received: " + ((TextMessage) message).getText()); }
connection.close(); } }
|
- message instanceof TextMessage:还有对象类型的消息
- 运行结果:


订单信息
JmsOrderMessagingService
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| package tacos.messaging;
import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Service;
import tacos.TacoOrder;
@Service public class JmsOrderMessagingService implements OrderMessagingService {
private JmsTemplate jms;
@Autowired public JmsOrderMessagingService(JmsTemplate jms) { this.jms = jms; }
@Override public void sendOrder(TacoOrder order) { jms.convertAndSend("tacocloud.order.queue", order, this::addOrderSource);
} private Message addOrderSource(Message message) throws JMSException { message.setStringProperty("X_ORDER_SOURCE", "WEB"); return message; } }
|
消息转换器(MessageConverter)
- SimpleMessageConverter:实现String与TextMessage的相互转换、字节数组与BytesMessage的相互转换、 Map与MapMessage的相互转换,以及Serializable对象与ObjectMessage的相互转换
- MappingJackson2MessageConverter:使用Jackson 2 JSON库实现消息与JSON格式的相互转换
- TypeId
- Message.setStringProperty

配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| package tacos.messaging;
import java.util.HashMap; import java.util.Map;
import javax.jms.Destination;
import org.apache.activemq.artemis.jms.client.ActiveMQQueue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import tacos.TacoOrder;
@Configuration public class MessagingConfig {
@Bean public MappingJackson2MessageConverter messageConverter() { MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter(); messageConverter.setTypeIdPropertyName("_typeId");
Map<String, Class<?>> typeIdMappings = new HashMap<String, Class<?>>(); typeIdMappings.put("order", TacoOrder.class); messageConverter.setTypeIdMappings(typeIdMappings);
return messageConverter; }
@Bean public Destination orderQueue() { return new ActiveMQQueue("tacocloud.order.queue"); } }
|
kitchen代码
加上artimes的依赖
接收消息:拉取模式(pull model)
接收消息:拉取模式(pull model),JmsTemplate支持
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| package tacos.kitchen;
import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Controller; import org.springframework.ui.Model; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping;
import lombok.RequiredArgsConstructor; import tacos.TacoOrder;
@Profile({"jms-template", "rabbitmq-template"}) @Controller @RequestMapping("/orders") @RequiredArgsConstructor public class OrderReceiverController {
private final OrderReceiver orderReceiver;
@GetMapping("/receive") public String receiveOrder(Model model) { TacoOrder order = orderReceiver.receiveOrder(); if (order != null) { model.addAttribute("order", order); return "receiveOrder"; } return "noOrder"; } }
|
接收消息:推送模式(push model)
需要定义消息监听器
RabbitMQ
RabbitMQ
RabbitMQ概念
- ConnectionFactory、Connection、Channel
- Exchange:Default、Direct、Topic、Fanout、Headers、Dead letter
- Queue
- routing key
- Binding key

依赖
代码中在api删除jms依赖,增加rabbitmq依赖
在对应模块中加上amqp依赖
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
配置
1 2 3 4 5 6 7 8
| spring: rabbitmq: host: localhost port: 5672 username: guest password: guest template: exchange: tacocloud.orde
|
接收消息:拉取模式(pull model)

接收消息:推送模式(push model)
-
需要定义消息监听器
-
@RabbitListener