Kubernetes自定义调度器


介绍

其实想自定义调度器是一件非常简单的事情,可无奈的是很难检索到相关博客或者资料,经过苦心寻找最终还是在 这篇博客 上找到了答案。想要自定义调度器,必须先要理解和会用 Client-go ,想要和 Kubernetes 的 APIServer 交互这是必不可少的

基本原理

既然要自定义调度器,那就要知道调度器的输入和输出是什么,其实很简单。

  • 输入:未调度的 Pod
  • 输出:给未调度的 Pod 的 Spec.NodeName 字段赋值,并发送给 APIServer

由于 k8s 优秀的设计,使得各个组件直接相互解耦,只需通过 APIServer 来通信。调度器只需要告诉 APIServer 该 Pod 要调度到那里,具体调度是由 Kubelet 负责的。

为此,就需要拿到未调度的 Pod,想拿到未调度的 Pod 有两种方式:

  1. 通过 Clients 客户端对象Watch APIServer 中的 Pod 资源,一旦发生 ADDED 事件,则说明有新的Pod加入,此时去处理即可。
  2. 通过 [[Client-go#4. Informer]] 作为缓存,然后同样监听 Informer 中提供的 PodInformer 的 ADDED 事件即可。

    两者本质是一样的,但是更推荐使用 Informer,降低 APIServer 的压力

拿到了未调度的 Pod 后,还需要执行以下操作:

  • 查看一下 Spec.ScheduleName 是不是当前写的调度器的名字,如果是则运行自己的调度逻辑,找出合适的node并给 Spec.NodeName 赋值
  • 将选择的节点和待调度的 pod 进行 bind 操作
  • 手动发送一个 Scheduled 事件以便进行监控

简单实现

为了简单起见,只写一个简单的随机调度器,主要是为了研究如何从零写一个调度器

源码解析:

  1. 创建clientset,也即创建一个和 APIServer 沟通的客户端

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
        // 创建配置
        config, err := clientcmd.BuildConfigFromFlags("", "config")
        if err != nil {
            panic(err.Error())
        }

        // 创建 clientset
        clientset, err := kubernetes.NewForConfig(config)
        if err != nil {
            panic(err.Error())
        }
  2. 使用 watch 进行监听
    有了客户端后,接下来要做的就是通过 watch 建立一个长链接,监听 pod 资源的改变

    注意这里指定了一个 FieldSelector ,只监听 schedulerName 等于当前调度器的且未被调取的 pod

    1
    2
    3
    4
    5
    6
    7
        watch, err := clientset.CoreV1().Pods("").Watch(context.TODO(), metav1.ListOptions{
            FieldSelector: fmt.Sprintf("spec.schedulerName=%s,spec.nodeName=", schedulerName),
        })

        if err != nil {
            panic(err.Error())
        }

    这里获取到的 watch 是一个 chan 对象,只要 pod 资源发生变化,那么变化的事件就会通过该通道传递。因此我们需要去遍历该通道,等待我们期待的 ADDED 事件

  3. 执行调度算法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    fmt.Println("start watching ...")
        for event := range watch.ResultChan() {
    // 判断事件类型
            if event.Type != "ADDED" {
                continue
            }
            p := event.Object.(*corev1.Pod)
            fmt.Println("found a pod to schedule:", p.Namespace, "/", p.Name)

            // 调度
            scheduledNode, err := ChooseFitNode(clientset)
            fmt.Printf("schedule the pod to %s\n", scheduledNode.Name)

            if err != nil {
                panic(err.Error())
            }

    当发现 ADDED 事件时,则通过该 pod 去执行调度算法(这里是一个随机选择的算法)

随机选择算法如下:

1
2
3
4
5
6
7
8
func ChooseFitNode(clientset *kubernetes.Clientset) (*corev1.Node, error) {
    nodes, _ := clientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
    if len(nodes.Items) == 0 {
        return nil, fmt.Errorf("no nodes can be scheduled")
    }
    return &nodes.Items[rand.Intn(len(nodes.Items))], nil

}
  1. bind操作
    当找到合适的 node 后,就可以将该 nodepod 进行绑定

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    // bind 操作
    clientset.CoreV1().Pods(p.Namespace).Bind(context.TODO(), &corev1.Binding{
    ObjectMeta: metav1.ObjectMeta{
    Name: p.Name,
    Namespace: p.Namespace,
    },
    Target: corev1.ObjectReference{
    APIVersion: "v1",
    Kind: "Node",
    Name: scheduledNode.Name,
    },
    }, metav1.CreateOptions{})
  2. 最后发送一个 Scheduled 事件,方便监控

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    // 发送一个 `Scheduled` 事件,以便监控
    timestamp := time.Now().UTC()
    message := fmt.Sprintf("schedule pod %s/%s to node %s\n", p.Namespace, p.Name, scheduledNode.Name)
    clientset.CoreV1().Events(p.Namespace).Create(context.TODO(), &corev1.Event{
    Count: 1,
    Message: message,
    Reason: "Scheduled",
    LastTimestamp: metav1.NewTime(timestamp),
    FirstTimestamp: metav1.NewTime(timestamp),
    Type: "Normal",
    Source: v1.EventSource{
    Component: schedulerName,
    },
    InvolvedObject: v1.ObjectReference{
    Kind: "Pod",
    Name: p.Name,
    Namespace: p.Namespace,
    UID: p.UID,
    },
    ObjectMeta: metav1.ObjectMeta{
    GenerateName: p.Name + "-",
    },
    }, metav1.CreateOptions{})
  3. 编写一个 pod 并指定 Spec.scheduleName为当前调度器名字
    运行结果:

Informer 版实现

虽然只是将 watch 修改为了 Informer 进行缓存,但整体实现起来还是有在较大差别

首先是 Scheduler 的定义

1
2
3
4
5
6
7
8
9
10
type Schedule struct {
// 调度器名字
scheduleName string
// 和apiserver交互的客户端
client *kubernetes.Clientset
// informer的工厂
informerFactory informers.SharedInformerFactory
// 工作队列
queue chan *v1.Pod
}

具体原理是:

  • informerFactory 中获取 informer 对象,并绑定对应资源的监听事件,当发生 ADDED 时,将对应的 pod 送入工作队列中
  • 启动所有 informer ,并死循环执行调度函数
  • 调度函数负责从工作队列从获取 pod 并进行后续的调度和bind,发送事件等操作

具体源码见 potapotato/my-scheduler: k8s custom scheduler (github.com)


Kubernetes自定义调度器
http://potatotato.github.io/2024/06/23/Kubernetes自定义调度器/
作者
dango
发布于
2024年6月23日
许可协议