《k8s-1.13版本源码分析》-调度器初始化

源码分析系列文章已经开源到github,地址如下:

    • github:
      https://github.com/farmer-hutao/k8s-source-code-analysis
    • gitbook:
      https://farmer-hutao.github.io/k8s-source-code-analysis

本文大纲

1. 概述

今天我们要做一些琐碎的知识点分析,比如调度器启动的时候默认配置是怎么来的?默认生效了哪些调度算法?自定义的算法是如何注入的?诸如这些问题,我们顺带会看一下调度器相关的一些数据结构的含义。看完前面这些节的分析后再看完本篇文章你可能会有一种醍醐灌顶的感觉哦~

2. 从 --config 开始

如果我们编译出来一个 kube-scheduler 二进制文件,运行./kube-scheduler -h后会看到很多的帮助信息,这些信息是分组的,比如第一组 Misc,差不多是“大杂烩”的意思,不好分类的几个 flag,其实也是最重要的几个 flag,如下:

很好理解,第一个红框框圈出来的--config用于指定配置文件,老版本的各种参数基本都不建议使用了,所以这个 config flag 指定的 config 文件中基本包含了所有可配置项,我们看一下代码中获取这个 flag 的相关代码:

cmd/kube-scheduler/app/options/options.go:143

func (o *Options) Flags() (nfs apiserverflag.NamedFlagSets) {
   fs := nfs.FlagSet("misc")
    // 关注 --config
   fs.StringVar(&o.ConfigFile, "config", o.ConfigFile, "The path to the configuration file. Flags override values in this file.")
   fs.StringVar(&o.WriteConfigTo, "write-config-to", o.WriteConfigTo, "If set, write the configuration values to this file and exit.")
   fs.StringVar(&o.Master, "master", o.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")

   o.SecureServing.AddFlags(nfs.FlagSet("secure serving"))
   o.CombinedInsecureServing.AddFlags(nfs.FlagSet("insecure serving"))
   o.Authentication.AddFlags(nfs.FlagSet("authentication"))
   o.Authorization.AddFlags(nfs.FlagSet("authorization"))
   o.Deprecated.AddFlags(nfs.FlagSet("deprecated"), &o.ComponentConfig)

   leaderelectionconfig.BindFlags(&o.ComponentConfig.LeaderElection.LeaderElectionConfiguration, nfs.FlagSet("leader election"))
   utilfeature.DefaultFeatureGate.AddFlag(nfs.FlagSet("feature gate"))

   return nfs
}
Copy

上述代码中有几个点可以关注到:

  1. FlagSet 的含义,命令行输出的分组和这里的分组是对应的;
  2. 除了认证授权、选举等“非关键”配置外,其他配置基本 Deprecated 了,也就意味着建议使用 config file;

上面代码中可以看到o.ConfigFile接收了config配置,我们看看Option类型是什么样子的~

2.1. options.Option 对象

Options对象包含运行一个 Scheduler 所需要的所有参数

cmd/kube-scheduler/app/options/options.go:55

type Options struct {
    // 和命令行帮助信息的分组是一致的
   ComponentConfig kubeschedulerconfig.KubeSchedulerConfiguration
   SecureServing           *apiserveroptions.SecureServingOptionsWithLoopback
   CombinedInsecureServing *CombinedInsecureServingOptions
   Authentication          *apiserveroptions.DelegatingAuthenticationOptions
   Authorization           *apiserveroptions.DelegatingAuthorizationOptions
   Deprecated              *DeprecatedOptions

   // config 文件的路径
   ConfigFile string

   // 如果指定了,会输出 config 的默认配置到这个文件
   WriteConfigTo string
   Master string
}
Copy

前面的 flag 相关代码中写到配置文件的内容给了o.ConfigFile,也就是Options.ConfigFile,那这个属性怎么使用呢?

我们来看下面这个 ApplyTo() 函数,这个函数要做的事情是把 options 配置 apply 给 scheduler app configuration(这个对象后面会讲到):

cmd/kube-scheduler/app/options/options.go:162

// 把 Options apply 给 Config
func (o *Options) ApplyTo(c *schedulerappconfig.Config) error {
    // --config 没有使用的情况
   if len(o.ConfigFile) == 0 {
      c.ComponentConfig = o.ComponentConfig
       // 使用 Deprecated 的配置
      if err := o.Deprecated.ApplyTo(&c.ComponentConfig); err != nil {
         return err
      }
      if err := o.CombinedInsecureServing.ApplyTo(c, &c.ComponentConfig); err != nil {
         return err
      }
   } else {
       // 加载 config 文件中的内容
      cfg, err := loadConfigFromFile(o.ConfigFile)
      if err != nil {
         return err
      }

      // 上面加载到的配置赋值给 Config中的 ComponentConfig
      c.ComponentConfig = *cfg

      if err := o.CombinedInsecureServing.ApplyToFromLoadedConfig(c, &c.ComponentConfig); err != nil {
         return err
      }
   }
   // ……

   return nil
}
Copy

这个函数中可以看到用 --config 和不用 --config 两种情况下 options 是如何应用到schedulerappconfig.Config中的。那么这里提到的 Config 对象又是什么呢?

2.2. config.Config对象

Config 对象包含运行一个 Scheduler 所需要的所有 context

cmd/kube-scheduler/app/config/config.go:32

type Config struct {
   // 调度器配置对象
   ComponentConfig kubeschedulerconfig.KubeSchedulerConfiguration

   LoopbackClientConfig *restclient.Config
   InsecureServing        *apiserver.DeprecatedInsecureServingInfo
   InsecureMetricsServing *apiserver.DeprecatedInsecureServingInfo
   Authentication         apiserver.AuthenticationInfo
   Authorization          apiserver.AuthorizationInfo
   SecureServing          *apiserver.SecureServingInfo
   Client          clientset.Interface
   InformerFactory informers.SharedInformerFactory
   PodInformer     coreinformers.PodInformer
   EventClient     v1core.EventsGetter
   Recorder        record.EventRecorder
   Broadcaster     record.EventBroadcaster
   LeaderElection *leaderelection.LeaderElectionConfig
}
Copy

所以前面的c.ComponentConfig = o.ComponentConfig这行代码也就是把 Options 中的 ComponentConfig 赋值给了 Config 中的 ComponentConfig;是哪里的逻辑让 Options 和 Config 对象产生了关联呢?(也就是说前面提到的 ApplyTo() 方法是再哪里被调用的?)

继续跟下去可以找到Config()函数,从这个函数的返回值*schedulerappconfig.Config可以看到它的目的,是需要得到一个 schedulerappconfig.Config,代码不长:

cmd/kube-scheduler/app/options/options.go:221

func (o *Options) Config() (*schedulerappconfig.Config, error) {
   // ……

   c := &schedulerappconfig.Config{}
    // 前面我们看到的 ApplyTo() 函数
   if err := o.ApplyTo(c); err != nil {
      return nil, err
   }

   // Prepare kube clients.
   // ……

   // Prepare event clients.
   eventBroadcaster := record.NewBroadcaster()
   recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, corev1.EventSource{Component: c.ComponentConfig.SchedulerName})

   // Set up leader election if enabled.
   // ……

   c.Client = client
   c.InformerFactory = informers.NewSharedInformerFactory(client, 0)
   c.PodInformer = factory.NewPodInformer(client, 0)
   c.EventClient = eventClient
   c.Recorder = recorder
   c.Broadcaster = eventBroadcaster
   c.LeaderElection = leaderElectionConfig

   return c, nil
}
Copy

