Opentelemetry Collector 源码解读

opentelemetry 作为 CNCF 社区的全新项目,最为核心的就是 Collector

核心

Collector 由三种组件构建而成

  • Receivers: 接受数据
  • Processors: 处理数据
  • Exporters: 将数据暴露出去

使用一种 pipelines 将这三者聚合而来,除此之外还有一些 extensions 用来拓展 collector

common
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
receivers:
otlp:
protocols:
grpc:
http:

processors:
batch:

exporters:
otlp:
endpoint: otelcol:55680

extensions:
health_check:
pprof:
zpages:

service:
extensions: [health_check,pprof,zpages]
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [otlp]
metrics:
receivers: [otlp]
processors: [batch]
exporters: [otlp]
logs:
receivers: [otlp]
processors: [batch]
exporters: [otlp]

通过配置,我们可以初窥之。

源码探路

opentelemetry 下称之为 OT

Component

OT 将所有组件的公用抽象为 Component interface,

1
2
3
4
type Component interface {
Start(ctx context.Context, host Host) error
Shutdown(ctx context.Context) error
}

基于 Component 拓展出 Receiver

1
2
3
type Receiver interface {
Component
}

而基于 Receiver 继续拓展出 TracesReceiver MetricsReceiver LogsReceiver,这个 OOP 的味道真的很熟悉。

Service

看代码先抓核心脉络,显然从配置上我们可以出来, pipelines 作为处理的贯穿者,pipelines 又是包含在 service 中,我们来看看 service 是怎么将这些串起来的。

buildPipelines()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (srv *service) buildPipelines() error {

// 先准备好 Exporters
var err error
srv.builtExporters, err = builder.BuildExporters(srv.logger, srv.startInfo, srv.config, srv.factories.Exporters)
if err != nil {
return fmt.Errorf("cannot build builtExporters: %w", err)
}

// 创建 Pipeline 使用 Service 中包含的 Processer
srv.builtPipelines, err = builder.BuildPipelines(srv.logger, srv.startInfo, srv.config, srv.builtExporters, srv.factories.Processors)
if err != nil {
return fmt.Errorf("cannot build pipelines: %w", err)
}

// 最终创建 Reciver 然后插入 Pipeline 中
srv.builtReceivers, err = builder.BuildReceivers(srv.logger, srv.startInfo, srv.config, srv.builtPipelines, srv.factories.Receivers)
if err != nil {
return fmt.Errorf("cannot build receivers: %w", err)
}

return nil
}

在创建完成之后,记得调用 Start 启动即可

Start()
1
2
3
4
5
6
7
8
9
10
11
func (srv *service) Start(ctx context.Context) error {
if err := srv.startExtensions(ctx); err != nil {
return fmt.Errorf("cannot setup extensions: %w", err)
}

if err := srv.startPipelines(ctx); err != nil {
return fmt.Errorf("cannot setup pipelines: %w", err)
}

return srv.builtExtensions.NotifyPipelineReady()
}

而构建的过程大同消息,我们这里举一个 BuildExporters 的例子

BuildExporters
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func BuildExporters(logger *zap.Logger,appInfo component.ApplicationStartInfo,config *configmodels.Config,factories map[configmodels.Type]component.ExporterFactory) (Exporters, error) {
eb := &exportersBuilder{logger.With(zap.String(kindLogKey, kindLogsExporter)), appInfo, config, factories}

exporterInputDataTypes := eb.calcExportersRequiredDataTypes()

exporters := make(Exporters)
// 以配置文件的方式循环
for _, cfg := range eb.config.Exporters {
componentLogger := eb.logger.With(zap.String(typeLogKey, string(cfg.Type())), zap.String(nameLogKey, cfg.Name()))
// 按照配置文件进行 Export 的构建
exp, err := eb.buildExporter(context.Background(), componentLogger, eb.appInfo, cfg, exporterInputDataTypes)
if err != nil {
return nil, err
}

exporters[cfg] = exp
}

return exporters, nil
}

最终的 Export 调用根据类型,会是 CreateTracesExporter/CreateMetricsExporter/CreateLogsExporter 这三者之一

CreateTracesExporter,实现类是 factory

CreateTracesExporter
1
2
3
4
5
6
7
8
9
func (f *factory) CreateTracesExporter(
ctx context.Context,
params component.ExporterCreateParams,
cfg configmodels.Exporter) (component.TracesExporter, error) {
if f.createTracesExporter != nil {
return f.createTracesExporter(ctx, params, cfg) // 调用 f 的内部函数
}
return nil, configerror.ErrDataTypeIsNotSupported
}

从名字就可以看出来,这是一个工厂模式,而构建 Exporter 是由 factory 的构建者传入的

WithTraces
1
2
3
4
5
func WithTraces(createTraceExporter CreateTracesExporter) FactoryOption {
return func(o *factory) {
o.createTracesExporter = createTraceExporter
}
}

而这个 factorey 恰恰也是由 Export 自己构建的

NewFactory
1
2
3
4
5
6
func NewFactory() component.ExporterFactory {
return exporterhelper.NewFactory(
typeStr,
createDefaultConfig,
exporterhelper.WithTraces(createTraceExporter))
}

