服务端开发

服务端-ch13-反应式编程基础

springboot
目录

# ch13-反应式编程基础

# 两种不同的编程范式

  • 命令式编程,imperative
  • 反应式编程,Reactive Programming

Reactive Programming解决什么问题

# Reactor

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
</dependency>
  • Reactive Streams:Netflix、Lightbend和Pirotal于2013年开始制定的一种规范,旨在提供无阻塞回压的异步 流处理标准
  • Reactor:Spring Pivotal团队提供的响应式编程的Java实现,其它类似实现:RxJava
    • 函数式、声明式,描述数据会流经的管道或流
  • Spring WebFlux:启用基于响应式编程的Web应用程序的开发。提供类似于Spring MVC的编程模型

# Java的stream与反应式的流区别

  • Java的stream通常都是同步的,并且只能处理有限的数据集,本质上来说,它们只是使用函数来 对集合进行迭代的一种方式
  • JDK9中的 Flow API对应反应式流

# 反应式流规范定义的4个接口(重要)

  • org.reactivestreams.*
  • Publisher:数据发布者
  • Subscriber:数据订阅者
  • Processor:处理器
  • Subscription:协调

处理过程是异步的

消费者驱动,消费者去请求发布者才会发布数据

# 反应式流图(Flux)

# 反应式流图(Mono)

# 两个基本概念:Flux 和 Mono

  • Flux:包含 0 到 N 个元素的异步序列
  • Mono:包含 0 或者 1 个元素的异步序列
  • 消息:正常的包含元素的消息、序列结束的消息和序列出错的消息
  • 操作符(Operator):对流上元素的操作

# 操作类型

  • 创建操作
  • 组合操作
  • 转换操作
  • 逻辑操作

# 创建Flux

  • Flux的静态方法
  • 根据对象创建,just方法
  • 根据集成创建,数组、Iterable、Java Stream
  • range
  • interval

package reactorfun;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;

import org.junit.jupiter.api.Test;

import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;

public class FluxCreationTests {

  @Test
  public void createAFlux_just() {
    Flux<String> fruitFlux = Flux
        .just("Apple", "Orange", "Grape", "Banana", "Strawberry");
    StepVerifier.create(fruitFlux)
        .expectNext("Apple")
        .expectNext("Orange")
        .expectNext("Grape")
        .expectNext("Banana")
        .expectNext("Strawberry")
        .verifyComplete();
  }

  @Test
  public void createAFlux_fromArray() {
    String[] fruits = new String[] {
        "Apple", "Orange", "Grape", "Banana", "Strawberry" };

    Flux<String> fruitFlux = Flux.fromArray(fruits);

    StepVerifier.create(fruitFlux)
        .expectNext("Apple")
        .expectNext("Orange")
        .expectNext("Grape")
        .expectNext("Banana")
        .expectNext("Strawberry")
        .verifyComplete();
  }

  @Test
  public void createAFlux_fromIterable() {
    List<String> fruitList = new ArrayList<>();
    fruitList.add("Apple");
    fruitList.add("Orange");
    fruitList.add("Grape");
    fruitList.add("Banana");
    fruitList.add("Strawberry");

    Flux<String> fruitFlux = Flux.fromIterable(fruitList);

    StepVerifier.create(fruitFlux)
        .expectNext("Apple")
        .expectNext("Orange")
        .expectNext("Grape")
        .expectNext("Banana")
        .expectNext("Strawberry")
        .verifyComplete();
  }

  @Test
   public void createAFlux_fromStream() {
     Stream<String> fruitStream =
          Stream.of("Apple", "Orange", "Grape", "Banana", "Strawberry");

     Flux<String> fruitFlux = Flux.fromStream(fruitStream);

     StepVerifier.create(fruitFlux)
         .expectNext("Apple")
         .expectNext("Orange")
         .expectNext("Grape")
         .expectNext("Banana")
         .expectNext("Strawberry")
         .verifyComplete();
   }

