当AI智能体只是一行SQL查询:流处理引擎的奇幻漂流
在软件工程的浩瀚星空中,我们时常惊叹于那些颠覆传统的思想火花。不久前,Seth Wiesman 在一次演讲中提出了一个振聋发聩的观点:“那个微服务,它本应是一行SQL查询”。这个想法如同一颗投入平静湖面的石子,激起了层层涟漪。它主张使用流处理器上的SQL查询来实现微服务,以求更快的上市时间、更高的一致性和可扩展性。这不禁让我陷入沉思:既然微服务可以,那么,我们这个时代最炙手可热的概念——AI智能体(AI Agent),是否也能被“降维”成一行流式SQL查询呢?
这听起来或许有些天马行空,像是在用一把瑞士军刀去建造一艘航空母舰。但请稍安勿躁,让我们一起踏上这场思想的奇幻漂流,探索如何利用 Apache Flink 这样的流处理平台,将AI智能体的构建带入一个全新的维度。
什么是AI智能体?
我们可以借用谷歌那简洁而不失精准的定义:AI智能体是利用人工智能代表用户追求目标、完成任务的软件系统。它们展现出推理、规划和记忆能力,并拥有一定程度的自主性来做出决策、学习和适应。
更接地气的理解是,可以将它们看作是“打了AI激素”的微服务:接收输入,利用大语言模型(LLM)进行处理,然后输出结果。目前,大多数落地的AI智能体并非科幻电影里那种拥有无限权限的“天网”,而更多是定义明确的“AI辅助工作流”,例如在客户服务、医疗文档处理、自动化销售等领域大放异彩。
🌊 当数据之河倒流:流式SQL的“推”与“拉”
要理解这个看似疯狂的想法,我们首先需要厘清传统数据库查询与流式查询的根本区别。在传统世界里,你向数据库发出一条SQL查询,这是一个“拉”(pull)的动作。数据库会扫描表、查询索引,然后将整个结果集一次性返回给你。这就像你去图书馆借书,告诉管理员你要什么,然后抱着一摞书回家。
而流式查询系统则彻底颠覆了这个模式,它采用的是“推”(push)的方式。查询会持续不断地运行,像一个永不疲倦的哨兵,时刻监控着数据的流动。一旦有新的数据(事件)流入,查询会立即对这部分增量数据进行计算,并将结果的“变化量”推送给下游。这更像是你订阅了一份杂志,每当新的一期出版,邮递员就会主动把它送到你的信箱,你无需每次都去报刊亭询问。
Apache Flink 正是这样一个为事件驱动和数据密集型应用而生的平台,它天生就具备高性能、高可扩展性和高鲁棒性的基因。这些特性,恰好也是构建一个可靠AI智能体所必需的基石。更妙的是,通过SQL,我们不仅为应用程序开发者打开了新世界的大门,也让广大的数据工程师们能够轻松地参与到这场AI的盛宴中来。
🤖 为智能体注入灵魂:当SQL遇见LLM
无论我们如何定义AI智能体,有一点是毋庸置疑的:它们必须与大语言模型(LLM)互动。这标志着一种范式的转变——我们不再是编写固定的规则来处理数据,而是将结构化或非结构化的数据,连同对话历史等上下文信息,以自然语言的形式“喂”给LLM,由它来生成响应。
那么,Flink SQL是如何实现与LLM的“联姻”的呢?答案藏在一个名为 FLIP-437 的提案中,它的目标是“在Flink SQL中支持机器学习模型”,让模型成为流处理应用中的“一等公民”。通过全新的 CREATE MODEL
语句,开发者可以直接在SQL中注册来自OpenAI、Google AI、AWS Bedrock等供应商的AI模型。
让我们来看一个生动的例子。假设我们希望追踪数据库和数据流领域顶级会议(如VLDB)的最新研究论文。阅读所有论文无疑是一项耗时巨大的工程。现在,我们可以构建一个AI智能体来自动为我们总结论文摘要。整个流程在像 Confluent Cloud 这样的全托管流处理平台上可以这样实现:
- 新论文的PDF文件被上传到S3存储桶,通过工具(如Apache Tika)提取纯文本。
- S3源连接器捕获这些文本文件,并将它们作为事件发送到Kafka主题中。
- 我们的AI智能体——一个流式SQL查询——消费这些事件,并调用OpenAI模型为每篇论文生成摘要。
- 结果被写入另一个Kafka主题,最终可以推送到Slack频道,让团队成员即时获取最新研究动态。
首先,我们需要像定义一张表一样,定义我们的AI模型:
CREATE MODEL summarization_model
INPUT (
text STRING
)
OUTPUT (
title STRING,
authors STRING,
year_of_publication INT,
summary STRING
)
COMMENT 'Research paper summarization model'
WITH (
'provider' = 'openai',
'task' = 'text_generation',
'openai.connection' = 'openai-connection',
'openai.model_version' = 'gpt-4.1-mini',
'openai.output_format' = 'json',
'openai.system_prompt' = 'This is a text extract of a research paper in PDF format.
Provide its title, authors, year of publication, and a summary
of 200 to 400 words. Reply with a JSON structure with the fields
"title", "authors", "year_of_publication", "summary". Return
only the JSON itself, no Markdown mark-up.'
);
请注意,这个模型定义中甚至包含了我们将要使用的系统提示(system prompt),它精确地指导了LLM如何工作以及返回何种格式的数据。模型创建后,我们就可以通过 ML_PREDICT()
函数来调用它,就像调用任何一个普通的SQL函数一样:
INSERT INTO papers_summarized
SELECT
fulltext,
p.title,
p.authors,
p.year_of_publication,
p.summary
FROM
research_papers,
LATERAL TABLE(ML_PREDICT('summarization_model', fulltext)) AS p;
一旦这个查询开始运行,每当一篇新论文的文本被推送到 research_papers
主题,papers_summarized
主题就会立刻收到一份结构化的摘要,如下表所示:
fulltext | title | authors | year_of_publication | summary |
Styx: Transactional Stateful Functions on ... | Styx: Transactional ... | Kyriakos Psarakis, George Christodoulou,... | 2025 | This paper introduces Styx, a novel runtime ... |
... | ... | ... | ... | ... |
这个例子仅仅是冰山一角。同样的方法可以应用于情感分析、数据分类、垃圾邮件检测、文本翻译等等,想象空间巨大。
⚡️ 永不眠的哨兵:事件驱动的自主智能
当我们谈论AI智能体时,脑海中浮现的第一个画面往往是基于同步请求-响应模式的对话机器人,比如ChatGPT。然而,在企业环境中,那些默默无闻、由事件驱动的自主智能体往往更具价值。它们基于实时数据流(如网店的用户点击、风力涡轮机的传感器数据、数据库的变更日志)自主采取行动,无需人类干预。
这正是Flink SQL的“甜蜜区”。它拥有庞大的连接器生态系统,可以与几乎所有可以想象到的数据源和目标系统无缝对接。无论是来自Kafka的点击流,还是来自数据库的CDC(Change Data Capture)数据,亦或是通过MQTT传输的传感器读数,Flink都能轻松应对。
将Flink与像Apache Kafka这样的事件流平台结合,更能创造出一个强大的“数字神经网络”。我们可以构建一个由多个专业化、松散耦合的智能体组成的网络。每个智能体完成一项特定任务,其输出可以成为其他智能体的输入,而彼此之间无需关心对方的具体实现细节。Kafka连接并解锁了公司内部的系统、团队和数据库,为智能体提供了做出明智决策所需的所有上下文。
此外,这种架构还有助于克服LLM的一个固有缺陷:知识的“保质期”。LLM的知识受限于其训练数据的截止日期。而通过事件驱动的架构,我们可以实时地将最新的信息注入到后续将要讨论的RAG系统中,从而让智能体的决策总是基于最新、最准确的数据。
什么是事件驱动架构?
这是一种软件设计范式,系统的组件之间通过生产和消费异步的“事件”来进行通信。一个组件(生产者)发布一个事件(例如,“用户下了一个订单”),而其他感兴趣的组件(消费者)会订阅并响应该事件(例如,库存服务减少库存,通知服务发送确认邮件)。这种松散耦合的模式使得系统更具弹性、可扩展性和灵活性。
🧠 赋予智能体记忆与知识:RAG的魔法与SQL的炼金术
通用LLM虽然知识渊博,但它们对你公司内部的“秘密”一无所知。要让AI智能体在企业场景中真正发挥作用,就必须为它们提供访问内部数据、工具和服务的“特权”。这在Flink SQL中如何实现呢?
对于结构化数据,比如存储在外部数据库中的客户信息,SQL是当之无愧的王者。Flink SQL允许你使用强大的 JOIN
语义来丰富发送给LLM的数据。你可以连接来自不同数据源的流,或者使用“查找连接”(look-up joins)来查询那些不经常变化的参考数据(如CRM系统中的客户详情)。Flink会自动处理缓存,以最高效的方式获取数据。
而对于非结构化数据——如内部文档、Wiki页面、客户合同等——检索增强生成(Retrieval-Augmented Generation, RAG)是目前公认的最佳实践。
什么是RAG?
检索增强生成(RAG)是一种将预训练的大语言模型与外部知识库相结合的技术。当需要回答一个问题或生成一段文本时,系统首先从知识库(通常是向量数据库)中检索出最相关的信息片段,然后将这些信息作为额外的上下文,连同原始问题一起提供给LLM。这极大地提高了模型回答的准确性和时效性,因为它能够利用模型训练时未曾见过的最新或专有知识。
让我们回到之前的论文摘要例子,并把它变得更复杂一些。假设我们公司内部也有一个研究Wiki,记录了我们自己的研究成果。我们希望当一篇新的外部论文进来时,智能体不仅能生成摘要,还能识别出它与我们内部研究的关联,从而为未来的研究提供新的视角。
这个任务可以通过一个由两个流式SQL作业组成的智能体系统来完成:
作业一:知识库的实时同步
这个作业负责将我们内部Wiki的知识转化为LLM可以理解的格式。
- 每当内部Wiki页面发生变化,通过CDC或Webhook等机制捕获变更事件。
- 使用
ML_PREDICT()
函数和一个嵌入模型(如OpenAI的 text-embedding-3-small
),将变更的文本内容转换成向量嵌入(A1)。
- 将这些新生成的或更新的向量嵌入存储到一个向量数据库(如Pinecone或Elasticsearch)中,确保知识库始终与原始数据保持同步(A2)。
作业二:智能分析代理
这个作业是真正的分析核心。
- 当一篇新的外部研究论文进入系统,首先像之前一样,使用
ML_PREDICT()
生成其摘要(B1)。
- 接着,再次使用
ML_PREDICT()
和同一个嵌入模型,为这篇论文的摘要创建一个向量表示(B2)。
- 使用这个新生成的向量去查询向量数据库,通过余弦相似度等算法,找出与我们内部研究最相关的文档(B3)。这一步目前需要通过自定义函数(UDF)来实现,比如在Confluent Cloud上有一个现成的
VECTOR_SEARCH()
函数。
- 最后,将检索到的内部研究信息作为增强上下文,连同论文摘要一起,再次调用
ML_PREDICT()
,让LLM分析并阐述外部论文与我们内部研究之间的关系(B4)。
到目前为止,我们讨论的还主要停留在“工作流”的范畴。要构建一个真正的“智能体”,可能需要让LLM自己决定在特定情境下调用哪些工具或数据源。Anthropic的MCP(模型上下文协议)标准正为此而生。虽然Flink SQL目前尚未原生支持,但我们可以通过编写自定义函数(UDF),特别是 Flink 2.1 中引入的一种新型UDF——过程表函数(Process Table Functions, PTF)来弥合这一差距。
💾 状态的艺术:为智能体打造可扩展的记忆宫殿
最后,我们来谈谈“记忆”。一个有用的智能体必须能够记住之前的交互。在我们的例子中,这可能是同一作者之前的论文;在推荐场景中,这可能是某位顾客过去所有的购买记录。
虽然Flink SQL能够为窗口聚合或连接等操作管理状态,但SQL本身无法提供构建AI智能体记忆所需的那种细粒度状态访问。然而,前面提到的过程表函数(PTF)再次成为了破局的关键。当我们将PTF应用于一个分区输入流时(例如,按conversation_id
或customer_id
分区),我们就可以在每个分区的上下文中管理任意的自定义状态。
你可以将与某个特定工作流实例相关的所有事件、消息,甚至之前的LLM响应,都存储在Flink管理的状态中。在构建下一次LLM提示时,再从状态存储中将它们取出。从这个角度看,一个由Flink状态支持的PTF可以被视为一种“持久化执行”(durable execution)的形式,它以一种可恢复、可扩展的方式,追踪着一个长期运行操作的进度。更棒的是,Flink会自动负责在集群中分布和管理这些状态,让你的有状态AI智能体能够轻松扩展到任意数量的计算节点。
🧗♀️ 当SQL遇到极限:超越声明式的智能
那么,我们是否应该用Flink SQL来构建所有的AI智能体呢?当然不是。当我们手中的工具是SQL这把“万能锤”时,确实要警惕把所有问题都看成钉子的风险。
对于智能体消费和产生的结构化数据的预处理和后处理——如过滤、转换、连接、聚合——Flink SQL无疑是最佳选择。但当我们需要构建一个拥有更高“自主性”的智能体时,单纯基于SQL的实现可能会变得力不从心。
社区显然也意识到了这一点。最近宣布的 Flink Agents 子项目 (FLIP-531),一个由Confluent和阿里巴巴的工程师合作提出的项目,旨在为AI智能体创建一个基于Flink的专用运行时。其核心思想是复用Flink久经考验的低延迟连续数据处理基础,并在此之上构建一个易于使用、将AI智能体作为一等公民的框架。该框架计划支持Python,从而让开发者能够利用庞大的Python AI库生态。这不禁让人想起Flink旗下另一个曾经的项目:Stateful Functions (StateFun),或许它将在AI智能体的浪潮中以新的形式迎来复兴。
结语:一场美丽的邂逅
Apache Flink,凭借其强大的流处理能力和不断演进的AI集成,正成为构建事件驱动智能体系统的一个极具吸引力的多功能平台。尽管在通往真正自主AI智能体的道路上还有一些工作要做(例如原生集成MCP),但它已经为我们提供了连接实时事件流、集成LLM、管理上下文和状态所需的核心工具。
对我而言,使用SQL以声明方式构建智能体系统的最大魅力在于其“民主化”的理念。有了正确的构建块(比如预置好的用于调用工具的UDF),任何熟悉SQL的人——而不仅仅是专业的AI工程师——都能够构建、部署并分享自己的AI解决方案。
所以,回到最初的问题:这个AI智能体,它真的应该是一行SQL查询吗?答案是:不总是,但在很多情况下,这绝对是一个值得认真考虑的、坚实可靠的起点。
参考文献
- Morling, G. (2025). This AI Agent Should Have Been a SQL Query. Retrieved from https://www.morling.dev/blog/this-ai-agent-should-have-been-sql-query/
- Carbone, P., et al. (2015). Apache Flink: Stream and Batch Processing in a Single Engine. Bulletin of the IEEE Computer Society Technical Committee on Data Engineering.
- Lewis, P., et al. (2020). Retrieval-Augmented Generation for Knowledge-Intensive NLP Tasks. Advances in Neural Information Processing Systems, 33, 9459-9474.
- The Apache Flink Community. FLIP-531: Flink Agents. Apache Software Foundation. Retrieved from the Flink Improvement Proposals archive.
- Anthropic. (2024). Building effective agents. Retrieved from Anthropic's official documentation.