ch11-消息中间件(ActiveMQ、RabbitMQ)

ActiveMQ

同步与异步

image-20231119172721004

broker

image-20231119172732660

  • 好处:A与B解耦,broker是消息代理

消息中间件

  • 消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然
  • 常用的消息中间件有:ActiveMQ、RabbitMQ、kafka
  • 大的系统中有很多子系统,子系统之间的解耦,使用消息中间件

本节课例子

image-20231119172822031

  • postman触发api调用,生成订单

image-20231122150446072

JMS

  • Java 消息服务(Java Message Servcie)
  • JMS是一个Java标准,定义了使用消息代理(message broker)的通用API
  • Spring通过基于模板的抽象为JMS功能提供了支持,这个模板就是JmsTemplate

消息代理(broker)

  • Apache ActiveMQ
  • Apache ActiveMQ Artemis,重新实现的下一代ActiveMQ

IntelliJ IDEA不能下载源代码的问题

直接使用JMS接口发送与接收消息

  • JMS规范:jakarta.jms-api-2.0.3.jar
  • artemis客户端:artemis-jms-client-2.17.0.jar
  • ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL, USERNAME, PASSWORD); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(“queue.example”);

关键概念

  • javax.jms.Message:TextMessage、ObjectMessage
  • javax.jms.Destination,队列或主题,如:org.apache.activemq.artemis.jms.client.ActiveMQQueue
    • 3种指定方式:
      • application.yml(default-destination)
      • @Bean(Destination对象)
      • 直接String指定

配置

1
2
3
4
5
6
7
8
9
10
11
spring:
jms:
template:
default-destination: tacocloud.order.queue
artemis:
host: localhost
port: 61616
user: artemis
password: artemis
embedded:
enabled: false

使用JmsTemplate

  • JmsTemplate是Spring对JMS集成支持的核心
  • 发送的两个方法:send、convertAndSend

ActiveMQ Artemis

Docker运行

  • $ docker run --detach --name mycontainer -p 61616:61616 -p 8161:8161 apache/activemq-artemis:latest-alpine
  • docker logs -f mycontainer
  • $ docker exec -it mycontainer /var/lib/artemis-instance/bin/artemis shell --user artemis --password artemis
  • 管理控制台:http://localhost:8161 artemis/artemis

控制台

image-20231119174236452

  • 需要先建立个会话的session
  • 然后消息会被放入队列queue:先进先出

依赖

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-artemis</artifactId>
</dependency>

代码

发送方:不适用springboot直接使用java

ArtemisExampleSend

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package tacos.messaging;

import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;

import javax.jms.JMSException;
import javax.jms.*;

public class ArtemisExampleSend {
private static final String BROKER_URL = "tcp://localhost:61616";
private static final String USERNAME = "artemis";
private static final String PASSWORD = "artemis";

public static void main(String[] args) throws JMSException {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL, USERNAME, PASSWORD);
Connection connection = connectionFactory.createConnection();
connection.start();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("queue.example");

MessageProducer messageProducer = session.createProducer(destination);
TextMessage textMessage = session.createTextMessage();
textMessage.setText("Hello, Artemis!");

messageProducer.send(textMessage);
System.out.println("Message sent successfully!");

connection.close();
}
}
  • import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; :jar包

  • 运行结果:

    • 终端console输出:Message sent successfully!
    • artemis终端输出:image-20231119175040257image-20231119175110771
  • destination:jms中所定义的目的地,无论消息是订阅还是队列

接收端

ArtemisExampleReceive

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package tacos.messaging;

import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;

import javax.jms.*;

