# ch13-反应式编程基础
# 两种不同的编程范式
- 命令式编程,imperative
- 反应式编程,Reactive Programming
Reactive Programming解决什么问题
- IO密集型场景
- 同步阻塞模型,阻塞线程多,CPU利用率不高,性能下降
- 管理多线程,意味着更高的复杂性
- 学习文档
- 剖析Reactor 模型: https://mp.weixin.qq.com/s?__biz=MzIwNDAyOTI2Nw==&mid=2247483716&idx=1&sn=91e7c3f7a46b6d054b8a938cefd3120d&chksm=96c72d78a1b0a46e6f3058c6c895496caab199184376d817a310fbd73620d55dd2bbc434b8d1&token= 1026451003&lang=zh_CN#rd
- Java NIO 底层原理:https://www.toutiao.com/article/6887439886178058759/
- Netty介绍:https://www.zhihu.com/question/607575828/answer/3157903032

# Reactor
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
- Reactive Streams:Netflix、Lightbend和Pirotal于2013年开始制定的一种规范,旨在提供无阻塞回压的异步 流处理标准
- Reactor:Spring Pivotal团队提供的响应式编程的Java实现,其它类似实现:RxJava
- 函数式、声明式,描述数据会流经的管道或流
- Spring WebFlux:启用基于响应式编程的Web应用程序的开发。提供类似于Spring MVC的编程模型
# Java的stream与反应式的流区别
- Java的stream通常都是同步的,并且只能处理有限的数据集,本质上来说,它们只是使用函数来 对集合进行迭代的一种方式
- JDK9中的 Flow API对应反应式流

# 反应式流规范定义的4个接口(重要)
- org.reactivestreams.*
- Publisher:数据发布者
- Subscriber:数据订阅者
- Processor:处理器
- Subscription:协调
处理过程是异步的
消费者驱动,消费者去请求发布者才会发布数据


# 反应式流图(Flux)

# 反应式流图(Mono)

# 两个基本概念:Flux 和 Mono
- Flux:包含 0 到 N 个元素的异步序列
- Mono:包含 0 或者 1 个元素的异步序列
- 消息:正常的包含元素的消息、序列结束的消息和序列出错的消息
- 操作符(Operator):对流上元素的操作
# 操作类型
- 创建操作
- 组合操作
- 转换操作
- 逻辑操作
# 创建Flux
- Flux的静态方法
- 根据对象创建,just方法
- 根据集成创建,数组、Iterable、Java Stream
- range
- interval

package reactorfun;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
public class FluxCreationTests {
@Test
public void createAFlux_just() {
Flux<String> fruitFlux = Flux
.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
StepVerifier.create(fruitFlux)
.expectNext("Apple")
.expectNext("Orange")
.expectNext("Grape")
.expectNext("Banana")
.expectNext("Strawberry")
.verifyComplete();
}
@Test
public void createAFlux_fromArray() {
String[] fruits = new String[] {
"Apple", "Orange", "Grape", "Banana", "Strawberry" };
Flux<String> fruitFlux = Flux.fromArray(fruits);
StepVerifier.create(fruitFlux)
.expectNext("Apple")
.expectNext("Orange")
.expectNext("Grape")
.expectNext("Banana")
.expectNext("Strawberry")
.verifyComplete();
}
@Test
public void createAFlux_fromIterable() {
List<String> fruitList = new ArrayList<>();
fruitList.add("Apple");
fruitList.add("Orange");
fruitList.add("Grape");
fruitList.add("Banana");
fruitList.add("Strawberry");
Flux<String> fruitFlux = Flux.fromIterable(fruitList);
StepVerifier.create(fruitFlux)
.expectNext("Apple")
.expectNext("Orange")
.expectNext("Grape")
.expectNext("Banana")
.expectNext("Strawberry")
.verifyComplete();
}
@Test
public void createAFlux_fromStream() {
Stream<String> fruitStream =
Stream.of("Apple", "Orange", "Grape", "Banana", "Strawberry");
Flux<String> fruitFlux = Flux.fromStream(fruitStream);
StepVerifier.create(fruitFlux)
.expectNext("Apple")
.expectNext("Orange")
.expectNext("Grape")
.expectNext("Banana")
.expectNext("Strawberry")
.verifyComplete();
}
@Test
public void createAFlux_interval() {
Flux<Long> intervalFlux =
Flux.interval(Duration.ofSeconds(1))
.take(5);
StepVerifier.create(intervalFlux)
.expectNext(0L)
.expectNext(1L)
.expectNext(2L)
.expectNext(3L)
.expectNext(4L)
.verifyComplete();
}
@Test
public void createAFlux_range() {
Flux<Integer> intervalFlux =
Flux.range(1, 5);
StepVerifier.create(intervalFlux)
.expectNext(1)
.expectNext(2)
.expectNext(3)
.expectNext(4)
.expectNext(5)
.verifyComplete();
}
}
# 组合Flux 流
- mergeWith
- zip 自己决定如何合并
- zip,提供合并函数
- first 取两个流里面最先有的数据