那调用这个Config()函数的地方又在哪里呢?继续跟就到 runCommand 里面了~

2.3. runCommand

runCommand 这个函数我们不陌生:

cmd/kube-scheduler/app/server.go:117

func runCommand(cmd *cobra.Command, args []string, opts *options.Options) error {
   // ……

   // 这个地方完成了前面说到的配置文件和命令行参数等读取和应用工作
   c, err := opts.Config()
   if err != nil {
      fmt.Fprintf(os.Stderr, "%v\n", err)
      os.Exit(1)
   }

   stopCh := make(chan struct{})

   // Get the completed config
   cc := c.Complete()

   // To help debugging, immediately log version
   klog.Infof("Version: %+v", version.Get())

   // 这里有一堆逻辑
   algorithmprovider.ApplyFeatureGates()

   // Configz registration.

   // ……

   return Run(cc, stopCh)
}
Copy

runCommand 在最开始的时候我们有见到过,已经到 cobra 入口的 Run 中了:

cmd/kube-scheduler/app/server.go:85

Run: func(cmd *cobra.Command, args []string) {
   if err := runCommand(cmd, args, opts); err != nil {
      fmt.Fprintf(os.Stderr, "%v\n", err)
      os.Exit(1)
   }
},
Copy

上面涉及到2个知识点:

  • ApplyFeatureGates
  • Run 中的逻辑

我们下面分别来看看~

3. ApplyFeatureGates

