idou老师教你学Istio 27:解读Mixer Report流程

1、概述

Mixer是Istio的核心组件,提供了遥测数据收集的功能,能够实时采集服务的请求状态等信息,以达到监控服务状态目的。

1.1 核心功能

?前置检查(Check):某服务接收并响应外部请求前,先通过Envoy向Mixer(Policy组件)发送Check请求,做一些access检查,同时确认adaptor所需cache字段,供之后Report接口使用;

?配额管理(Quota):通过配额管理机制,处理多请求时发生的资源竞争;

?遥测数据上报(Report):该服务请求处理结束后,将请求相关的日志,监控等数据,通过Envoy上报给Mixer(telemetry)

1.2 示例图

2、代码分析

2.1 Report代码分析

本节主要介绍Report的详细流程(基于Istio release1.0.0版本,commit id为3a136c90)。Report是mixer server的一个接口,供Envoy通过grpc调用。首先,我们从mixer server的启动入口main函数看起:

func main() {
   rootCmd := cmd.GetRootCmd(os.Args[1:], supportedTemplates(), supportedAdapters(), shared.Printf, shared.Fatalf)

   if err := rootCmd.Execute(); err != nil {
      os.Exit(-1)
   }
}

在rootCmd中,mixs通过server命令启动了mixer server,从而触发了runserver函数,在runserver中初始化(New)了一个server,我们一起看下newServer的函数,在这个函数中,与我们本节相关的内容就是Mixer初始化了一个grpc服务器NewGRPCServer。

rootCmd.AddCommand(serverCmd(info, adapters, printf, fatalf))
func serverCmd(info map[string]template.Info, adapters []adapter.InfoFn, printf, fatalf shared.FormatFn) *cobra.Command {
   sa := server.DefaultArgs()
   sa.Templates = info
   sa.Adapters = adapters

   serverCmd := &cobra.Command{
      Use:   "server",
      Short: "Starts Mixer as a server",
      Run: func(cmd *cobra.Command, args []string) {
         runServer(sa, printf, fatalf)
      },
   }… …
}
func newServer(a *Args, p *patchTable) (*Server, error) {
   grpc.EnableTracing = a.EnableGRPCTracing
   s.server = grpc.NewServer(grpcOptions...)
   mixerpb.RegisterMixerServer(s.server, api.NewGRPCServer(s.dispatcher, s.gp, s.checkCache))
}

在这个grpc的服务端中,定义了一个Report接口,这就是我们这节课主要关注的内容(可以看到Check接口也在此定义,我们下节再讲)

func (s *grpcServer) Report(ctx context.Context, req *mixerpb.ReportRequest) (*mixerpb.ReportResponse, error) {
   lg.Debugf("Report (Count: %d)", len(req.Attributes))
   // 校验attribute是否为空,空则直接return
   if len(req.Attributes) == 0 {
      return reportResp, nil
   }

   // 若属性word为空,赋为默认值
   for i := 0; i < len(req.Attributes); i++ {
      iflen(req.Attributes[i].Words) == 0 {
         req.Attributes[i].Words = req.DefaultWords
      }
   }

   // 根据第一条attribute,生成proto包,proto包能跟踪一组attributes
   protoBag := attribute.NewProtoBag(&req.Attributes[0], s.globalDict, s.globalWordList)

   // 初始化,开始跟踪attributes各个条目中属性
   accumBag := attribute.GetMutableBag(protoBag)

   // 保存accumBag的增量状态
   reportBag := attribute.GetMutableBag(accumBag)

   reportSpan, reportCtx := opentracing.StartSpanFromContext(ctx, "Report")
   reporter := s.dispatcher.GetReporter(reportCtx)

   var errors *multierror.Error
   for i := 0; i < len(req.Attributes); i++ {
      span, newctx := opentracing.StartSpanFromContext(reportCtx, fmt.Sprintf("attribute bag %d", i))

      // 第一个属性已经在创建proto包时创建,在此追踪所有attributes
      if i > 0 {
         if err := accumBag.UpdateBagFromProto(&req.Attributes[i], s.globalWordList); err != nil {
            err = fmt.Errorf("request could not be processed due to invalid attributes: %v", err)
            span.LogFields(otlog.String("error", err.Error()))
            span.Finish()
            errors = multierror.Append(errors, err)
            break
         }
      }

      lg.Debug("Dispatching Preprocess")
      // 真正开始分发,预处理阶段
      if err := s.dispatcher.Preprocess(newctx, accumBag, reportBag); err != nil {
         err = fmt.Errorf("preprocessing attributes failed: %v", err)
         span.LogFields(otlog.String("error", err.Error()))
         span.Finish()
         errors = multierror.Append(errors, err)
         continue
      }

      lg.Debug("Dispatching to main adapters after running preprocessors")
      lg.Debuga("Attribute Bag: \n", reportBag)
      lg.Debugf("Dispatching Report %d out of %d", i+1, len(req.Attributes))
      // 真正开始分发,将数据逐步加入到缓存中
      if err := reporter.Report(reportBag); err != nil {
         span.LogFields(otlog.String("error", err.Error()))
         span.Finish()
         errors = multierror.Append(errors, err)
         continue
      }

      span.Finish()

      // purge the effect of the Preprocess call so that the next time through everything is clean
      reportBag.Reset()
   }

   reportBag.Done()
   accumBag.Done()
   protoBag.Done()
   // 真正的发送函数,从缓存中取出并发送到adaptor
   if err := reporter.Flush(); err != nil {
      errors = multierror.Append(errors, err)
   }
   reporter.Done()

   if errors != nil {
      reportSpan.LogFields(otlog.String("error", errors.Error()))
   }
   reportSpan.Finish()

   if errors != nil {
      lg.Errora("Report failed:", errors.Error())
      return nil, grpc.Errorf(codes.Unknown, errors.Error())
   }
   // 过程结束
   return reportResp, nil
}

