Streams 如何在几秒内生成日志管道
作者来自 Elastic Luca WintergerstStreams 只需一次点击就能生成一个完整、经过测试的日志处理 pipeline。其背后的机制分为两个阶段确定性指纹匹配 deterministic fingerprinting 以及一个基于真实数据反复迭代的 reasoning agent同时通过严格的验证阈值在结果展示前保证质量。只需点击 Kibana Processing 标签中的 Suggest pipeline 按钮几秒钟内你就会看到一个完整的 pipelineGrok 模式、日期标准化、类型转换并附带你的真实日志文档在该 pipeline 中的解析预览。另一种方式则是手动完成这一切编写 Grok pattern、进行测试、修复边界情况、发现字段名不符合 ECS、重命名字段、再添加 date processor。而这些工作仅仅只是针对一个服务的量。每个日志 pipeline 要做的三件事每个日志处理 pipeline 都在做同样的三件事从原始日志消息中提取字段将其标准化为一致的 schema并清理不需要的数据。大多数团队过去都是手动构建和维护这些流程但随着日志格式不断变化这会变得很困难尤其是当你发现编写 Grok pattern 的人已经换了团队而除了 pattern 本身之外几乎没有任何文档时。每增加一个新服务就意味着要从头再来一遍不同的格式、不同的边界情况最终由另一个人去维护一份他自己并没有写过的 pattern。在初始 pipeline 阶段Streams 会自动处理这三件事并在任何数据进入生产环境之前对结果进行验证。点击 “Suggest pipeline” 会发生什么在 Kibana 中打开某个 stream 的 Processing 标签页点击按钮。几秒钟后面板会生成一个建议的 pipeline通常包含解析步骤、日期标准化、类型转换以及字段清理同时提供一个实时预览展示最近的文档在经过该 pipeline 处理后的结果。在这个视图中你可以看到将被提取的具体字段、它们的类型以及有多少示例文档被成功解析。如果某个字段名不正确你也可以直接内联编辑如果某个步骤引入了噪声可以直接删除。如果解析率需要提升也可以轻松调整并重新生成。在你明确确认之前不会有任何内容写入 stream。目前来看让人参与这一环节仍然很重要随着这类系统的不断成熟未来可能不再需要这一人工步骤。接下来我们更详细地看看整个流程。阶段 1日志分组与模式提取第一阶段不涉及 reasoning model。这是一个确定性过程相同的输入始终产生相同的输出不存在模型带来的波动。同时它也缩小了第二阶段需要处理的范围。在任何提取开始之前Streams 会根据日志格式指纹对消息进行聚类。这个算法其实很简单数字映射为 0字母映射为 a而标点符号保持不变。生成相同指纹的两条消息会被归入同一组。# two entries from the same nginx stream 2026-03-30 14:22:31 192.168.1.100 - james GET /api/v1/health 200 2026-03-30 08:01:05 10.0.0.5 - alice GET /api/v2/status 404 # fingerprint 0-0-0 0:0:0 0.0.0.0 - a a /a/a0/a 0 0-0-0 0:0:0 0.0.0.0 - a a /a/a0/a 0一个包含多种日志格式的 stream 会生成多个分组每种格式对应一个分组。这是一种非常简单但非常有效的方法可以将相似日志聚类在一起从而使后续所有步骤更加可靠。Grok 和 Dissect 都在相同的输入上运行但它们的工作方式不同。Grok 按分组运行因为它支持多个模式并能分别处理每种不同格式。Dissect 只使用一个模式因此只针对批次中最大的分组。对于每个候选方案一个启发式算法会分析日志消息并识别字段边界哪些是固定文本哪些是变化部分。它会生成一个带有位置占位符名称的模式。随后一个 LLM 会基于最多 10 条示例消息审查这些字段位置并将占位符重命名为更具可读性、符合 schema 规范的名称。# grok heuristic output (positional placeholders) %{IPV4:field_0} - %{USER:field_1} \[%{HTTPDATE:field_2}\] %{WORD:field_3} %{URIPATHPARAM:field_4}... # after LLM field naming (ECS-aligned) %{IPV4:source.ip} - %{USER:user.name} \[%{HTTPDATE:timestamp}\] %{WORD:http.request.method} %{URIPATHPARAM:url.path}... # dissect heuristic output (positional placeholders) %{field_0} - %{field_1} [%{field_2}] %{field_3} %{field_4} %{?field_5} %{field_6} %{field_7} # after LLM field naming (ECS-aligned) %{source.ip} - %{user.name} [%{timestamp}] %{http.request.method} %{url.path} %{?http_version} %{http.response.status_code} %{http.response.body.bytes}生成的处理器会在你提交的文档上进行模拟以衡量其解析率。Grok 表达能力更强支持类型化字段、命名捕获以及多个子模式但最大的缺点是速度较慢。相比之下Dissect 更快但仅限于基于固定位置的拆分。简单的日志格式通常可以用 dissect 干净地解析复杂的则需要 grok。解析率更高的候选方案会成为该分组的解析处理器。这个过程会对批次中的每个分组执行。阶段 1 会为阶段 2 提供每个分组对应的一个解析处理器。对于一批 nginx 访问日志提取过程会为当前唯一的格式分组生成两个候选方案# input (sampled from 300 submitted documents) 192.168.1.100 - james [30/Mar/2026:14:22:31 0000] GET /api/v1/health HTTP/1.1 200 1234 # grok candidate → parse rate 94% (282/300) %{IPV4:source.ip} - %{USER:user.name} \[%{HTTPDATE:timestamp}\] %{WORD:http.request.method} %{URIPATHPARAM:url.path} HTTP/%{NUMBER:http.version} %{NUMBER:http.response.status_code:int} %{NUMBER:http.response.body.bytes:int} # dissect candidate → parse rate 71% (213/300) %{source.ip} - %{user.name} [%{timestamp}] %{http.request.method} %{url.path} %{?http_version} %{http.response.status_code} %{http.response.body.bytes} # winner: grok这里 Grok 胜出因为 %{HTTPDATE} 可以处理带方括号的时间戳格式Dissect 尝试基于固定位置进行拆分但在处理外围的方括号时失败。两者是并行运行的对结果进行比较几乎不会增加额外时间因为这一步初始模拟只是在一部分样本文档上完成。阶段 2推理 agent阶段 1 会生成一个解析处理器阶段 2 则将其转化为一个完整且经过验证的 pipeline。这个阶段使用一个推理 agent在一个包含两个工具的循环中迭代执行最多运行六次。循环过程代理会接收阶段 1 生成的解析处理器并提出额外的步骤日期规范化、类型转换、字段清理以及对其识别为敏感的字段进行 PII 脱敏处理。随后它会在你的原始文档原始数据而非预处理后的数据上运行完整的候选 pipeline并返回验证结果。如果模拟失败代理会读取错误信息并进行调整。这些失败信息非常具体我们充分利用 LLM 的能力来理解它们哪个处理器失败了、影响了多少比例的文档、以及错误类型是什么。当解析率低于 80% 时工具会返回Parse rate is too low: 67.00% (minimum required: 80%). The pipeline is not extracting fields from enough documents. Review the processors and ensure they handle the document structure correctly. Processor grok[0] has a failure rate of 33.00% (maximum allowed: 20%). This processor is failing on too many documents.代理现在会读取处理器名称、失败率以及阈值然后在下一次迭代中调整模式。在错误被解决之前它无法提交。这一过程会不断重复直到 pipeline 通过验证然后才会提交并在 UI 中发送给用户进行审批。为了保证质量我们在工具层面而不是依赖代理自身判断强制执行两个硬性阈值如果成功解析的文档少于 80%模拟将返回错误agent 必须先修复这个问题才能继续。如果任何单个处理器在超过 20% 的文档上失败则该模拟被判定为无效。验证机制也嵌入在工具中模型会看到错误信息并且必须在继续之前解决这些问题。它无法提交一个未通过这些检查的 pipeline。在底层我们在引导代理朝一个特定方向工作。这里的系统提示包括“优先简化。与其添加变通方案不如移除有问题的处理器。一个能够完美处理 95% 文档的 pipeline要优于一个试图覆盖 100% 但表现不可预测的方案。”如果你的数据已经是结构化良好的例如已有规范的 timestamp、正确的字段类型、且没有需要解析的原始文本代理会检测到这一点并提交一个空的 pipeline而不会为了增加步骤而增加处理器。输出是 StreamlangAgent 会编写 Streamlang DSL这是 Elastic 用于流处理的语言在底层会编译为 ingest pipeline。字段 schema、处理器类型以及步骤格式全部都通过 Streamlang 表达。下面是针对上述 nginx 示例、面向 ECS 数据流的用户批准后的 pipelinesteps: - action: grok from: message patterns: - %{IPV4:source.ip} - %{USER:user.name} \\[%{HTTPDATE:timestamp}\\] \%{WORD:http.request.method} %{URIPATHPARAM:url.path} HTTP/%{NUMBER:http.version}\ %{NUMBER:http.response.status_code:int} %{NUMBER:http.response.body.bytes:int} - action: date from: timestamp formats: - dd/MMM/yyyy:HH:mm:ss Z - action: convert from: http.response.status_code type: integer - action: remove from: message两种 schema一个生成器并不是所有人落库日志的方式都相同Elastic 需要支持多种格式。使用 OpenTelemetry collector 的团队希望数据采用 OTel 原生字段而使用 Elastic 传统技术栈的团队则期望使用 ECS。这两种方式都是合理的如果强制所有人使用同一种 schema就意味着一半用户在开始之前就必须先重构自己的 pipeline。因此Streams 同时支持这两种方式生成器也会同时处理。我们会自动判断应该使用 OTel 还是 ECS。具体来说主要是查看 stream 的名称中是否包含 “otel”因为这是当前技术栈中的默认命名方式。由于规范字段名称不同两种情况下生成的 pipeline 也会有所不同OTelECSLog bodybody.textmessageLog levelseverity_textlog.levelService nameresource.attributes.service.nameservice.nameHost nameresource.attributes.host.namehost.nameOTel 数据流会使用一个 grok 处理器从body.text字段中读取数据{ action: grok, from: body.text, patterns: [...] }ECS 数据流则从message字段读取{ action: grok, from: message, patterns: [...] }OTel 数据流会将 ECS 字段名映射为对应的 OTel 字段别名。例如log.level会作为severity_text的别名message会作为body.text的别名。这样一来用 ECS 编写的查询可以直接在 OTel 数据流上运行无需修改因为别名层会自动完成字段转换。{ message: { path: body.text, type: alias }, log.level: { path: severity_text, type: alias } }代理会识别当前属于哪一种 schema并据此调整行为。它不会在 OTel stream 中额外添加severity_text → log.level的重命名步骤因为别名层已经提供了这种映射关系。而在 ECS stream 中它会显式地执行这种标准化处理。模式规范化 / schema 规范化字段抽取只是最显而易见的一部分但更重要的是字段对齐。如果两个服务都在记录 HTTP 请求但使用了不同的字段名例如一个用response_status另一个用http_code那么针对http.response.status_code: 5*的查询在两者中都会得不到结果。Schema normalization 的作用就是将这些不同字段统一映射到标准名称# before: extracted field names from two different services { response_status: 500 } # service A { http_code: 500 } # service B # after: ECS normalization { http.response.status_code: 500 }现在每个服务都使用http.response.status_code因此该查询可以在所有服务中正常工作。在模拟过程中代理会为其生成的每个字段检查 ECS 和 OTel 的元数据。对于已经是标准名称的字段会保持不变对于能够映射到已知 ECS 字段的会进行重命名。模拟指标会明确展示这一点结果中的每个字段都会带有其 ECS 或 OTel 类型标识因此你可以一眼看出哪些字段已经被规范化。代理必须达到的标准系统提示为用户批准的 pipeline 设置了明确的验收条件99% 的文档必须包含有效的timestamp所有字段必须符合目标 schema 的类型要求整体失败率必须低于 0.5%如果代理无法在 6 次迭代内满足所有条件则生成失败。总结pipeline 生成只需要几秒而手动流程可能需要数小时。时间节省来自于自动化了原本需要手动执行的验证循环编写 pattern、在真实数据上测试、阅读失败结果、调整再试。代理会在最多 6 个周期内基于 stream 最近实际接收的文档完成这一过程。Streams 与 processing 的后续发展当前最面向用户的改进是 refinement loop细化循环。现在如果建议 “接近正确但不完全正确”你需要手动编辑步骤并结束下一版本允许你直接调整生成的 pipeline并带着修改反馈回传给代理使其在你的基础上继续优化而不是从头开始。另外两个正在开发的方向是生成过程异步化目前会阻塞 UI 几秒未来将在后台运行以及支持已有 pipeline 的 streams目前只支持没有任何 processing steps 的 stream。这些能力也正在以可调用工具的形式暴露在 Streams agent builder 中并通过 API 提供给第三方 agent 框架使用。代理可以在更大的 onboarding 工作流中直接调用完整的 pipeline 生成而不依赖 UI。原文https://www.elastic.co/observability-labs/blog/elastic-streams-ai-pipeline-generation