0x00 概要ActionTask 是 Action 执行的基本单位代表一个可执行的任务块。一个完整的 Action 可能会被切分成多个 ActionTask 来执行。ActionTask 在整体流程的位置如下Action Code → Agent → AgentPlan → ActionExecutionOperator → ActionTask → Flink Runtime0x01 基础知识ActionTask 是 Action 执行过程中的一个片段用于支持复杂的执行逻辑如异步处理对应关系如下一个 Action 对应一个函数在 AgentPlan 中每个 Action 包含一个执行函数 (exec)通常是 PythonFunction 或 JavaFunction例如在 tool_call_action.py 中TOOL_CALL_ACTION Action ( nametool_call_action, execPythonFunction.from_callable (process_tool_request), // 一个函数 listen_event_types[...] )一个 Action 可能产生多个 ActionTaskActionTask 是 Action 的执行时表示可以看作是 Action 的 “执行片段”一个 Action 可能在执行过程中被拆分为多个 ActionTask特别是在处理异步操作时1.1 相关组件ActionTask 概念的相关组件如下组件核心功能JavaActionTask执行 Java 函数PythonActionTask执行 Python 函数支持异步 / 生成器模式桥接 Java 与 Python 生态LocalRunnerContext本地执行上下文模拟 Flink 分布式状态管理事件队列、key 隔离状态、资源访问ActionTaskResult动作执行结果载体包含是否完成、输出事件、下一个待执行任务若有PythonGeneratorActionTask处理 Python 生成器的异步任务持续执行直到完成所有异步操作Tool 相关机制支持装饰器tool、add_resource 等方式注册工具通过 TOOL_CALL_ACTION 触发执行在系统中的架构如下1.2 ActionTask我们接下来看看 ActionTask 的具体实现。ActionTask 是基类。/** * This class represents a task related to the execution of an action in {link * ActionExecutionOperator}. * * pAn action is split into multiple code blocks, and each code block is represented by an {code * ActionTask}. You can call {link #invoke()} to execute a code block and obtain invoke result * {link ActionTaskResult}. If the action contains additional code blocks, you can obtain the next * {code ActionTask} via {link ActionTaskResult#getGeneratedActionTask()} and continue executing * it. */ public abstract class ActionTask { protected final Object key; protected final Event event; protected final Action action; /** * Since RunnerContextImpl contains references to the Operator and state, it should not be * serialized and included in the state with ActionTask. Instead, we should check if a valid * RunnerContext exists before each ActionTask invocation and create a new one if necessary. */ protected transient RunnerContextImpl runnerContext; public ActionTask(Object key, Event event, Action action) { this.key key; this.event event; this.action action; } public RunnerContextImpl getRunnerContext() { return runnerContext; } public void setRunnerContext(RunnerContextImpl runnerContext) { this.runnerContext runnerContext; } public Object getKey() { return key; } /** Invokes the action task. */ public abstract ActionTaskResult invoke() throws Exception; public class ActionTaskResult { private final boolean finished; private final ListEvent outputEvents; private final OptionalActionTask generatedActionTaskOpt; public ActionTaskResult( boolean finished, ListEvent outputEvents, Nullable ActionTask generatedActionTask) { this.finished finished; this.outputEvents outputEvents; this.generatedActionTaskOpt Optional.ofNullable(generatedActionTask); } public boolean isFinished() { return finished; } public ListEvent getOutputEvents() { return outputEvents; } public OptionalActionTask getGeneratedActionTask() { return generatedActionTaskOpt; } } }1.3 PythonActionTaskPythonActionTask 是一个专门用于执行 Python 动作任务的特殊 ActionTask 实现。它的主要作用包括执行 Python 函数调用 Python 函数来处理事件处理异步操作支持 Python 中的异步操作通过生成器机制实现桥接 Java 和 Python作为 Java 端和 Python 端之间的桥梁协调两者间的交互1.3.1 定义PythonActionTask 对应一个 Python 函数更准确地说是一个 PythonFunction 对象这个函数是在创建 Action 时定义的存储在 action.getExec() 中。但PythonActionTask 不仅仅是简单的函数封装而是使其能够在 Flink Agents 框架中正确执行并支持框架所需的高级特性。它提供了以下附加价值复杂逻辑PythonActionTask 不仅仅是执行函数还负责处理复杂的交互逻辑执行环境管理为函数提供合适的执行上下文异步支持通过生成器机制支持长时间运行的操作事件处理管理和传递执行过程中产生的事件状态维护在整个执行过程中维护必要的状态信息PythonActionTask 在系统架构中的位置和交互关系如下代码如下public class PythonActionTask extends ActionTask { public ActionTaskResult invoke() throws Exception { PythonActionExecutor pythonActionExecutor getPythonActionExecutor(); // 这里执行实际的 Python 函数 String pythonGeneratorRef pythonActionExecutor.executePythonFunction( (PythonFunction) action.getExec(), // -- 这就是对应的函数 (PythonEvent) event, runnerContext); // 处理异步情况 if (pythonGeneratorRef ! null) { // 如果函数返回了生成器则创建新的任务继续执行 ActionTask tempGeneratedActionTask new PythonGeneratorActionTask(key, event, action, pythonGeneratorRef); tempGeneratedActionTask.setRunnerContext(runnerContext); if (pythonGeneratorRef ! null) { // 如果函数返回了生成器则创建新的任务继续执行 ActionTask tempGeneratedActionTask new PythonGeneratorActionTask(key, event, action, pythonGeneratorRef); tempGeneratedActionTask.setRunnerContext(runnerContext); return tempGeneratedActionTask.invoke(); } // 否则表示函数已执行完毕 return new ActionTaskResult( true, runnerContext.drainEvents(event.getSourceTimestamp()), null);1.3.2 PythonActionTask 与 Function 的关系PythonActionTask 与 Function 的关系的如下1.3.3 与其他组件的关系PythonActionTask 与其他组件的关系如下图所示。1.3.4 调用流程PythonActionTask.invoke() 流程如下图所示其关键特点为异步支持通过 Python 生成器机制支持长时间运行的操作状态管理与 ActionExecutionOperator 协作管理执行状态错误处理适当地处理 Python 执行过程中可能出现的异常内存管理与 RunnerContext 集成管理短期记忆和其他状态PythonActionTask 在整个系统中起到了至关重要的作用它使得 Flink Agents 能够无缝集成 Python 生态系统中的各种 AI 工具和库同时保持与 Flink 流处理引擎的良好集成。1.4 PythonGeneratorActionTaskPythonGeneratorActionTask 是 PythonActionTask 的派生类。当 Python 函数中使用了 yield 或异步操作时Python 执行器会检测到这种情况并返回一个 Generator 引用系统会创建 PythonGeneratorActionTask 来继续执行/** An {link ActionTask} wrapper a Python Generator to represent a code block in Python action. */ public class PythonGeneratorActionTask extends PythonActionTask { private final String pythonGeneratorRef; public PythonGeneratorActionTask( Object key, Event event, Action action, String pythonGeneratorRef) { super(key, event, action); this.pythonGeneratorRef pythonGeneratorRef; } Override public ActionTaskResult invoke() { boolean finished getPythonActionExecutor().callPythonGenerator(pythonGeneratorRef); ActionTask generatedActionTask finished ? null : this; return new ActionTaskResult( finished, runnerContext.drainEvents(event.getSourceTimestamp()), generatedActionTask); } }1.5 JavaActionTaskJavaActionTask 执行 Java ActionTask。/** * A special {link ActionTask} designed to execute a Java action task. * * pNote that Java action currently do not support asynchronous execution. As a result, a Java * action task will be invoked only once. */ public class JavaActionTask extends ActionTask { private final ClassLoader userCodeClassLoader; public JavaActionTask(Object key, Event event, Action action, ClassLoader userCodeClassLoader) { super(key, event, action); checkState(action.getExec() instanceof JavaFunction); this.userCodeClassLoader userCodeClassLoader; } Override public ActionTaskResult invoke() throws Exception { runnerContext.checkNoPendingEvents(); ClassLoader cl Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(userCodeClassLoader); action.getExec().call(event, runnerContext); } finally { Thread.currentThread().setContextClassLoader(cl); } return new ActionTaskResult( true, runnerContext.drainEvents(event.getSourceTimestamp()), null); } }1.6 ActionTaskResult 结构每次执行 ActionTask 后会返回一个 ActionTaskResult 对象public class ActionTaskResult { private final boolean finished; // 是否已完成 private final ListEvent outputEvents; // 输出事件 private final OptionalActionTask generatedActionTaskOpt; // 下一个 ActionTask如果有 // ... }0x02 ActionTask 切分机制在 Flink Agents 框架中ActionTask 的切分是为了支持长时间运行的操作和异步执行。切分的好处如下避免阻塞长时间运行的操作不会阻塞整个操作符提高并发性允许其他 key 的任务同时执行容错能力每个 ActionTask 可以单独失败和恢复资源管理更好地管理内存和其他资源这种切分机制使得 Flink Agents 能够高效地处理复杂的长时间运行的 AI Agent 任务同时保持系统的响应的稳定性。2.1 切分方式主要切分方式如下概念上的拆分一个 Action 在概念上可被拆成多个顺序执行的代码块每个代码块成为一个 ActionTask 实例。设计目的在于细粒度控制尤其适用于异步操作。创建与流程初始触发ActionExecutionOperator 通过 createActionTask() 为每个动作生成首个 ActionTask。执行过程若动作产生生成器异步可再实例化新的 ActionTask 继续后续代码块。/** * Processes an incoming event for the given key and may submit a new mail * tryProcessActionTaskForKey to continue processing. */ private void processEvent(Object key, Event event) throws Exception { notifyEventProcessed(event); boolean isInputEvent EventUtil.isInputEvent(event); if (EventUtil.isOutputEvent(event)) { } else { // We then obtain the triggered action and add ActionTasks to the waiting processing // queue. ListAction triggerActions getActionsTriggeredBy(event); if (triggerActions ! null !triggerActions.isEmpty()) { for (Action triggerAction : triggerActions) { actionTasksKState.add(createActionTask(key, triggerAction, event)); } } } }createActionTask 代码如下。private ActionTask createActionTask(Object key, Action action, Event event) { if (action.getExec() instanceof JavaFunction) { return new JavaActionTask( key, event, action, getRuntimeContext().getUserCodeClassLoader()); } else if (action.getExec() instanceof PythonFunction) { return new PythonActionTask(key, event, action); } else { throw new IllegalStateException( Unsupported action type: action.getExec().getClass()); } }2.2 实现细节ActionTask.java 中有如下这意味着:一个 Action 可能被拆分成多个代码块每个代码块由一个 ActionTask 表示通过调用 invoke () 执行代码块并获得结果如果 Action 包含更多代码块可以从 ActionTaskResult 中获取下一个 ActionTask/** * 此类表示在 ActionExecutionOperator 中执行的动作任务。 * * p一个动作会被拆分为多个代码块每个代码块都由一个 ActionTask 表示。 * 可以调用 #invoke() 来执行一个代码块并获得执行结果ActionTaskResult。 * 如果动作包含额外的代码块可通过 ActionTaskResult#getGeneratedActionTask() * 获取下一个 ActionTask 并继续执行它。 */Python 集成PythonActionTask.java当 Python 函数中使用了 yield 或异步操作时Python 执行器会检测到这种情况并返回一个 Generator 引用系统会创建 PythonGeneratorActionTask 来继续执行这说明在处理异步 Python 函数时一个 Action 可能会产生多个 ActionTask初始的 PythonActionTask后续的 PythonGeneratorActionTask如果需要/** * 专门用于执行 Python 动作任务的特殊 ActionTask。 * * p在 Python 中进行异步执行期间PythonActionTask 可以生成一个 * PythonGeneratorActionTask 来代表后续需要的代码块。 */ public class PythonActionTask extends ActionTask { public ActionTaskResult invoke() throws Exception { ... String pythonGeneratorRef pythonActionExecutor.executePythonFunction( (PythonFunction)action.getExec(), (PythonEvent)event, runnerContext); // 如果用户定义的动作使用接口提交了异步任务 // 它将在第一次执行后返回一个 Python 生成器对象实例。 // 否则意味着没有提交异步任务且动作已经完成。 if (pythonGeneratorRef ! null) { // Python 动作生成了一个生成器。我们需要执行一次它 // 这将提交一个异步任务并返回动作是否已完成。 ActionTask tempGeneratedActionTask new PythonGeneratorActionTask(key, event, action, pythonGeneratorRef); tempGeneratedActionTask.setRunnerContext(runnerContext); return tempGeneratedActionTask.invoke(); } return new ActionTaskResult( true, runnerContext.drainEvents(event.getSourceTimestamp()), null); } }2.3 关键点ActionTask 拆分的关键点如下顺序执行对于给定的键任务按顺序执行以保持顺序性和一致性异步处理当动作涉及异步操作时初始任务执行到异步操作为止返回一个新的 ActionTask 来表示后续的操作后续任务处理异步结果并继续执行状态管理每个 ActionTask 通过 RunnerContext 维护其状态确保拆分之间的连续性基于邮箱的处理使用 Flink 的邮箱执行器来调度后续任务的处理// 提交新邮件以继续处理 mailboxExecutor.submit(() - tryProcessActionTaskForKey(key), process action task);