通过上述代码解读,我们了解了Report接口的工作流程,但此时我们还并不知道一个请求的状态是如何报给adaptor的,下面我们通过简要的函数串接,把这部分流程串起来:

上述的预处理阶段Preprocess与上报阶段Report,最终都会调用到dispatch函数,仅通过不同的type来区分要做的事情;

func (d *Impl) Preprocess(ctx context.Context, bag attribute.Bag, responseBag *attribute.MutableBag) error {
   s := d.getSession(ctx, tpb.TEMPLATE_VARIETY_ATTRIBUTE_GENERATOR, bag)
   s.responseBag = responseBag
   err := s.dispatch()
   if err == nil {
      err = s.err
   }
   … …
}
func (r *reporter) Report(bag attribute.Bag) error {
   s := r.impl.getSession(r.ctx, tpb.TEMPLATE_VARIETY_REPORT, bag)
   s.reportStates = r.states
   err := s.dispatch()
   if err == nil {
      err = s.err
   }
   … …
}

而dispatch函数中做了真正的分发动作,包括:

1.遍历所有adaptor,调用adaptor中的函数,针对不同的adaptor生成不同的instance,并将instance缓存放入reportstates

var instance interface{}
if instance, err = input.Builder(s.bag); err != nil {
   log.Errorf("error creating instance: destination=‘%v‘, error=‘%v‘", destination.FriendlyName, err)
   s.err = multierror.Append(s.err, err)
   continue
}
type NamedBuilder struct {
   InstanceShortName string
   Builder           template.InstanceBuilderFn
}
InstanceBuilderFn func(attrs attribute.Bag) (interface{}, error)
CreateInstanceBuilder: func(instanceName string, param proto.Message, expb *compiled.ExpressionBuilder) (template.InstanceBuilderFn, error)
builder.build(attr)
// For report templates, accumulate instances as much as possible before commencing dispatch.
if s.variety == tpb.TEMPLATE_VARIETY_REPORT {
   state.instances = append(state.instances, instance)
   continue
}

2.将instance分发到所有adaptor,最终调用并分发到adaptor的HandleMetric函数中

func (r *reporter) Flush() error {
   s := r.impl.getSession(r.ctx, tpb.TEMPLATE_VARIETY_REPORT, nil)
   s.reportStates = r.states

   s.dispatchBufferedReports()
   err := s.err
   … …
}
func (s *session) dispatchBufferedReports() {
   // Ensure that we can run dispatches to all destinations in parallel.
   s.ensureParallelism(len(s.reportStates))

   // dispatch the buffered dispatchStates we‘ve got
   for k, v := range s.reportStates {
      s.dispatchToHandler(v)
      delete(s.reportStates, k)
   }

   s.waitForDispatched()
}
func (s *session) dispatchToHandler(ds *dispatchState) {
   s.activeDispatches++
   ds.session = s
   s.impl.gp.ScheduleWork(ds.invokeHandler, nil)
}
case tpb.TEMPLATE_VARIETY_REPORT:
   ds.err = ds.destination.Template.DispatchReport(
      ctx, ds.destination.Handler, ds.instances)