  @Test
  public void createAFlux_interval() {
    Flux<Long> intervalFlux =
        Flux.interval(Duration.ofSeconds(1))
            .take(5);

    StepVerifier.create(intervalFlux)
       .expectNext(0L)
        .expectNext(1L)
        .expectNext(2L)
        .expectNext(3L)
        .expectNext(4L)
        .verifyComplete();
  }

  @Test
  public void createAFlux_range() {
    Flux<Integer> intervalFlux =
        Flux.range(1, 5);

    StepVerifier.create(intervalFlux)
        .expectNext(1)
        .expectNext(2)
        .expectNext(3)
        .expectNext(4)
        .expectNext(5)
        .verifyComplete();
  }
}

# 组合Flux 流

  • mergeWith
  • zip 自己决定如何合并
  • zip,提供合并函数
  • first 取两个流里面最先有的数据

package reactorfun;

import java.time.Duration;

import org.junit.jupiter.api.Test;

import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import reactor.util.function.Tuple2;

public class FluxMergingTests {

  @Test
  public void mergeFluxes() {
    // delays needed to avoid the first flux from streaming the
    // data through before subscribing to the second flux.

    Flux<String> characterFlux = Flux
        .just("Garfield", "Kojak", "Barbossa")
        .delayElements(Duration.ofMillis(500));
    Flux<String> foodFlux = Flux
        .just("Lasagna", "Lollipops", "Apples")
        .delaySubscription(Duration.ofMillis(250))
        .delayElements(Duration.ofMillis(500));

    Flux<String> mergedFlux = characterFlux.mergeWith(foodFlux);

    StepVerifier.create(mergedFlux)
        .expectNext("Garfield")
        .expectNext("Lasagna")
        .expectNext("Kojak")
        .expectNext("Lollipops")
        .expectNext("Barbossa")
        .expectNext("Apples")
        .verifyComplete();
  }

  @Test
  public void zipFluxes() {
    Flux<String> characterFlux = Flux
        .just("Garfield", "Kojak", "Barbossa");
    Flux<String> foodFlux = Flux
        .just("Lasagna", "Lollipops", "Apples");

    Flux<Tuple2<String, String>> zippedFlux =
        Flux.zip(characterFlux, foodFlux);

    StepVerifier.create(zippedFlux)
          .expectNextMatches(p ->
              p.getT1().equals("Garfield") &&
              p.getT2().equals("Lasagna"))
          .expectNextMatches(p ->
              p.getT1().equals("Kojak") &&
              p.getT2().equals("Lollipops"))
          .expectNextMatches(p ->
              p.getT1().equals("Barbossa") &&
              p.getT2().equals("Apples"))
          .verifyComplete();
  }

  @Test
  public void zipFluxesToObject() {
    Flux<String> characterFlux = Flux
        .just("Garfield", "Kojak", "Barbossa");
    Flux<String> foodFlux = Flux
        .just("Lasagna", "Lollipops", "Apples");

    Flux<String> zippedFlux =
        Flux.zip(characterFlux, foodFlux, (c, f) -> c + " eats " + f);

    StepVerifier.create(zippedFlux)
          .expectNext("Garfield eats Lasagna")
          .expectNext("Kojak eats Lollipops")
          .expectNext("Barbossa eats Apples")
          .verifyComplete();
  }

  @Test
  public void firstWithSignalFlux() {
    // delay needed to "slow down" the slow Flux

    Flux<String> slowFlux = Flux.just("tortoise", "snail", "sloth")
          .delaySubscription(Duration.ofMillis(100));
    Flux<String> fastFlux = Flux.just("hare", "cheetah", "squirrel");

    Flux<String> firstFlux = Flux.firstWithSignal(slowFlux, fastFlux);

    StepVerifier.create(firstFlux)
        .expectNext("hare")
        .expectNext("cheetah")
        .expectNext("squirrel")
        .verifyComplete();
  }
}