package reactorfun;
import java.time.Duration;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import reactor.util.function.Tuple2;
public class FluxMergingTests {
@Test
public void mergeFluxes() {
// delays needed to avoid the first flux from streaming the
// data through before subscribing to the second flux.
Flux<String> characterFlux = Flux
.just("Garfield", "Kojak", "Barbossa")
.delayElements(Duration.ofMillis(500));
Flux<String> foodFlux = Flux
.just("Lasagna", "Lollipops", "Apples")
.delaySubscription(Duration.ofMillis(250))
.delayElements(Duration.ofMillis(500));
Flux<String> mergedFlux = characterFlux.mergeWith(foodFlux);
StepVerifier.create(mergedFlux)
.expectNext("Garfield")
.expectNext("Lasagna")
.expectNext("Kojak")
.expectNext("Lollipops")
.expectNext("Barbossa")
.expectNext("Apples")
.verifyComplete();
}
@Test
public void zipFluxes() {
Flux<String> characterFlux = Flux
.just("Garfield", "Kojak", "Barbossa");
Flux<String> foodFlux = Flux
.just("Lasagna", "Lollipops", "Apples");
Flux<Tuple2<String, String>> zippedFlux =
Flux.zip(characterFlux, foodFlux);
StepVerifier.create(zippedFlux)
.expectNextMatches(p ->
p.getT1().equals("Garfield") &&
p.getT2().equals("Lasagna"))
.expectNextMatches(p ->
p.getT1().equals("Kojak") &&
p.getT2().equals("Lollipops"))
.expectNextMatches(p ->
p.getT1().equals("Barbossa") &&
p.getT2().equals("Apples"))
.verifyComplete();
}
@Test
public void zipFluxesToObject() {
Flux<String> characterFlux = Flux
.just("Garfield", "Kojak", "Barbossa");
Flux<String> foodFlux = Flux
.just("Lasagna", "Lollipops", "Apples");
Flux<String> zippedFlux =
Flux.zip(characterFlux, foodFlux, (c, f) -> c + " eats " + f);
StepVerifier.create(zippedFlux)
.expectNext("Garfield eats Lasagna")
.expectNext("Kojak eats Lollipops")
.expectNext("Barbossa eats Apples")
.verifyComplete();
}
@Test
public void firstWithSignalFlux() {
// delay needed to "slow down" the slow Flux
Flux<String> slowFlux = Flux.just("tortoise", "snail", "sloth")
.delaySubscription(Duration.ofMillis(100));
Flux<String> fastFlux = Flux.just("hare", "cheetah", "squirrel");
Flux<String> firstFlux = Flux.firstWithSignal(slowFlux, fastFlux);
StepVerifier.create(firstFlux)
.expectNext("hare")
.expectNext("cheetah")
.expectNext("squirrel")
.verifyComplete();
}
}
# 过滤Flux流2
- skip:指定个数/时间
- take:指定个数/时间
- filter,需要提供Predicate
- distinct,只发布源Flux中尚未发布过的数据项

# 转换Flux流1
- map
- 同步
- 返回具体值
- flatMap
- 异步
- 转换出来的返回结果还是一个流(Mono/Flux)
- 可以并发处理,指定用哪个并发模型处理
- 多个流并发处理结果合并成一个流,但结果顺序不可控+
- 扁平化
- 并发模型(Schedulers方法)
- .immediate()
- .single()
- .newSingle()
- .elastic()
- .parallel()

# 转换Flux流2
- buffer,缓冲数据,bufferAndFlatMap
- collectList,同:buffer不带参数则缓冲所有数据到列表
- collectMap,需要提供生成key的函数

