有两种编程模式:Imperative和Reactive。

Imperative有点像SIA5中提到的一次订阅一年报纸,但是要等到一年结束才能给一年的报纸的方法。

Reactive有点像现实中的报纸订阅,很多任务可以并行工作,每个任务都可以处理数据并把数据交给下一个任务

这一章要了解Reactor这个东西,因为Reactor是Spring 5 reactive programming的核心,也是创建reactive controller和repository的基础。

Spring Reactive Programming的理论

反应式编程和指令式编程各有不同的适应场合。

指令式编程就是老式的写程序,每一条指令让计算机完成一个任务。但是在高I/O操作的时候,这些任务花费的时间很大,即使是为这些高I/O的任务启动一些新线程,这些线程自己也会阻塞。管理大批阻塞的线程也非常麻烦。

反应式编程实际上是声明了一个数据流通的管道,管道中的数据甚至可以是无限的,不声明一步步的操作步骤,而是将数据视为流。

反应式流最初于2013年由Netflix,Light-bend和Pivotal(Spring的开发商)联合创建,目的是为了创建非阻塞的反压力的异步流数据处理的标准。

所谓反压力,指的是数据的消费者不会被海量数据所淹没导致无法处理,而是可以建立消费者消费数据的能力。

Java Stream和Reactive Stream听上去有点像,但其实完全不同。前者是同步的,本质上还是一步步进行工作,而后者对于任意类型的数据都提供了异步支持,包括无限的数据流动,实时处理数据。

Reactive Stream的规范可以归类为四个接口:

  1. Publisher发布者
  2. Subscriber订阅者
  3. Subscription订阅
  4. Processor处理器

Publisher生产数据,并发送给每个SubscriptionSubscriberPublisher接口有一个方法叫做subscribe(),通过这个方法,一个订阅者可以订阅这个发布者:

public interface Publisher<T> {
    void subscribe(Subscriber<? super T> subscriber);
}

这其中的泛型是具体消息的类型,由于订阅者的信息类型可能更加抽象,所以订阅者想要获取的信息类型是发布者的消息类型的相同类型或者父类,这样就肯定可以接受订阅者的消息。

订阅者订阅了发布者之后,就会从发布者那里接收事件,这些事件通过订阅者的接口里的方法来控制:

public interface Subscriber<T> {
    void onSubscribe(Subscription sub);
    void onNext(T item);
    void onError(Throwable ex);
    void onComplete();
}

订阅者接受的第一个事件就是这个订阅关系对象,当发布者调用订阅者的onSubscribe(Subscription sub)方法时,就会传入一个Subscription对象给订阅者。

订阅者通过订阅关系对象接口的方法来管理订阅关系:

public interface Subscription {
    void request(long n);
    void cancel();
}

订阅者可以调用订阅关系里的request(long n)向发布者请求数据,或者调用cancel()表示不再对该发布者感兴趣。

调用request(long n)的时候,订阅者会传入一个long类型参数,表示可以接受多少个数据对象,以防止发布者发来过多数据,这就是可以灵活定制反压力的地方。在发布者发来消息之后,可以重复调用这个方法继续请求数据。

一旦订阅者调用了request(long n),数据就开始在反应流里开始流动了。对于每个数据,通过调用onNext(T item)方法,这个数据就会到达订阅者。如果有异常,onError方法会被调用。

如果发布者没有更多要发送的数据,就会调用onComplete()来通知订阅者发送结束。

对于Processor来说,是一个订阅者和发布者的结合,像这样:

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}

这个接口先会作为订阅者接受数据然后处理,之后会作为一个发布者,将数据发布给它的订阅者。

这四个接口或者说是组件,就是用来创建一个数据流动的管道,管道从发布者开始,经过0个或者更多的处理器,最后将结果发送给最终的订阅者。(其实和第九章的管道组件的逻辑有点像,只不过组件更加少,也更清晰)。

Project Reactor就是将上边四个组件组成一个实际可用的管道的实现,所以是 Spring 5 反应式编程模型的核心,剩下的时间就来探索这个Reactor。

开始使用Reactor