这个函数跟进去可以看到如下几行简单的代码,这里很自然我们能够想到继续跟defaults.ApplyFeatureGates(),但是不能只看到这个函数哦,具体来看:

pkg/scheduler/algorithmprovider/plugins.go:17

package algorithmprovider

import (
   "k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults"
)

// ApplyFeatureGates applies algorithm by feature gates.
func ApplyFeatureGates() {
   defaults.ApplyFeatureGates()
}
Copy

到这里分2条路:

  • import defaults 这个 package 的时候有一个init()函数调用的逻辑
  • defaults.ApplyFeatureGates() 函数调用本身。

3.1. 默认算法注册

pkg/scheduler/algorithmprovider/defaults/defaults.go:38

func init() {
   // ……
   registerAlgorithmProvider(defaultPredicates(), defaultPriorities())
   // ……
}
Copy

init()函数中我们先关注 registerAlgorithmProvider() 函数,这里从字面上可以得到不少信息,大胆猜一下:是不是注册了默认的预选算法和优选算法?

pkg/scheduler/algorithmprovider/defaults/defaults.go:222

func registerAlgorithmProvider(predSet, priSet sets.String) {
   // 注册 algorithm provider. 默认使用 DefaultProvider
   factory.RegisterAlgorithmProvider(factory.DefaultProvider, predSet, priSet)
   factory.RegisterAlgorithmProvider(ClusterAutoscalerProvider, predSet,
      copyAndReplace(priSet, "LeastRequestedPriority", "MostRequestedPriority"))
}
Copy

看到这里可以关注到 AlgorithmProvider 这个概念,后面会讲到。

先看一下里面调用的注册函数是怎么实现的:

pkg/scheduler/factory/plugins.go:387

func RegisterAlgorithmProvider(name string, predicateKeys, priorityKeys sets.String) string {
   schedulerFactoryMutex.Lock()
   defer schedulerFactoryMutex.Unlock()
   validateAlgorithmNameOrDie(name)
    // 很明显,关键逻辑在这里
   algorithmProviderMap[name] = AlgorithmProviderConfig{
      FitPredicateKeys:     predicateKeys,
      PriorityFunctionKeys: priorityKeys,
   }
   return name
}
Copy

首先,algorithmProviderMap 这个变量是一个包级变量,在86行做的定义:algorithmProviderMap = make(map[string]AlgorithmProviderConfig)

这里的 key 有2种情况:

  • "DefaultProvider"
  • "ClusterAutoscalerProvider"

混合云场景用得到 ClusterAutoscalerProvider,大家感兴趣可以研究一下 ClusterAutoscaler 特性,这块我们先不说。默认的情况是生效的 DefaultProvider,这块逻辑后面还会提到。

然后这个 map 的 value 的类型是一个简单的 struct:

pkg/scheduler/factory/plugins.go:99

type AlgorithmProviderConfig struct {
   FitPredicateKeys     sets.String
   PriorityFunctionKeys sets.String
}
Copy

接着看一下defaultPredicates()函数

pkg/scheduler/algorithmprovider/defaults/defaults.go:106

func defaultPredicates() sets.String {
   return sets.NewString(
      // Fit is determined by volume zone requirements.
      factory.RegisterFitPredicateFactory(
         predicates.NoVolumeZoneConflictPred,
         func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
            return predicates.NewVolumeZonePredicate(args.PVInfo, args.PVCInfo, args.StorageClassInfo)
         },
      ),

      // ……

      factory.RegisterFitPredicate(predicates.NoDiskConflictPred, predicates.NoDiskConflict),

      // ……
   )
}
Copy

这个函数里面就2中类型的玩法,简化一些可以理解成上面这个样子,我们一个个来看。

先认识一下 sets.NewString()函数要干嘛:

vendor/k8s.io/apimachinery/pkg/util/sets/string.go:27

type String map[string]Empty

// NewString creates a String from a list of values.
func NewString(items ...string) String {
   ss := String{}
   ss.Insert(items...)
   return ss
}
// ……

// Insert adds items to the set.
func (s String) Insert(items ...string) {
    for _, item := range items {
        s[item] = Empty{}
    }
}
Copy

如上,很简单的类型封装。里面的Empty是:type Empty struct{},所以本质上就是要用map[string]struct{}这个类型罢了。

因此上面defaultPredicates()函数中sets.NewString()内每一个参数本质上就是一个 string类型了,我们来看这一个个 string 是怎么来的。

pkg/scheduler/factory/plugins.go:195