type TemplateInfo struct {
   Name             string
   Variety          tpb.TemplateVariety
   DispatchReport   template.DispatchReportFn
   DispatchCheck    template.DispatchCheckFn
   DispatchQuota    template.DispatchQuotaFn
   DispatchGenAttrs template.DispatchGenerateAttributesFn
}
DispatchReport: func(ctx context.Context, handler adapter.Handler, inst []interface{}) error {

   // Convert the instances from the generic []interface{}, to their specialized type.
   instances := make([]*metric.Instance, len(inst))
   for i, instance := range inst {
      instances[i] = instance.(*metric.Instance)
   }

   // Invoke the handler.
   if err := handler.(metric.Handler).HandleMetric(ctx, instances); err != nil {
      return fmt.Errorf("failed to report all values: %v", err)
   }
   return nil
}

2.2 相关结构体定义

Report接口请求体定义

// Used to report telemetry after performing one or more actions.
type ReportRequest struct {
   // 代表一个请求中的属性
   // 每个attribute代表一个请求动作,多个动作可汇总在一条message中以提高效率
   //虽然每个“属性”消息在语义上被视为与消息中的其他属性无关的独立独立实体,但此消息格式利用属性消息之间的增量编码,以便大幅减少请求大小并改进端到端 效率。 每组单独的属性用于修改前一组。 这消除了在单个请求中多次冗余地发送相同属性的需要。
   // 如果客户端上报时不想使用增量编码,可全量的发送所有属性.
   Attributes []CompressedAttributes `protobuf:"bytes,1,rep,name=attributes" json:"attributes"`
   // 所有属性的默认消息级字典.
   // 这使得可以为该请求中的所有属性共享相同的字典,这可以大大减少整体请求大小
   DefaultWords []string `protobuf:"bytes,2,rep,name=default_words,json=defaultWords" json:"default_words,omitempty"`
   // 全局字典的词条数,可检测客户端与服务端之间的全局字典是否同步
   GlobalWordCount uint32 `protobuf:"varint,3,opt,name=global_word_count,json=globalWordCount,proto3" json:"global_word_count,omitempty"`
}

3、总结

Mixer中涉及很多缓存命中等用于优化性能的设计,本文仅介绍了Mixer中Report接口发送到adaptor的过程,一些性能优化设计,如protobag,dispatch缓存等内容,将会在后续文章中解析。

相关服务请访问https://support.huaweicloud.com/cce/index.html?cce_helpcenter_2019

原文地址:https://blog.51cto.com/14051317/2353294

时间: 2024-10-12 04:11:54

idou老师教你学Istio 27:解读Mixer Report流程的相关文章

idou老师教你学Istio 29:Envoy启动流程

功能概述 Envoy启动时,会启动一个进程,并在这个进程中启动很多线程,这样,可以启动很多worker线程,一般worker线程数与核心数相同,每个worker线程处理所有已配置的listener上的请求,管理连接并处理filterchain,非阻塞:同时,在这个进程中会启动一个主线程,它负责启动和停止envoy,也是通过API提供配置管理的线程,同时它收集不同的指标,管理其它线程,也是非阻塞的. 重要数据结构定义 2.1 Filter 过滤器,包括listener filter.network

idou老师教你学Istio 07: 如何用istio实现请求超时管理

前言 在前面的文章中,大家都已经熟悉了Istio的故障注入和流量迁移.这两个方面的功能都是Istio流量治理的一部分.今天将继续带大家了解Istio的另一项功能,关于请求超时的管理. 首先我们可以通过一个简单的Bookinfo的微服务应用程序来动手实践一下Istio是如何实现请求超时的管理.看过idou老师前面文章的老司机应该都已经对Bookinfo这个实例驾轻就熟了,当然还存在部分被idou老师的文采刚吸引过来的新同学. 下面先简单的介绍一下Bookinfo这个样例应用整体架构,以便我们更好地

idou老师教你学Istio 04:Istio性能及扩展性介绍

