opentelemetry
作为 CNCF
社区的全新项目,最为核心的就是 Collector
核心 Collector
由三种组件构建而成
Receivers
: 接受数据
Processors
: 处理数据
Exporters
: 将数据暴露出去
使用一种 pipelines
将这三者聚合而来,除此之外还有一些 extensions
用来拓展 collector
common 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 receivers: otlp: protocols: grpc: http: processors: batch: exporters: otlp: endpoint: otelcol:55680 extensions: health_check: pprof: zpages: service: extensions: [health_check ,pprof ,zpages ] pipelines: traces: receivers: [otlp ] processors: [batch ] exporters: [otlp ] metrics: receivers: [otlp ] processors: [batch ] exporters: [otlp ] logs: receivers: [otlp ] processors: [batch ] exporters: [otlp ]
通过配置,我们可以初窥之。
源码探路
opentelemetry 下称之为 OT
Component OT
将所有组件的公用抽象为 Component interface
,
1 2 3 4 type Component interface { Start(ctx context.Context, host Host) error Shutdown(ctx context.Context) error }
基于 Component
拓展出 Receiver
1 2 3 type Receiver interface { Component }
而基于 Receiver
继续拓展出 TracesReceiver
MetricsReceiver
LogsReceiver
,这个 OOP
的味道真的很熟悉。
Service 看代码先抓核心脉络,显然从配置上我们可以出来, pipelines
作为处理的贯穿者,pipelines
又是包含在 service
中,我们来看看 service
是怎么将这些串起来的。
buildPipelines() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 func (srv *service) buildPipelines() error { var err error srv.builtExporters, err = builder.BuildExporters(srv.logger, srv.startInfo, srv.config, srv.factories.Exporters) if err != nil { return fmt.Errorf("cannot build builtExporters: %w" , err) } srv.builtPipelines, err = builder.BuildPipelines(srv.logger, srv.startInfo, srv.config, srv.builtExporters, srv.factories.Processors) if err != nil { return fmt.Errorf("cannot build pipelines: %w" , err) } srv.builtReceivers, err = builder.BuildReceivers(srv.logger, srv.startInfo, srv.config, srv.builtPipelines, srv.factories.Receivers) if err != nil { return fmt.Errorf("cannot build receivers: %w" , err) } return nil }
在创建完成之后,记得调用 Start
启动即可
Start() 1 2 3 4 5 6 7 8 9 10 11 func (srv *service) Start(ctx context.Context) error { if err := srv.startExtensions(ctx); err != nil { return fmt.Errorf("cannot setup extensions: %w" , err) } if err := srv.startPipelines(ctx); err != nil { return fmt.Errorf("cannot setup pipelines: %w" , err) } return srv.builtExtensions.NotifyPipelineReady() }
而构建的过程大同消息,我们这里举一个 BuildExporters
的例子
BuildExporters 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 func BuildExporters (logger *zap.Logger,appInfo component.ApplicationStartInfo,config *configmodels.Config,factories map [configmodels.Type]component.ExporterFactory) (Exporters, error ) { eb := &exportersBuilder{logger.With(zap.String(kindLogKey, kindLogsExporter)), appInfo, config, factories} exporterInputDataTypes := eb.calcExportersRequiredDataTypes() exporters := make (Exporters) for _, cfg := range eb.config.Exporters { componentLogger := eb.logger.With(zap.String(typeLogKey, string (cfg.Type())), zap.String(nameLogKey, cfg.Name())) exp, err := eb.buildExporter(context.Background(), componentLogger, eb.appInfo, cfg, exporterInputDataTypes) if err != nil { return nil , err } exporters[cfg] = exp } return exporters, nil }
最终的 Export
调用根据类型,会是 CreateTracesExporter/CreateMetricsExporter/CreateLogsExporter
这三者之一
以 CreateTracesExporter
,实现类是 factory
CreateTracesExporter 1 2 3 4 5 6 7 8 9 func (f *factory) CreateTracesExporter( ctx context.Context, params component.ExporterCreateParams, cfg configmodels.Exporter) (component.TracesExporter, error ) { if f.createTracesExporter != nil { return f.createTracesExporter(ctx, params, cfg) } return nil , configerror.ErrDataTypeIsNotSupported }
从名字就可以看出来,这是一个工厂模式,而构建 Exporter
是由 factory
的构建者传入的
WithTraces 1 2 3 4 5 func WithTraces (createTraceExporter CreateTracesExporter) FactoryOption { return func (o *factory) { o.createTracesExporter = createTraceExporter } }
而这个 factorey
恰恰也是由 Export
自己构建的
NewFactory 1 2 3 4 5 6 func NewFactory () component.ExporterFactory { return exporterhelper.NewFactory( typeStr, createDefaultConfig, exporterhelper.WithTraces(createTraceExporter)) }
这些 Factory
是作为默认存在的代码写死在系统内的
service/defaultcomponents/defaults.go:54 1 2 3 4 5 6 7 8 9 10 receivers, err := component.MakeReceiverFactoryMap( jaegerreceiver.NewFactory(), fluentforwardreceiver.NewFactory(), zipkinreceiver.NewFactory(), prometheusreceiver.NewFactory(), opencensusreceiver.NewFactory(), otlpreceiver.NewFactory(), hostmetricsreceiver.NewFactory(), kafkareceiver.NewFactory(), )
不仅仅只支持单入口的,我们也可以支持多入口的
1 2 3 4 5 6 service: pipelines: traces: receivers: [otlp , jaeger , zipkin ] processors: [memory_limiter , batch ] exporters: [otlp , jaeger , zipkin ]
到这里,我们就明白了 service
的功能,就是读取配置文件,按照文件去构建系统,让我们回到真正运行的部分
Receiver Receiver
组件我们用 ZipkinReceiver
看看是怎么工作的。如果构建的我们已经知道,包含如何运行,我们显然需要看看他到底做了什么。
Start github 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 func (zr *ZipkinReceiver) Start(_ context.Context, host component.Host) error { zr.startOnce.Do(func () { err = nil zr.host = host zr.server = zr.config.HTTPServerSettings.ToServer(zr) var listener net.Listener listener, err = zr.config.HTTPServerSettings.ToListener() if err != nil { return } zr.shutdownWG.Add(1 ) go func () { defer zr.shutdownWG.Done() if errHTTP := zr.server.Serve(listener); errHTTP != http.ErrServerClosed { host.ReportFatalError(errHTTP) } }() }) return err }
启动里面就打开了监听器,标准一个 HTTP
服务。处理逻辑肯定在我们熟悉的 ServeHTTP
这了。
ServceHTTP github 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 func (zr *ZipkinReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) { pr := processBodyIfNecessary(r) slurp, _ := ioutil.ReadAll(pr) if c, ok := pr.(io.Closer); ok { _ = c.Close() } _ = r.Body.Close() var td pdata.Traces var err error if asZipkinv1 { td, err = zr.v1ToTraceSpans(slurp, r.Header) } else { td, err = zr.v2ToTraceSpans(slurp, r.Header) } consumerErr := zr.nextConsumer.ConsumeTraces(ctx, td) obsreport.EndTraceDataReceiveOp(ctx, receiverTagValue, td.SpanCount(), consumerErr) }
可见比较核心的是 ConsumeTraces
,如何让这个枢纽运作起来,代码在
attach github 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 func (rb *receiversBuilder) attachReceiverToPipelines(){ case configmodels.TracesDataType: junction := buildFanoutTraceConsumer(builtPipelines) createdReceiver, err = factory.CreateTracesReceiver(ctx, creationParams, config, junction) } func buildFanoutTraceConsumer (pipelines []*builtPipeline) consumer.Traces { if len (pipelines) == 1 { return pipelines[0 ].firstTC } var pipelineConsumers []consumer.Traces anyPipelineMutatesData := false for _, pipeline := range pipelines { pipelineConsumers = append (pipelineConsumers, pipeline.firstTC) anyPipelineMutatesData = anyPipelineMutatesData || pipeline.MutatesConsumedData } if anyPipelineMutatesData { return fanoutconsumer.NewTracesCloning(pipelineConsumers) } return fanoutconsumer.NewTraces(pipelineConsumers) }
Exporter 对于 Exporter
我们来看看 JaegerExporter
逻辑也是非常的清晰的
pushTraceData github 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 func (s *protoGRPCSender) pushTraceData(ctx context.Context, td pdata.Traces,) error { batches, err := jaegertranslator.InternalTracesToJaegerProto(td) if err != nil { return consumererror.Permanent(fmt.Errorf("failed to push trace data via Jaeger exporter: %w" , err)) } if s.metadata.Len() > 0 { ctx = metadata.NewOutgoingContext(ctx, s.metadata) } for _, batch := range batches { _, err = s.client.PostSpans( ctx, &jaegerproto.PostSpansRequest{Batch: *batch}, grpc.WaitForReady(s.waitForReady)) if err != nil { s.logger.Debug("failed to push trace data to Jaeger" , zap.Error(err)) return fmt.Errorf("failed to push trace data via Jaeger exporter: %w" , err) } } return nil }
而这个 client
恰好就是 jaegerproto.CollectorServiceClient
而将这些串通在一起,我们需要一个稳定的 ABI
,而这个就是
consumer/consumer.go:33 1 2 3 4 type Traces interface { ConsumeTraces(ctx context.Context, td pdata.Traces) error }
因此我们发现,其实这几个组件都是实现了此接口。