func RegisterFitPredicateFactory(name string, predicateFactory FitPredicateFactory) string {
   schedulerFactoryMutex.Lock()
   defer schedulerFactoryMutex.Unlock()
   validateAlgorithmNameOrDie(name)
    // 唯一值的关注的逻辑
   fitPredicateMap[name] = predicateFactory
    // 返回 name
   return name
}
Copy

这个函数要返回一个 string 我们已经知道了,里面的逻辑也只有这一行需要我们关注:fitPredicateMap[name] = predicateFactory,这个 map 类型也是一个包级变量:fitPredicateMap = make(map[string]FitPredicateFactory),所以前面讲的注册本质也就是在填充这个变量而已。理解fitPredicateMap[name] = predicateFactory中 fitPredicateMap的 key 和 value,也就知道了这里的 Register 要做什么。

defaultPredicates()中的第二种注册方式 RegisterFitPredicate 区别不大,函数体也是调用的 RegisterFitPredicateFactory():

pkg/scheduler/factory/plugins.go:106

func RegisterFitPredicate(name string, predicate algorithm.FitPredicate) string {
   return RegisterFitPredicateFactory(name, func(PluginFactoryArgs) algorithm.FitPredicate { return predicate })
}
Copy

3.2. 特性开关

pkg/scheduler/algorithmprovider/defaults/defaults.go:183

func ApplyFeatureGates() {
   if utilfeature.DefaultFeatureGate.Enabled(features.TaintNodesByCondition) {

      factory.RemoveFitPredicate(predicates.CheckNodeConditionPred)
      factory.RemoveFitPredicate(predicates.CheckNodeMemoryPressurePred)
      factory.RemoveFitPredicate(predicates.CheckNodeDiskPressurePred)
      factory.RemoveFitPredicate(predicates.CheckNodePIDPressurePred)
      factory.RemovePredicateKeyFromAlgorithmProviderMap(predicates.CheckNodeConditionPred)
      factory.RemovePredicateKeyFromAlgorithmProviderMap(predicates.CheckNodeMemoryPressurePred)
      factory.RemovePredicateKeyFromAlgorithmProviderMap(predicates.CheckNodeDiskPressurePred)
      factory.RemovePredicateKeyFromAlgorithmProviderMap(predicates.CheckNodePIDPressurePred)

      factory.RegisterMandatoryFitPredicate(predicates.PodToleratesNodeTaintsPred, predicates.PodToleratesNodeTaints)
      factory.RegisterMandatoryFitPredicate(predicates.CheckNodeUnschedulablePred, predicates.CheckNodeUnschedulablePredicate)
      factory.InsertPredicateKeyToAlgorithmProviderMap(predicates.PodToleratesNodeTaintsPred)
      factory.InsertPredicateKeyToAlgorithmProviderMap(predicates.CheckNodeUnschedulablePred)
   }

   if utilfeature.DefaultFeatureGate.Enabled(features.ResourceLimitsPriorityFunction) {
      factory.RegisterPriorityFunction2("ResourceLimitsPriority", priorities.ResourceLimitsPriorityMap, nil, 1)
      factory.InsertPriorityKeyToAlgorithmProviderMap(factory.RegisterPriorityFunction2("ResourceLimitsPriority", priorities.ResourceLimitsPriorityMap, nil, 1))
   }
}
Copy

这个函数看着几十行,实际上只在重复一件事情,增加或删除一些预选和优选算法。我们看一下这里的一些逻辑:

utilfeature.DefaultFeatureGate.Enabled() 函数要做的事情是判断一个 feature 是否开启;函数参数本质只是一个字符串:

pkg/features/kube_features.go:25

const (
   AppArmor utilfeature.Feature = "AppArmor"
   DynamicKubeletConfig utilfeature.Feature = "DynamicKubeletConfig"
   // ……
)
Copy

这里定义了很多的 feature,然后定义了哪些 feature 是开启的,处在 alpha 还是 beta 或者 GA 等:

pkg/features/kube_features.go:405

var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureSpec{
   AppArmor:             {Default: true, PreRelease: utilfeature.Beta},
   DynamicKubeletConfig: {Default: true, PreRelease: utilfeature.Beta},
   ExperimentalHostUserNamespaceDefaultingGate: {Default: false, PreRelease: utilfeature.Beta},
   ExperimentalCriticalPodAnnotation:           {Default: false, PreRelease: utilfeature.Alpha},
   DevicePlugins:                               {Default: true, PreRelease: utilfeature.Beta},
   TaintBasedEvictions:                         {Default: true, PreRelease: utilfeature.Beta},
   RotateKubeletServerCertificate:              {Default: true, PreRelease: utilfeature.Beta},
    // ……
}
Copy

