# IT明星不是梦 # 图解kubernetes资源扩展机制实现(上)

k8s目前主要支持CPU和内存两种资源,为了支持用户需要按需分配的其他硬件类型的资源的调度分配,k8s实现了设备插件框架(device plugin framework)来用于其他硬件类型的资源集成,比如现在机器学习要使用GPU等资源,今天来看下其内部的关键实现

1. 基础概念

1.1 集成方式

1.1.1 DaemonSet与服务

当我们要集成本地硬件的资源的时候,我们可以在当前节点上通过DaemonSet来运行一个GRPC服务,通过这个服务来进行本地硬件资源的上报与分配

1.1.2 服务注册设计

当提供硬件服务需要与kubelet进行通信的时候,则首先需要进行注册,注册的方式,则是通过最原始的底层的socket文件,并且通过Linux文件系统的inotify机制,来实现服务的注册

1.2 插件服务感知

1.2.1 Watcher

Watcher主要是负责感知当前节点上注册的服务,当发现新的要注册的插件服务,则会产生对应的事件,注册到当前的kubelet中

1.2.2 期望状态与实际状态

这里的状态主要是指的是否需要注册,因为kubelet与对应的插件服务是通过网络进行通信的,当网络出现问题、或者对应的插件服务故障,则可能会导致服务注册失败,但此时对应的服务的socket还依旧存在,即对应的插件服务依旧存在

此时就会有两种状态:期望状态与实际状态, 因为socket存在所以服务的期望状态其实是需要注册这个插件服务,但是实际上因为某些原因,这个插件服务并没有完成注册,后续会不断的通过期望状态,调整实际状态,从而达到一致

1.2.3 协调器

协调器则就是完成上述两种状态之间操作的核心,其通过调用对应插件的回调函数,其实就是调用对应的grpc接口,来完成期望状态与实际状态的一致性

1.2.4 插件控制器

针对每种类型的插件,都会有对应的控制器,其实也就是实现对应设备注册和反注册并且完成底层资源的分配(Allocate)和收集(ListWatch)操作

2. 插件服务发现

2.1 核心数据结构

type Watcher struct {
    // 感知插件服务注册的socket的路径
    path                string
    fs                  utilfs.Filesystem
    // inotify监测插件服务socket变化
    fsWatcher           *fsnotify.Watcher
    stopped             chan struct{}
    // 存储期望状态
    desiredStateOfWorld cache.DesiredStateOfWorld
}

2.2 初始化

初始化其实就是创建对应的目录

func (w *Watcher) init() error {
    klog.V(4).Infof("Ensuring Plugin directory at %s ", w.path)

    if err := w.fs.MkdirAll(w.path, 0755); err != nil {
        return fmt.Errorf("error (re-)creating root %s: %v", w.path, err)
    }

    return nil
}

2.3 插件服务发现核心

    go func(fsWatcher *fsnotify.Watcher) {
        defer close(w.stopped)
        for {
            select {
            case event := <-fsWatcher.Events:
                //如果发现对应目录的文件的变化,则会触发对应的事件
                if event.Op&fsnotify.Create == fsnotify.Create {
                    err := w.handleCreateEvent(event)
                    if err != nil {
                        klog.Errorf("error %v when handling create event: %s", err, event)
                    }
                } else if event.Op&fsnotify.Remove == fsnotify.Remove {
                    w.handleDeleteEvent(event)
                }
                continue
            case err := <-fsWatcher.Errors:
                if err != nil {
                    klog.Errorf("fsWatcher received error: %v", err)
                }
                continue
            case <-stopCh:
                // In case of plugin watcher being stopped by plugin manager, stop
                // probing the creation/deletion of plugin sockets.
                // Also give all pending go routines a chance to complete
                select {
                case <-w.stopped:
                case <-time.After(11 * time.Second):
                    klog.Errorf("timeout on stopping watcher")
                }
                w.fsWatcher.Close()
                return
            }
        }
    }(fsWatcher)

2.4 补偿机制

其实补偿机制主要是在重新启动kubelet的时候,需要将之前已经存在的socket重新注册到当前的kubelet中

