现代的应用程序,如果可以说的上是一个不错的应用的话,几乎都要和其他程序进行交互,从其他程序获取数据或者发送数据给其他程序,无论是在互联网,还是在一个公司的内网上。

当然程序间的通信,需要事先协商好通信的协议,以让双方都能够理解。Spring Integration是一个Enterpirise Integration Patterns的实现

其中的每一个模式,都被实现成一个组件,这些组件共同组成一个管道进行发送和接收信息。使用Spring 的配置,可以很方便的将这些组件组装成一个数据流通的管道。

所以可以将所有这种与外部服务进行沟通的方式都视作一个管道,成为Integration Flow ,即一条数据的管道连接到其他程序,这就是整合其他服务的本质。

简单的文件系统整合流

大概文件系统是几乎所有应用程序都要与其交互的程序了,在Spring Integration中,有很多组件都是用来读和写文件的。

依然需要引入依赖,如下:

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.integration</groupId>
	<artifactId>spring-integration-file</artifactId>
</dependency>

第一个依赖是Spring Boot的 Integration依赖,这个依赖用于创建Spring Integration Flow。第二个是Spring Integration的文件端点模块,是Spring Integration 12个整合其他系统的模块之一。

在之后还会讨论这些模块,现在先要知道文件端点模块是用来和文件系统打交道的,用于从文件系统中读取数据或者向文件系统中写入数据。

创建一个消息网关

在实际创建这条管道之前,需要创建一个gateway interface 消息网关:

package cc.conyli.restlearn.integration;

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.file.FileHeaders;
import org.springframework.messaging.handler.annotation.Header;

@MessagingGateway(defaultRequestChannel = "textInChannel")
public interface FileWriterGateway {

    void writeToFile(@Header(FileHeaders.FILENAME) String filename, String data);
}

尽管代码很短,但是这几个注解有的解释一番:

  1. @MessagingGateway是用于在运行时生成这个接口的实现类的一个Bean,就像Spring Data在运行时自动生成神奇接口对应的实现类一样。在需要的地方进行依赖注入这个接口就可以了。
  2. (defaultRequestChannel = "textInChannel")是说对于writeToFile的调用,都往名称叫做textInChannel的管道里去放。
  3. writeToFile()这个方法,接受一个@Header注解的文件名,这里表示文件名的信息,实际上会被放到消息的头部中叫做file_name的键中,而不是放在报文(payload)中,后边的data表示需要写入到文件中的内容,这部分是放在消息的报文中的。

消息网关实际上就相当于我们这里的管道的端点,可以往里边扔数据和接收数据。有了端点之后,还需要定义管子是什么样的:

虽然有部分自动配置,但这里还是需要写一些配置以让管道满足应用的需求,有三种方法:

  1. XML配置
  2. Java配置
  3. 使用DSL的Java配置

XML配置

虽然不推荐用XML,但很多时候XML的语义相当清晰,还是值得一看。在resources目录下创建一个filewriterconfig.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-file="http://www.springframework.org/schema/integration/file"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/integration
        http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/integration/file
        http://www.springframework.org/schema/integration/file/spring-integration-file.xsd">

    <int:channel id="textInChannel"/>

    <int:transformer id="upperCase"
                     input-channel="textInChannel"
                     output-channel="fileWriterChannel"
                     expression="payload.toUpperCase()"/>

    <int:channel id="fileWriterChannel"/>

    <int-file:outbound-channel-adapter id="writer"
                                       channel="fileWriterChannel"
                                       directory="/tmp/sia5/files"
                                       mode="APPEND"
                                       append-new-line="true"/>
</beans>
  1. <int:channel id="textInChannel"/>定义了管道的名称,这个名称和消息网关里注解里的管道名称要相符,如果多个管道,名称不能相同。
  2. <int:transformer id="upperCase" ...的部分实际上是一个消息处理器,接受textInChannel传进来的消息,进行大写处理,然后输出给fileWriterChannel
  3. <int:channel id="fileWriterChannel"/>又定义了一个叫做fileWriterChannel的管道,就是上一个配置里接受输出的管道。
  4. <int-file:outbound-channel-adapter id="writer" ...是对刚才的fileWriterChannel管道的详细配置,注意这里的命名空间换成了int-file,这是Spring Integration规定的写文件的管道命名空间。其中定义了管道名称,然后设置了directory,会在这个目录下,按照消息头部的文件名,将消息报文中的数据写入这个文件里。modeappend-new-line的设置表示如果文件存在,则开始新行并向文件的尾部追加。