所以回到前面ApplyFeatureGates()的逻辑,utilfeature.DefaultFeatureGate.Enabled(features.TaintNodesByCondition)要判断的是 TaintNodesByCondition 这个特性是否开启了,如果开启了就把 predicates 中 "CheckNodeCondition", "CheckNodeMemoryPressure", "CheckNodePIDPressurePred", "CheckNodeDiskPressure" 这几个算法去掉,把 "PodToleratesNodeTaints", "CheckNodeUnschedulable" 加上。接着对于特性 "ResourceLimitsPriorityFunction" 的处理也是同一个逻辑。

4. Scheduler 的创建

我们换一条线,从 Scheduler 对象的创建再来看另外几个知识点。

前面分析到runCommand()函数的时候我们说到了需要关注最后一行return Run(cc, stopCh)的逻辑,在Run()函数中主要的逻辑就是创建 Scheduler 和启动 Scheduler;现在我们来看创建逻辑:

cmd/kube-scheduler/app/server.go:174

sched, err := scheduler.New(cc.Client,
   cc.InformerFactory.Core().V1().Nodes(),
   cc.PodInformer,
   cc.InformerFactory.Core().V1().PersistentVolumes(),
   cc.InformerFactory.Core().V1().PersistentVolumeClaims(),
   cc.InformerFactory.Core().V1().ReplicationControllers(),
   cc.InformerFactory.Apps().V1().ReplicaSets(),
   cc.InformerFactory.Apps().V1().StatefulSets(),
   cc.InformerFactory.Core().V1().Services(),
   cc.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
   storageClassInformer,
   cc.Recorder,
   cc.ComponentConfig.AlgorithmSource,
   stopCh,
   scheduler.WithName(cc.ComponentConfig.SchedulerName),
   scheduler.WithHardPodAffinitySymmetricWeight(cc.ComponentConfig.HardPodAffinitySymmetricWeight),
   scheduler.WithEquivalenceClassCacheEnabled(cc.ComponentConfig.EnableContentionProfiling),
   scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),
   scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
   scheduler.WithBindTimeoutSeconds(*cc.ComponentConfig.BindTimeoutSeconds))
Copy

这里调用了一个New()函数,传了很多参数进去。New()函数的定义如下:

pkg/scheduler/scheduler.go:131

func New(client clientset.Interface,
   nodeInformer coreinformers.NodeInformer,
   podInformer coreinformers.PodInformer,
   pvInformer coreinformers.PersistentVolumeInformer,
   pvcInformer coreinformers.PersistentVolumeClaimInformer,
   replicationControllerInformer coreinformers.ReplicationControllerInformer,
   replicaSetInformer appsinformers.ReplicaSetInformer,
   statefulSetInformer appsinformers.StatefulSetInformer,
   serviceInformer coreinformers.ServiceInformer,
   pdbInformer policyinformers.PodDisruptionBudgetInformer,
   storageClassInformer storageinformers.StorageClassInformer,
   recorder record.EventRecorder,
   schedulerAlgorithmSource kubeschedulerconfig.SchedulerAlgorithmSource,
   stopCh <-chan struct{},
   opts ...func(o *schedulerOptions)) (*Scheduler, error)
Copy

这里涉及到的东西有点小多,我们一点点看:

options := defaultSchedulerOptions 这行代码的 defaultSchedulerOptions 是一个 schedulerOptions 对象:

pkg/scheduler/scheduler.go:121

// LINE 67
type schedulerOptions struct {
    schedulerName                  string
    hardPodAffinitySymmetricWeight int32
    enableEquivalenceClassCache    bool
    disablePreemption              bool
    percentageOfNodesToScore       int32
    bindTimeoutSeconds             int64
}

// ……

// LINE 121
var defaultSchedulerOptions = schedulerOptions{
    // "default-scheduler"
   schedulerName:                  v1.DefaultSchedulerName,
    // 1
   hardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight,
   enableEquivalenceClassCache:    false,
   disablePreemption:              false,
    // 50
   percentageOfNodesToScore:       schedulerapi.DefaultPercentageOfNodesToScore,
    // 100
   bindTimeoutSeconds:             BindTimeoutSeconds,
}
Copy

回到New()函数的逻辑:

pkg/scheduler/scheduler.go:148

for _, opt := range opts {
   opt(&options)
}
Copy

这里的 opts 定义在参数里:opts ...func(o *schedulerOptions),我们看一个实参来理解一下:scheduler.WithName(cc.ComponentConfig.SchedulerName)

