作者来自 Elastic Jeffrey Rengifo使用 OpenTelemetry 为 Flask API 添加监测并仅通过 2 个环境变量将追踪、指标和日志发送到 Elastic Cloud无需采集器。Elastic 的 托管 OTLP 端点 允许你的 Python 应用仅使用标准的 OpenTelemetry SDK 和 2 个环境变量直接将追踪、指标和日志发送到 Elastic Cloud。无需部署、配置或维护 Collector。本文将通过一个完整添加监测的 Flask API 示例演示从创建第一个 span 到在 Kibana 服务地图中查看服务的全过程并实现日志与追踪关联。前提条件一个 Elastic Cloud 账户Serverless 或 Cloud Hosted v9.0Python 3.9Kibana 内置的示例 eCommerce orders 数据集Add data Sample data Sample eCommerce orders安装所需的包pip install flask elasticsearch python-dotenv \ opentelemetry-api opentelemetry-sdk \ opentelemetry-exporter-otlp什么是 Elastic 托管 OTLP 端点Elastic 托管 OTLP 端点是一个托管的摄取层支持应用直接通过 HTTP 或 gRPC 接收标准 OTLP 数据无需使用采集器。OpenTelemetry 使用 OTLPOpenTelemetry 协议 来传输遥测数据。在传统架构中应用会将数据发送到 OpenTelemetry Collector再由 Collector 转发到 Elasticsearch 等后端系统。Collector 负责批处理、重试和路由但也意味着你需要额外部署、配置并维护一个组件。Elastic 的 托管 OTLP 端点近期正式发布移除了这一步骤。你的应用可以直接将 OTLP 数据发送到 Elastic 托管的端点。该端点支持标准 OTLP over HTTP 或 gRPC并由托管摄取层提供支撑负责扩展、缓冲和可靠性保障。你使用的仍然是同一套 opentelemetry-exporter-otlp 包这些包与任何后端兼容。使用 OpenTelemetry 构建 Python Flask API我们将用 Python 和 Flask 构建一个小型 REST API用于列出和查询存储在 Elasticsearch 中的 eCommerce 订单。作为数据源我们将使用 Kibana 内置的示例数据集。你可以在 配套代码仓库 中找到完整的应用代码。下面的教程会逐步构建代码但如果你愿意也可以直接克隆仓库并跟着一起操作。在添加遥测之前我们先看基础应用一个用于查询 Kibana 示例 eCommerce 索引的 Flask API。它包含两个端点列出最近订单以及通过 ID 查询单个订单。创建一个名为 app.py 的文件import os from dotenv import load_dotenv from elasticsearch import Elasticsearch from flask import Flask, jsonify load_dotenv() # Elasticsearch client es Elasticsearch( hosts[os.environ[ES_URL]], api_keyos.environ[ES_API_KEY], ) INDEX kibana_sample_data_ecommerce app Flask(__name__) app.route(/orders) def list_orders(): response es.search( indexINDEX, size10, sort[{order_date: desc}], aggs{total_revenue: {sum: {field: taxful_total_price}}}, ) hits response[hits][hits] total_revenue response[aggregations][total_revenue][value] orders [ { order_id: h[_source][order_id], customer: h[_source][customer_full_name], total: h[_source][taxful_total_price], date: h[_source][order_date], } for h in hits ] return jsonify({orders: orders, total_revenue: total_revenue}) app.route(/orders/order_id) def get_order(order_id): response es.search( indexINDEX, size1, query{term: {order_id: order_id}}, ) hits response[hits][hits] if not hits: return jsonify({error: Order not found}), 404 return jsonify(hits[0][_source]) if __name__ __main__: app.run(host0.0.0.0, port5001)创建一个 .env 文件填写你的 Elasticsearch 凭证。要获取 OTLP 端点和 API Key请参考 快速开始将 OTLP 数据发送到 Elastic 指南。OTEL_EXPORTER_OTLP_ENDPOINT OTEL_EXPORTER_OTLP_HEADERSAuthorizationApiKey api_key ES_URL ES_API_KEY这是一个未进行任何监测instrumentation的可运行 API。接下来的部分将逐步为其添加追踪、指标和日志每次只引入一种信号。从 Python 发送 traces我们先从 traces 开始使用 Python OpenTelemetry SDK。我们会用 spans 包裹 Elasticsearch 调用从而观察每个查询的耗时。设置 tracer在 app.py 的顶部添加以下内容from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.resources import Resource, SERVICE_NAME # Create a resource identifying your service resource Resource.create({ SERVICE_NAME: my-python-app }) # Set up the tracer provider with OTLP export tracer_provider TracerProvider(resourceresource) tracer_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter())) trace.set_tracer_provider(tracer_provider) tracer trace.get_tracer(__name__)为你的 endpoints 添加 spans现在将每个 route handler 用一个 parent span 包裹并在 Elasticsearch 调用外再嵌套一个 child span。这样你就得到一个两层 trace一个 span 表示 HTTP 请求另一个 span 表示内部数据库查询。app.route(/orders) def list_orders(): with tracer.start_as_current_span(list-orders) as span: with tracer.start_as_current_span(es.search) as es_span: es_span.set_attribute(db.system, elasticsearch) es_span.set_attribute(db.elasticsearch.index, INDEX) response es.search( indexINDEX, size10, sort[{order_date: desc}], aggs{total_revenue: {sum: {field: taxful_total_price}}}, ) hits response[hits][hits] total_revenue response[aggregations][total_revenue][value] orders [ { order_id: h[_source][order_id], customer: h[_source][customer_full_name], total: h[_source][taxful_total_price], date: h[_source][order_date], } for h in hits ] span.set_attribute(orders.returned, len(orders)) return jsonify({orders: orders, total_revenue: total_revenue})每个请求都会创建一个 parent spanlist-orders并包含一个 child spanes.search用于包裹 Elasticsearch 调用。db.system和db.elasticsearch.index 属性是 Kibana 用来在 trace waterfall 中识别该调用为 Elasticsearch 查询的关键字段。从 Python 发送 metricsmetrics 用于揭示跨所有请求的模式有多少调用命中了每个 endpoint、响应时间如何随时间变化。我们现在添加一个 counter 和 histogram 来捕获这些信息。设置 meter在 app.py 中添加以下内容from opentelemetry import metrics from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter # 设置 metric reader 和 provider metric_reader PeriodicExportingMetricReader( OTLPMetricExporter(), export_interval_millis10000 # 每 10 秒导出一次 ) meter_provider MeterProvider( resourceresource, metric_readers[metric_reader] ) metrics.set_meter_provider(meter_provider) meter metrics.get_meter(__name__)创建 instruments# 请求总数 counter request_counter meter.create_counter( nameapp.requests.total, descriptionTotal number of requests, unit1 ) # 请求耗时 histogram request_duration meter.create_histogram( nameapp.request.duration, descriptionRequest duration in seconds, units )在 endpoints 中记录 metrics更新 route handler 来记录 metricsimport time app.route(/orders) def list_orders(): start_time time.time() with tracer.start_as_current_span(list-orders) as span: with tracer.start_as_current_span(es.search) as es_span: es_span.set_attribute(db.system, elasticsearch) es_span.set_attribute(db.elasticsearch.index, INDEX) response es.search( indexINDEX, size10, sort[{order_date: desc}], aggs{total_revenue: {sum: {field: taxful_total_price}}}, ) hits response[hits][hits] total_revenue response[aggregations][total_revenue][value] orders [ { order_id: h[_source][order_id], customer: h[_source][customer_full_name], total: h[_source][taxful_total_price], date: h[_source][order_date], } for h in hits ] duration time.time() - start_time span.set_attribute(orders.returned, len(orders)) request_counter.add(1, {endpoint: /orders, status: 200}) request_duration.record(duration, {endpoint: /orders}) return jsonify({orders: orders, total_revenue: total_revenue})从 Python 发送 logstraces 和 metrics 告诉你发生了什么以及发生频率。logs 捕获细节错误信息、参数值和调试上下文。OpenTelemetry 通过桥接 Python 的标准 模块让你现有的日志语句与 traces 和 metrics 一起通过 OTLP 导出。当你在 span 上下文中输出 log 时SDK 会自动附加 trace ID 和 span ID因此你可以从一条 log 直接跳转到生成它的 trace。设置 log providerimport logging from opentelemetry._logs import set_logger_provider from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler from opentelemetry.sdk._logs.export import BatchLogRecordProcessor from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter # 设置 logger provider logger_provider LoggerProvider(resourceresource) logger_provider.add_log_record_processor( BatchLogRecordProcessor(OTLPLogExporter()) ) set_logger_provider(logger_provider) # 将 Python logging 接入 OpenTelemetry handler LoggingHandler( levellogging.INFO, logger_providerlogger_provider ) logging.getLogger().addHandler(handler) logging.getLogger().addHandler(logging.StreamHandler()) # 同时输出到终端 logging.getLogger().setLevel(logging.INFO) logger logging.getLogger(__name__)LoggingHandler()确保 logs 在开发时也会显示在你的终端中。如果没有它logs 只会发送到 Elastic而终端不会输出内容这在测试时可能会让人困惑。从 endpoints 发送 logsapp.route(/orders) def list_orders(): start_time time.time() with tracer.start_as_current_span(list-orders) as span: logger.info(列出最近订单) with tracer.start_as_current_span(es.search) as es_span: es_span.set_attribute(db.system, elasticsearch) es_span.set_attribute(db.elasticsearch.index, INDEX) response es.search( indexINDEX, size10, sort[{order_date: desc}], aggs{total_revenue: {sum: {field: taxful_total_price}}}, ) hits response[hits][hits] total_revenue response[aggregations][total_revenue][value] orders [ { order_id: h[_source][order_id], customer: h[_source][customer_full_name], total: h[_source][taxful_total_price], date: h[_source][order_date], } for h in hits ] duration time.time() - start_time span.set_attribute(orders.returned, len(orders)) logger.info( 订单已列出, extra{orders.returned: len(orders), duration_s: round(duration, 4)} ) request_counter.add(1, {endpoint: /orders, status: 200}) request_duration.record(duration, {endpoint: /orders}) return jsonify({orders: orders, total_revenue: total_revenue})因为这些 logger.info() 调用发生在 span 上下文中OpenTelemetry 会自动为每条日志记录附加 trace ID 和 span ID。在 Kibana 中这意味着你可以从一条 log 直接跳转到对应的 trace。运行已添加监测的 Flask 应用并生成流量Traces、metrics 和 logs 已经全部接入后应用就完成了完整的可观测性配置。现在我们运行它、发送请求并验证数据是否进入 Kibana。python app.py注意Flask 自带的开发服务器适用于本教程。在生产环境中应使用类似 gunicorn (花生壳) 的 WSGI 服务器。生成一些流量curl http://localhost:5001/orders curl http://localhost:5001/orders/584677 curl http://localhost:5001/orders/does-not-exist几秒后打开 Kibana进入Observability APM Services。你应该能看到 my-python-app 被列出。在 Kibana 中查看 traces、metrics 和 logs现在遥测数据已经开始流动我们来看看每种信号在 Kibana 中的落点以及它们是如何相互关联的。Traces进入APM Services my-python-app Transactions。点击一个 list-orders transaction 来打开 trace waterfall。你应该会看到 parent span list-orders以及一个 child spanes.search。我们之前设置的 db.system和db.elasticsearch.index 属性会显示在 span 详情中同时 Kibana 会将这个 child 识别为一个 Elasticsearch 查询。失败的 transactions为了查看错误可见性打开一个针对/orders/does-not-exist请求的get-order transaction。transaction 详情中会显示 event.outcome: failure以及我们附加的order.id: does-not-exist 属性。这表明错误状态和自定义属性可以正确地通过托管端点进行传播。关联日志Correlated logs进入Discover按service.name: my-python-app. 进行过滤。展开一条日志文档可以看到 body.text,trace_id和 span_id 字段。由于我们是在 span 上下文中发送 logs每条日志记录都会携带 trace ID。你可以复制 trace_id 的值然后进入APM直接跳转到生成该日志的 trace。Metrics在Discover中切换到 metrics-* data view。你应该会看到 app.requests.total​​​​​​​counter 和app.request.duration histogram 以固定间隔持续到达。你也可以在 Lens 中创建一个可视化app.requests.total 在 Y 轴使用timestamp 在X 轴使用来观察请求量随时间的变化。服务地图Kibana 会基于我们之前设置的 span 属性自动构建 服务地图。因为我们在 Elasticsearch spans db.system: elasticsear 中标记了Kibana 会绘制my-python-app和 elasticsearch 之间的依赖关系从而为你提供服务及其后端的可视化概览。总结在 Python 中无需 Collector 即可实现完整 OpenTelemetry 可观测性在本教程中我们使用标准 OpenTelemetry SDK 和两个环境变量为 Flask API 添加了 traces、metrics 和 logs然后在 Kibana 中验证了所有三种信号并实现了内置的 log-to-trace 关联。托管 OTLP 端点负责处理扩展、缓冲和持久化因此你可以专注于应用本身而无需运维摄取基础设施。接下来你可以考虑使用 EDOT Python 进行自动埋点以移除手动 spans或者添加更贴合业务领域的自定义 metrics。下一步查看 Elastic 托管 OTLP 端点文档 了解高级配置。尝试 或 EDOT Python在无需手动 spans 的情况下自动为 Flask 和 Elasticsearch 客户端进行埋点。查阅 OpenTelemetry Python SDK 文档 获取更多埋点选项。如需对 histogram 的时序特性进行细粒度控制请参考 托管 OTLP 端点文档。原文OpenTelemetry Python to Elastic in 2 environment variables — Elastic Observability Labs