可以把一个int看做一段管道,XML文件中的配置是 一段管道-对应的配置 这样成对出现,每一段管道还可以指定来源和输出,最后实际上是组装了如下图一样的一个管道:

FileWriterChannel

要使XML配置生效,对于Spring Boot来说,只要随便创建一个空的配置类,然后加入@ImportResource注解,在config目录下创建配置类:

package cc.conyli.restlearn.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.ImportResource;

@Configuration
@ImportResource(locations = "classpath:/filewriterconfig.xml")
public class FileWriterXMLConfig {
}

实际这个时候就可以编写控制器和页面来测试了。自行编写了一个/write路径的控制器,用于接收/write?data=xxx的数据,在前端页面用vue可以绑定input与值,然后动态拼接链接去访问。

经过试验,发现每次发送请求,都可以写入一行大写字母到程序所在的硬盘的根目录\tmp\sia5\files下的自定义文件名的文件中。还可以发现写入一个字符的时候文件是三字节,说明是UTF-8格式,非常方便。

这里IDE会提示,找不到FileWriterGateway类型可用的Bean,但实际上无需在接口上添加@Component,也无需理会IDE,运行时一切正常。

接下来可以去掉配置类上的注解,使用配置类来编写Java配置。

Java配置

熟悉XML的配置的话,其实知道刚才的配置里把每一段管道配置为一个Bean。在Java配置里,也需要配置这两段管道的Bean:

package cc.conyli.restlearn.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.annotation.Transformer;
import org.springframework.integration.file.FileWritingMessageHandler;
import org.springframework.integration.file.support.FileExistsMode;
import org.springframework.integration.transformer.GenericTransformer;

import java.io.File;

@Configuration
public class FileWriterJavaConfig {

    @Bean
    @Transformer(inputChannel = "textInChannel", outputChannel = "fileWriterChannel")
    public GenericTransformer<String, String> upperCaseTransformer() {
        return text -> text.toUpperCase();
    }

    @Bean
    @ServiceActivator(inputChannel = "fileWriterChannel")
    public FileWritingMessageHandler fileWriter() {
        FileWritingMessageHandler handler = new FileWritingMessageHandler(new File("/tmp/sia5/files"));
        handler.setExpectReply(false);
        handler.setFileExistsMode(FileExistsMode.APPEND);
        handler.setAppendNewLine(true);
        return handler;
    }
}

这里定义了两个Bean:

第一个Bean定义了一个Transformer,GenericTransformer是一个接口,因此靠lambda方式传入了一个实现类,用于转换大小写。

第二个Bean的注解ServiceActivator表示这个Bean会接受从fileWriterChannel传入的数据,然后将数据交给FileWritingMessageHandler进行操作。Bean内部的配置显然一看就明白。其中有一个新增的就是handler.setExpectReply(false);,表示无需返回一个响应。由于我们没有接收端,如果这里设置为true,每次会收到一个错误信息。

这里一个很明显的与XML不同的是,没有显式的定义管道的名称,这是因为在注解中指定了管道的名称,如果管道存在就会使用对应的管道,如果不存在就会自动创建。如果想要进一步自定义管道,可以使用与管道名同名的方法,通过返回MessageChannel对象来创建对应的Bean:

@Bean
public MessageChannel textInChannel() {
    return new DirectChannel();
}
...
@Bean
public MessageChannel fileWriterChannel() {
    return new DirectChannel();
}

可以看到,Java配置的逻辑和XML是相反的,XML基于管道,给管道附加上功能。Java配置基于功能,给功能指定使用的管道。

再次启动程序,可以发现依然可以写文件了。

Spring Integration DSL

DSL是 Domain Specific Language 的缩写,中文翻译为领域特定语言,还可以用Spring Integration的特定方法来进行配置,用链式调用的方式,这个先看看如何写配置类吧:

package cc.conyli.restlearn.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.file.dsl.Files;
import org.springframework.integration.file.support.FileExistsMode;

import java.io.File;

@Configuration
public class FileWriterDSLConfig {