func (w *Watcher) traversePluginDir(dir string) error {
    return w.fs.Walk(dir, func(path string, info os.FileInfo, err error) error {
        if err != nil {
            if path == dir {
                return fmt.Errorf("error accessing path: %s error: %v", path, err)
            }

            klog.Errorf("error accessing path: %s error: %v", path, err)
            return nil
        }

        switch mode := info.Mode(); {
        case mode.IsDir():
            if err := w.fsWatcher.Add(path); err != nil {
                return fmt.Errorf("failed to watch %s, err: %v", path, err)
            }
        case mode&os.ModeSocket != 0:
            event := fsnotify.Event{
                Name: path,
                Op:   fsnotify.Create,
            }
            //TODO: Handle errors by taking corrective measures
            if err := w.handleCreateEvent(event); err != nil {
                klog.Errorf("error %v when handling create event: %s", err, event)
            }
        default:
            klog.V(5).Infof("Ignoring file %s with mode %v", path, mode)
        }

        return nil
    })
}

2.5 注册事件回调

注册其实就只需要感知到的socket文件路径传递给期望状态进行管理

func (w *Watcher) handlePluginRegistration(socketPath string) error {
    if runtime.GOOS == "windows" {
        socketPath = util.NormalizePath(socketPath)
    }
    // 调用期望状态进行更新
    klog.V(2).Infof("Adding socket path or updating timestamp %s to desired state cache", socketPath)
    err := w.desiredStateOfWorld.AddOrUpdatePlugin(socketPath)
    if err != nil {
        return fmt.Errorf("error adding socket path %s or updating timestamp to desired state cache: %v", socketPath, err)
    }
    return nil
}

2.6 删除事件回调

注册其实就只需要感知到的socket文件路径传递给期望状态进行管理

func (w *Watcher) handleDeleteEvent(event fsnotify.Event) {
    klog.V(6).Infof("Handling delete event: %v", event)

    socketPath := event.Name
    klog.V(2).Infof("Removing socket path %s from desired state cache", socketPath)
    w.desiredStateOfWorld.RemovePlugin(socketPath)
}

3.期望状态与实际状态

3.1 插件信息

插件信息其实只是存储了对应socket的路径和最近更新的时间

type PluginInfo struct {
    SocketPath string
    Timestamp  time.Time
}

3.2 期望状态

期望状态与实际状态在数据结构上都是一样的,因为本质上只是为了存储插件的当前的状态信息,即更新时间,这里不在赘述

type desiredStateOfWorld struct {
    socketFileToInfo map[string]PluginInfo
    sync.RWMutex
}
type actualStateOfWorld struct {

    socketFileToInfo map[string]PluginInfo
    sync.RWMutex
}

4.OperationExecutor

目前k8s中支持两大类的插件的管理一类是DevicePlugin即我们本文说的这些都是这种概念,一类是CSIPlugin,其中针对每一类DRiver的处理其实内部都是不一样的,那其实在操作之前就要先感知到当前的Driver是那种类型的

OperationExecutor主要就是做这件事的,其根据不同的plugin类型,生成不同的要执行的操作,即对应的Plugin类型获取对应的handler,就生成了一个要执行的操作

4.1 生成注册插件回调函数

4.1.1 通过socket连接对应的插件服务

    registerPluginFunc := func() error {
        client, conn, err := dial(socketPath, dialTimeoutDuration)
        if err != nil {
            return fmt.Errorf("RegisterPlugin error -- dial failed at socket %s, err: %v", socketPath, err)
        }
        defer conn.Close()

        ctx, cancel := context.WithTimeout(context.Background(), time.Second)
        defer cancel()

        infoResp, err := client.GetInfo(ctx, &registerapi.InfoRequest{})
        if err != nil {
            return fmt.Errorf("RegisterPlugin error -- failed to get plugin info using RPC GetInfo at socket %s, err: %v", socketPath, err)
        }

4.1.2 根据插件类型验证服务


        handler, ok := pluginHandlers[infoResp.Type]
        if !ok {
            if err := og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- no handler registered for plugin type: %s at socket %s", infoResp.Type, socketPath)); err != nil {
                return fmt.Errorf("RegisterPlugin error -- failed to send error at socket %s, err: %v", socketPath, err)
            }
            return fmt.Errorf("RegisterPlugin error -- no handler registered for plugin type: %s at socket %s", infoResp.Type, socketPath)
        }

        if infoResp.Endpoint == "" {
            infoResp.Endpoint = socketPath
        }
        if err := handler.ValidatePlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil {
            if err = og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- plugin validation failed with err: %v", err)); err != nil {
                return fmt.Errorf("RegisterPlugin error -- failed to send error at socket %s, err: %v", socketPath, err)
            }
            return fmt.Errorf("RegisterPlugin error -- pluginHandler.ValidatePluginFunc failed")
        }