public class ArtemisExampleReceive {
private static final String BROKER_URL = "tcp://localhost:61616";
private static final String USERNAME = "artemis";
private static final String PASSWORD = "artemis";

public static void main(String[] args) throws JMSException {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL, USERNAME, PASSWORD);
Connection connection = connectionFactory.createConnection();
connection.start();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("queue.example");

MessageConsumer messageConsumer = session.createConsumer(destination);
Message message = messageConsumer.receive();

if (message instanceof TextMessage) {
System.out.println("Message received: " + ((TextMessage) message).getText());
}

connection.close();
}
}
  • message instanceof TextMessage:还有对象类型的消息
  • 运行结果:image-20231119175807286image-20231119175826498

订单信息

JmsOrderMessagingService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package tacos.messaging;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;

import tacos.TacoOrder;

@Service
public class JmsOrderMessagingService implements OrderMessagingService {

private JmsTemplate jms;

@Autowired
public JmsOrderMessagingService(JmsTemplate jms) {
this.jms = jms;
}

@Override
public void sendOrder(TacoOrder order) {
jms.convertAndSend("tacocloud.order.queue", order,
this::addOrderSource);
// jms.send(new MessageCreator(){
// @Override
// public Message createMessage(Session session) throws JMSException {
// Message message = session.createObjectMessage(order);
// message.setStringProperty("X_ORDER_SOURCE", "WEB");
// return message;
// }
// });
}

private Message addOrderSource(Message message) throws JMSException {
message.setStringProperty("X_ORDER_SOURCE", "WEB");
return message;
}
}
  • 添加依赖之后就可以直接注入JmsTemplate

消息转换器(MessageConverter)

  • SimpleMessageConverter:实现String与TextMessage的相互转换、字节数组与BytesMessage的相互转换、 Map与MapMessage的相互转换,以及Serializable对象与ObjectMessage的相互转换
  • MappingJackson2MessageConverter:使用Jackson 2 JSON库实现消息与JSON格式的相互转换
  • TypeId
  • Message.setStringProperty

image-20231123131552636

配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package tacos.messaging;

import java.util.HashMap;
import java.util.Map;

import javax.jms.Destination;

import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;

import tacos.TacoOrder;

@Configuration
public class MessagingConfig {

@Bean
public MappingJackson2MessageConverter messageConverter() {
MappingJackson2MessageConverter messageConverter =
new MappingJackson2MessageConverter();
messageConverter.setTypeIdPropertyName("_typeId");

Map<String, Class<?>> typeIdMappings = new HashMap<String, Class<?>>();
typeIdMappings.put("order", TacoOrder.class);
messageConverter.setTypeIdMappings(typeIdMappings);

return messageConverter;
}

@Bean
public Destination orderQueue() {
return new ActiveMQQueue("tacocloud.order.queue");
}
}

kitchen代码

加上artimes的依赖

接收消息:拉取模式(pull model)

接收消息:拉取模式(pull model),JmsTemplate支持

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package tacos.kitchen;

import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;

import lombok.RequiredArgsConstructor;
import tacos.TacoOrder;

@Profile({"jms-template", "rabbitmq-template"})
@Controller
@RequestMapping("/orders")
@RequiredArgsConstructor
public class OrderReceiverController {

private final OrderReceiver orderReceiver;

@GetMapping("/receive")
public String receiveOrder(Model model) {
TacoOrder order = orderReceiver.receiveOrder();
if (order != null) {
model.addAttribute("order", order);
return "receiveOrder";
}
return "noOrder";
}
}

接收消息:推送模式(push model)

需要定义消息监听器

  • @JmsListener

RabbitMQ

RabbitMQ

RabbitMQ概念

  • ConnectionFactory、Connection、Channel
  • Exchange:Default、Direct、Topic、Fanout、Headers、Dead letter
  • Queue
  • routing key
  • Binding key

image-20231123131758138

依赖

代码中在api删除jms依赖,增加rabbitmq依赖

在对应模块中加上amqp依赖

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置

1
2
3
4
5
6
7
8
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
template:
exchange: tacocloud.orde

接收消息:拉取模式(pull model)

image-20231123174046590

接收消息:推送模式(push model)

  • 需要定义消息监听器

  • @RabbitListener