# 过滤Flux流2

  • skip:指定个数/时间
  • take:指定个数/时间
  • filter,需要提供Predicate
  • distinct,只发布源Flux中尚未发布过的数据项

# 转换Flux流1

  • map
    • 同步
    • 返回具体值
  • flatMap
    • 异步
    • 转换出来的返回结果还是一个流(Mono/Flux)
    • 可以并发处理,指定用哪个并发模型处理
      • 多个流并发处理结果合并成一个流,但结果顺序不可控+
    • 扁平化
  • 并发模型(Schedulers方法)
    • .immediate()
    • .single()
    • .newSingle()
    • .elastic()
    • .parallel()

# 转换Flux流2

  • buffer,缓冲数据,bufferAndFlatMap
  • collectList,同:buffer不带参数则缓冲所有数据到列表
  • collectMap,需要提供生成key的函数

package reactorfun;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;

import org.junit.jupiter.api.Test;

import lombok.Data;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;

public class FluxTransformingTests {

  @Test
  public void skipAFew() {
    Flux<String> countFlux = Flux.just(
        "one", "two", "skip a few", "ninety nine", "one hundred")
        .skip(3);

    StepVerifier.create(countFlux)
        .expectNext("ninety nine", "one hundred")
        .verifyComplete();
  }

  @Test
  public void skipAFewSeconds() {
    Flux<String> countFlux = Flux.just(
        "one", "two", "skip a few", "ninety nine", "one hundred")
        .delayElements(Duration.ofSeconds(1))
        .skip(Duration.ofSeconds(4));

    StepVerifier.create(countFlux)
        .expectNext("ninety nine", "one hundred")
        .verifyComplete();
  }

  @Test
  public void take() {
    Flux<String> nationalParkFlux = Flux.just(
        "Yellowstone", "Yosemite", "Grand Canyon", "Zion", "Acadia")
        .take(3);

    StepVerifier.create(nationalParkFlux)
        .expectNext("Yellowstone", "Yosemite", "Grand Canyon")
        .verifyComplete();
  }

  @Test
  public void takeForAwhile() {
    Flux<String> nationalParkFlux = Flux.just(
        "Yellowstone", "Yosemite", "Grand Canyon", "Zion", "Grand Teton")
        .delayElements(Duration.ofSeconds(1))
        .take(Duration.ofMillis(3500));

    StepVerifier.create(nationalParkFlux)
        .expectNext("Yellowstone", "Yosemite", "Grand Canyon")
        .verifyComplete();
  }

  @Test
  public void filter() {
    Flux<String> nationalParkFlux = Flux.just(
        "Yellowstone", "Yosemite", "Grand Canyon", "Zion", "Grand Teton")
        .filter(np -> !np.contains(" "));

    StepVerifier.create(nationalParkFlux)
        .expectNext("Yellowstone", "Yosemite", "Zion")
        .verifyComplete();
  }

  @Test
  public void distinct() {
    Flux<String> animalFlux = Flux.just(
        "dog", "cat", "bird", "dog", "bird", "anteater")
        .distinct();

    StepVerifier.create(animalFlux)
        .expectNext("dog", "cat", "bird", "anteater")
        .verifyComplete();
  }

  @Test
  public void map() {
    Flux<Player> playerFlux = Flux
      .just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
      .map(n -> {
        String[] split = n.split("\\s");
        return new Player(split[0], split[1]);
      });

    StepVerifier.create(playerFlux)
        .expectNext(new Player("Michael", "Jordan"))
        .expectNext(new Player("Scottie", "Pippen"))
        .expectNext(new Player("Steve", "Kerr"))
        .verifyComplete();
  }