pkg/scheduler/scheduler.go:80

// 这个函数能够把给定的 schedulerName 赋值给 schedulerOptions.schedulerName
func WithName(schedulerName string) Option {
   return func(o *schedulerOptions) {
      o.schedulerName = schedulerName
   }
}
Copy

这种方式设置一个对象的属性还是挺有意思的。

4.1. 调度算法源

我们继续往后面看New()函数的其他逻辑:

source := schedulerAlgorithmSource 这行代码里的 schedulerAlgorithmSource 代表了什么?

形参中有这个变量的定义:schedulerAlgorithmSource kubeschedulerconfig.SchedulerAlgorithmSource,跟进去可以看到:

pkg/scheduler/apis/config/types.go:97

// 表示调度算法源,两个属性是互相排斥的,也就是二选一的意思
type SchedulerAlgorithmSource struct {
   // Policy is a policy based algorithm source.
   Policy *SchedulerPolicySource
   // Provider is the name of a scheduling algorithm provider to use.
   Provider *string
}
Copy

这两个属性肯定得理解一下了,目测挺重要的样子:

Policy

pkg/scheduler/apis/config/types.go:106

type SchedulerPolicySource struct {
   // 文件方式配置生效的调度算法
   File *SchedulerPolicyFileSource
   // ConfigMap 方式配置生效的调度算法
   ConfigMap *SchedulerPolicyConfigMapSource
}
// 下面分别是2个属性的结构定义:
// ……
type SchedulerPolicyFileSource struct {
    // Path is the location of a serialized policy.
    Path string
}

// ……
type SchedulerPolicyConfigMapSource struct {
    // Namespace is the namespace of the policy config map.
    Namespace string
    // Name is the name of hte policy config map.
    Name string
}
Copy

大家还记得我们在讲调度器设计的时候提到的 Policy 文件不?大概长这个样子:

{
"kind" : "Policy",
"apiVersion" : "v1",
"predicates" : [
    {"name" : "PodFitsHostPorts"},
    {"name" : "HostName"}
    ],
"priorities" : [
    {"name" : "LeastRequestedPriority", "weight" : 1},
    {"name" : "EqualPriority", "weight" : 1}
    ],
"hardPodAffinitySymmetricWeight" : 10,
"alwaysCheckAllPredicates" : false
}
Copy

所以啊,这个 Policy原来是通过代码里的 SchedulerPolicySource 去配置的~

4.2. policy / provider 如何生效

前面讲到调度算法从何而来(源头),现在我们看一下这些算法配置如何生效的:

pkg/scheduler/scheduler.go:173

source := schedulerAlgorithmSource
switch {
    // 如果 Provider 配置了,就不用 policy 了
case source.Provider != nil:
   // 根据给定的 Provider 创建 scheduler config
   sc, err := configurator.CreateFromProvider(*source.Provider)
   if err != nil {
      return nil, fmt.Errorf("couldn‘t create scheduler using provider %q: %v", *source.Provider, err)
   }
   config = sc
    // 如果 Policy 提供了,就没有上面的 provider 的事情了
case source.Policy != nil:
   // 根据给定的 Policy 创建 scheduler config
   policy := &schedulerapi.Policy{}
   switch {
       // 是 File 的情况
   case source.Policy.File != nil:
      if err := initPolicyFromFile(source.Policy.File.Path, policy); err != nil {
         return nil, err
      }
       // 是 ConfigMap 的情况
   case source.Policy.ConfigMap != nil:
      if err := initPolicyFromConfigMap(client, source.Policy.ConfigMap, policy); err != nil {
         return nil, err
      }
   }
   sc, err := configurator.CreateFromConfig(*policy)
   if err != nil {
      return nil, fmt.Errorf("couldn‘t create scheduler from policy: %v", err)
   }
   config = sc
default:
   return nil, fmt.Errorf("unsupported algorithm source: %v", source)
}
Copy

上面代码涉及到的2个类型我们再来关注一下:

  • schedulerapi.Policy
  • factory.Config

这个 Policy 就是具体用于存放我们配置的 policy 的载体,对照着这个结构我们可以判断自己在配置 policy 的时候应该按照什么格式:

pkg/scheduler/api/types.go:47

type Policy struct {
   metav1.TypeMeta
   Predicates []PredicatePolicy
   Priorities []PriorityPolicy
   ExtenderConfigs []ExtenderConfig
   HardPodAffinitySymmetricWeight int32
   AlwaysCheckAllPredicates bool
}
Copy

