kubernetes的rolling update机制解析

commit: d577db99873cbf04b8e17b78f17ec8f3a27eca30
Date: Fri Apr 10 23:45:36 2015 -0700

0.命令行和依赖的基础知识

Synopsis

Perform a rolling update of the given ReplicationController.

Replaces the specified controller with new controller, updating one pod at a time to use the
new PodTemplate. The new-controller.json must specify the same namespace as the
existing controller and overwrite at least one (common) label in its replicaSelector.

kubectl rolling-update OLD_CONTROLLER_NAME -f NEW_CONTROLLER_SPEC

Examples

// Update pods of frontend-v1 using new controller data in frontend-v2.json.
$ kubectl rolling-update frontend-v1 -f frontend-v2.json

// Update pods of frontend-v1 using JSON data passed into stdin.
$ cat frontend-v2.json | kubectl rolling-update frontend-v1 -f -

ReplicationController,简称rc,是kubernet体系中某一种类型pod的集合,rc有一个关键参数叫做replicas,也是就是pod的数量。

那么rc有什么用呢?这是为了解决在集群上一堆pod中有些如果挂了,那么就在别的宿主机上把容器启动起来,并让业务流量导入到正确启动的pod上。也就是说,rc保证了集群服务的可用性,当你有很多个服务启动在一个集群中,你需要用程序去监控这些服务的运行状况,并动态保证服务可用。

rc和pod的对应关系是怎么样的?rc通过selector来选择一些pod作为他的控制范围。只要pod的标签(label)符合seletor,则属于这个rc,下面是pod和rc的示例。

