现代的应用程序,如果可以说的上是一个不错的应用的话,几乎都要和其他程序进行交互,从其他程序获取数据或者发送数据给其他程序,无论是在互联网,还是在一个公司的内网上。
当然程序间的通信,需要事先协商好通信的协议,以让双方都能够理解。Spring Integration是一个Enterpirise Integration Patterns
的实现
其中的每一个模式,都被实现成一个组件,这些组件共同组成一个管道进行发送和接收信息。使用Spring 的配置,可以很方便的将这些组件组装成一个数据流通的管道。
所以可以将所有这种与外部服务进行沟通的方式都视作一个管道,成为Integration Flow ,即一条数据的管道连接到其他程序,这就是整合其他服务的本质。
简单的文件系统整合流
大概文件系统是几乎所有应用程序都要与其交互的程序了,在Spring Integration中,有很多组件都是用来读和写文件的。
依然需要引入依赖,如下:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-file</artifactId> </dependency>
第一个依赖是Spring Boot的 Integration依赖,这个依赖用于创建Spring Integration Flow。第二个是Spring Integration的文件端点模块,是Spring Integration 12个整合其他系统的模块之一。
在之后还会讨论这些模块,现在先要知道文件端点模块是用来和文件系统打交道的,用于从文件系统中读取数据或者向文件系统中写入数据。
创建一个消息网关
在实际创建这条管道之前,需要创建一个gateway interface 消息网关:
package cc.conyli.restlearn.integration; import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.file.FileHeaders; import org.springframework.messaging.handler.annotation.Header; @MessagingGateway(defaultRequestChannel = "textInChannel") public interface FileWriterGateway { void writeToFile(@Header(FileHeaders.FILENAME) String filename, String data); }
尽管代码很短,但是这几个注解有的解释一番:
@MessagingGateway
是用于在运行时生成这个接口的实现类的一个Bean,就像Spring Data在运行时自动生成神奇接口对应的实现类一样。在需要的地方进行依赖注入这个接口就可以了。(defaultRequestChannel = "textInChannel")
是说对于writeToFile的调用,都往名称叫做textInChannel的管道里去放。writeToFile()
这个方法,接受一个@Header注解的文件名,这里表示文件名的信息,实际上会被放到消息的头部中叫做file_name的键中,而不是放在报文(payload)中,后边的data表示需要写入到文件中的内容,这部分是放在消息的报文中的。
消息网关实际上就相当于我们这里的管道的端点,可以往里边扔数据和接收数据。有了端点之后,还需要定义管子是什么样的:
虽然有部分自动配置,但这里还是需要写一些配置以让管道满足应用的需求,有三种方法:
- XML配置
- Java配置
- 使用DSL的Java配置
XML配置
虽然不推荐用XML,但很多时候XML的语义相当清晰,还是值得一看。在resources目录下创建一个filewriterconfig.xml:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-file="http://www.springframework.org/schema/integration/file" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd"> <int:channel id="textInChannel"/> <int:transformer id="upperCase" input-channel="textInChannel" output-channel="fileWriterChannel" expression="payload.toUpperCase()"/> <int:channel id="fileWriterChannel"/> <int-file:outbound-channel-adapter id="writer" channel="fileWriterChannel" directory="/tmp/sia5/files" mode="APPEND" append-new-line="true"/> </beans>
<int:channel id="textInChannel"/>
定义了管道的名称,这个名称和消息网关里注解里的管道名称要相符,如果多个管道,名称不能相同。<int:transformer id="upperCase" ...
的部分实际上是一个消息处理器,接受textInChannel
传进来的消息,进行大写处理,然后输出给fileWriterChannel
<int:channel id="fileWriterChannel"/>
又定义了一个叫做fileWriterChannel
的管道,就是上一个配置里接受输出的管道。<int-file:outbound-channel-adapter id="writer" ...
是对刚才的fileWriterChannel
管道的详细配置,注意这里的命名空间换成了int-file
,这是Spring Integration规定的写文件的管道命名空间。其中定义了管道名称,然后设置了directory
,会在这个目录下,按照消息头部的文件名,将消息报文中的数据写入这个文件里。mode
和append-new-line
的设置表示如果文件存在,则开始新行并向文件的尾部追加。
可以把一个int看做一段管道,XML文件中的配置是 一段管道-对应的配置 这样成对出现,每一段管道还可以指定来源和输出,最后实际上是组装了如下图一样的一个管道:
要使XML配置生效,对于Spring Boot来说,只要随便创建一个空的配置类,然后加入@ImportResource
注解,在config目录下创建配置类:
package cc.conyli.restlearn.config; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.ImportResource; @Configuration @ImportResource(locations = "classpath:/filewriterconfig.xml") public class FileWriterXMLConfig { }
实际这个时候就可以编写控制器和页面来测试了。自行编写了一个/write路径的控制器,用于接收/write?data=xxx的数据,在前端页面用vue可以绑定input与值,然后动态拼接链接去访问。
经过试验,发现每次发送请求,都可以写入一行大写字母到程序所在的硬盘的根目录\tmp\sia5\files下的自定义文件名的文件中。还可以发现写入一个字符的时候文件是三字节,说明是UTF-8格式,非常方便。
这里IDE会提示,找不到FileWriterGateway
类型可用的Bean,但实际上无需在接口上添加@Component
,也无需理会IDE,运行时一切正常。
接下来可以去掉配置类上的注解,使用配置类来编写Java配置。
Java配置
熟悉XML的配置的话,其实知道刚才的配置里把每一段管道配置为一个Bean。在Java配置里,也需要配置这两段管道的Bean:
package cc.conyli.restlearn.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.annotation.Transformer; import org.springframework.integration.file.FileWritingMessageHandler; import org.springframework.integration.file.support.FileExistsMode; import org.springframework.integration.transformer.GenericTransformer; import java.io.File; @Configuration public class FileWriterJavaConfig { @Bean @Transformer(inputChannel = "textInChannel", outputChannel = "fileWriterChannel") public GenericTransformer<String, String> upperCaseTransformer() { return text -> text.toUpperCase(); } @Bean @ServiceActivator(inputChannel = "fileWriterChannel") public FileWritingMessageHandler fileWriter() { FileWritingMessageHandler handler = new FileWritingMessageHandler(new File("/tmp/sia5/files")); handler.setExpectReply(false); handler.setFileExistsMode(FileExistsMode.APPEND); handler.setAppendNewLine(true); return handler; } }
这里定义了两个Bean:
第一个Bean定义了一个Transformer,GenericTransformer是一个接口,因此靠lambda方式传入了一个实现类,用于转换大小写。
第二个Bean的注解ServiceActivator
表示这个Bean会接受从fileWriterChannel传入的数据,然后将数据交给FileWritingMessageHandler进行操作。Bean内部的配置显然一看就明白。其中有一个新增的就是handler.setExpectReply(false);
,表示无需返回一个响应。由于我们没有接收端,如果这里设置为true,每次会收到一个错误信息。
这里一个很明显的与XML不同的是,没有显式的定义管道的名称,这是因为在注解中指定了管道的名称,如果管道存在就会使用对应的管道,如果不存在就会自动创建。如果想要进一步自定义管道,可以使用与管道名同名的方法,通过返回MessageChannel对象来创建对应的Bean:
@Bean public MessageChannel textInChannel() { return new DirectChannel(); } ... @Bean public MessageChannel fileWriterChannel() { return new DirectChannel(); }
可以看到,Java配置的逻辑和XML是相反的,XML基于管道,给管道附加上功能。Java配置基于功能,给功能指定使用的管道。
再次启动程序,可以发现依然可以写文件了。
Spring Integration DSL
DSL是 Domain Specific Language 的缩写,中文翻译为领域特定语言,还可以用Spring Integration的特定方法来进行配置,用链式调用的方式,这个先看看如何写配置类吧:
package cc.conyli.restlearn.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.dsl.MessageChannels; import org.springframework.integration.file.dsl.Files; import org.springframework.integration.file.support.FileExistsMode; import java.io.File; @Configuration public class FileWriterDSLConfig { @Bean public IntegrationFlow fileWriterFlow() { return IntegrationFlows.from(MessageChannels.direct("textInChannel")) .<String, String>transform(t -> t.toUpperCase()) .handle(Files.outboundAdapter(new File("/tmp/sia5/files")) .fileExistsMode(FileExistsMode.APPEND) .appendNewLine(true)) .get(); } }
这段代码简洁不少,IntegrationFlows
用于初始化一个流对象,然后在后边以链式调用的方式进行各种配置,最后用get()生成配置好的IntegrationFlow
对象。
相比Java配置,连文件输出的管道都无需显式生面了,只要直接给一个文件绑定的适配器进行写入就行了。当然,这个DSL与普通的Java配置相比不容易掌握,需要专门研究。
如果一定要显式配置第二段管道的名称,可以修改如下:
@Bean
public IntegrationFlow fileWriterFlow() {
return IntegrationFlows
.from(MessageChannels.direct("textInChannel"))
.<String, String>transform(t -> t.toUpperCase())
.channel(MessageChannels.direct("fileWriterChannel"))
.handle(Files.outboundAdapter(new File("/tmp/sia5/files"))
.fileExistsMode(FileExistsMode.APPEND)
.appendNewLine(true))
.get();
}
现在就用三种方式都配置完成了一个整合了文件系统操作的应用。
Spring Integration 管道组件
之前用了两个管道组件和一个Transformer组件整合了一条通往文件系统的数据管道。现在来看一看整个家族里还有那些组件加在管道上。
Channels
,把消息从一个元素发到另一个元素Filters
,有条件的允许消息通过Transformers
,根据设置好的条件修改消息Routers
,通常根据消息的头部信息,将消息发到不同的channel里Splitters
,将消息拆成两个或者更多,发送到不同的channel里Aggregators
,与Splitters相反,将来自不同channel的消息组装成一个消息Service activators
,将消息交给一个Java方法进行处理,然后把结果输出到channel中Channel adapters
,将管道连接到外部系统,可以接受或者发送数据Gateways
,将消息发送到一个整合流中
可以把channel想象成是管道,剩下的每一个都是一个处理节点,然后共同构成一个流,信息从一段扔进去,经过各种处理,从另外一端出来。这其中的每一个东西,在Spring里边都会被做成一个Bean。
Message Channels
Spring Integration提供了几种实现:
PublishSubscribeChannel
,发布订阅模式管道,将消息发给一个或者多个组件QueueChannel
,队列模式,发布的消息会存在一个队列里,直到有人来拿走,如果有多个消费者,只有一个消费者可以拿到某一个数据PriorityChannel
,优先级管道,和队列模式相似,区别是依据消息头部的优先级来决定哪些消息先被送出去RendezvousChannel
,与队列模式相似,但是发送者会阻塞发送直到消息被拿走,实际是一个同步的方式,有一个消息就必须被消费掉才能发下一个DirectChannel
,像发布订阅管道,但是会发送一个单独的消息给在同一个线程里的消费者,可以让数据传输跨channel(没看懂)ExecutorChannel
,像DirectChannel,但是消息由一个TaskExecutor分发。FluxMessageChannel
,一个基于Project Reactor’s Flux的响应式管道,将在第十章来讲Spring 5新增的Reactive系列玩意。
如果不做任何设置,就像刚才我们直接配置管道的话,默认生成的管道都是DirectChannel
类型。如果想使用不同的实现,必须在Bean里显式指定返回的具体类型:
@Bean public MessageChannel orderChannel() { return new PublishSubscribeChannel(); }
方法的名称就是管道的名称。对于input来说,是不区分管道类型的。
这里要特别注意的是,如果是QueueChannel
类型,在使用@ServiceActivator
注解的时候,必须加上一个poller
属性:
@ServiceActivator(inputChannel="orderChannel", poller=@Poller(fixedRate="1000"))
这表示轮询的频率是每秒一次。
Filters
Filter对应的Bean的方法必须返回一个布尔值,其中传入要处理的值,如果返回true,就会通过;返回false,就会阻止该消息。
@Filter(inputChannel = "firstChannel", outputChannel = "textInChannel") public boolean isContentH(String string) { return string.contains("H"); }
给原来的程序加上一个新的管道,进来的叫做firstChannel
,出去的叫做textInChannel
,然后把Gateway中的管道改成firstChannel
,如此修改之后,只有包含有大写字母H的内容才会被写入文件。
DSL的了解一下即可:
.<Integer>filter((p) -> p % 2 == 0)
Transformers
这是一个对消息进行处理的组件,实际上处理之后的消息已经不是原来的消息,而是原来消息的处理结果。所以业务逻辑可以很简单,比如我们编写的基于原来消息把小写改成大写;也可能是很复杂的业务逻辑,比如根据字符串去查找对应的其他数据,然后再返回找到的数据中的一个字段,处理后的消息和原来的消息差异就很大。
GenericTransformer的泛型第一个指的是传入的类型,第二个指的是处理之后传出的类型。注意GenericTransformer只是一个接口,所以需要返回匿名对象或者lambda表达式。
Routers
Routers用于把一个消息根据条件发送到不同的管道中。
按照SIA5的例子,把奇数和偶数发送到不同的管道中,可以编写如下的代码:
@Bean @Router(inputChannel = "numberChannel") public AbstractMessageRouter evenOddRouter() { return new AbstractMessageRouter() { @Override protected Collection<MessageChannel> determineTargetChannels(Message<?> message) { Integer number = (Integer) message.getPayload(); if (number % 2 == 0) { return Collections.singleton(evenChannel()); } return Collections.singleton(oddChannel()); } }; } @Bean public MessageChannel evenChannel() { return new DirectChannel(); } @Bean public MessageChannel oddChannel() { return new DirectChannel(); }
所有的Routers需要返回AbstractMessageRouter
的对象,其中需要重写determineTargetChannels
方法,从消息中取出数字,然后进行判断。
返回值则是所有管道集合中取单独的一个渠道(在下边被声明为一个Bean)。这只是一个示例,估计具体使用还是得看文档。
Splitters
将一个消息分割成多个消息,发给不同的Channel。这个组件用途很广泛,比如:
- 一个信息体中是一个相同类型数据的集合,将其分拆成多个单独的数据,或者只想处理其中的一个。
- 一个信息体中包含多个相关的信息,但是可以拆分成不同的不同类型的对象。
使用这个组件的时候,一般需要针对要分拆的消息定义好一个POJO,然后还需要定义拆分出来的POJO,然后编写代码,先分拆,将分拆后的东西都扔到一个管道里,然后在接上一个Router,根据消息类型来分发即可。
如果消息包含的是一个相同类型的数据集合,可以直接这么写:
@Splitter(inputChannel="lineItemsChannel", outputChannel="lineItemChannel") public List<LineItem> lineItemSplitter(List<LineItem> lineItems) { return lineItems; }
这一块东西SIA5介绍的比较少,还需要自己在用到的时候钻研了。
Service activators
Service activators接受一个消息,然后将其委托给MessageHandler的一个实现类去进行处理,把处理后的结果继续往下传输,也可以结束数据流通。
虽然Spring对此也有实现,但肯定看编写自定义的处理方法,否则没有什么实际用处。
@Bean @ServiceActivator(inputChannel="someChannel") public MessageHandler sysoutHandler() { return message -> { System.out.println("Message payload: " + message.getPayload()); }; }
这个方法只有一个inputChannel,和我们编写的写入文件的方法很类似,但是是以打印结束,没有继续流通的管道。
如果要让数据在处理后继续流动,就必须使用GenericHandler
:
@Bean @ServiceActivator(inputChannel="orderChannel", outputChannel="completeOrder") public GenericHandler<Order> orderHandler(OrderRepository orderRepo) { return (payload, headers) -> { return orderRepo.save(payload); }; }
这段代码是根据header信息来保存payload中的信息,返回的结果是一个数据对象,这个对象就会被继续扔到completeOrder
管道里
Gateway
可以把Gateway认为是应用程序开始往管道里扔数据的地方。我们已经用过了FileWriterGateway这个神奇接口,这个接口是一个单向的Gateway,只有一个接受字符串然后写入的方法,返回void。
当然也有双向的Gateway,让其方法返回值即可,比如:
@Component @MessagingGateway(defaultRequestChannel="inChannel", defaultReplyChannel="outChannel") public interface UpperCaseGateway { String uppercase(String in); }
同样,这个Gateway也无需编写实现类。当数据到达注解中的outChannel的时候,数据就会从uppercase方法中返回。
Channel adapters
Channel adapters是整合流开始和结束的地方。数据通过一个inbound adapter进入流,从一个outbound adapter离开流。
inbound流的具体形式可以有很多,根据具体的数据形式而定,比如:
@Bean @InboundChannelAdapter(poller=@Poller(fixedRate="1000"), channel="numberChannel") public MessageSource<Integer> numberSource(AtomicInteger source) { return () -> { return new GenericMessage<>(source.getAndIncrement()); }; }
这个Bean使用了@InboundChannelAdapter
注解,表示这是一个起点,管道无需规定input和output,因为就是起点。
轮询时间设置为1秒,有一个依赖注入是AtomicInteger,这个类的意思是,每一秒钟,传入AtomicInteger的一个新增的数字。
这个类要求返回的对象是MessageSource的泛型,可以用匿名类也可以用lambda表达式。
很多Channel Adapters都由Spring Integration的很多端点模块提供,比如有FileReadingMessageSource类等。
Endpoint modules
Spring Integration提供的端点模块相当于预先写好的针对某种外部服务的端点模块,当然也可以自定义,但是内置模块基本上已经涵盖了大部分的内容。
从前边可以知道,实际上端点模块提的都是adapter。
所有端点的Group ID都是org.springframework.integration
,端点列表可以查看原书230-231页,基本上涵盖了各种内容。
用一个整合邮件功能的流来做个例子。
整合邮件流
原书的例子是让Taco cloud可以通过邮件来下订单,本质就是不断检测新邮件,然后解析邮件中的内容,然后根据内容生成订单。
用整合流的术语来描述,就是需要一个email端点的inbound adapter,然后flow中的处理器解析邮件内容,生成订单对象,然后把订单对象交给Taco Cloud的REST API即可。
这里可以模仿这个程序,写一个解析学生对象的程序,然后调用我们的REST API去添加一个学生。
先添加依赖:
<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mail</artifactId> </dependency>
首先需要针对电子邮件创建一个配置属性类,用于生成IMAP的url地址,用于接收邮件:
package cc.conyli.restlearn.entity; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; @Data @ConfigurationProperties(prefix = "mymock.email") @Component public class EmailProperties { private String username; private String password; private String host; private String mailbox; private long pollRate = 30000; public String getImapUrl() { return String.format("imaps://%s:%s@%s/%s", this.username, this.password, this.host, this.mailbox); } }
首先要注意的是@ConfigurationProperties(prefix = "mymock.email")
,这表示这个类是一个属性类,使用这个注解首先需要添加如下依赖来开启自定义注解声明属性:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency>
prefix
中的前缀表示这个类对应application.properties
中的配置以mymock.email
开始。
之后就可以配置邮箱连接的具体属性:
mymock.email.host=imap.vip.sina.com mymock.email.mailbox="" mymock.email.username=lee0709@vip.sina.com mymock.email.password=******** mymock.email.poll-rate=30000
这里邮箱名到新浪邮箱看了,没有给出,先空着,不知道能不能连接成功。
之后我们要创建整合流,这个流是这样的:
Email inbound adapter –> 管道 –> 邮件转换成数据对象的Transformer –> 管道 –> 提交数据到REST API的outbound adapter
如何创建这个流,一般有两种做法:
- 将流创建在当前应用的内部
- 将流创建为一个独立的应用
由于这个流要使用Student对象,比较方便的做法是创建在当前应用的内部。然而创建为一个独立的应用也有好处,方便调试,更加灵活,耦合程度低。
创建流可以使用XML,Java或者DSL,现在用DSL来创建一下流:
后边的源代码基本看懂了,就是组装Transformer和其他的内容。待我后边详细看看如何自己写一个Email程序。
使用Java发送和接收邮件的相关内容也要看看,不过不管怎样,基本上对于Spring Integration知道是怎么回事了。
柚子叔,这些东西您都记住吗,我学着学着很容易把前面的知识点全都丢了=-=
主要是理解结构,而不是记住具体的写法。好比这篇东西,我会知道需要与其他程序集成的时候可以通过Spring Integration搭一段管道,管道有各个节点可以对数据进行处理。这样就可以了,在具体用到的时候,当然还需要翻文档,查阅代码然后来编写,但是这时候和完全不知道该怎么做是不同的。可以看看我这篇翻译http://www.conyli.cc/archives/2022
谢谢你啊,柚子叔!