这个结构内部封装的一层层结构我就不继续贴了,大家感兴趣可以点开看一下,跟到底的落点都是基础类型的,string啊,int啊,bool啊这些~

关于 factory.Config 可能大家有印象,这个结构就是 Scheduler 对象的唯一属性:

pkg/scheduler/scheduler.go:58

type Scheduler struct {
   config *factory.Config
}
Copy

Config 结构体的属性不外乎 Scheduler 在落实调度、抢占等动作时所需要的一系列方法(或对象);在New()函数的最后有一行sched := NewFromConfig(config),实现是简单地实例化 Scheduler,然后将 config 赋值给 Scheduler 的 config 属性,然后返回 Scheduler 对象的地址。

5. 默认生效的算法

我们最后还是单独拎出来强调一下生效了哪些算法的具体逻辑吧,前面有提到一些了,我相信肯定有人很关注这个知识点。

前面提到 Scheduler 创建的时候使用的 New()函数,函数中 switch 判断 schedulerAlgorithmSource 是 Provider 还是 Policy,然后做了具体的初始化逻辑,我们具体看其中一个初始化, 串一下这些点:

sc, err := configurator.CreateFromProvider(*source.Provider)

如果我们配置的是 Provider,这时候代码逻辑调用的是这样一行,这个函数的实现如下:

pkg/scheduler/factory/factory.go:1156

func (c *configFactory) CreateFromProvider(providerName string) (*Config, error) {
    // 比如说我们配置的 name 是 DefaultProvider,这个函数要获取一个 AlgorithmProviderConfig 类型的对象
   provider, err := GetAlgorithmProvider(providerName)
   if err != nil {
      return nil, err
   }
    // 下面详细看
   return c.CreateFromKeys(provider.FitPredicateKeys, provider.PriorityFunctionKeys, []algorithm.SchedulerExtender{})
}
Copy

这个函数里有2个点需要关注,第一个是GetAlgorithmProvider()函数返回了什么:

pkg/scheduler/factory/plugins.go:99

type AlgorithmProviderConfig struct {
   FitPredicateKeys     sets.String
   PriorityFunctionKeys sets.String
}
Copy

看到这个返回值类型,心里就明朗了。

我们继续看比较重要的CreateFromKeys()方法调用的具体逻辑,这个函数的实参中 provider.FitPredicateKeys, provider.PriorityFunctionKeys 很明显和具体的 provider 相关,不同 provider 定义的预置算法不同。继续来看函数实现:

pkg/scheduler/factory/factory.go:1255

func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error) {

    // ……

    // 根据 predicateKeys 得到 predicateFuncs
    predicateFuncs, err := c.GetPredicates(predicateKeys)
    // 根据 priorityKeys 得到 priorityConfigs
    priorityConfigs, err := c.GetPriorityFunctionConfigs(priorityKeys)

    // ……

    // 创建一个 genericScheduler,这个对象我们很熟悉。algo 也就是 Algorithm 的简写;
   algo := core.NewGenericScheduler(
      c.schedulerCache,
      c.equivalencePodCache,
      c.podQueue,
      predicateFuncs, // 和 predicateKeys 对应
      predicateMetaProducer,
      priorityConfigs, // 和 priorityKeys 对应
      priorityMetaProducer,
      // ……
   )

   podBackoff := util.CreateDefaultPodBackoff()
   return &Config{
      // ……
      Algorithm:           algo, // 很清晰了
      // ……
   }, nil
}
Copy

上面的NewGenericScheduler()函数接收了这些参数之后丢给了 genericScheduler 对象,这个对象中 predicates 属性对应参数 predicateFuncs,prioritizers 属性对应参数 priorityConfigs;

从这里的代码可以看出来我们配置的算法源可以影响到 Scheduler 的初始化,最终体现在改变了 Scheduler 对象的 config 属性的 Algorithm 属性的 prioritizers 和 prioritizers 上。我们最后回顾一下这2个属性的类型,就和以前的预选、优选过程分析的时候关注的点对上了:

  • predicates --> map[string]algorithm.FitPredicate
  • prioritizers --> []algorithm.PriorityConfig

是不是很熟悉呢?

行,今天就讲到这里~

原文地址:https://www.cnblogs.com/cloudgeek/p/10651660.html

时间: 2024-11-09 01:40:58

《k8s-1.13版本源码分析》-调度器初始化的相关文章

Spring各版本源码下载

