#IT明星不是梦#图解kubernetes容器探活机制核心实现

在k8s中通过kubelet拉起一个容器之后,用户可以指定探活的方式用于实现容器的健康性检查,目前支持TCP、Http和命令三种方式,今天介绍其整个探活模块的实现, 了解其周期性探测、计数器、延迟等设计的具体实现

1. 探活的整体设计

1.1 线程模型


探活的线程模型设计相对简单一些,其通过worker来进行底层探活任务的执行,并通过Manager来负责worker的管理, 同时缓存探活的结果

1.2 周期性探活


根据每个探活任务的周期,来生成定时器,则只需要监听定时器事件即可

1.3 探活机制的实现


探活机制的实现除了命令Http和Tcp都相对简单,Tcp只需要直接通过net.DialTimeout链接即可,而Http则是通过构建一个http.Transport构造Http请求执行Do操作即可

相对复杂的则是exec, 其首先要根据当前container的环境变量生成command,然后通过容器、命令、超时时间等构建一个Command最后才是调用runtimeService调用csi执行命令?

2.探活接口实现

2.1 核心成员结构

type prober struct {
    exec execprobe.Prober
    // 我们可以看到针对readiness/liveness会分别启动一个http Transport来进行链接
    readinessHTTP httpprobe.Prober
    livenessHTTP  httpprobe.Prober
    startupHTTP   httpprobe.Prober
    tcp           tcpprobe.Prober
    runner        kubecontainer.ContainerCommandRunner

    // refManager主要是用于获取成员的引用对象
    refManager *kubecontainer.RefManager
    // recorder会负责探测结果事件的构建,并最终传递回 apiserver
    recorder   record.EventRecorder
}

2.2 探活主流程

探活的主流程主要是位于prober的probe方法中,其核心流程分为三段

2.2.1 获取探活的目标配置