  @Test
  public void flatMap() {
    Flux<Player> playerFlux = Flux
      .just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
      .flatMap(n -> Mono.just(n)
          .map(p -> {
              String[] split = p.split("\\s");
              return new Player(split[0], split[1]);
            })
          .subscribeOn(Schedulers.parallel())
        );

    List<Player> playerList = Arrays.asList(
        new Player("Michael", "Jordan"),
        new Player("Scottie", "Pippen"),
        new Player("Steve", "Kerr"));

    StepVerifier.create(playerFlux)
        .expectNextMatches(p -> playerList.contains(p))
        .expectNextMatches(p -> playerList.contains(p))
        .expectNextMatches(p -> playerList.contains(p))
        .verifyComplete();
  }

  @Data
  private static class Player {
    private final String firstName;
    private final String lastName;
  }
}
package reactorfun;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

import org.junit.jupiter.api.Test;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;

public class FluxBufferingTests {

  @Test
  public void buffer() {
    Flux<String> fruitFlux = Flux.just(
        "apple", "orange", "banana", "kiwi", "strawberry");

    Flux<List<String>> bufferedFlux = fruitFlux.buffer(3);

    StepVerifier
        .create(bufferedFlux)
        .expectNext(Arrays.asList("apple", "orange", "banana"))
        .expectNext(Arrays.asList("kiwi", "strawberry"))
        .verifyComplete();
  }

  @Test
  public void bufferAndFlatMap() throws Exception {
    Flux.just(
        "apple", "orange", "banana", "kiwi", "strawberry")
        .buffer(3)
        .flatMap(x ->
          Flux.fromIterable(x)
            .map(y -> y.toUpperCase())
            .subscribeOn(Schedulers.parallel())
            .log()
                 //map的话返回的是flux<string>类型
                 //fluxmap返回string类型
        ).subscribe();
  }

  @Test
  public void collectList() {
    Flux<String> fruitFlux = Flux.just(
        "apple", "orange", "banana", "kiwi", "strawberry");

    Mono<List<String>> fruitListMono = fruitFlux.collectList();

    StepVerifier
        .create(fruitListMono)
        .expectNext(Arrays.asList(
            "apple", "orange", "banana", "kiwi", "strawberry"))
        .verifyComplete();
  }

  @Test
  public void collectMap() {
    Flux<String> animalFlux = Flux.just(
        "aardvark", "elephant", "koala", "eagle", "kangaroo");

    Mono<Map<Character, String>> animalMapMono =
        animalFlux.collectMap(a -> a.charAt(0));

    StepVerifier
        .create(animalMapMono)
        .expectNextMatches(map -> {
          return
              map.size() == 3 &&
              map.get('a').equals("aardvark") &&
              map.get('e').equals("eagle") &&
              map.get('k').equals("kangaroo");
        })
        .verifyComplete();
  }

  @Test
  public void all() {
    Flux<String> animalFlux = Flux.just(
        "aardvark", "elephant", "koala", "eagle", "kangaroo");

    Mono<Boolean> hasAMono = animalFlux.all(a -> a.contains("a"));
    StepVerifier.create(hasAMono)
      .expectNext(true)
      .verifyComplete();

    Mono<Boolean> hasKMono = animalFlux.all(a -> a.contains("k"));
    StepVerifier.create(hasKMono)
      .expectNext(false)
      .verifyComplete();
  }

  @Test
  public void any() {
    Flux<String> animalFlux = Flux.just(
        "aardvark", "elephant", "koala", "eagle", "kangaroo");

    Mono<Boolean> hasAMono = animalFlux.any(a -> a.contains("a"));

    StepVerifier.create(hasAMono)
      .expectNext(true)
      .verifyComplete();

    Mono<Boolean> hasZMono = animalFlux.any(a -> a.contains("z"));
    StepVerifier.create(hasZMono)
      .expectNext(false)
      .verifyComplete();
  }  
}

# 对流执行逻辑操作

  • all,需要提供Predicate函数,注意返回类型Mono
  • any,需要提供Predicate函数,注意返回类型Mono

# 缓冲

  • buffer,缓冲数据,bufferAndFlatMap
  • collectList,同:buffer不带参数则缓冲所有数据到列表
  • collectMap,需要提供生成key的函数