服务端开发

服务端-ch11-消息中间件(ActiveMQ、RabbitMQ)

springboot
目录

# ch11-消息中间件(ActiveMQ、RabbitMQ)

# ActiveMQ

# 同步与异步

# broker

  • 好处:A与B解耦,broker是消息代理

# 消息中间件

  • 消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然
  • 常用的消息中间件有:ActiveMQ、RabbitMQ、kafka
  • 大的系统中有很多子系统,子系统之间的解耦,使用消息中间件

# 本节课例子

  • postman触发api调用,生成订单

# JMS

  • Java 消息服务(Java Message Servcie)
  • JMS是一个Java标准,定义了使用消息代理(message broker)的通用API
  • Spring通过基于模板的抽象为JMS功能提供了支持,这个模板就是JmsTemplate

# 消息代理(broker)

  • Apache ActiveMQ
  • Apache ActiveMQ Artemis,重新实现的下一代ActiveMQ

# IntelliJ IDEA不能下载源代码的问题

# 直接使用JMS接口发送与接收消息

  • JMS规范:jakarta.jms-api-2.0.3.jar
  • artemis客户端:artemis-jms-client-2.17.0.jar
  • ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL, USERNAME, PASSWORD); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(“queue.example”);

# 关键概念

  • javax.jms.Message:TextMessage、ObjectMessage
  • javax.jms.Destination,队列或主题,如:org.apache.activemq.artemis.jms.client.ActiveMQQueue
    • 3种指定方式:
      • application.yml(default-destination)
      • @Bean(Destination对象)
      • 直接String指定

# 配置

spring:
    jms:
        template:
        	default-destination: tacocloud.order.queue
    artemis:
    host: localhost
    port: 61616
    user: artemis
    password: artemis
    embedded:
    	enabled: false

# 使用JmsTemplate

  • JmsTemplate是Spring对JMS集成支持的核心
  • 发送的两个方法:send、convertAndSend

# ActiveMQ Artemis

# Docker运行

  • $ docker run --detach --name mycontainer -p 61616:61616 -p 8161:8161 apache/activemq-artemis:latest-alpine
  • docker logs -f mycontainer
  • $ docker exec -it mycontainer /var/lib/artemis-instance/bin/artemis shell --user artemis --password artemis
  • 管理控制台:http://localhost:8161 artemis/artemis

# 控制台

  • 需要先建立个会话的session
  • 然后消息会被放入队列queue:先进先出

# 依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-artemis</artifactId>
</dependency>

# 代码

# 发送方:不适用springboot直接使用java

ArtemisExampleSend

package tacos.messaging;

import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;

import javax.jms.JMSException;
import javax.jms.*;

public class ArtemisExampleSend {
    private static final String BROKER_URL = "tcp://localhost:61616";
    private static final String USERNAME = "artemis";
    private static final String PASSWORD = "artemis";

    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL, USERNAME, PASSWORD);
        Connection connection = connectionFactory.createConnection();
        connection.start();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue("queue.example");

        MessageProducer messageProducer = session.createProducer(destination);
        TextMessage textMessage = session.createTextMessage();
        textMessage.setText("Hello, Artemis!");

        messageProducer.send(textMessage);
        System.out.println("Message sent successfully!");

        connection.close();
    }
}
  • import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; :jar包

  • 运行结果:

    • 终端console输出:Message sent successfully!
    • artemis终端输出:
  • destination:jms中所定义的目的地,无论消息是订阅还是队列

# 接收端

ArtemisExampleReceive

package tacos.messaging;

import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;

import javax.jms.*;

public class ArtemisExampleReceive {
    private static final String BROKER_URL = "tcp://localhost:61616";
    private static final String USERNAME = "artemis";
    private static final String PASSWORD = "artemis";

    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL, USERNAME, PASSWORD);
        Connection connection = connectionFactory.createConnection();
        connection.start();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue("queue.example");

        MessageConsumer messageConsumer = session.createConsumer(destination);
        Message message = messageConsumer.receive();

        if (message instanceof TextMessage) {
            System.out.println("Message received: " + ((TextMessage) message).getText());
        }

        connection.close();
    }
}
  • message instanceof TextMessage:还有对象类型的消息
  • 运行结果:

# 订单信息

JmsOrderMessagingService

package tacos.messaging;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;

import tacos.TacoOrder;

@Service
public class JmsOrderMessagingService implements OrderMessagingService {

  private JmsTemplate jms;

  @Autowired
  public JmsOrderMessagingService(JmsTemplate jms) {
    this.jms = jms;
  }

  @Override
  public void sendOrder(TacoOrder order) {
    jms.convertAndSend("tacocloud.order.queue", order,
        this::addOrderSource);
//    jms.send(new MessageCreator(){
//      @Override
//      public Message createMessage(Session session) throws JMSException {
//        Message message = session.createObjectMessage(order);
//        message.setStringProperty("X_ORDER_SOURCE", "WEB");
//        return message;
//      }
//    });
  }
  
  private Message addOrderSource(Message message) throws JMSException {
    message.setStringProperty("X_ORDER_SOURCE", "WEB");
    return message;
  }
}
  • 添加依赖之后就可以直接注入JmsTemplate

# 消息转换器(MessageConverter)

  • SimpleMessageConverter:实现String与TextMessage的相互转换、字节数组与BytesMessage的相互转换、 Map与MapMessage的相互转换,以及Serializable对象与ObjectMessage的相互转换
  • MappingJackson2MessageConverter:使用Jackson 2 JSON库实现消息与JSON格式的相互转换
  • TypeId
  • Message.setStringProperty

配置类

package tacos.messaging;

import java.util.HashMap;
import java.util.Map;

import javax.jms.Destination;

import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;

import tacos.TacoOrder;

@Configuration
public class MessagingConfig {

  @Bean
  public MappingJackson2MessageConverter messageConverter() {
    MappingJackson2MessageConverter messageConverter =
                            new MappingJackson2MessageConverter();
    messageConverter.setTypeIdPropertyName("_typeId");

    Map<String, Class<?>> typeIdMappings = new HashMap<String, Class<?>>();
    typeIdMappings.put("order", TacoOrder.class);
    messageConverter.setTypeIdMappings(typeIdMappings);

    return messageConverter;
  }

  @Bean
  public Destination orderQueue() {
    return new ActiveMQQueue("tacocloud.order.queue");
  }
}

# kitchen代码

加上artimes的依赖

# 接收消息:拉取模式(pull model)

接收消息:拉取模式(pull model),JmsTemplate支持

package tacos.kitchen;

import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;

import lombok.RequiredArgsConstructor;
import tacos.TacoOrder;

@Profile({"jms-template", "rabbitmq-template"})
@Controller
@RequestMapping("/orders")
@RequiredArgsConstructor
public class OrderReceiverController {

  private final OrderReceiver orderReceiver;

  @GetMapping("/receive")
  public String receiveOrder(Model model) {
    TacoOrder order = orderReceiver.receiveOrder();
    if (order != null) {
      model.addAttribute("order", order);
      return "receiveOrder";
    }
    return "noOrder";
  }
}

# 接收消息:推送模式(push model)

需要定义消息监听器

  • @JmsListener

# RabbitMQ

RabbitMQ

# RabbitMQ概念

  • ConnectionFactory、Connection、Channel
  • Exchange:Default、Direct、Topic、Fanout、Headers、Dead letter
  • Queue
  • routing key
  • Binding key

# 依赖

代码中在api删除jms依赖,增加rabbitmq依赖

在对应模块中加上amqp依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

# 配置

spring:
    rabbitmq:
        host: localhost
        port: 5672
        username: guest
        password: guest
        template:
        	exchange: tacocloud.orde

# 接收消息:拉取模式(pull model)

# 接收消息:推送模式(push model)

  • 需要定义消息监听器

  • @RabbitListener