func (pb *prober) probe(probeType probeType, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID) (results.Result, error) {
var probeSpec *v1.Probe
// 根据探活的类型来获取对应位置的探活配置
    switch probeType {
    case readiness:
        probeSpec = container.ReadinessProbe
    case liveness:
        probeSpec = container.LivenessProbe
    case startup:
        probeSpec = container.StartupProbe
    default:
        return results.Failure, fmt.Errorf("unknown probe type: %q", probeType)
    }

2.2.2 执行探活记录错误信息

如果返回的错误,或者不是成功或者警告的状态,则会获取对应的引用对象,然后通过 recorder进行事件的构造,发送结果返回apiserver

// 执行探活流程
result, output, err := pb.runProbeWithRetries(probeType, probeSpec, pod, status, container, containerID, maxProbeRetries)

    if err != nil || (result != probe.Success && result != probe.Warning) {
        // // 如果返回的错误,或者不是成功或者警告的状态
        // 则会获取对应的引用对象,然后通过
        ref, hasRef := pb.refManager.GetRef(containerID)
        if !hasRef {
            klog.Warningf("No ref for container %q (%s)", containerID.String(), ctrName)
        }
        if err != nil {
            klog.V(1).Infof("%s probe for %q errored: %v", probeType, ctrName, err)
            recorder进行事件的构造,发送结果返回apiserver
            if hasRef {
                pb.recorder.Eventf(ref, v1.EventTypeWarning, events.ContainerUnhealthy, "%s probe errored: %v", probeType, err)
            }
        } else { // result != probe.Success
            klog.V(1).Infof("%s probe for %q failed (%v): %s", probeType, ctrName, result, output)
            // recorder进行事件的构造,发送结果返回apiserver
            if hasRef {
                pb.recorder.Eventf(ref, v1.EventTypeWarning, events.ContainerUnhealthy, "%s probe failed: %s", probeType, output)
            }
        }
        return results.Failure, err
    }

2.2.3 探活重试实现

func (pb *prober) runProbeWithRetries(probeType probeType, p *v1.Probe, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID, retries int) (probe.Result, string, error) {
    var err error
    var result probe.Result
    var output string
    for i := 0; i < retries; i++ {
        result, output, err = pb.runProbe(probeType, p, pod, status, container, containerID)
        if err == nil {
            return result, output, nil
        }
    }
    return result, output, err
}

2.2.4 根据探活类型执行探活

func (pb *prober) runProbe(probeType probeType, p *v1.Probe, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID) (probe.Result, string, error) {
    timeout := time.Duration(p.TimeoutSeconds) * time.Second
    if p.Exec != nil {
        klog.V(4).Infof("Exec-Probe Pod: %v, Container: %v, Command: %v", pod, container, p.Exec.Command)
        command := kubecontainer.ExpandContainerCommandOnlyStatic(p.Exec.Command, container.Env)
        return pb.exec.Probe(pb.newExecInContainer(container, containerID, command, timeout))
    }
    if p.HTTPGet != nil {
        // 获取协议类型与 http参数信息
        scheme := strings.ToLower(string(p.HTTPGet.Scheme))
        host := p.HTTPGet.Host
        if host == "" {
            host = status.PodIP
        }
        port, err := extractPort(p.HTTPGet.Port, container)
        if err != nil {
            return probe.Unknown, "", err
        }
        path := p.HTTPGet.Path
        klog.V(4).Infof("HTTP-Probe Host: %v://%v, Port: %v, Path: %v", scheme, host, port, path)
        url := formatURL(scheme, host, port, path)
        headers := buildHeader(p.HTTPGet.HTTPHeaders)
        klog.V(4).Infof("HTTP-Probe Headers: %v", headers)
        switch probeType {
        case liveness:
            return pb.livenessHTTP.Probe(url, headers, timeout)
        case startup:
            return pb.startupHTTP.Probe(url, headers, timeout)
        default:
            return pb.readinessHTTP.Probe(url, headers, timeout)
        }
    }
    if p.TCPSocket != nil {
        port, err := extractPort(p.TCPSocket.Port, container)
        if err != nil {
            return probe.Unknown, "", err
        }
        host := p.TCPSocket.Host
        if host == "" {
            host = status.PodIP
        }
        klog.V(4).Infof("TCP-Probe Host: %v, Port: %v, Timeout: %v", host, port, timeout)
        return pb.tcp.Probe(host, port, timeout)
    }
    klog.Warningf("Failed to find probe builder for container: %v", container)
    return probe.Unknown, "", fmt.Errorf("missing probe handler for %s:%s", format.Pod(pod), container.Name)
}

3. worker工作线程

Worker工作线程执行探测,要考虑几个问题:
1.容器刚启动的时候可能需要等待一段时间,比如应用程序可能要做一些初始化的工作,还没有准备好
2.如果发现容器探测失败后重新启动,则在启动之前重复的探测也是没有意义的
3.无论是成功或者失败,可能需要一些阈值来进行辅助,避免单次小概率失败,重启容器

3.1 核心成员?

其中关键参数除了探测配置相关,则主要是onHold参数,该参数用于决定是否延缓对容器的探测,即当容器重启的时候,需要延缓探测,resultRun则是一个计数器,不论是连续成功或者连续失败,都通过该计数器累加,后续会判断是否超过给定阈值

type worker struct {
    // 停止channel
    stopCh chan struct{}

    // 包含探针的pod
    pod *v1.Pod

    // 容器探针
    container v1.Container

    // 探针配置
    spec *v1.Probe

    // 探针类型
    probeType probeType

    // The probe value during the initial delay.
    initialValue results.Result

    // 存储探测结果
    resultsManager results.Manager
    probeManager   *manager

    // 此工作进程的最后一个已知容器ID。
    containerID kubecontainer.ContainerID
    // 最后一次探测结果
    lastResult results.Result
    // 探测连续返回相同结果的此时
    resultRun int

    // 探测失败会设置为true不会进行探测
    onHold bool

    // proberResultsMetricLabels holds the labels attached to this worker
    // for the ProberResults metric by result.
    proberResultsSuccessfulMetricLabels metrics.Labels
    proberResultsFailedMetricLabels     metrics.Labels
    proberResultsUnknownMetricLabels    metrics.Labels
}

3.2 探测实现核心流程

3.2.1 失败容器探测中断

如果当前容器的状态已经被终止了,则就不需要对其进行探测了,直接返回即可

    // 获取当前worker对应pod的状态
    status, ok := w.probeManager.statusManager.GetPodStatus(w.pod.UID)
    if !ok {
        // Either the pod has not been created yet, or it was already deleted.
        klog.V(3).Infof("No status for pod: %v", format.Pod(w.pod))
        return true
    }
    // 如果pod终止worker应该终止
    if status.Phase == v1.PodFailed || status.Phase == v1.PodSucceeded {
        klog.V(3).Infof("Pod %v %v, exiting probe worker",
            format.Pod(w.pod), status.Phase)
        return false
    }

3.2.2 延缓探测恢复

延缓探测恢复主要是指的在发生探测失败的情况下,会进行重启操作,在此期间不会进行探测,恢复的逻辑则是通过判断对应容器的id是否改变,通过修改onHold实现

// 通过容器名字获取最新的容器信息
c, ok := podutil.GetContainerStatus(status.ContainerStatuses, w.container.Name)
    if !ok || len(c.ContainerID) == 0 {
        // Either the container has not been created yet, or it was deleted.
        klog.V(3).Infof("Probe target container not found: %v - %v",
            format.Pod(w.pod), w.container.Name)
        return true // Wait for more information.
    }

    if w.containerID.String() != c.ContainerID {
        // 如果容器改变,则表明重新启动了一个容器
        if !w.containerID.IsEmpty() {
            w.resultsManager.Remove(w.containerID)
        }
        w.containerID = kubecontainer.ParseContainerID(c.ContainerID)
        w.resultsManager.Set(w.containerID, w.initialValue, w.pod)
        // 获取到一个新的容器,则就需要重新开启探测
        w.onHold = false
    }

    if w.onHold {
        //如果当前设置延缓状态为true,则不进行探测
        return true
    }

3.2.3 初始化延迟探测

初始化延迟探测主要是指的容器的Running的运行时间小于配置的InitialDelaySeconds则直接返回


if int32(time.Since(c.State.Running.StartedAt.Time).Seconds()) < w.spec.InitialDelaySeconds {
        return true
    }

3.2.4 执行探测逻辑

    result, err := w.probeManager.prober.probe(w.probeType, w.pod, status, w.container, w.containerID)
    if err != nil {
        // Prober error, throw away the result.
        return true
    }

    switch result {
    case results.Success:
        ProberResults.With(w.proberResultsSuccessfulMetricLabels).Inc()
    case results.Failure:
        ProberResults.With(w.proberResultsFailedMetricLabels).Inc()
    default:
        ProberResults.With(w.proberResultsUnknownMetricLabels).Inc()
    }

3.2.5 累加探测计数

在累加探测计数之后,会判断累加后的计数是否超过设定的阈值,如果未超过则不进行状态变更

    if w.lastResult == result {
        w.resultRun++
    } else {
        w.lastResult = result
        w.resultRun = 1
    }

    if (result == results.Failure && w.resultRun < int(w.spec.FailureThreshold)) ||
        (result == results.Success && w.resultRun < int(w.spec.SuccessThreshold)) {
        // Success or failure is below threshold - leave the probe state unchanged.
        // 成功或失败低于阈值-保持探测器状态不变。
        return true
    }

3.2.6 修改探测状态

如果探测状态发送改变,则需要先进行状态的保存,同时如果是探测失败,则需要修改onHOld状态为true即延缓探测,同时将计数器归0

// 这里会修改对应的状态信息
w.resultsManager.Set(w.containerID, result, w.pod)

    if (w.probeType == liveness || w.probeType == startup) && result == results.Failure {
        // 容器运行liveness/starup检测失败,他们需要重启, 停止探测,直到有新的containerID
        // 这是为了减少命中#21751的机会,其中在容器停止时运行 docker exec可能会导致容器状态损坏
        w.onHold = true
        w.resultRun = 0
    }

3.3 探测主循环流程

主流程就很简答了执行上面的探测流程

func (w *worker) run() {
    // 根据探活周期来构建定时器
    probeTickerPeriod := time.Duration(w.spec.PeriodSeconds) * time.Second

    // If kubelet restarted the probes could be started in rapid succession.
    // Let the worker wait for a random portion of tickerPeriod before probing.
    time.Sleep(time.Duration(rand.Float64() * float64(probeTickerPeriod)))

    probeTicker := time.NewTicker(probeTickerPeriod)

    defer func() {
        // Clean up.
        probeTicker.Stop()
        if !w.containerID.IsEmpty() {
            w.resultsManager.Remove(w.containerID)
        }

        w.probeManager.removeWorker(w.pod.UID, w.container.Name, w.probeType)
        ProberResults.Delete(w.proberResultsSuccessfulMetricLabels)
        ProberResults.Delete(w.proberResultsFailedMetricLabels)
        ProberResults.Delete(w.proberResultsUnknownMetricLabels)
    }()

probeLoop:
    for w.doProbe() {
        // Wait for next probe tick.
        select {
        case <-w.stopCh:
            break probeLoop
        case <-probeTicker.C:
            // continue
        }
    }
}

今天就先到这里面,明天再聊proberManager的实现,大家分享转发,就算对我的支持了,动动手就绪

原文地址:https://blog.51cto.com/srexin/2470431

时间: 2024-07-30 15:16:43

#IT明星不是梦#图解kubernetes容器探活机制核心实现的相关文章

# IT明星不是梦 #图解kubernetes容器探活机制核心实现状态管理

k8s为实现容器探活worker的管理构建了一个Manager组件,该组件负责底层探活worker的管理,并且缓存当前的容器的状态,并对外同步容器的当前状态,今天我们就来分析下其部分核心组件 1. 核心原理实现 Manager缓存的状态主要是会被kubelet.状态组件消费,并且在Pod同步状态的时候,会通过当前Manager里面的探测状态来更新Pod的容器的就绪与启动状态的更新,让我们一起看看Manager自身的一些关键实现吧 2. 探活结果管理 即prober/results/results

# IT明星不是梦 # 图解kubernetes容器状态同步机制核心实现

在K8s中将Pod调度到某一台Node节点之后,后续的状态维护信息则是由对应机器上的kubelet进行维护,如何实时反馈本地运行状态,并通知apiserver则是设计的难点, 本节主要是通过感知Pod状态变化和探测状态改变两个流程来实际分析其核心数据结构,来了解内部设计 1. 状态管理 1.1 静态Pod 静态Pod主要是指的那些不是通过感知apiserver创建的pod, 因为apiserver上并不包含,但是同时也需要维护和获取这类Pod的状态, k8s中就设计了一个镜像Pod的概念,其实就

# IT明星不是梦 #图解kubernetes调度器SchedulingQueue核心源码实现

chedulingQueue是kubernetes scheduler中负责进行等待调度pod存储的对,Scheduler通过SchedulingQueue来获取当前系统中等待调度的Pod,本文主要讨论SchedulingQueue的设计与实现的各种实现, 了解探究其内部实现与底层源码,本系列代码基于kubernets1.1.6分析而来 SchedulingQueue设计 队列与优先级 队列与场景 类型 描述 通常实现 队列 普通队列是一个FIFO的数据结构,根据元素入队的次序依次出队 数组或者

#IT明星不是梦#图解kubernetes Pod生命周期事件生成器

PLEG(PodLifecycleEventGenerator)主要是用于周期性检测Pod的运行状态,从而对比Pod前后状态生成事件从而触发kubelet进行Pod容器状态的校证,让我们一起来初探下其内部实现机制 1. 图解设计 1.1 Pod事件生成 Pod事件生成主要是根据对应Pod前后的状态对比来实现,首先通过runtime来获取当前节点的所有Pod的列表,并将对应的状态进行保存,这样在下一个轮训周期就可以通过前后状态的对比去发现状态发生改变的Pod的容器,并且产生对应的事件 1.2 事件

# IT明星不是梦 # 图解kubernetes资源扩展机制实现(上)

k8s目前主要支持CPU和内存两种资源,为了支持用户需要按需分配的其他硬件类型的资源的调度分配,k8s实现了设备插件框架(device plugin framework)来用于其他硬件类型的资源集成,比如现在机器学习要使用GPU等资源,今天来看下其内部的关键实现 1. 基础概念 1.1 集成方式 1.1.1 DaemonSet与服务 当我们要集成本地硬件的资源的时候,我们可以在当前节点上通过DaemonSet来运行一个GRPC服务,通过这个服务来进行本地硬件资源的上报与分配 1.1.2 服务注册

图解kubernetes中api聚合机制的实现

Kubernetes kubernetes中apiserver的设计无疑是复杂的,其自身内部就包含了三种角色的api服务,今天我们一起来臆测下其内部的设计,搞明白aggregator.apiserver.apiExtensionsServer(crd server)的设计精要 1.从web服务到web网关到CRD apiserver还是蛮复杂的,今天我们只讨论其kube-aggregator/apiserver/apiextensions三者架构上的设计,而不关注诸如请求认证.准入控制.权限等等

kubernetes容器编排系统介绍

版权声明:本文由turboxu原创文章,转载请注明出处: 文章原文链接:https://www.qcloud.com/community/article/152 来源:腾云阁 https://www.qcloud.com/community Kubernetes作为容器编排生态圈中重要一员,是Google大规模容器管理系统borg的开源版本实现,吸收借鉴了google过去十年间在生产环境上所学到的经验与教训. Kubernetes提供应用部署.维护. 扩展机制等功能,利用Kubernetes能方

10分钟快速搭建Kubernetes容器集群平台

官方提供Kubernetes部署3种方式 minikube Minikube是一个工具,可以在本地快速运行一个单点的Kubernetes,尝试Kubernetes或日常开发的用户使用.不能用于生产环境. 官方文档:https://kubernetes.io/docs/setup/minikube/ kubeadm kubeadm可帮助你快速部署一套kubernetes集群.kubeadm设计目的为新用户开始尝试kubernetes提供一种简单的方法.目前是Beta版. 官方文档:https://

Docker Kubernetes 容器扩容与缩容

Docker Kubernetes 容器扩容与缩容 环境: 系统:Centos 7.4 x64 Docker版本:18.09.0 Kubernetes版本:v1.8 管理节点:192.168.1.79 工作节点:192.168.1.78 工作节点:192.168.1.77 创建环境: 1.Deployment名称:nginx-deployment 2.pods副本数为:3  3.image镜像:nginx1.9 管理节点:扩容或缩容deploymnet的pod副本数. kubectl scale