# IT明星不是梦 #图解kubernetes容器探活机制核心实现状态管理

k8s为实现容器探活worker的管理构建了一个Manager组件,该组件负责底层探活worker的管理,并且缓存当前的容器的状态,并对外同步容器的当前状态,今天我们就来分析下其部分核心组件

1. 核心原理实现


Manager缓存的状态主要是会被kubelet、状态组件消费,并且在Pod同步状态的时候,会通过当前Manager里面的探测状态来更新Pod的容器的就绪与启动状态的更新,让我们一起看看Manager自身的一些关键实现吧

2. 探活结果管理

即prober/results/results_manager组件,其主要作用是:存储探测结果和通知探测结果

2.1 核心数据结构

cache负责容器的探测结果的保存,updates则负责对外更新状态的订阅,其通过新的结果和cache中的状态进行对比,从而决定是否对外通知

// Manager implementation.
type manager struct {
    // 保护cache
    sync.RWMutex
    // 容器ID->探测结果
    cache map[kubecontainer.ContainerID]Result
    // 更新管道
    updates chan Update
}

2.2 更新缓存通知事件

更新缓存的时候回通过对比前后状态来进行是否发布变更事件,从而通知到外部订阅容器变更的kubelet核心流程


func (m *manager) Set(id kubecontainer.ContainerID, result Result, pod *v1.Pod) {
    // 修改内部状态
    if m.setInternal(id, result) {
        // 同步更新事件
        m.updates <- Update{id, result, pod.UID}
    }
}

内部状态修改与判断是否进行同步实现

// 如果之前的缓存不存在,或者前后状态不一致则会返回true触发更新
func (m *manager) setInternal(id kubecontainer.ContainerID, result Result) bool {
    m.Lock()
    defer m.Unlock()
    prev, exists := m.cache[id]
    if !exists || prev != result {
        m.cache[id] = result
        return true
    }
    return false
}

2.3 对外更新管道


func (m *manager) Updates() <-chan Update {
    return m.updates
}

3.探测管理器

探测管理器是指的prober/prober)manager的Manager组件,其负责当前kubelet上面探活组件的管理,并且进行探测状态结果的缓存与同步,并且内部还通过statusManager来进行apiserver状态的同步

3.1 容器探测Key

每个探测Key包含要探测的目标信息:pod的ID、容器名、探测类型

type probeKey struct {
    podUID        types.UID
    containerName string
    probeType     probeType
}

3.2 核心数据结构

statusManager组件在后续章节里面会进行详细分析,说下livenessManager该组件即探活的结果,所以当一个容器探测失败,则会由kubelet本地先进行处理,而readlinessManager和startupManager则需要通过statusManager同步给apiserver进行同步

type manager struct {
    //探测Key与worker映射
    workers map[probeKey]*worker
    // 读写锁
    workerLock sync.RWMutex

    //statusManager缓存为探测提供pod IP和容器id。
    statusManager status.Manager

    // 存储readiness探测结果
    readinessManager results.Manager

    // 存储liveness探测结果
    livenessManager results.Manager

    // 存储startup探测结果
    startupManager results.Manager

    // 执行探测操作
    prober *prober
}

3.3 同步startup探测结果

func (m *manager) updateStartup() {
    // 从管道获取数据进行同步
    update := <-m.startupManager.Updates()

    started := update.Result == results.Success
    m.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)
}

3.4 同步readiness探测结果

func (m *manager) updateReadiness() {
    update := <-m.readinessManager.Updates()

    ready := update.Result == results.Success
    m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
}

3.5 启动同步探测结果后台任务

func (m *manager) Start() {
    // Start syncing readiness.
    go wait.Forever(m.updateReadiness, 0)
    // Start syncing startup.
    go wait.Forever(m.updateStartup, 0)
}

3.6 添加Pod探测

添加 Pod的时候会遍历Pod的所有容器,并根据探测类型来进行对应探测worker的构建