spring framework 各版本源码下载地址 现在spring的源码下载地址真是不好找,这次终于找到了.记录一下,以帮助需要的朋友. https://github.com/spring-projects/spring-framework/tags可以选择需要的版本进行下载. 感谢网友 xiyuan1999 在csdn上提供的此下载地址.

可解决由于IIS、VS低版本源码 兼容性问题。

安装完 Internet Information Services和 Web 管理工具 后,重启VS,以管理员身份打开,同时 选择源码--属性--安全--添加 everyone 所有权限,配置IIS,即可解决由于IIS.VS低版本源码 兼容性问题. 解决方法: 重新启动时  右键 选以管理员身份运行 那是兼容性还没配置把 控制面板-程序和功能 打开或关闭 Windows 功能 找到这个Internet Information Services 展开Web 管理工具 安装完 Internet In

nginx源码分析之模块初始化

在nginx启动过程中,模块的初始化是整个启动过程中的重要部分,而且了解了模块初始化的过程对应后面具体分析各个模块会有事半功倍的效果.在我看来,分析源码来了解模块的初始化是最直接不过的了,所以下面主要通过结合源码来分析模块的初始化过程. 稍微了解nginx的人都知道nginx是高度模块化的,各个功能都封装在模块中,而各个模块的初始化则是根据配置文件来进行的,下面我们会看到nginx边解析配置文件中的指令,边初始化指令所属的模块,指令其实就是指示怎样初始化模块的. 模块初始化框架 模块的初始化主要

nginx源码分析之网络初始化

nginx作为一个高性能的HTTP服务器,网络的处理是其核心,了解网络的初始化有助于加深对nginx网络处理的了解,本文主要通过nginx的源代码来分析其网络初始化. 从配置文件中读取初始化信息 与网络有关的配置命令主要有两个:listen和sever_name.首先先了解这两个命令的用法. listen listen命令设置nginx监听地址,nginx从这里接受请求.对于IP协议,这个地址就是address和port:对于UNIX域套接字协议,这个地址就是path. 一条listen指令只能

ThinkPHP6源码分析之应用初始化

ThinkPHP6 源码分析之应用初始化 官方群点击此处. App Construct 先来看看在 __construct 中做了什么,基本任何框架都会在这里做一些基本的操作,也就是从这里开始延伸出去. public function __construct(string $rootPath = '') { $this->thinkPath = dirname(__DIR__) . DIRECTORY_SEPARATOR; $this->rootPath = $rootPath ? rtrim

Android4.4版本源码变化

一,WifiManager 新增: 1 public void setTdlsEnabled (InetAddress remoteIPAddress, boolean enable) 2 3 Enable/Disable TDLS on a specific local route. 4 5 TDLS enables two wireless endpoints to talk to each other directly without going through the access po

Struts2 源码分析——拦截器的机制

本章简言 上一章讲到关于action代理类的工作.即是如何去找对应的action配置信息,并执行action类的实例.而这一章笔者将讲到在执行action需要用到的拦截器.为什么要讲拦截器呢?可以这样子讲吧.拦截器的应用是sturts2核心的亮点之一.如果不明白拦截器是什么的话,那么你相当于没有学习过struts2.笔者本来想直接讲这一章的知识点.可是又怕读者可能对拦截器没有一个概念化的理解.为什么这么讲呢?struts2在设计拦截器这一个部分的内容.在笔者看来事实是以AOP为核心思想来设计的.

linux内存源码分析 - 页表的初始化

本文为原创,转载请注明:http://www.cnblogs.com/tolimit/ 本文章中系统我们假设为x86下的32位系统,暂且不分析64位系统的页表结构. linux分页 linux下采用四级分页,一个线性地址会分为5个偏移量用于寻址,具体看图: 虽然有四级,但并不是每一级都会用到,在linux中,对于硬件体系的不同可能会用到二级页表,三级页表,四级页表中的其中一个,如下: 64位系统:使用四级分页或三级分页,跟硬件有关. 未开启PAE(物理地址扩展)的32位系统:只使用二级分页,页上

Spring IOC 容器源码分析 - 余下的初始化工作

1. 简介 本篇文章是"Spring IOC 容器源码分析"系列文章的最后一篇文章,本篇文章所分析的对象是 initializeBean 方法,该方法用于对已完成属性填充的 bean 做最后的初始化工作.相较于之前几篇文章所分析的源码,initializeBean 的源码相对比较简单,大家可以愉快的阅读.好了,其他的不多说了,我们直入主题吧. 2. 源码分析 本章我们来分析一下 initializeBean 方法的源码.在完成分析后,还是像往常一样,把方法的执行流程列出来.好了,看源码