xx-controller.json

   "spec":{
      "replicas":1,
      "selector":{
         "name":"redis",
         "role":"master"
      },

xx-pod.json

  "labels": {
    "name": "redis"
  },

kubernetes被我们简称为k8s,如果对其中的基础概念有兴趣可以看这篇

1.kubctl入口

/cmd/kubectl/kubctl.go

func main() {
    runtime.GOMAXPROCS(runtime.NumCPU())
    cmd := cmd.NewKubectlCommand(cmdutil.NewFactory(nil), os.Stdin, os.Stdout, os.Stderr)
    if err := cmd.Execute(); err != nil {
        os.Exit(1)
    }
}

2.实际调用

源代码在pkg包内,/pkg/kubectl/cmd/cmd.go,每个子命令都实现统一的接口,rollingupdate这行是:

    cmds.AddCommand(NewCmdRollingUpdate(f, out))

这个函数的实现在:/pkg/kubectl/cmd/rollingupdate.go

func NewCmdRollingUpdate(f *cmdutil.Factory, out io.Writer) *cobra.Command {
    cmd := &cobra.Command{
        Use: "rolling-update OLD_CONTROLLER_NAME -f NEW_CONTROLLER_SPEC",
        // rollingupdate is deprecated.
        Aliases: []string{"rollingupdate"},
        Short:   "Perform a rolling update of the given ReplicationController.",
        Long:    rollingUpdate_long,
        Example: rollingUpdate_example,
        Run: func(cmd *cobra.Command, args []string) {
            err := RunRollingUpdate(f, out, cmd, args)
            cmdutil.CheckErr(err)
        },
    }
}

可以看到实际调用时的执行函数是RunRollingUpdate,算是进入正题了

func RunRollingUpdate(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, args []string) error {
...
    mapper, typer := f.Object()
    // TODO: use resource.Builder instead
    obj, err := resource.NewBuilder(mapper, typer, f.ClientMapperForCommand()).
        NamespaceParam(cmdNamespace).RequireNamespace().
        FilenameParam(filename).
        Do().
        Object()
    if err != nil {
        return err
    }
    newRc, ok := obj.(*api.ReplicationController)
    if !ok {
        return cmdutil.UsageError(cmd, "%s does not specify a valid ReplicationController", filename)
    }

这是建立一个新的rc的代码,其中resource是kubneter所有资源(pod,service,rc)的基类。可以看到新的rc从json参数文件中获取所有信息,然后转义为ReplicationController这个类。

    if oldName == newName {
        return cmdutil.UsageError(cmd, "%s cannot have the same name as the existing ReplicationController %s",
            filename, oldName)
    }

    var hasLabel bool
    for key, oldValue := range oldRc.Spec.Selector {
        if newValue, ok := newRc.Spec.Selector[key]; ok && newValue != oldValue {
            hasLabel = true
            break
        }
    }
    if !hasLabel {
        return cmdutil.UsageError(cmd, "%s must specify a matching key with non-equal value in Selector for %s",
            filename, oldName)
    }

这里可以看到,对于新的rc和旧的rc,有2项限制,一个是新旧名字需要不同,另一个是rc的selector中需要至少有一项的值不一样。

    updater := kubectl.NewRollingUpdater(newRc.Namespace, client)

    // fetch rc
    oldRc, err := client.ReplicationControllers(newRc.Namespace).Get(oldName)
    if err != nil {
        return err
    }
...
    err = updater.Update(out, oldRc, newRc, period, interval, timeout)
    if err != nil {
        return err
    }

在做rolling update的时候,有两个条件限制,一个是新的rc的名字需要和旧的不一样,第二是至少有个一个标签的值不一样。其中namespace是k8s用来做多租户资源隔离的,可以先忽略不计。

3. 数据结构和实现

这段代码出现了NewRollingUpdater,是在上一层的/pkg/kubectl/rollingupdate.go这个文件中,更加接近主体了

// RollingUpdater provides methods for updating replicated pods in a predictable,
// fault-tolerant way.
type RollingUpdater struct {
    // Client interface for creating and updating controllers
    c client.Interface
    // Namespace for resources
    ns string
}

可以看到这里的RollingUpdater里面是一个k8s的client的结构来向api server发送命令

func (r *RollingUpdater) Update(out io.Writer, oldRc, newRc *api.ReplicationController, updatePeriod, interval, timeout time.Duration) error {
    oldName := oldRc.ObjectMeta.Name
    newName := newRc.ObjectMeta.Name
    retry := &RetryParams{interval, timeout}
    waitForReplicas := &RetryParams{interval, timeout}
    if newRc.Spec.Replicas <= 0 {
        return fmt.Errorf("Invalid controller spec for %s; required: > 0 replicas, actual: %s\n", newName, newRc.Spec)
    }
    desired := newRc.Spec.Replicas
    sourceId := fmt.Sprintf("%s:%s", oldName, oldRc.ObjectMeta.UID)

    // look for existing newRc, incase this update was previously started but interrupted
    rc, existing, err := r.getExistingNewRc(sourceId, newName)
    if existing {
        fmt.Fprintf(out, "Continuing update with existing controller %s.\n", newName)
        if err != nil {
            return err
        }
        replicas := rc.ObjectMeta.Annotations[desiredReplicasAnnotation]
        desired, err = strconv.Atoi(replicas)
        if err != nil {
            return fmt.Errorf("Unable to parse annotation for %s: %s=%s",
                newName, desiredReplicasAnnotation, replicas)
        }
        newRc = rc
    } else {
        fmt.Fprintf(out, "Creating %s\n", newName)
        if newRc.ObjectMeta.Annotations == nil {
            newRc.ObjectMeta.Annotations = map[string]string{}
        }
        newRc.ObjectMeta.Annotations[desiredReplicasAnnotation] = fmt.Sprintf("%d", desired)
        newRc.ObjectMeta.Annotations[sourceIdAnnotation] = sourceId
        newRc.Spec.Replicas = 0
        newRc, err = r.c.ReplicationControllers(r.ns).Create(newRc)
        if err != nil {
            return err
        }
    }

    // +1, -1 on oldRc, newRc until newRc has desired number of replicas or oldRc has 0 replicas
    for newRc.Spec.Replicas < desired && oldRc.Spec.Replicas != 0 {
        newRc.Spec.Replicas += 1
        oldRc.Spec.Replicas -= 1
        fmt.Printf("At beginning of loop: %s replicas: %d, %s replicas: %d\n",
            oldName, oldRc.Spec.Replicas,
            newName, newRc.Spec.Replicas)
        fmt.Fprintf(out, "Updating %s replicas: %d, %s replicas: %d\n",
            oldName, oldRc.Spec.Replicas,
            newName, newRc.Spec.Replicas)

        newRc, err = r.resizeAndWait(newRc, retry, waitForReplicas)
        if err != nil {
            return err
        }
        time.Sleep(updatePeriod)
        oldRc, err = r.resizeAndWait(oldRc, retry, waitForReplicas)
        if err != nil {
            return err
        }
        fmt.Printf("At end of loop: %s replicas: %d, %s replicas: %d\n",
            oldName, oldRc.Spec.Replicas,
            newName, newRc.Spec.Replicas)
    }
    // delete remaining replicas on oldRc
    if oldRc.Spec.Replicas != 0 {
        fmt.Fprintf(out, "Stopping %s replicas: %d -> %d\n",
            oldName, oldRc.Spec.Replicas, 0)
        oldRc.Spec.Replicas = 0
        oldRc, err = r.resizeAndWait(oldRc, retry, waitForReplicas)
        // oldRc, err = r.resizeAndWait(oldRc, interval, timeout)
        if err != nil {
            return err
        }
    }
    // add remaining replicas on newRc
    if newRc.Spec.Replicas != desired {
        fmt.Fprintf(out, "Resizing %s replicas: %d -> %d\n",
            newName, newRc.Spec.Replicas, desired)
        newRc.Spec.Replicas = desired
        newRc, err = r.resizeAndWait(newRc, retry, waitForReplicas)
        if err != nil {
            return err
        }
    }
    // Clean up annotations
    if newRc, err = r.c.ReplicationControllers(r.ns).Get(newName); err != nil {
        return err
    }
    delete(newRc.ObjectMeta.Annotations, sourceIdAnnotation)
    delete(newRc.ObjectMeta.Annotations, desiredReplicasAnnotation)
    newRc, err = r.updateAndWait(newRc, interval, timeout)
    if err != nil {
        return err
    }
    // delete old rc
    fmt.Fprintf(out, "Update succeeded. Deleting %s\n", oldName)
    return r.c.ReplicationControllers(r.ns).Delete(oldName)
}

这段代码很长,但做的事情很简单:

  1. 如果新的rc没有被创建,就先创一下,如果已经创建了(在上次的rolling_update中创建了但超时了)
  2. 用几个循环,把新的rc的replicas增加上去,旧的rc的replicas降低下来,主要调用的函数是resizeAndWait和updateAndWait

4. 底层调用

接上一节的resizeAndWait,代码在/pkg/kubectl/resize.go,这里的具体代码就不贴了
其余的所有调用都发生/pkg/client这个目录下,这是一个http/json的client,主要功能就是向api-server发送请求
整体来说,上面的wait的实现都是比较土的,就是发一个update请求过去,后面轮询的调用get来检测状态是否符合最终需要的状态。

5. 总结

先说一下这三个时间参数的作用:

update-period:新rc增加一个pod后,等待这个period,然后从旧rc缩减一个pod
poll-interval:这个函数名来源于linux上的poll调用,就是每过一个poll-interval,向服务端发起请求,直到这个请求成功或者报失败
timeout:总操作的超时时间

rolling update主要是客户端这边实现的,分析完了,但还是有一些未知的问题,例如:

  1. api-server, cadvisor, kubelet, proxy, etcd这些服务端组件是怎么交互的?怎么保证在服务一直可用的情况下增减pod?
  2. 是否有可能在pod增减的时候插入自己的一些代码或者过程?因为我们目前的架构中没有使用k8s的proxy,需要自己去调用负载均衡的系统给这些pod导流量
  3. 对于具体的pod,我们怎么去做内部程序的健康检查?在业务不可用的情况下向k8s系统发送消息,干掉这个pod,在别的机器上创建新的来替代。
时间: 2024-08-25 12:54:58

kubernetes的rolling update机制解析的相关文章

ASP.NET Core on K8S深入学习(5)Rolling Update

本篇已加入<.NET Core on K8S学习实践系列文章索引>,可以点击查看更多容器化技术相关系列文章. 一.什么是Rolling Update? 为了服务升级过程中提供可持续的不中断的服务,K8S提供了Rolling Update机制,它可以使得服务近乎无缝地平滑升级,即在不停止对外服务的前提下完成应用的更新.滚动更新采用渐进的方式逐步替换旧版本Pod,如果更新不如预期,那么也可以通过回滚操作恢复到更新前的状态. 滚动更新的最大好处在于零停机,整个更新过程始终有副本在运行,从而保证了业务

在 Rolling Update 中使用 Health Check - 每天5分钟玩转 Docker

上一节讨论了 Health Check 在 Scale Up 中的应用,Health Check 另一个重要的应用场景是 Rolling Update.试想一下下面的情况: 现有一个正常运行的多副本应用,接下来对应用进行更新(比如使用更高版本的 image),Kubernetes 会启动新副本,然后发生了如下事件: 正常情况下新副本需要 10 秒钟完成准备工作,在此之前无法响应业务请求. 但由于人为配置错误,副本始终无法完成准备工作(比如无法连接后端数据库). 先别继续往下看,现在请花一分钟思考

在 Rolling Update 中使用 Health Check【转】

上一节讨论了 Health Check 在 Scale Up 中的应用,Health Check 另一个重要的应用场景是 Rolling Update.试想一下下面的情况: 现有一个正常运行的多副本应用,接下来对应用进行更新(比如使用更高版本的 image),Kubernetes 会启动新副本,然后发生了如下事件: 正常情况下新副本需要 10 秒钟完成准备工作,在此之前无法响应业务请求. 但由于人为配置错误,副本始终无法完成准备工作(比如无法连接后端数据库). 先别继续往下看,现在请花一分钟思考

Sql Server Tempdb原理-日志机制解析实践

笔者曾经在面试DBA时的一句”tempdb为什么比其他数据库快?”使得95%以上的应试者都一脸茫然.Tempdb作为Sqlserver的重要特征,一直以来大家对它可能即熟悉又陌生.熟悉是我们时时刻刻都在用,陌生可能是很少有人关注它的运行机制.这次我将通过实例给大家介绍下tempdb的日志机制. 测试用例 我们分别在用户数据库(testpage),tempdb中创建相似对象t1,#t1,并在tempdb中创建创建非临时表,然后执行相应的insert脚本(用以产生日志),并记录执行时间用以比较用以比

Java并发编程:Concurrent锁机制解析

.title { text-align: center } .todo { font-family: monospace; color: red } .done { color: green } .tag { background-color: #eee; font-family: monospace; padding: 2px; font-size: 80%; font-weight: normal } .timestamp { color: #bebebe } .timestamp-kwd

笔记:XML-解析文档-流机制解析器(SAX、StAX)

DOM 解析器完整的读入XML文档,然后将其转换成一个树型的数据结构,对于大多数应用,DOM 都运行很好,但是,如果文档很大,并且处理算法又非常简单,可以在运行时解析节点,而不必看到完整的树形结构,那么我们应该使用流机制解析器(streaming parser),Java 类库提供的流解析机制有 SAX 解析器和 StAX 解析器,SAX 解析器是基于事件回调机制,而 StAX解析器提供了解析事件的迭代器. 使用SAX解析器 SAX 解析器在解析XML 输入的组成部分时会报告事件,在使用 SAX

androd输入管理系统机制解析

 android的输入管理系统主要完成按键.触摸板.鼠标等输入设备的事件输入,功能包括,输入设备的事件输入及向焦点窗口和焦点视图的事件派发,事件的插入,事件的过滤,事件的拦截等功能. 整个输入系统包括服务端和客户端两部分,服务端部分主要完成输入设备事件的读取.事件的映射.事件的插入.事件的过滤.事件的拦截等功能:客户端部分主要完成事件向焦点窗口和焦点视图的派发. 输入系统的整个架构采用的是管道过滤器模式(Pipe and Filter)架构模式.服务端的InputReader和InputDi

android permission权限与安全机制解析(下)

在android permission权限与安全机制解析(上)篇博客中,我已经详细介绍了android相关系统permission和自定义permission,以及一些权限机制和安全机制.这篇博客主要将会介绍到android 6.0的相关权限更改,原理和相关的处理方式,解决方法等. 就以我以前的一个仿最新版微信相册为例子来分析. android 6.0权限全面详细分析和解决方案 Marshmallow版本权限修改 android的权限系统一直是首要的安全概念,因为这些权限只在安装的时候被询问一次

转 Java Classloader机制解析

转 Java Classloader机制解析 发表于11个月前(2014-05-09 11:36)   阅读(693) | 评论(0) 9人收藏此文章, 我要收藏 赞1 慕课网,程序员升职加薪神器,点击免费学习 目录[-] JDK默认ClassLoader 双亲委托模型 如何自定义ClassLoader 1.loadClass 方法 2.findClass 3.defineClass 不遵循“双亲委托机制”的场景 做Java开发,对于ClassLoader的机制是必须要熟悉的基础知识,本文针对J