func (m *manager) AddPod(pod *v1.Pod) {
    m.workerLock.Lock()
    defer m.workerLock.Unlock()

    key := probeKey{podUID: pod.UID}
    for _, c := range pod.Spec.Containers {
        key.containerName = c.Name

        // 针对startupProbe的探测任务的构建
        if c.StartupProbe != nil && utilfeature.DefaultFeatureGate.Enabled(features.StartupProbe) {
            key.probeType = startup
            if _, ok := m.workers[key]; ok {
                klog.Errorf("Startup probe already exists! %v - %v",
                    format.Pod(pod), c.Name)
                return
            }
            // 构建新的worker
            w := newWorker(m, startup, pod, c)
            m.workers[key] = w
            go w.run()
        }

        // 针对ReadinessProbe的探测任务的构建
        if c.ReadinessProbe != nil {
            key.probeType = readiness
            if _, ok := m.workers[key]; ok {
                klog.Errorf("Readiness probe already exists! %v - %v",
                    format.Pod(pod), c.Name)
                return
            }
            w := newWorker(m, readiness, pod, c)
            m.workers[key] = w
            go w.run()
        }

        // 针对LivenessProbe的探测任务的构建
        if c.LivenessProbe != nil {
            key.probeType = liveness
            if _, ok := m.workers[key]; ok {
                klog.Errorf("Liveness probe already exists! %v - %v",
                    format.Pod(pod), c.Name)
                return
            }
            w := newWorker(m, liveness, pod, c)
            m.workers[key] = w
            go w.run()
        }
    }
}

3.7 更新Pod状态

更新Pod状态主要是根据当前Manager里面缓存的之前的状态信息来更新Pod里面对应容器的状态,这些状态是Pod里面容器最新的探测状态,获取这些状态则是检测当前的容器是否已经就绪和启动,为后续更新流程做基础数据

3.7.1 容器状态更新

    for i, c := range podStatus.ContainerStatuses {
        var ready bool
        // 检测容器状态
        if c.State.Running == nil {
            ready = false
        } else if result, ok := m.readinessManager.Get(kubecontainer.ParseContainerID(c.ContainerID)); ok {
            // 检测readinessMnager里面的状态,如果是成功则就是已经就绪
            ready = result == results.Success
        } else {
            // 检查是否有尚未运行的探测器。只要存在则认为就绪
            _, exists := m.getWorker(podUID, c.Name, readiness)
            ready = !exists
        }
        podStatus.ContainerStatuses[i].Ready = ready

        var started bool
        if c.State.Running == nil {
            started = false
        } else if !utilfeature.DefaultFeatureGate.Enabled(features.StartupProbe) {
            // 容器正在运行,如果StartupProbe功能被禁用,则假定它已启动
            started = true
        } else if result, ok := m.startupManager.Get(kubecontainer.ParseContainerID(c.ContainerID)); ok {
            // 如果startupManager里面的状态是成功的则认为是已经启动的
            started = result == results.Success
        } else {
            // 检查是否有尚未运行的探测器。
            _, exists := m.getWorker(podUID, c.Name, startup)
            started = !exists
        }
        podStatus.ContainerStatuses[i].Started = &started
    }

3.7.2 初始化容器状态更新

针对初始化容器主要容器已经终止并且退出的状态码为0,则认为初始化容器已经就绪

    for i, c := range podStatus.InitContainerStatuses {
        var ready bool
        if c.State.Terminated != nil && c.State.Terminated.ExitCode == 0 {
            // 容器状态
            ready = true
        }
        podStatus.InitContainerStatuses[i].Ready = ready
    }

3.8 存活状态通知

存活状态通知主要是在kubelet的核心流程循环中进行的,如果检测到容器的状态失败,会立刻进行对应pod的容器状态的同步,从而决定下一步的操作是做什么

    case update := <-kl.livenessManager.Updates():
        // 如果探测状态失败
        if update.Result == proberesults.Failure {
            // 省略代码
            handler.HandlePodSyncs([]*v1.Pod{pod})
        }

探活整体的设计大概就是这样,接下来会分期其statusManager组件,即将将探测的状态与apiserver的同步的实现, k8s源码阅读电子书地址: https://www.yuque.com/baxiaoshi/tyado3

原文地址:https://blog.51cto.com/srexin/2470468

时间: 2024-07-30 15:16:19

# IT明星不是梦 #图解kubernetes容器探活机制核心实现状态管理的相关文章

#IT明星不是梦#图解kubernetes容器探活机制核心实现

在k8s中通过kubelet拉起一个容器之后,用户可以指定探活的方式用于实现容器的健康性检查,目前支持TCP.Http和命令三种方式,今天介绍其整个探活模块的实现, 了解其周期性探测.计数器.延迟等设计的具体实现 1. 探活的整体设计 1.1 线程模型 探活的线程模型设计相对简单一些,其通过worker来进行底层探活任务的执行,并通过Manager来负责worker的管理, 同时缓存探活的结果 1.2 周期性探活 根据每个探活任务的周期,来生成定时器,则只需要监听定时器事件即可 1.3 探活机制

# IT明星不是梦 # 图解kubernetes容器状态同步机制核心实现