4.1.3 注册插件到实际状态

        err = actualStateOfWorldUpdater.AddPlugin(cache.PluginInfo{
            SocketPath: socketPath,
            Timestamp:  timestamp,
        })
        if err != nil {
            klog.Errorf("RegisterPlugin error -- failed to add plugin at socket %s, err: %v", socketPath, err)
        }
            // 调用插件的注册回调函数
        if err := handler.RegisterPlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil {
            return og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- plugin registration failed with err: %v", err))
        }

4.1.4 通知对应的服务注册成功


        if err := og.notifyPlugin(client, true, ""); err != nil {
            return fmt.Errorf("RegisterPlugin error -- failed to send registration status at socket %s, err: %v", socketPath, err)
        }

4.2 通过socket构建注册client

func dial(unixSocketPath string, timeout time.Duration) (registerapi.RegistrationClient, *grpc.ClientConn, error) {
    ctx, cancel := context.WithTimeout(context.Background(), timeout)
    defer cancel()

    c, err := grpc.DialContext(ctx, unixSocketPath, grpc.WithInsecure(), grpc.WithBlock(),
        grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
            return (&net.Dialer{}).DialContext(ctx, "unix", addr)
        }),
    )

    if err != nil {
        return nil, nil, fmt.Errorf("failed to dial socket %s, err: %v", unixSocketPath, err)
    }

    return registerapi.NewRegistrationClient(c), c, nil
}

今天就先到这里,下一章会继续介绍如何组合上述组件以及默认的回调管理机制的实现,进探究到这里谢谢大家,感谢分享点赞,反转又不花钱

k8s源码阅读电子书地址: https://www.yuque.com/baxiaoshi/tyado3

原文地址:https://blog.51cto.com/srexin/2471583

时间: 2024-10-31 11:42:22

# IT明星不是梦 # 图解kubernetes资源扩展机制实现(上)的相关文章

# IT明星不是梦 #图解kubernetes调度器SchedulingQueue核心源码实现

chedulingQueue是kubernetes scheduler中负责进行等待调度pod存储的对,Scheduler通过SchedulingQueue来获取当前系统中等待调度的Pod,本文主要讨论SchedulingQueue的设计与实现的各种实现, 了解探究其内部实现与底层源码,本系列代码基于kubernets1.1.6分析而来 SchedulingQueue设计 队列与优先级 队列与场景 类型 描述 通常实现 队列 普通队列是一个FIFO的数据结构,根据元素入队的次序依次出队 数组或者

# IT明星不是梦 # 图解kubernetes容器状态同步机制核心实现

在K8s中将Pod调度到某一台Node节点之后,后续的状态维护信息则是由对应机器上的kubelet进行维护,如何实时反馈本地运行状态,并通知apiserver则是设计的难点, 本节主要是通过感知Pod状态变化和探测状态改变两个流程来实际分析其核心数据结构,来了解内部设计 1. 状态管理 1.1 静态Pod 静态Pod主要是指的那些不是通过感知apiserver创建的pod, 因为apiserver上并不包含,但是同时也需要维护和获取这类Pod的状态, k8s中就设计了一个镜像Pod的概念,其实就

#IT明星不是梦#图解kubernetes容器探活机制核心实现