Istio的性能问题一直是国内外相关厂商关注的重点,Istio对于数据面应用请求时延的影响更是备受关注,而以现在Istio官方与相关厂商的性能测试结果来看,四位数的qps显然远远不能满足应用于生产的要求.从发布以来,Istio官方也在不断的对其性能进行优化增强.同时,Istio控制面的可靠性是Istio用于生产的另一项重要考量标准,自动伸缩扩容,自然是可靠性保证的重要手段.下面我们先从性能测试的角度入手,了解下Istio官方提供的性能测试方法与基准,主要分为以下四个方面展开. 一.函数级别测试

idou老师教你学Istio 16:如何用 Istio 实现微服务间的访问控制

摘要使用 Istio 可以很方便地实现微服务间的访问控制.本文演示了使用 Denier 适配器实现拒绝访问,和 Listchecker 适配器实现黑白名单两种方法. 使用场景 有时需要对微服务间的相互访问进行控制,比如使满足某些条件(比如版本)的微服务能够(或不能)调用特定的微服务. 访问控制属于策略范畴,在 Istio 中由 Mixer 组件实现. Mixer拓扑图,来源官方文档 如上图所示,服务的外部请求会被 Envoy 拦截,每个经过 Envoy 的请求都会调用 Mixer,为 Mixer

idou老师教你学istio:监控能力介绍

经过了一年多的开发和测试,Istio于北京时间7月31日发布了1.0版本,并且宣布1.0版本已经可以成熟的应用于生产环境.对于istio的各项主要功能,之前的文章已经介绍的非常详细,并且还会有更多的文章来分析原理和实践功能.今天我们主要介绍的服务是istio流量监控能力.我们知道每个pod内都会有一个Envoy容器,其具备对流入和流出pod的流量进行管理,认证,控制的能力.Mixer则主要负责访问控制和遥测信息收集.如拓扑图所示,当某个服务被请求时,首先会请求istio-policy服务,来判定

idou老师教你学Istio 08: 调用链埋点是否真的“零修改”?

本文将结合一个具体例子中的细节详细描述Istio调用链的原理和使用方式.并基于Istio中埋点的原理解释来说明:为了输出一个质量良好的调用链,业务程序需根据自身特点做适当的修改,即并非官方一直在说的完全无侵入的做各种治理.另外还会描述Istio当前版本中收集调用链数据可以通过Envoy和Mixer两种不同的方式. Istio一直强调其无侵入的服务治理,服务运行可观察性.即用户完全无需修改代码,就可以通过和业务容器一起部署的proxy来执行服务治理和与性能数据的收集.原文是这样描述的: Istio

idou老师教你学Istio 14:如何用K8S对Istio Service进行流量健康检查

Istio利用k8s的探针对service进行流量健康检查,有两种探针可供选择,分别是liveness和readiness: liveness探针用来侦测什么时候需要重启容器.比如说当liveness探针捕获到程序运行时出现的一个死锁,这种情况下重启容器可以让程序更容易可用. readiness探针用来使容器准备好接收流量.当所有容器都ready时被视为pod此时ready.比如说用这种信号来控制一个后端服务,当pod没有到ready状态时,服务会从负载均衡被移除. 使用场景: liveness

idou老师教你学Istio 15:Istio实现双向TLS的迁移

在Istio中,双向TLS是传输身份验证的完整堆栈解决方案,它为每个服务提供可跨集群的强大身份.保护服务到服务通信和最终用户到服务通信,以及提供密钥管理系统.本文阐述如何在不中断通信的情况下,把现存Istio服务的流量从明文升级为双向TLS. 使用场景 在部署了Istio的集群中,使用人员刚开始可能更关注功能性,服务之间的通信配置的都是明文传输,当功能逐渐完善,开始关注安全性,部署有sidecar的服务需要使用双向TLS进行安全传输,但服务不能中断,这时,一个可取的方式就是进行双向TLS的迁移.

idou老师教你学Istio 17 : 通过HTTPS进行双向TLS传输

众所周知,HTTPS是用来解决 HTTP 明文协议的缺陷,在 HTTP 的基础上加入 SSL/TLS 协议,依靠 SSL 证书来验证服务器的身份,为客户端和服务器端之间建立"SSL"通道,确保数据运输安全.而Istio的双向TLS也用来保证数据传输安全.那么,Istio的双向TLS是如何与HTTPS服务一起工作的呢? 下面通过实例演示Istio的双向TLS是如何与HTTPS服务一起工作的,包括三个部分: ? 在没有 Istio sidecar 的情况下部署 HTTPS 服务 ? 关闭