package reactorfun;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import org.junit.jupiter.api.Test;
import lombok.Data;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
public class FluxTransformingTests {
@Test
public void skipAFew() {
Flux<String> countFlux = Flux.just(
"one", "two", "skip a few", "ninety nine", "one hundred")
.skip(3);
StepVerifier.create(countFlux)
.expectNext("ninety nine", "one hundred")
.verifyComplete();
}
@Test
public void skipAFewSeconds() {
Flux<String> countFlux = Flux.just(
"one", "two", "skip a few", "ninety nine", "one hundred")
.delayElements(Duration.ofSeconds(1))
.skip(Duration.ofSeconds(4));
StepVerifier.create(countFlux)
.expectNext("ninety nine", "one hundred")
.verifyComplete();
}
@Test
public void take() {
Flux<String> nationalParkFlux = Flux.just(
"Yellowstone", "Yosemite", "Grand Canyon", "Zion", "Acadia")
.take(3);
StepVerifier.create(nationalParkFlux)
.expectNext("Yellowstone", "Yosemite", "Grand Canyon")
.verifyComplete();
}
@Test
public void takeForAwhile() {
Flux<String> nationalParkFlux = Flux.just(
"Yellowstone", "Yosemite", "Grand Canyon", "Zion", "Grand Teton")
.delayElements(Duration.ofSeconds(1))
.take(Duration.ofMillis(3500));
StepVerifier.create(nationalParkFlux)
.expectNext("Yellowstone", "Yosemite", "Grand Canyon")
.verifyComplete();
}
@Test
public void filter() {
Flux<String> nationalParkFlux = Flux.just(
"Yellowstone", "Yosemite", "Grand Canyon", "Zion", "Grand Teton")
.filter(np -> !np.contains(" "));
StepVerifier.create(nationalParkFlux)
.expectNext("Yellowstone", "Yosemite", "Zion")
.verifyComplete();
}
@Test
public void distinct() {
Flux<String> animalFlux = Flux.just(
"dog", "cat", "bird", "dog", "bird", "anteater")
.distinct();
StepVerifier.create(animalFlux)
.expectNext("dog", "cat", "bird", "anteater")
.verifyComplete();
}
@Test
public void map() {
Flux<Player> playerFlux = Flux
.just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
.map(n -> {
String[] split = n.split("\\s");
return new Player(split[0], split[1]);
});
StepVerifier.create(playerFlux)
.expectNext(new Player("Michael", "Jordan"))
.expectNext(new Player("Scottie", "Pippen"))
.expectNext(new Player("Steve", "Kerr"))
.verifyComplete();
}
@Test
public void flatMap() {
Flux<Player> playerFlux = Flux
.just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
.flatMap(n -> Mono.just(n)
.map(p -> {
String[] split = p.split("\\s");
return new Player(split[0], split[1]);
})
.subscribeOn(Schedulers.parallel())
);
List<Player> playerList = Arrays.asList(
new Player("Michael", "Jordan"),
new Player("Scottie", "Pippen"),
new Player("Steve", "Kerr"));
StepVerifier.create(playerFlux)
.expectNextMatches(p -> playerList.contains(p))
.expectNextMatches(p -> playerList.contains(p))
.expectNextMatches(p -> playerList.contains(p))
.verifyComplete();
}
@Data
private static class Player {
private final String firstName;
private final String lastName;
}
}
package reactorfun;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
public class FluxBufferingTests {
@Test
public void buffer() {
Flux<String> fruitFlux = Flux.just(
"apple", "orange", "banana", "kiwi", "strawberry");
Flux<List<String>> bufferedFlux = fruitFlux.buffer(3);
StepVerifier
.create(bufferedFlux)
.expectNext(Arrays.asList("apple", "orange", "banana"))
.expectNext(Arrays.asList("kiwi", "strawberry"))
.verifyComplete();
}
@Test
public void bufferAndFlatMap() throws Exception {
Flux.just(
"apple", "orange", "banana", "kiwi", "strawberry")
.buffer(3)
.flatMap(x ->
Flux.fromIterable(x)
.map(y -> y.toUpperCase())
.subscribeOn(Schedulers.parallel())
.log()
//map的话返回的是flux<string>类型
//fluxmap返回string类型
).subscribe();
}
@Test
public void collectList() {
Flux<String> fruitFlux = Flux.just(
"apple", "orange", "banana", "kiwi", "strawberry");
Mono<List<String>> fruitListMono = fruitFlux.collectList();
StepVerifier
.create(fruitListMono)
.expectNext(Arrays.asList(
"apple", "orange", "banana", "kiwi", "strawberry"))
.verifyComplete();
}
@Test
public void collectMap() {
Flux<String> animalFlux = Flux.just(
"aardvark", "elephant", "koala", "eagle", "kangaroo");
Mono<Map<Character, String>> animalMapMono =
animalFlux.collectMap(a -> a.charAt(0));
StepVerifier
.create(animalMapMono)
.expectNextMatches(map -> {
return
map.size() == 3 &&
map.get('a').equals("aardvark") &&
map.get('e').equals("eagle") &&
map.get('k').equals("kangaroo");
})
.verifyComplete();
}
@Test
public void all() {
Flux<String> animalFlux = Flux.just(
"aardvark", "elephant", "koala", "eagle", "kangaroo");
Mono<Boolean> hasAMono = animalFlux.all(a -> a.contains("a"));
StepVerifier.create(hasAMono)
.expectNext(true)
.verifyComplete();
Mono<Boolean> hasKMono = animalFlux.all(a -> a.contains("k"));
StepVerifier.create(hasKMono)
.expectNext(false)
.verifyComplete();
}
@Test
public void any() {
Flux<String> animalFlux = Flux.just(
"aardvark", "elephant", "koala", "eagle", "kangaroo");
Mono<Boolean> hasAMono = animalFlux.any(a -> a.contains("a"));
StepVerifier.create(hasAMono)
.expectNext(true)
.verifyComplete();
Mono<Boolean> hasZMono = animalFlux.any(a -> a.contains("z"));
StepVerifier.create(hasZMono)
.expectNext(false)
.verifyComplete();
}
}
# 对流执行逻辑操作
- all,需要提供Predicate函数,注意返回类型Mono
- any,需要提供Predicate函数,注意返回类型Mono

# 缓冲
- buffer,缓冲数据,bufferAndFlatMap
- collectList,同:buffer不带参数则缓冲所有数据到列表
- collectMap,需要提供生成key的函数