在K8s中将Pod调度到某一台Node节点之后,后续的状态维护信息则是由对应机器上的kubelet进行维护,如何实时反馈本地运行状态,并通知apiserver则是设计的难点, 本节主要是通过感知Pod状态变化和探测状态改变两个流程来实际分析其核心数据结构,来了解内部设计 1. 状态管理 1.1 静态Pod 静态Pod主要是指的那些不是通过感知apiserver创建的pod, 因为apiserver上并不包含,但是同时也需要维护和获取这类Pod的状态, k8s中就设计了一个镜像Pod的概念,其实就

# IT明星不是梦 #图解kubernetes调度器SchedulingQueue核心源码实现

chedulingQueue是kubernetes scheduler中负责进行等待调度pod存储的对,Scheduler通过SchedulingQueue来获取当前系统中等待调度的Pod,本文主要讨论SchedulingQueue的设计与实现的各种实现, 了解探究其内部实现与底层源码,本系列代码基于kubernets1.1.6分析而来 SchedulingQueue设计 队列与优先级 队列与场景 类型 描述 通常实现 队列 普通队列是一个FIFO的数据结构,根据元素入队的次序依次出队 数组或者

#IT明星不是梦#图解kubernetes Pod生命周期事件生成器

PLEG(PodLifecycleEventGenerator)主要是用于周期性检测Pod的运行状态,从而对比Pod前后状态生成事件从而触发kubelet进行Pod容器状态的校证,让我们一起来初探下其内部实现机制 1. 图解设计 1.1 Pod事件生成 Pod事件生成主要是根据对应Pod前后的状态对比来实现,首先通过runtime来获取当前节点的所有Pod的列表,并将对应的状态进行保存,这样在下一个轮训周期就可以通过前后状态的对比去发现状态发生改变的Pod的容器,并且产生对应的事件 1.2 事件

# IT明星不是梦 # 图解kubernetes资源扩展机制实现(上)

k8s目前主要支持CPU和内存两种资源,为了支持用户需要按需分配的其他硬件类型的资源的调度分配,k8s实现了设备插件框架(device plugin framework)来用于其他硬件类型的资源集成,比如现在机器学习要使用GPU等资源,今天来看下其内部的关键实现 1. 基础概念 1.1 集成方式 1.1.1 DaemonSet与服务 当我们要集成本地硬件的资源的时候,我们可以在当前节点上通过DaemonSet来运行一个GRPC服务,通过这个服务来进行本地硬件资源的上报与分配 1.1.2 服务注册

图解kubernetes中api聚合机制的实现

Kubernetes kubernetes中apiserver的设计无疑是复杂的,其自身内部就包含了三种角色的api服务,今天我们一起来臆测下其内部的设计,搞明白aggregator.apiserver.apiExtensionsServer(crd server)的设计精要 1.从web服务到web网关到CRD apiserver还是蛮复杂的,今天我们只讨论其kube-aggregator/apiserver/apiextensions三者架构上的设计,而不关注诸如请求认证.准入控制.权限等等

kubernetes容器编排系统介绍

版权声明:本文由turboxu原创文章,转载请注明出处: 文章原文链接:https://www.qcloud.com/community/article/152 来源:腾云阁 https://www.qcloud.com/community Kubernetes作为容器编排生态圈中重要一员,是Google大规模容器管理系统borg的开源版本实现,吸收借鉴了google过去十年间在生产环境上所学到的经验与教训. Kubernetes提供应用部署.维护. 扩展机制等功能,利用Kubernetes能方

10分钟快速搭建Kubernetes容器集群平台

官方提供Kubernetes部署3种方式 minikube Minikube是一个工具,可以在本地快速运行一个单点的Kubernetes,尝试Kubernetes或日常开发的用户使用.不能用于生产环境. 官方文档:https://kubernetes.io/docs/setup/minikube/ kubeadm kubeadm可帮助你快速部署一套kubernetes集群.kubeadm设计目的为新用户开始尝试kubernetes提供一种简单的方法.目前是Beta版. 官方文档:https://

Docker Kubernetes 容器扩容与缩容

Docker Kubernetes 容器扩容与缩容 环境: 系统:Centos 7.4 x64 Docker版本:18.09.0 Kubernetes版本:v1.8 管理节点:192.168.1.79 工作节点:192.168.1.78 工作节点:192.168.1.77 创建环境: 1.Deployment名称:nginx-deployment 2.pods副本数为:3  3.image镜像:nginx1.9 管理节点:扩容或缩容deploymnet的pod副本数. kubectl scale