    @Bean
    public IntegrationFlow fileWriterFlow() {
        return IntegrationFlows.from(MessageChannels.direct("textInChannel"))
                .<String, String>transform(t -> t.toUpperCase())
                .handle(Files.outboundAdapter(new File("/tmp/sia5/files"))
                        .fileExistsMode(FileExistsMode.APPEND)
                        .appendNewLine(true))
                .get();
    }
}

这段代码简洁不少,IntegrationFlows用于初始化一个流对象,然后在后边以链式调用的方式进行各种配置,最后用get()生成配置好的IntegrationFlow对象。

相比Java配置,连文件输出的管道都无需显式生面了,只要直接给一个文件绑定的适配器进行写入就行了。当然,这个DSL与普通的Java配置相比不容易掌握,需要专门研究。

如果一定要显式配置第二段管道的名称,可以修改如下:

@Bean
    public IntegrationFlow fileWriterFlow() {
        return IntegrationFlows
                .from(MessageChannels.direct("textInChannel"))
                .<String, String>transform(t -> t.toUpperCase())
                .channel(MessageChannels.direct("fileWriterChannel"))
                .handle(Files.outboundAdapter(new File("/tmp/sia5/files"))
                        .fileExistsMode(FileExistsMode.APPEND)
                        .appendNewLine(true))
                .get();
    }

现在就用三种方式都配置完成了一个整合了文件系统操作的应用。

Spring Integration 管道组件

之前用了两个管道组件和一个Transformer组件整合了一条通往文件系统的数据管道。现在来看一看整个家族里还有那些组件加在管道上。

  1. Channels,把消息从一个元素发到另一个元素
  2. Filters,有条件的允许消息通过
  3. Transformers,根据设置好的条件修改消息
  4. Routers,通常根据消息的头部信息,将消息发到不同的channel里
  5. Splitters,将消息拆成两个或者更多,发送到不同的channel里
  6. Aggregators,与Splitters相反,将来自不同channel的消息组装成一个消息
  7. Service activators,将消息交给一个Java方法进行处理,然后把结果输出到channel中
  8. Channel adapters,将管道连接到外部系统,可以接受或者发送数据
  9. Gateways,将消息发送到一个整合流中

可以把channel想象成是管道,剩下的每一个都是一个处理节点,然后共同构成一个流,信息从一段扔进去,经过各种处理,从另外一端出来。这其中的每一个东西,在Spring里边都会被做成一个Bean。

Message Channels

Spring Integration提供了几种实现:

  1. PublishSubscribeChannel,发布订阅模式管道,将消息发给一个或者多个组件
  2. QueueChannel,队列模式,发布的消息会存在一个队列里,直到有人来拿走,如果有多个消费者,只有一个消费者可以拿到某一个数据
  3. PriorityChannel,优先级管道,和队列模式相似,区别是依据消息头部的优先级来决定哪些消息先被送出去
  4. RendezvousChannel,与队列模式相似,但是发送者会阻塞发送直到消息被拿走,实际是一个同步的方式,有一个消息就必须被消费掉才能发下一个
  5. DirectChannel,像发布订阅管道,但是会发送一个单独的消息给在同一个线程里的消费者,可以让数据传输跨channel(没看懂)
  6. ExecutorChannel,像DirectChannel,但是消息由一个TaskExecutor分发。
  7. FluxMessageChannel,一个基于Project Reactor’s Flux的响应式管道,将在第十章来讲Spring 5新增的Reactive系列玩意。

如果不做任何设置,就像刚才我们直接配置管道的话,默认生成的管道都是DirectChannel类型。如果想使用不同的实现,必须在Bean里显式指定返回的具体类型:

@Bean
public MessageChannel orderChannel() {
    return new PublishSubscribeChannel();
}

方法的名称就是管道的名称。对于input来说,是不区分管道类型的。

这里要特别注意的是,如果是QueueChannel类型,在使用@ServiceActivator注解的时候,必须加上一个poller属性:

@ServiceActivator(inputChannel="orderChannel", poller=@Poller(fixedRate="1000"))

这表示轮询的频率是每秒一次。

Filters

Filter对应的Bean的方法必须返回一个布尔值,其中传入要处理的值,如果返回true,就会通过;返回false,就会阻止该消息。