Reactor实际上是projectreactor.io提供的库,所以需要先添加依赖,因为是测试项目就随便写点代码试验一下先要添加依赖:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-bom</artifactId>
            <version>Bismuth-RELEASE</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
    
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
</dependency>

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <scope>test</scope>
</dependency>

这实际也是Pivotal开发的库,能够和Spring很好的结合在一起,也可以单独使用,依赖安装的部分来自于:https://projectreactor.io/docs/core/release/reference/中的Maven Installation部分。

可以看到添加依赖后在外部库里增加了io.projectreactor:reactor-core和test两个包,说明添加依赖成功。

原书247页的图很好,其实我个人理解就是多个队列,处理完一个放到下一个队列里,没有的话就直接放到输出队列中..不知道底层是不是这样,因为都说和JS的异步一样,所以推测应该也是事件驱动的消息队列异步方式。

FluxMono是Reactor包提供的核心的用于创建管道的类,Flux可以表示一个管子里有0个,1个直到无限个数据,Mono表示管道里只有不多于1个的数据。

Flux 和 Mono 有500多个操作,可以分为四类:

  1. 创建操作 Creation
  2. 合并操作 Combination
  3. 转换操作 Transformation
  4. 逻辑操作 Logic

来看一下最常用的操作,剩下的具体用途就要看文档了。

创建反应式类型

很多时候如果使用了也采用反应式编程的程序,会提供建立好的数据流管道,但很多时候也需要自己创建一个。从不同的数据来源可以创建不同的管道。主要有如下几个方面:

  1. 从对象创建
  2. 从数组创建
  3. 从Iterable创建
  4. 创建连续的数字
  5. 创建interval数字

常见的是从一个或多个对象创建一个管道,管道用来传输对象数据。

public void createFlux_just() {
    Flux<String> fruitFlux = Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
    fruitFlux.subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) {
            System.out.println(s);
        }
    });
}

Flux的泛型就是管道中传输的数据类型。.just方法表示基于其中的参数内容创建管道。这里基于了连续的5个字符串类型的水果名称。

订阅对象必须是一个Consumer类,然后重写accept方法,用表示接收到数据如何处理。

可以编写对应的测试:

StepVerifier.create(fruitFlux)
    .expectNext("Apple")
    .expectNext("Orange")
    .expectNext("Grape")
    .expectNext("Banana")
    .expectNext("Strawberry")
    .verifyComplete();

还可以从数组创建Flux:

@Test
public void createFlux_just() {
    String[] fruits = new String[]{"Apple", "Orange", "Grape", "Banana", "Strawberry"};
    Flux<String> fruitFlux = Flux.fromArray(fruits);
    fruitFlux.subscribe(s -> System.out.println(s));
    
    StepVerifier.create(fruitFlux)
            .expectNext("Apple")
            .expectNext("Orange")
            .expectNext("Grape")
            .expectNext("Banana")
            .expectNext("Strawberry")
            .verifyComplete();

}

使用的方法是.fromArray,传入的Consumer对象改成lambda表示,依然可以通过测试。

既然可以从数组,能够想到,其实可以从所有的Iterable对象来创建Flux:

@Test
public void createFluxFromIterable() {
    ArrayList<String> fruits = new ArrayList<>();
    fruits.add("Apple");
    fruits.add("Orange");
    fruits.add("Grape");
    fruits.add("Banana");
    fruits.add("Strawberry");
    Flux<String> fruitFlux = Flux.fromIterable(fruits);
    fruitFlux.subscribe(s -> System.out.println(s));

    StepVerifier.create(fruitFlux)
            .expectNext("Apple")
            .expectNext("Orange")
            .expectNext("Grape")
            .expectNext("Banana")
            .expectNext("Strawberry")
            .verifyComplete();

}

使用的方法是fromIterable

还有的时候,可能不需要具体的数字,而是把Flux当成一个计数器,不断的生成新的数字,这个时候可以使用.range()方法:

@Test
public void createFluxFromRange() {
    Flux<Integer> intervalCount = Flux.range(1, 5);
    intervalCount.subscribe(s -> System.out.println(s));

    StepVerifier.create(intervalCount)
            .expectNext(1)
            .expectNext(2)
            .expectNext(3)
            .expectNext(4)
            .expectNext(5)
            .verifyComplete();

}

