Kubernetes自定义调度器
- Metadata
- Link: Project__k8s 自定义调度器
- Date: [[2022-10-26_周三]]
- Tag: #云原生/kubernetes
- Index: Kubernetes 云原生操作系统
- Repo: potapotato/my-scheduler: k8s custom scheduler (github.com)
介绍
其实想自定义调度器是一件非常简单的事情,可无奈的是很难检索到相关博客或者资料,经过苦心寻找最终还是在 这篇博客 上找到了答案。想要自定义调度器,必须先要理解和会用 Client-go ,想要和 Kubernetes 的 APIServer 交互这是必不可少的
基本原理
既然要自定义调度器,那就要知道调度器的输入和输出是什么,其实很简单。
- 输入:未调度的 Pod
- 输出:给未调度的 Pod 的
Spec.NodeName字段赋值,并发送给 APIServer
由于 k8s 优秀的设计,使得各个组件直接相互解耦,只需通过 APIServer 来通信。调度器只需要告诉 APIServer 该 Pod 要调度到那里,具体调度是由 Kubelet 负责的。
为此,就需要拿到未调度的 Pod,想拿到未调度的 Pod 有两种方式:
- 通过 Clients 客户端对象 去
WatchAPIServer 中的 Pod 资源,一旦发生ADDED事件,则说明有新的Pod加入,此时去处理即可。 - 通过 [[Client-go#4. Informer]] 作为缓存,然后同样监听 Informer 中提供的 PodInformer 的
ADDED事件即可。两者本质是一样的,但是更推荐使用 Informer,降低 APIServer 的压力
拿到了未调度的 Pod 后,还需要执行以下操作:
- 查看一下
Spec.ScheduleName是不是当前写的调度器的名字,如果是则运行自己的调度逻辑,找出合适的node并给Spec.NodeName赋值 - 将选择的节点和待调度的 pod 进行 bind 操作
- 手动发送一个
Scheduled事件以便进行监控
简单实现
为了简单起见,只写一个简单的随机调度器,主要是为了研究如何从零写一个调度器
源码解析:
创建clientset,也即创建一个和
APIServer沟通的客户端1
2
3
4
5
6
7
8
9
10
11// 创建配置
config, err := clientcmd.BuildConfigFromFlags("", "config")
if err != nil {
panic(err.Error())
}
// 创建 clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}使用
watch进行监听
有了客户端后,接下来要做的就是通过watch建立一个长链接,监听pod资源的改变注意这里指定了一个 FieldSelector ,只监听 schedulerName 等于当前调度器的且未被调取的 pod
1
2
3
4
5
6
7watch, err := clientset.CoreV1().Pods("").Watch(context.TODO(), metav1.ListOptions{
FieldSelector: fmt.Sprintf("spec.schedulerName=%s,spec.nodeName=", schedulerName),
})
if err != nil {
panic(err.Error())
}这里获取到的
watch是一个chan对象,只要pod资源发生变化,那么变化的事件就会通过该通道传递。因此我们需要去遍历该通道,等待我们期待的ADDED事件执行调度算法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16fmt.Println("start watching ...")
for event := range watch.ResultChan() {
// 判断事件类型
if event.Type != "ADDED" {
continue
}
p := event.Object.(*corev1.Pod)
fmt.Println("found a pod to schedule:", p.Namespace, "/", p.Name)
// 调度
scheduledNode, err := ChooseFitNode(clientset)
fmt.Printf("schedule the pod to %s\n", scheduledNode.Name)
if err != nil {
panic(err.Error())
}当发现
ADDED事件时,则通过该pod去执行调度算法(这里是一个随机选择的算法)
随机选择算法如下:
1 | |
bind操作
当找到合适的node后,就可以将该node和pod进行绑定1
2
3
4
5
6
7
8
9
10
11
12// bind 操作
clientset.CoreV1().Pods(p.Namespace).Bind(context.TODO(), &corev1.Binding{
ObjectMeta: metav1.ObjectMeta{
Name: p.Name,
Namespace: p.Namespace,
},
Target: corev1.ObjectReference{
APIVersion: "v1",
Kind: "Node",
Name: scheduledNode.Name,
},
}, metav1.CreateOptions{})最后发送一个
Scheduled事件,方便监控1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23// 发送一个 `Scheduled` 事件,以便监控
timestamp := time.Now().UTC()
message := fmt.Sprintf("schedule pod %s/%s to node %s\n", p.Namespace, p.Name, scheduledNode.Name)
clientset.CoreV1().Events(p.Namespace).Create(context.TODO(), &corev1.Event{
Count: 1,
Message: message,
Reason: "Scheduled",
LastTimestamp: metav1.NewTime(timestamp),
FirstTimestamp: metav1.NewTime(timestamp),
Type: "Normal",
Source: v1.EventSource{
Component: schedulerName,
},
InvolvedObject: v1.ObjectReference{
Kind: "Pod",
Name: p.Name,
Namespace: p.Namespace,
UID: p.UID,
},
ObjectMeta: metav1.ObjectMeta{
GenerateName: p.Name + "-",
},
}, metav1.CreateOptions{})编写一个 pod 并指定
Spec.scheduleName为当前调度器名字
运行结果:
Informer 版实现
虽然只是将 watch 修改为了 Informer 进行缓存,但整体实现起来还是有在较大差别
首先是 Scheduler 的定义
1 | |
具体原理是:
- 从
informerFactory中获取informer对象,并绑定对应资源的监听事件,当发生ADDED时,将对应的pod送入工作队列中 - 启动所有
informer,并死循环执行调度函数 - 调度函数负责从工作队列从获取
pod并进行后续的调度和bind,发送事件等操作
具体源码见 potapotato/my-scheduler: k8s custom scheduler (github.com)