在开发Spring AI数据摄取微服务时,搭建高效的ETL(提取、转换、加载)管道是关键一环。本文将详细介绍如何利用Spring Cloud Function和Spring AI来构建这样的管道,通过一步步的操作指南,带你从环境搭建到功能实现,最后进行演示验证,帮助你轻松掌握这一技术,为实际项目开发提供有力支持。

一、Maven依赖配置

要基于Spring Cloud Function配置ETL管道,首先得添加Spring AI和Spring Cloud Function模块的相关依赖。在项目的pom.xml文件中进行如下配置:

<properties> <!-- 设置Spring Cloud版本 --> <spring.cloud.version>2023.0.1</spring.cloud.version> <!-- 设置Spring Functions Catalog版本 --> <spring.functions.catalog.version>5.0.0-SNAPSHOT</spring.functions.catalog.version> </properties> <dependencies> <!-- Spring Cloud Function上下文依赖 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-function-context</artifactId> </dependency> <!-- 文件供应商依赖,用于提供文件流 --> <dependency> <groupId>org.springframework.cloud.fn</groupId> <artifactId>spring-file-supplier</artifactId> </dependency> <!-- Spring AI的Tika文档读取器依赖 --> <dependency> <groupId>org.springframework.ai</groupId> <artifactId>spring-ai-tika-document-reader</artifactId> </dependency> <!-- Spring AI与OpenAI集成的启动器依赖 --> <dependency> <groupId>org.springframework.ai</groupId> <artifactId>spring-ai-openai-spring-boot-starter</artifactId> </dependency> <!-- Spring AI与Chroma数据库集成的启动器依赖 --> <dependency> <groupId>org.springframework.ai</groupId> <artifactId>spring-ai-chroma-store-spring-boot-starter</artifactId> </dependency> <!-- Spring Boot与Docker Compose集成的依赖,用于运行时管理容器 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-docker-compose</artifactId> <scope>runtime</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <!-- Spring Cloud依赖管理 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring.cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> <!-- Spring Functions Catalog的BOM依赖管理 --> <dependency> <groupId>org.springframework.cloud.fn</groupId> <artifactId>spring-functions-catalog-bom</artifactId> <version>${spring.functions.catalog.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> 

通过这些依赖,项目就能引入所需的功能模块,为后续构建ETL管道打下基础。

二、文件供应商与文档读取器

spring-file-supplier模块提供了一个fileSupplier,它可以在其他应用中复用和组合。这个供应商能从指定目录生成一个文件的响应式流,开发者需要订阅这个Flux来获取数据。可以通过配置file.supplier.*相关属性,指定输入目录的位置和支持的文件格式。

# 设置输入目录路径 file.supplier.directory=c:/temp/ingestion-files # 设置文件名匹配正则表达式,只处理符合格式的文件 file.supplier.filename-regex=.*.(pdf|docx|txt|pages|csv) 

当Spring检测到Maven坐标和配置属性后,会自动启用一个函数,该函数读取文件内容并返回Flux<Message<byte[]>>。但在Spring AI的ETL管道中,我们需要的是org.springframework.ai.document.Document对象的Flux。所以,要创建一个新函数documentReader来进行格式转换。

import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ByteArrayResource; import org.springframework.ai.document.Document; import org.springframework.ai.reader.tika.TikaDocumentReader; import org.springframework.messaging.Message; import reactor.core.publisher.Flux; import java.util.List; import java.util.stream.Collectors; @Configuration public class CloudFunctionConfig { // 定义一个名为documentReader的Bean,用于将文件流转换为Document对象列表的流 @Bean public Function<Flux<Message<byte[]>>, Flux<List<Document>>> documentReader() { // 输入是包含文件字节数据的消息流,输出是Document对象列表的流 return resourceFlux -> resourceFlux // 对每个消息进行转换操作 .map(message -> { // 使用TikaDocumentReader将字节数据转换为Document对象列表 List<Document> documents = new TikaDocumentReader(new ByteArrayResource(message.getPayload())) .get() .stream() // 为每个Document对象添加文件来源元数据 .peek(document -> { document.getMetadata().put("source", message.getHeaders().get("file_name")); }) .collect(Collectors.toList()); return documents; }); } // 其他可能的配置方法 //... } 

三、文档转换器

文档转换器的功能是使用TokenTextSplitter将读取到的Document对象分割成文本块。

import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.ai.document.Document; import org.springframework.ai.transformer.splitter.TokenTextSplitter; import reactor.core.publisher.Flux; import java.util.List; @Configuration public class CloudFunctionConfig { // 定义一个名为documentTransformer的Bean,用于分割Document对象列表 @Bean public Function<Flux<List<Document>>, Flux<List<Document>>> documentTransformer() { // 输入是Document对象列表的流,输出也是Document对象列表的流 return documentListFlux -> documentListFlux // 对每个Document对象列表进行转换操作 .map(unsplitList -> new TokenTextSplitter().apply(unsplitList)); } // 其他可能的配置方法 //... } 

四、文档写入器

文档写入器负责接收分割后的Document列表,并将其文本和嵌入向量存储到向量数据库中。

import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.ai.document.Document; import org.springframework.ai.vectorstore.VectorStore; import reactor.core.publisher.Flux; import java.util.List; @Configuration public class CloudFunctionConfig { // 定义日志记录器 private static final Logger LOGGER = LoggerFactory.getLogger(CloudFunctionConfig.class); // 定义一个名为documentWriter的Bean,用于将Document对象列表写入向量数据库 @Bean public Consumer<Flux<List<Document>>> documentWriter(VectorStore vectorStore) { // 输入是Document对象列表的流 return documentFlux -> documentFlux // 对每个Document对象列表进行操作 .doOnNext(documents -> { // 记录写入向量数据库的文档数量 LOGGER.info("Writing {} documents to vector store.", documents.size()); // 将Document对象列表写入向量数据库 vectorStore.accept(documents); // 记录已写入向量数据库的文档数量 LOGGER.info("{} documents have been written to vector store.", documents.size()); }) // 订阅流,开始处理数据 .subscribe(); } // 其他可能的配置方法 //... } 

五、配置ETL管道

当所有的Bean都添加到@Configuration类中后,就可以在属性文件中组合函数,形成ETL管道。在属性文件中添加如下配置:

# 定义ETL管道中函数的执行顺序 spring.cloud.function.definition=fileSupplier|documentReader|documentTransformer|documentWriter 

这样就指定了文件供应商、文档读取器、文档转换器和文档写入器的执行顺序,构建起了完整的ETL管道。

六、执行ETL管道

要执行一个函数,可以通过FunctionCatalog.lookup()方法查找其实例,然后调用run()方法来执行。如果上下文中只有一个组合函数,catalog.lookup(null)就能返回这个组合函数;否则,需要在lookup()方法中指定函数名。

import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.function.context.catalog.FunctionCatalog; import org.springframework.stereotype.Service; @Service public class IngestionService { // 注入FunctionCatalog实例 private final FunctionCatalog catalog; // 构造函数,初始化FunctionCatalog实例 public IngestionService(FunctionCatalog catalog) { this.catalog = catalog; } // 定义数据摄取方法 public void ingest() { // 获取组合函数实例 Runnable composedFunction = catalog.lookup(null); // 执行组合函数 composedFunction.run(); } } 

七、演示

完成上述配置后,可以从多种方式调用ingest()方法,比如通过REST端点、批处理定时任务,或者任何支持流处理的应用。在本文示例中,通过REST端点来调用。

import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; @RestController public class IngestionController { // 注入IngestionService实例 @Resource IngestionService ingestionService; // 处理POST请求,执行数据摄取操作 @PostMapping("run-ingestion") public ResponseEntity<?> run() { // 调用IngestionService的ingest方法执行数据摄取 ingestionService.ingest(); // 返回表示请求已被接受的响应 return ResponseEntity.accepted().build(); } } 

在本地运行应用,调用/run-ingestion端点,它会按顺序执行所有函数,并将文件内容的嵌入向量存储到向量数据库中。通过查看控制台日志,可以验证操作是否成功。例如,日志中会显示类似如下信息:

Writing 7 documents to vector store. 7 documents have been written to vector store. 

八、总结

通过这个Spring Cloud Function的教程,我们成功构建了一个适用于Spring AI应用的数据摄取ETL管道。该应用利用内置的供应商函数,从指定的文件系统目录读取多种格式的文档,然后通过自定义的函数将文档内容处理成文本块,并把嵌入向量存储到Chroma向量数据库中。