这会打印出1-5,这个区间范围包含两端的数据。与此类似,还可以创建间隔一定时间的数字:

@Test
public void createFluxOfInterval() {
    Flux<Long> intervalCount = Flux.interval(Duration.ofSeconds(1)).take(5);
    intervalCount.subscribe(s -> System.out.println(s));

    StepVerifier.create(intervalCount)
            .expectNext(1L)
            .expectNext(2L)
            .expectNext(3L)
            .expectNext(4L)
            .expectNext(5L)
            .verifyComplete();
}

这里先指定间隔时间,再指定数字,注意.take的参数是Long类型,故要相应修改代码。结果的区间也不再相同,而是0-4,所以这个测试会失败。

Combination

说是合并,其实这里涉及到将两个Flux合并以及将一个FLux或者Mono来拆分。有如下几种:

  1. 合并流输出结果
  2. 将两个Flux各一个结果打包成一个对象进行数据
  3. 将两个Flux的结果进行处理,以处理后的结果输出
  4. 选择速度较快的Flux

SIA5 253页的图很好的解释了合并的过程,将两个Flux的结果合并成一个结果流输出,需要使用.mergeWith()方法:

@Test
public void mergeFluxes() {
    Flux<String> names = Flux.just("jenny", "cony", "minko", "owl").delayElements(Duration.ofMillis(500));

    Flux<String> zi = Flux.just("master", "queen", "slave", "guardian").delaySubscription(Duration.ofMillis(250)).delayElements(Duration.ofMillis(500));

    Flux<String> mergedFlux = names.mergeWith(zi);

    mergedFlux.subscribe(s -> System.out.println(s));

    StepVerifier.create(mergedFlux)
            .expectNext("jenny")
            .expectNext("master")
            .expectNext("cony")
            .expectNext("queen")
            .expectNext("minko")
            .expectNext("slave")
            .expectNext("owl")
            .expectNext("guardian")
            .verifyComplete();
}

这里创建了两个Flux,delayElements表示第一个元素立刻投递,第二个元素开始间隔的时间。delaySubscription则表示从订阅开始延迟一定时间才向管道内放数据。

如果画一条时间线的话,第一个放到管道里的是”jenny”,之后250毫秒之后第二个Flux被订阅,然后”master”被放到管道里,之后是”cony”。。。

被merge的两个Flux,生产和投放数据的控制属性依然是其原来的,但是数据投放到了同一个输出的地方。

实际的顺序就像是测试结果里的一样。但是这里因为时间间隔设置的比较大,实际上并不能依赖于这种用时间代码来控制的排序。

如果要组合两个Flux,还有一种办法就是zip(),用zip()组合之后的Flux,必定会从两个Flux中各取一个,组装成一个Tuple2<?, ?>对象:

@Test
public void mergeFluxesToTuple() {
    Flux<String> names = Flux.just("jenny", "cony", "minko", "owl");

    Flux<String> zi = Flux.just("master", "queen", "slave", "guardian");

    Flux<Tuple2<String, String>> mergedFlux = names.zipWith(zi);

    mergedFlux.subscribe(s -> System.out.println(s));

    StepVerifier.create(mergedFlux)
            .expectNextMatches(p -> p.getT1().equals("jenny") && p.getT2().equals("master"))
            .expectNextMatches(p -> p.getT1().equals("cony") && p.getT2().equals("queen"))
            .expectNextMatches(p -> p.getT1().equals("minko") && p.getT2().equals("slave"))
            .expectNextMatches(p -> p.getT1().equals("owl") && p.getT2().equals("guardian"))
            .verifyComplete();
}

代码比较好懂,这里注意的是顺序,names.zipWith(zi)表示在测试的时候.getT1()拿到的是name,getT2对应是zi里的数据,如果写成zi.zipWith(names)测试就会失败。

除了上边的打包,还可以调用Flux.zip()方法传入一个接受两个参数的方法,然后就会把这个方法的返回值输出到流里,这也是一种combination。

