一个 OTLP 端点,三个团队,零路由规则:Elasticsearch Streams AI 分区
作者来自 Elastic Aleksandar Panov停止提前编写日志路由规则。看看 Streams AI Partitioning 如何读取你的数据提出子 streams并让你在几分钟内为每个团队设置保留策略。将三个团队的日志发送到同一个 Elastic OTLP 端点Streams AI 分区 会将其路由到按团队划分的子 stream无需提前编写任何路由规则。在这篇文章中你生成 115 条多团队日志记录让 AI 分析接收到的数据并提出分区建议用自然语言进行优化然后为每个团队分别设置独立保留策略支付 90 天结账 30 天通知 7 天。整个流程在 Elastic 可观测性Observability内部运行无需接触 index templates 或 ILM policies。为什么多团队日志路由需要结构化设计在多团队的 Elasticsearch 部署中系统通常会逐渐收敛到一个共享索引这在初期是可行的但当各团队开始需要不同的保留周期、分片配置或处理 pipeline 时这种模式就会开始出现问题。在 Streams 出现之前你必须在数据写入前就配置好 ingestion 脚本把数据发送到不同的索引或 data streams或者使用 reroute processor通过某个字段来定义数据的目标去向。而在 AI Partitioning 模式下你让数据先进入系统。然后 AI 会分析已到达的数据提出分区建议你可以用自然语言对这些建议进行调整并应用它们。最终会生成一组 wired child streams它们继承父 stream 的保留策略、processors 和 schema同时仍然允许在子 stream 层级进行独立覆盖。使用 Streams AI Partitioning 之前的准备条件在运行示例之前需要满足以下三个条件启用 Wired Streams在 Elastic Cloud Serverless 以及 Elastic Cloud Hosted 9.4 中wired streams 默认已开启。如果你是从更早版本升级而来请打开 Streams 应用并在 Settings 中确认该开关已启用。Elastic Managed LLM 连接器前往Stack Management Connectors Create connector Elastic Managed LLM。需要注意以下几点该连接器已预先配置无需外部账号或 API key。任意 生成式 AI 连接器 都可用于该功能。注意 Elastic Managed LLM 按输入与输出的 token 量计费每百万 token 收费详情见 定价说明。使用该功能的账号需要具备manage_inference集群权限内置inference_admin角色已包含该权限。Managed OTLP 端点 URL 与 API key打开Cloud Console Manage Application endpoints Ingest。复制 Managed OTLP endpoint URL并在同一页面生成 API key。准备好这些之后打开Observability Streams确认列表中存在一个logs.otelwired stream。这个 stream 就是我们接下来要进行分区的父 stream。如果侧边栏中没有显示Streams你的 Kibana space 可能使用的不是 Observability 的 solution view。你可以在 Stack Management Spaces 中修改编辑你的 space将 Solution view 设置为Observability。生成多团队日志数据本示例使用来自同一家虚构公司的三个应用这些应用分别由不同团队产生payments-api结构化 JSON包含transaction_id和amount_cents。数据敏感保留周期较长。checkout-webJSON 格式包含cart_id和customer_id。主要为 INFO 和 ERROR 日志。notifications-worker结构较松散包含recipient和channel。日志量较大。我们使用 Python 脚本通过 OpenTelemetry Python SDK 将三个团队的日志通过 OTLP 直接发送到 Managed OTLP 端点。完整代码包括配置与执行在 配套 notebook 中提供。每个团队都通过 service name 定义并配有一组消息模板以及一个用于生成团队特定属性的函数TEAMS { payments: { service: payments-api, messages: [ (INFO, charge captured tx{tx} amount_cents{amt}), (ERROR, charge declined tx{tx} reasoninsufficient_funds), (INFO, refund issued tx{tx} amount_cents{amt}), ], extra: lambda: { transaction_id: ftx_{random.randint(10000, 99999)}, amount_cents: random.randint(100, 50000), }, }, checkout: { service: checkout-web, messages: [ (INFO, cart updated cart{cart} customer{cust}), (INFO, checkout started cart{cart} customer{cust}), (ERROR, checkout failed cart{cart} stageaddress_validation), ], extra: lambda: { cart_id: fc_{random.randint(1000, 9999)}, customer_id: fu_{random.randint(100, 999)}, }, }, notifications: { service: notifications-worker, messages: [ (INFO, email queued recipient{rcp} channelemail), (INFO, sms queued recipient{rcp} channelsms), (ERROR, webhook failed recipient{rcp} channelwebhook status503), ], extra: lambda: { recipient: f1555{random.randint(1000000, 9999999)}, channel: random.choice([email, sms, webhook]), }, }, }将elasticsearch.index设置为logs.otel作为资源属性会将数据路由到 wired streams 根 stream而不是默认的 OTLP 数据 stream。def setup_provider(): resource Resource.create({elasticsearch.index: logs.otel}) provider LoggerProvider(resourceresource) provider.add_log_record_processor(BatchLogRecordProcessor(OTLPLogExporter())) set_logger_provider(provider) handler LoggingHandler(levellogging.INFO, logger_providerprovider) root logging.getLogger() root.setLevel(logging.INFO) root.addHandler(handler) return provider运行 notebook发送 115 条记录并在三个团队之间分布不均。打开Observability Streams logs.otel切换到Partitioning分区标签页。你应该能在预览面板中看到已接收的数据其中包含team、service.name等属性以及各团队特定字段在列视图中可见。Streams AI 分区如何提出子 streamStreams AI Partitioning 会分析来自父 stream 的最多 1,000 条文档识别属性的聚类情况与基数分布然后基于能够最好区分数据的字段提出子 stream用于此分析的机器学习方法详见 Streams 中的自动日志解析。对于我们刚刚发送的数据AI 基于attributes.service.name提出了三个子 stream每个建议都会展示一个 Streamlang 条件以及在采样文档中匹配该条件的百分比。AI 选择service.name是因为它是一个标准的 OpenTelemetry 属性并且是任何单一工作负载的自然标识符。这个初始建议是合理的但值得思考当部署规模增长时会发生什么。目前只有三个服务因为只有三个团队。明天Payments 可能会新增refunds-api和fraud-detector。每新增一个 service都会机械地创建一个新的子 stream随着时间推移你会为实际上只有三个组织边界的系统生成数十个分区。Elastic 的 分区建议 更倾向于按 team 或技术类型进行逻辑分组并目标控制在几十个分区而不是数百个。基于team的分区更稳定因为即使 Payments 团队运营再多服务它仍然只会对应一个子 stream。用自然语言优化 Streams AI 分区建议在 Streams AI Partitioning 中审查完初始 AI 建议后点击Modify suggestions打开自由文本输入框。提交后AI 会重新生成建议。现在这三个卡片的分组键从service.name改为attributes.team勾选全部三个然后点击Accept selected。会弹出一个确认对话框显示将要创建的 streams每一个都带有WHERE attributes.team equals team条件。点击Create all streams。现在 Partitioning 标签页会显示三个子 streams它们都作为logs.otel父 stream 的一部分每一条通过 OTLP 端点进入的新文档都会根据这些条件被路由到对应的子 stream。你可以打开任意子 stream 来验证其数据。例如logs.otel.checkout只会显示 checkout 日志如何在 Elasticsearch Streams 中设置按团队的日志保留策略在 Streams AI Partitioning 创建子 stream 之后每个子 stream 都可以拥有独立的生命周期配置而不需要依赖父 stream。由于 wired streams 采用父子层级结构每个子 stream 默认会继承父级的保留策略、processors 和 schema。你只需要覆盖那些需要调整的分区即可。打开子 streamlogs.otel.payments进入 Retention 标签页。点击Edit retention method选择Custom period并将其设置为 90 天。对其他团队做同样操作并设置符合其需求的保留策略StreamRetentionRationalelogs.otel.payments90 天敏感金融数据满足合规要求logs.otel.checkout30 天用于排查问题无需长期保存logs.otel.notifications7 天高吞吐量送达确认后价值较低结论从共享索引到按团队 streams无需路由规则一个由多个团队向 Elastic 部署发送日志的共享系统是最常见的起点。过去要对其进行组织通常需要提前编写路由规则或者手动维护不同的 index templates 和 ILM policies。有了 Streams AI Partitioning流程变得不同你让数据先进入系统让 AI 读取实际到达的数据在需要时用自然语言优化建议然后直接接受结果。最终得到的是一组子 streams它们继承父 stream 的全部配置同时为每个团队提供独立的保留策略和处理能力而无需任何手动模板管理。下一步运行这个 配套 notebook生成你自己的多团队数据。阅读 Elastic 可观测性 Streams 如何简化保留管理深入理解保留模型。阅读 Streams Processing停止与 Grok 作斗争探索当不同团队需要不同解析逻辑时 Streams 的处理能力。阅读 为可观测性引入 Streams了解 Streams 所在的整体调查能力体系。原文Log routing without rules: Elastic Streams AI Partitioning — Elastic Observability Labs