这些 Factory 是作为默认存在的代码写死在系统内的

service/defaultcomponents/defaults.go:54
1
2
3
4
5
6
7
8
9
10
receivers, err := component.MakeReceiverFactoryMap(
jaegerreceiver.NewFactory(),
fluentforwardreceiver.NewFactory(),
zipkinreceiver.NewFactory(),
prometheusreceiver.NewFactory(),
opencensusreceiver.NewFactory(),
otlpreceiver.NewFactory(),
hostmetricsreceiver.NewFactory(),
kafkareceiver.NewFactory(),
)

不仅仅只支持单入口的,我们也可以支持多入口的

1
2
3
4
5
6
service:
pipelines:
traces:
receivers: [otlp, jaeger, zipkin]
processors: [memory_limiter, batch]
exporters: [otlp, jaeger, zipkin]

到这里,我们就明白了 service 的功能,就是读取配置文件,按照文件去构建系统,让我们回到真正运行的部分

Receiver

Receiver 组件我们用 ZipkinReceiver 看看是怎么工作的。如果构建的我们已经知道,包含如何运行,我们显然需要看看他到底做了什么。

Startgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (zr *ZipkinReceiver) Start(_ context.Context, host component.Host) error {
zr.startOnce.Do(func() {
err = nil
zr.host = host
zr.server = zr.config.HTTPServerSettings.ToServer(zr)
var listener net.Listener
listener, err = zr.config.HTTPServerSettings.ToListener()
if err != nil {
return
}
zr.shutdownWG.Add(1)
go func() {
defer zr.shutdownWG.Done()

if errHTTP := zr.server.Serve(listener); errHTTP != http.ErrServerClosed {
host.ReportFatalError(errHTTP)
}
}()
})

return err
}

启动里面就打开了监听器,标准一个 HTTP 服务。处理逻辑肯定在我们熟悉的 ServeHTTP 这了。

ServceHTTPgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (zr *ZipkinReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// 这里需要处理 gzip 之类的
pr := processBodyIfNecessary(r)
slurp, _ := ioutil.ReadAll(pr)
if c, ok := pr.(io.Closer); ok {
_ = c.Close()
}
_ = r.Body.Close()

var td pdata.Traces
var err error
if asZipkinv1 {
// 处理 v1
td, err = zr.v1ToTraceSpans(slurp, r.Header)
} else {
// 处理 v2
td, err = zr.v2ToTraceSpans(slurp, r.Header)
}
// 这里的 nextConsumer 就是 pipeline 的下一跳
consumerErr := zr.nextConsumer.ConsumeTraces(ctx, td)

// 处理结束
obsreport.EndTraceDataReceiveOp(ctx, receiverTagValue, td.SpanCount(), consumerErr)

// 返回 HTTP REPSONE 略
}

可见比较核心的是 ConsumeTraces,如何让这个枢纽运作起来,代码在

attachgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (rb *receiversBuilder) attachReceiverToPipelines(){
case configmodels.TracesDataType:
junction := buildFanoutTraceConsumer(builtPipelines)
createdReceiver, err = factory.CreateTracesReceiver(ctx, creationParams, config, junction)
}


func buildFanoutTraceConsumer(pipelines []*builtPipeline) consumer.Traces {
if len(pipelines) == 1 {
return pipelines[0].firstTC
}

var pipelineConsumers []consumer.Traces
anyPipelineMutatesData := false
for _, pipeline := range pipelines {
pipelineConsumers = append(pipelineConsumers, pipeline.firstTC)
anyPipelineMutatesData = anyPipelineMutatesData || pipeline.MutatesConsumedData
}

// Create a junction point that fans out to all pipelines.
if anyPipelineMutatesData {
return fanoutconsumer.NewTracesCloning(pipelineConsumers)
}
return fanoutconsumer.NewTraces(pipelineConsumers)
}

Exporter

对于 Exporter 我们来看看 JaegerExporter 逻辑也是非常的清晰的

pushTraceDatagithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (s *protoGRPCSender) pushTraceData(ctx context.Context, td pdata.Traces,) error {
batches, err := jaegertranslator.InternalTracesToJaegerProto(td)
if err != nil {
return consumererror.Permanent(fmt.Errorf("failed to push trace data via Jaeger exporter: %w", err))
}

if s.metadata.Len() > 0 {
ctx = metadata.NewOutgoingContext(ctx, s.metadata)
}

for _, batch := range batches {
_, err = s.client.PostSpans(
ctx,
&jaegerproto.PostSpansRequest{Batch: *batch}, grpc.WaitForReady(s.waitForReady))

if err != nil {
s.logger.Debug("failed to push trace data to Jaeger", zap.Error(err))
return fmt.Errorf("failed to push trace data via Jaeger exporter: %w", err)
}
}

return nil
}

而这个 client 恰好就是 jaegerproto.CollectorServiceClient 而将这些串通在一起,我们需要一个稳定的 ABI,而这个就是

consumer/consumer.go:33
1
2
3
4
type Traces interface {
// ConsumeTraces receives pdata.Traces for consumption.
ConsumeTraces(ctx context.Context, td pdata.Traces) error
}

因此我们发现,其实这几个组件都是实现了此接口。