@Test
public void mergeFluxesProcessed() {
    Flux<String> names = Flux.just("jenny", "cony", "minko", "owl");

    Flux<String> zi = Flux.just("master", "queen", "slave", "guardian");

    Flux<String> mergedFlux = Flux.zip(names, zi, (a, b) -> a + "|" + b);


    mergedFlux.subscribe(s -> System.out.println(s));

    StepVerifier.create(mergedFlux)
            .expectNext("jenny|master")
            .expectNext("cony|queen")
            .expectNext("minko|slave")
            .expectNext("owl|guardian")
            .verifyComplete();
}

还可以从两个Flux中选择一个来输出,使用.first()方法,就可以只选择那个被标记为“快速”的Flux进行输出:

@Test
public void mergeFluxesFirst() {
    Flux<String> names = Flux.just("jenny", "cony", "minko", "owl").delaySubscription(Duration.ofMillis(100));

    Flux<String> zi = Flux.just("master", "queen", "slave", "guardian");

    Flux<String> mergedFlux = Flux.first(names, zi);


    mergedFlux.subscribe(s -> System.out.println(s));

    StepVerifier.create(mergedFlux)
            .expectNext("master")
            .expectNext("queen")
            .expectNext("slave")
            .expectNext("guardian")
            .verifyComplete();
}

这里的第一个names设置了延迟100毫秒,就被认为是一个慢Flux,于是mergedFlux就只输出了“快”Flux,而完全忽略了慢Flux

Transformer和Filter

这里也有很多功能,先列表如下:

  1. 过滤信息-跳过一定个数 – .skip(n)
  2. 过滤信息-跳过一定时间 – .skip(time)
  3. 过滤信息-只取前几个数据 – .take(n)
  4. 过滤信息-只取指定时间之前的数据 – .take(time)
  5. 通用过滤器 – .filter()
  6. 去重 – .ditinct
  7. 转换数据类型 – .map
  8. 异步转换数据类型 – .flatmap
  9. 缓冲大小 – .buffer(n)
  10. 组成List – .buffer()无参
  11. 把Flux重组成一个Mono – .collectList()无参
  12. 把Flux重组成一个Mono – .collectMap(),传入一个方法用于生成key。

过滤信息的一个基础功能是直接忽略从头开始的指定数量的数据:

@Test
public void skipFlux() {
    Flux<String> names = Flux.just("jenny", "cony", "minko", "owl").skip(2);

    names.subscribe(s -> System.out.println(s));

    StepVerifier.create(names)
            .expectNext("minko")
            .expectNext("owl")
            .verifyComplete();
}

.skip(2)表示跳过最开头的2个数据,这里的链式调用之后产生的Flux并不是4个元素的Flux,而是一个新的只包含后边2个数据的Flux。

有时候可能不需要按个数,而是忽略从头开始一定时间的数据,到了一定时间后才开始输出数据,可以给.skip传入一个Duration对象:

@Test
public void skipTimeFlux() {
    Flux<String> names = Flux.just("jenny", "cony", "minko", "owl").delayElements(Duration.ofMillis(100)).skip(Duration.ofMillis(300));

    names.subscribe(s -> System.out.println(s));

    StepVerifier.create(names)
            .expectNext("minko")
            .expectNext("owl")
            .verifyComplete();
}

程序运行时会在100毫秒后投放”jenny”,200毫秒后投放”cony”,不忽略的时间从300毫秒开始,所以会显示出minko和owl。如果将延迟时间改成99毫秒,就只会显示owl了。

之前看过了intervaltake的连用,实际上.take(n)表示只取前几个数据:

@Test
public void takeFlux() {
    Flux<String> names = Flux.just("jenny", "cony", "minko", "owl").take(3);

    names.subscribe(s -> System.out.println(s));

    StepVerifier.create(names)
            .expectNext("jenny")
            .expectNext("cony")
            .expectNext("minko")
            .verifyComplete();
}

这就是仅取前三个数据的Flux。如果给take()传入时间对象,表示只取指定时间之前的结果:

@Test
public void takeTimeFlux() {
    Flux<String> names = Flux.just("jenny", "cony", "minko", "owl").delayElements(Duration.ofSeconds(1)).take(Duration.ofMillis(3500));

    names.subscribe(s -> System.out.println(s));

    StepVerifier.create(names)
            .expectNext("jenny")
            .expectNext("cony")
            .expectNext("minko")
            .verifyComplete();
}

第三秒结束的时候投放完三个数据,3.5秒之后的数据都不再获取,于是就只有前三个数据。当然,这里的时间片如果分的太短,测试不一定通过,比如把3500改成3001。

以上都是基于个数或者是时间的,还有通用的过滤器方法.filter(),只会过滤满足条件的数据:

@Test
public void filterFlux() {
    Flux<String> names = Flux.just("jenny", "cony", "minko", "owl", "gege", "dazhuan").filter(s -> s.length() == 4);

    names.subscribe(s -> System.out.println(s));

    StepVerifier.create(names)
            .expectNext("jenny")
            .expectNext("minko")
            .verifyComplete();
}

.filter()方法里传入一个返回值为boolean的方法,只要运算结果是true,这个数据就会出现在结果流里,是false则会被抛弃。

.distinct方法可以去掉一个Flux中重复发布的内容,只留下不重复的部分:

@Test
public void distinctFlux() {
    Flux<String> names = Flux.just("jenny", "cony", "minko", "owl", "gege", "dazhuan","owl","minko").distinct();

    names.subscribe(s -> System.out.println(s));

    StepVerifier.create(names)
            .expectNext("jenny")
            .expectNext("cony")
            .expectNext("minko")
            .expectNext("owl")
            .expectNext("gege")
            .expectNext("dazhuan")
            .verifyComplete();
}

后边的owl和minko会被忽略,只保留第一个不相同的数据对象。

之前都是各种filter,当然也可以对数据进行处理,转换成其他类型的数据,可以使用.map()方法,把一个Flux的数据转换成另一个Flux上的不同类型的数据或者处理过的数据:

@Test
public void mappedFlux() {
    Flux<Employee> names = Flux.just("jenny", "cony", "minko", "owl")
            .map(s -> new Employee(s, "", ""));

    names.subscribe(s -> System.out.println(s));

    StepVerifier.create(names)
            .expectNext(new Employee("jenny","",""))
            .expectNext(new Employee("cony","",""))
            .expectNext(new Employee("minko","",""))
            .expectNext(new Employee("owl","",""))
            .verifyComplete();
}

这个就是把一个字符串Flux转换成了一个Employee类型的Flux,注意开始指定Flux泛型的时候要注意与返回值一致。

注意,.map()方法是同步的。可以考虑使用异步的.flatmap()方法,这个方法实际上是利用一个中间Flu进行中转,所以可以实现异步效果:

@Test
public void flatmappedFlux() {
    Flux<Employee> names = Flux.just("jenny", "cony", "minko", "owl", "gege")
            .flatMap(s -> Mono.just(s).map(n -> new Employee(n, "", ""))).subscribeOn(Schedulers.parallel());
}

每一个字符串,实际上被.flatMap映射到了一个Mono对象上,这个Mono对象的内容来自于map的处理结果,然后会同时订阅这些Mono。

这里有一个新方法.subscribeOn,并不是订阅方法,而是一个表示如何通过并发手段来处理订阅的方法。这里加在最后,表示这些转换成Mono的工作和订阅Mono的工作有多线程并行执行。

Reactor并不强行指定具体的并发模型,而是通过.subscribeOn这个方法进行指定,常用的方法解释如下:

方法 解释
.immediate() 在当前线程中执行订阅
.single() 在单独的一个新线程中执行,所有调用都用这同一个线程
.newSingle() 每个订阅用一个新线程执行
.elastic() 用一个弹性线程池
.parallel() 用一个固定数量的线程池,一般等于CPU的核心数

使用.flatMap搭配subscribeOn

对于大量的数据,可以将其拆分成一个一个最大数量的包来进行处理,使用.buffer()方法:

@Test
public void bufferedFlux() {
    Flux<String> names = Flux.just("jenny", "cony", "minko", "owl", "gege");

    Flux<List<String>> buffered = names.buffer(2);

    StepVerifier.create(buffered)
            .expectNext(Arrays.asList("jenny", "cony"))
            .expectNext(Arrays.asList("minko", "owl"))
            .expectNext(Arrays.asList("gege"))
            .verifyComplete();
}

这里先创建一个字符串Flux,然后使用buffer的时候,可以发现泛型要修改为一个List泛型,buffer(2)表示返回最大长度为2的列表,如果装不满,列表中就只有部分元素。

这里要注意的是,组装List的过程也是同步的,如果要改成异步,可以搭配flatMap来使用:

@Test
public void bufferedFlatMapFlux() {
    Flux.just("jenny", "cony", "minko", "owl", "gege")
            .buffer(3).
            flatMap(x -> Flux.fromIterable(x)
                    .map(y -> y.toUpperCase())
                    .subscribeOn(Schedulers.parallel())
                    .log()
            ).subscribe();
}

这段程序的意思是,依然先创建了一个缓冲区大小为3的List<String>的Flux,但是对于其中每一个元素,创建了一个对应的Flux,然后将其字符串变成大写,使用并行处理,最后再一起订阅这些新生成的小的Flux。这里添加的.log()是可以打印出每次进行的步骤。

如果buffer()不添加参数,那就是将全部结果组装为一个List。这个代码就省略了。

由于全部结果就是一个List,所以可以用一个Mono对象来装,就可以使用.collectList()方法。

@Test
public void collectMonoFlux() {
    Flux<String> flux = Flux.just("jenny", "cony", "minko", "owl", "gege");
    Mono<List<String>> mono = flux.collectList();

    StepVerifier
            .create(mono)
            .expectNext(Arrays.asList("jenny", "cony", "minko", "owl", "gege"))
            .verifyComplete();
}

Flux<String>类型会变成一个Mono<List<String>>类型。

有趣的是,.collectMap()方法会把Flux组成一个Mono的Map对象,每一个对象的key可以用传入的方法来生成:

@Test
public void collectMapMonoFlux() {
    Flux<String> flux = Flux.just("jenny", "cony", "minko", "owl", "gege");
    Mono<Map<String, String>> mono = flux.collectMap(s -> s.substring(0, 2));

    StepVerifier
            .create(mono)
            .expectNextMatches(map->{
                return map.size() == 5 &&
                        map.get("je").equals("jenny") &&
                        map.get("co").equals("cony") &&
                        map.get("mi").equals("minko") &&
                        map.get("ow").equals("owl") &&
                        map.get("ge").equals("gege");})
            .verifyComplete();
}

将字符串Flux转换成了一个Mono,里边有Map对象,键为传入的函数生成的每个字符串的前两个字符,值为对应的值。

以上是常用的Transformers类型,有通用的,也有适合转换和封装的。

Logic

有时候仅仅只需要知道流内的数据对象是否满足某些条件,而不是进行某些复杂处理。可以使用all()any()方法来进行一些逻辑操作。

all()用来测试所有的数据是否满足某些条件。而any()用来测试所有的数据中至少有一个满足某些条件。

如果我们测试字符串里是否含有y和z:

@Test
public void collectMapMonoFlux() {
    Flux<String> flux = Flux.just("jenny", "cony", "minko", "owl", "gege");
    Mono<Boolean> hasY = flux.all(s -> s.contains("y"));

    StepVerifier.create(hasY).expectNext(false).verifyComplete();

    Mono<Boolean> hasZ = flux.any(s -> s.contains("y"));
    StepVerifier.create(hasZ).expectNext(true).verifyComplete();
}

第一个测试表示是否Flux中的每一个数据都包含”y”,结果显然是false,第二个测试表示至少有一个数据含有”y”,肯定为真了。

总结

其实Reactor和之前学习的管道有点像,都是一个抽象的数据流,只不过集成的过程更注重分发,而这个过程更注重不会阻塞。

这里的很多Operation其实是一个模式,即在程序中需要什么样的处理方式,就可以找找SIA5书中这些常用模型看看有没有对应的内容。再不行就要翻文档了。