在k8s中通过kubelet拉起一个容器之后,用户可以指定探活的方式用于实现容器的健康性检查,目前支持TCP.Http和命令三种方式,今天介绍其整个探活模块的实现, 了解其周期性探测.计数器.延迟等设计的具体实现 1. 探活的整体设计 1.1 线程模型 探活的线程模型设计相对简单一些,其通过worker来进行底层探活任务的执行,并通过Manager来负责worker的管理, 同时缓存探活的结果 1.2 周期性探活 根据每个探活任务的周期,来生成定时器,则只需要监听定时器事件即可 1.3 探活机制

# IT明星不是梦 #图解kubernetes容器探活机制核心实现状态管理

k8s为实现容器探活worker的管理构建了一个Manager组件,该组件负责底层探活worker的管理,并且缓存当前的容器的状态,并对外同步容器的当前状态,今天我们就来分析下其部分核心组件 1. 核心原理实现 Manager缓存的状态主要是会被kubelet.状态组件消费,并且在Pod同步状态的时候,会通过当前Manager里面的探测状态来更新Pod的容器的就绪与启动状态的更新,让我们一起看看Manager自身的一些关键实现吧 2. 探活结果管理 即prober/results/results

#IT明星不是梦#图解kubernetes Pod生命周期事件生成器

PLEG(PodLifecycleEventGenerator)主要是用于周期性检测Pod的运行状态,从而对比Pod前后状态生成事件从而触发kubelet进行Pod容器状态的校证,让我们一起来初探下其内部实现机制 1. 图解设计 1.1 Pod事件生成 Pod事件生成主要是根据对应Pod前后的状态对比来实现,首先通过runtime来获取当前节点的所有Pod的列表,并将对应的状态进行保存,这样在下一个轮训周期就可以通过前后状态的对比去发现状态发生改变的Pod的容器,并且产生对应的事件 1.2 事件

# IT明星不是梦 # kubernetes调度器学习基础概览

scheudler是kubernetes中的核心组件,负责为用户声明的pod资源选择合适的node,同时保证集群资源的最大化利用,这里先介绍下资源调度系统设计里面的一些基础概念 基础任务资源调度 基础的任务资源调度通常包括三部分: 角色类型 功能 node node负责具体任务的执行,同时对包汇报自己拥有的资源 resource manager 汇总当前集群中所有node提供的资源,供上层的scheduler的调用获取,同时根据node汇报的任务信息来进行当前集群资源的更新 scheduler

kubernetes资源创建详解【持续完善中】

目录 资源创建详解 一:Pod及常用参数 1.简介 2.模板 3.删除pod 4.设置Pod主机名 5.镜像拉取策略(ImagePullPolicy) 二:RC 1.简介 2.模板 三:Deployment 1.简介 2.模板 四:HPA 1.简介 2.模板 五:StatefulSet 1.简介 2.模板 六:PV和PVC 八:扩展 8.1.Pod调度到指定的Node 资源创建详解 一:Pod及常用参数 1.简介 2.模板 3.删除pod 示例流程如下: 用户发送删除pod的命令,默认宽限期是3

深入理解Kubernetes资源限制:CPU

写在前面在上一篇关于Kubernetes资源限制的文章我们讨论了如何通过ResourceRequirements设置Pod中容器内存限制,以及容器运行时是如何利用Linux Cgroups实现这些限制的.也分析了requests是用来通知调度器Pod所需资源需求和limits是在宿主机遇到内存压力时帮助内核限制资源二者的区别. 在本文中,我会继续深入探讨CPU时间的requests和limits.你是否阅读过第一篇文章并不会影响本文的学习,但是我建议你两篇文章都读一读,从而得到工程师或者集群管理

kubefuse 让Kubernetes 资源成为fuse 文件系统

kubefuse 是基于fuse 开发的文件系统,我们可以像访问文件系统一样访问Kubernetes 资源,使用python开发 支持以下特性: 可以使用方便的linux tools: ls. vim .cat 像文件系统一样查看Kubernetes 资源 像文件系统一样访问Kubernetes 资源描述cat ~/kubernetes/default/pod/postgres-aazm1/describe 方便的备份,导出信息 一种可选的持续部署方案 说明 使用kubefuse 工具挂载我们的