@Filter(inputChannel = "firstChannel", outputChannel = "textInChannel")
public boolean isContentH(String string) {
    return string.contains("H");
}

给原来的程序加上一个新的管道,进来的叫做firstChannel,出去的叫做textInChannel,然后把Gateway中的管道改成firstChannel,如此修改之后,只有包含有大写字母H的内容才会被写入文件。

DSL的了解一下即可:

.<Integer>filter((p) -> p % 2 == 0)

Transformers

这是一个对消息进行处理的组件,实际上处理之后的消息已经不是原来的消息,而是原来消息的处理结果。所以业务逻辑可以很简单,比如我们编写的基于原来消息把小写改成大写;也可能是很复杂的业务逻辑,比如根据字符串去查找对应的其他数据,然后再返回找到的数据中的一个字段,处理后的消息和原来的消息差异就很大。

GenericTransformer的泛型第一个指的是传入的类型,第二个指的是处理之后传出的类型。注意GenericTransformer只是一个接口,所以需要返回匿名对象或者lambda表达式。

Routers

Routers用于把一个消息根据条件发送到不同的管道中。

按照SIA5的例子,把奇数和偶数发送到不同的管道中,可以编写如下的代码:

@Bean
@Router(inputChannel = "numberChannel")
public AbstractMessageRouter evenOddRouter() {
    return new AbstractMessageRouter() {
        @Override
        protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
            Integer number = (Integer) message.getPayload();
            if (number % 2 == 0) {
                return Collections.singleton(evenChannel());
            }
            return Collections.singleton(oddChannel());
        }
    };
}

@Bean
public MessageChannel evenChannel() {
    return new DirectChannel();
}

@Bean
public MessageChannel oddChannel() {
    return new DirectChannel();
}

所有的Routers需要返回AbstractMessageRouter的对象,其中需要重写determineTargetChannels方法,从消息中取出数字,然后进行判断。

返回值则是所有管道集合中取单独的一个渠道(在下边被声明为一个Bean)。这只是一个示例,估计具体使用还是得看文档。

Splitters

将一个消息分割成多个消息,发给不同的Channel。这个组件用途很广泛,比如:

  1. 一个信息体中是一个相同类型数据的集合,将其分拆成多个单独的数据,或者只想处理其中的一个。
  2. 一个信息体中包含多个相关的信息,但是可以拆分成不同的不同类型的对象。

使用这个组件的时候,一般需要针对要分拆的消息定义好一个POJO,然后还需要定义拆分出来的POJO,然后编写代码,先分拆,将分拆后的东西都扔到一个管道里,然后在接上一个Router,根据消息类型来分发即可。

如果消息包含的是一个相同类型的数据集合,可以直接这么写:

@Splitter(inputChannel="lineItemsChannel", outputChannel="lineItemChannel")
public List<LineItem> lineItemSplitter(List<LineItem> lineItems) {
    return lineItems;
}

这一块东西SIA5介绍的比较少,还需要自己在用到的时候钻研了。

Service activators

Service activators接受一个消息,然后将其委托给MessageHandler的一个实现类去进行处理,把处理后的结果继续往下传输,也可以结束数据流通。

虽然Spring对此也有实现,但肯定看编写自定义的处理方法,否则没有什么实际用处。

@Bean
@ServiceActivator(inputChannel="someChannel")
public MessageHandler sysoutHandler() {
    return message -> {
        System.out.println("Message payload: " + message.getPayload());
    };
}

这个方法只有一个inputChannel,和我们编写的写入文件的方法很类似,但是是以打印结束,没有继续流通的管道。

如果要让数据在处理后继续流动,就必须使用GenericHandler

@Bean
@ServiceActivator(inputChannel="orderChannel", outputChannel="completeOrder")
public GenericHandler<Order> orderHandler(OrderRepository orderRepo) {
    return (payload, headers) -> {
        return orderRepo.save(payload);
    };
}

这段代码是根据header信息来保存payload中的信息,返回的结果是一个数据对象,这个对象就会被继续扔到completeOrder管道里

Gateway

可以把Gateway认为是应用程序开始往管道里扔数据的地方。我们已经用过了FileWriterGateway这个神奇接口,这个接口是一个单向的Gateway,只有一个接受字符串然后写入的方法,返回void。

当然也有双向的Gateway,让其方法返回值即可,比如:

@Component
@MessagingGateway(defaultRequestChannel="inChannel", defaultReplyChannel="outChannel")
public interface UpperCaseGateway {
    String uppercase(String in);
}

同样,这个Gateway也无需编写实现类。当数据到达注解中的outChannel的时候,数据就会从uppercase方法中返回。

Channel adapters

Channel adapters是整合流开始和结束的地方。数据通过一个inbound adapter进入流,从一个outbound adapter离开流。

inbound流的具体形式可以有很多,根据具体的数据形式而定,比如:

@Bean
@InboundChannelAdapter(poller=@Poller(fixedRate="1000"), channel="numberChannel")
public MessageSource<Integer> numberSource(AtomicInteger source) {
    return () -> {
        return new GenericMessage<>(source.getAndIncrement());
    };
}

这个Bean使用了@InboundChannelAdapter注解,表示这是一个起点,管道无需规定input和output,因为就是起点。

轮询时间设置为1秒,有一个依赖注入是AtomicInteger,这个类的意思是,每一秒钟,传入AtomicInteger的一个新增的数字。

这个类要求返回的对象是MessageSource的泛型,可以用匿名类也可以用lambda表达式。

很多Channel Adapters都由Spring Integration的很多端点模块提供,比如有FileReadingMessageSource类等。

Endpoint modules

Spring Integration提供的端点模块相当于预先写好的针对某种外部服务的端点模块,当然也可以自定义,但是内置模块基本上已经涵盖了大部分的内容。

从前边可以知道,实际上端点模块提的都是adapter。

所有端点的Group ID都是org.springframework.integration,端点列表可以查看原书230-231页,基本上涵盖了各种内容。

用一个整合邮件功能的流来做个例子。

整合邮件流

原书的例子是让Taco cloud可以通过邮件来下订单,本质就是不断检测新邮件,然后解析邮件中的内容,然后根据内容生成订单。

用整合流的术语来描述,就是需要一个email端点的inbound adapter,然后flow中的处理器解析邮件内容,生成订单对象,然后把订单对象交给Taco Cloud的REST API即可。

这里可以模仿这个程序,写一个解析学生对象的程序,然后调用我们的REST API去添加一个学生。

先添加依赖:

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mail</artifactId>
</dependency>

首先需要针对电子邮件创建一个配置属性类,用于生成IMAP的url地址,用于接收邮件:

package cc.conyli.restlearn.entity;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Data
@ConfigurationProperties(prefix = "mymock.email")
@Component
public class EmailProperties {
    private String username;
    private String password;
    private String host;
    private String mailbox;
    private long pollRate = 30000;

    public String getImapUrl() {
        return String.format("imaps://%s:%s@%s/%s", this.username, this.password, this.host, this.mailbox);
    }
}

首先要注意的是@ConfigurationProperties(prefix = "mymock.email"),这表示这个类是一个属性类,使用这个注解首先需要添加如下依赖来开启自定义注解声明属性:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-configuration-processor</artifactId>
    <optional>true</optional>
</dependency>

prefix中的前缀表示这个类对应application.properties中的配置以mymock.email开始。

之后就可以配置邮箱连接的具体属性:

mymock.email.host=imap.vip.sina.com
mymock.email.mailbox=""
mymock.email.username=lee0709@vip.sina.com
mymock.email.password=********
mymock.email.poll-rate=30000

这里邮箱名到新浪邮箱看了,没有给出,先空着,不知道能不能连接成功。

之后我们要创建整合流,这个流是这样的:

Email inbound adapter –> 管道 –> 邮件转换成数据对象的Transformer –> 管道 –> 提交数据到REST API的outbound adapter

如何创建这个流,一般有两种做法:

  1. 将流创建在当前应用的内部
  2. 将流创建为一个独立的应用

由于这个流要使用Student对象,比较方便的做法是创建在当前应用的内部。然而创建为一个独立的应用也有好处,方便调试,更加灵活,耦合程度低。

创建流可以使用XML,Java或者DSL,现在用DSL来创建一下流:

后边的源代码基本看懂了,就是组装Transformer和其他的内容。待我后边详细看看如何自己写一个Email程序。

使用Java发送和接收邮件的相关内容也要看看,不过不管怎样,基本上对于Spring Integration知道是怎么回事了。