跳到主要内容

控制器 CRD 介绍

1、控制器理解

学习demo地址: https://github.com/schwarzeni/kubebuilder-appservice

1.1 项目架构描述

 +---------------------+               +-----------------------------+
| | (1) Create | |
| User (kubectl) +---------------> Kubernetes API |
| | | |
+---------------------+ +-----------------------------+
^
|
| (2) Watch
|
+----------------------+ +-------------+-------------------+
| | | |
| Operator +-----------> Operator Controller |
| Controller (Reconcile) | |
| | | (Reconcile AppService) |
+----------------------+ +-------------+-------------------+
|
| (3) Create/Update
|
+-----------------------+ | +------------------+
| | | | |
| Kubernetes | +----------------> Deployment |
| Resources (Service, | | |
| Deployment, etc.) | +------------------+
+-----------------------+

1.2 项目结构组成描述

.
├── Dockerfile # Docker 文件,用于打包 Operator 镜像
├── Makefile # 构建和管理项目的 Makefile
├── PROJECT # kubebuilder 项目的定义文件
├── config/ # 配置目录,存放 CRD、RBAC、Webhook 等配置
│ ├── crd/
│ │ ├── bases/
│ │ │ └── <group>_<version>_<resource>.yaml # 生成的 CRD 定义
│ ├── default/
│ │ └── kustomization.yaml # 默认的 kustomize 文件,用于生成 YAML 清单
│ ├── manager/
│ │ └── manager.yaml # 管理器 Deployment 的 YAML 文件
│ ├── rbac/
│ │ └── role.yaml # 生成的 RBAC 权限控制文件
│ ├── samples/
│ │ └── <group>_<version>_<resource>.yaml # 示例 CRD 文件
├── go.mod # Go 依赖管理文件
├── go.sum # Go 依赖版本锁定文件
├── hack/ # 辅助脚本
│ └── boilerplate.go.txt
├── api/ # API 定义目录
│ ├── v1alpha1/ # 定义自定义资源的版本
│ │ ├── appservice_types.go # AppService CRD 的结构定义
│ │ ├── groupversion_info.go # 注册 API 组和版本信息
│ │ └── zz_generated.deepcopy.go # 自动生成的 deepcopy 函数
├── controllers/ # 控制器逻辑目录
│ └── appservice_controller.go # 上面提供的 Reconcile 代码应该放在这里
└── main.go # 项目的入口文件

精简版本

.
├── config/ # 配置目录
│ ├── crd/ # CRD 定义
│ │ └── bases/
│ │ └── batch.example.com_appservices.yaml # CRD YAML 文件
│ ├── manager/ # 管理器配置
│ │ └── manager.yaml # 部署控制器的 YAML 文件
│ ├── rbac/ # RBAC 配置
│ │ └── role.yaml # 控制器需要的权限
├── api/ # API 定义目录
│ └── v1alpha1/
│ ├── appservice_types.go # 自定义资源的结构定义
│ └── groupversion_info.go # API 组和版本的注册
├── controllers/ # 控制器逻辑目录
│ └── appservice_controller.go # AppService 控制器逻辑
├── go.mod # Go 依赖管理文件
├── go.sum # Go 依赖版本锁定文件
└── main.go # 项目的入口文件

目录简要说明:

  • config/: 配置文件目录,包含 CRD 定义、RBAC 和控制器部署等相关 YAML 文件。
  • api/v1alpha1/: API 定义目录,包括自定义资源的 SpecStatus 结构体,以及注册 API 组和版本的代码。
  • controllers/: 控制器逻辑所在目录,appservice_controller.go 实现了 Reconcile 逻辑。
  • main.go: 项目入口文件,启动控制器管理器。

1.3 主要文件示例

api/v1alpha1/appservice_types.go (简化版本)

这是定义 AppService 自定义资源的文件。

package v1alpha1

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// AppServiceSpec defines the desired state of AppService
type AppServiceSpec struct {
// 定义你的 AppService 的 Spec 字段
Replicas int32 `json:"replicas"`
}

// AppServiceStatus defines the observed state of AppService
type AppServiceStatus struct {
// 定义你的 AppService 的 Status 字段
AvailableReplicas int32 `json:"availableReplicas"`
}

// +kubebuilder:object:root=true
// +kubebuilder:subresource:status

// AppService is the Schema for the appservices API
type AppService struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`

Spec AppServiceSpec `json:"spec,omitempty"`
Status AppServiceStatus `json:"status,omitempty"`
}

// +kubebuilder:object:root=true

// AppServiceList contains a list of AppService
type AppServiceList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []AppService `json:"items"`
}

func init() {
SchemeBuilder.Register(&AppService{}, &AppServiceList{})
}

controllers/appservice_controller.go

这就是你给出的 Reconcile 函数的地方

package controllers

import (
"context"
"encoding/json"
"reflect"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

batchv1alpha1 "example.com/api/v1alpha1"
ctrl "sigs.k8s.io/controller-runtime"
)

func (r *AppServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
instance := &batchv1alpha1.AppService{}
if err := r.Client.Get(ctx, req.NamespacedName, instance); err != nil {
if errors.IsNotFound(err) {
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}

if instance.DeletionTimestamp != nil {
return reconcile.Result{}, nil
}

deployment := &appsv1.Deployment{}
if err := r.Client.Get(ctx, req.NamespacedName, deployment); err != nil {
if !errors.IsNotFound(err) {
return ctrl.Result{}, err
}

deployment = NewDeployment(instance)
if err := r.Client.Create(ctx, deployment); err != nil {
return ctrl.Result{}, err
}

svc := NewService(instance)
if err := r.Client.Create(ctx, svc); err != nil {
return ctrl.Result{}, err
}
} else {
oldSpec := &batchv1alpha1.AppServiceSpec{}
if err := json.Unmarshal([]byte(instance.Annotations["spec"]), oldSpec); err != nil {
return ctrl.Result{}, err
}
if !reflect.DeepEqual(instance.Spec, *oldSpec) {
newDeployment := NewDeployment(instance)
currDeployment := &appsv1.Deployment{}
if err := r.Client.Get(ctx, req.NamespacedName, currDeployment); err != nil {
return ctrl.Result{}, err
}
currDeployment.Spec = newDeployment.Spec
if err := r.Client.Update(ctx, currDeployment); err != nil {
return ctrl.Result{}, err
}

newService := NewService(instance)
currService := &corev1.Service{}
if err := r.Client.Get(ctx, req.NamespacedName, currService); err != nil {
return ctrl.Result{}, err
}
currIP := currService.Spec.ClusterIP
currService.Spec = newService.Spec
currService.Spec.ClusterIP = currIP
if err := r.Client.Update(ctx, currService); err != nil {
return ctrl.Result{}, err
}
}
}

data, _ := json.Marshal(instance.Spec)
if instance.Annotations != nil {
instance.Annotations["spec"] = string(data)
} else {
instance.Annotations = map[string]string{"spec": string(data)}
}
if err := r.Client.Update(ctx, instance); err != nil {
return ctrl.Result{}, err
}

return ctrl.Result{}, nil
}

1.4 如何生成这些文件

你可以通过以下步骤使用 kubebuilder 工具生成这些文件:

安装 kubebuilder

go install sigs.k8s.io/kubebuilder/cmd@latest

创建项目

kubebuilder init --domain example.com --repo example.com/api

创建 API 和控制器

kubebuilder create api --group batch --version v1alpha1 --kind AppService

编辑生成的控制器文件和 API 文件:根据你的逻辑,修改控制器文件并添加 AppService 资源的业务逻辑。

2、常用组件的一些概念

2.1 informer使用

一个控制器每次获取对象的时候都要访问APIServer,这会给系统带来很高的负载, Informers的内存缓存就是来解决这个问题的。Informers还可以几乎实时的监控对象的变化,而不需要轮训请求,这样就可以保证客户端的缓存数据和服务端的数据一致。就可以大大降低APIServer的压力。

2.2 Reflector

Relector反射器他能感知到k8s中的事件发生变化时,他能够自动拉取最新的数据到Delta中,这个逻辑原理是什么呢?

2.2.1 Reflector 的工作原理

  1. 监视 API Server:Reflector 会使用 Kubernetes 的 Watch 接口,持续监控特定资源的变化(如 Pod、Service 等)。通过 Watch,Reflector 可以接收事件通知,而不需要频繁地向 API Server 查询。
  2. 事件处理:当 Kubernetes 中发生事件(例如资源的创建、更新或删除)时,Reflector 会收到相应的通知。这些通知通常是基于 WebSocket 或 HTTP/2 的长连接,确保能即时感知变化。
  3. 更新本地缓存:一旦收到事件,Reflector 会根据事件类型(添加、更新、删除)对本地缓存进行相应的更新。这是通过调用相应的处理函数(通常是同步到 Delta FIFO 队列)来完成的。
  4. Delta FIFO 队列:Delta FIFO 队列是用来存储最新的数据变更,它可以按顺序处理事件并提供给下游的消费者(如 Informer)。这样,消费者就能处理最新的状态,而不需要直接与 API Server 交互。

为什么能够自动拉取最新数据

Reflector 不需要手动拉取最新数据的原因在于:

  • 事件驱动:Reflector 通过 Watch 接口获取实时事件,这样它不需要依赖定时拉取。
  • 高效的资源管理:它能迅速反应到状态变化,确保本地缓存和 Kubernetes 集群的状态一致。

通过这种机制,Kubernetes 的 Informer 能够高效地维护状态一致性,同时减少对 API Server 的压力。这样设计使得系统更为高效且响应迅速。

2.2.2 Reflector是如何实现监视watch变化

Reflector 实现监视 Kubernetes 中资源的变化主要依赖于 Watch API,而不是定时拉取。具体来说,Reflector 通过以下方式来监视变化:

1. 使用 Watch API
  • 建立连接:Reflector 向 API Server 发送一个 Watch 请求。这个请求是通过 HTTP/1.1 或 HTTP/2 建立的长连接。一旦连接建立,API Server 会保持该连接开放。
  • 实时推送事件:当资源(如 Pod、Service 等)发生变化时,API Server 会立即将这些变化(事件)推送到 Reflector,而不是让 Reflector 定期询问。
2. 处理事件
  • 事件类型:每当 Reflector 收到事件时,会根据事件的类型(添加、更新、删除)来更新其本地缓存或 Delta FIFO 队列。
  • 高效处理:这种方式允许 Reflector 几乎实时地感知变化,确保本地状态与集群状态一致。
3. 连接管理
  • 重连机制:如果连接意外中断(如网络问题或 API Server 重启),Reflector 会尝试重新建立连接,并从上一次的状态开始继续监听。
  • 初始数据同步:在建立 Watch 连接之前,Reflector 通常会先从 API Server 拉取当前资源的完整列表,以便在后续的事件处理中有一个初始状态。

在kubernetes中,watch这类接口很多,比如有下面:

kubernetes controller
informer
kubelet
通过Watch接口监视与其相关的Pod和Node状态变化,以便及时响应
Custom Resource Definitions(CRD)
Kubectl命令
kubect get命令时,可以加上--watch选项,这实际上利用了Watch接口来实现显示资源状态的变化
Metrics Server
监视节点和Pod的资源使用情况,提供集群的性能数据
Kubernetes Dashboard
web ui 也会使用Watch接口来实时更新界面上的资源状态

Watch的基本原理

反向推送:与传统的拉取模型(客户端定期请求服务器)不同,Watch 使用长连接,API Server 会主动将变化推送给客户端。这种方式显著减少了不必要的网络请求和延迟。

事件驱动:当 Kubernetes 集群中的某个资源(如 Pod、Service 等)发生变化时,API Server 会立即将相关事件(如添加、更新、删除)发送到已建立 Watch 的客户端。

2.2.3 watch的demo实现

以下是一个简单的示例代码,模拟 Watch 机制,监听 Pod 的变化

go get k8s.io/client-go@v0.23.0 # 确保版本与你的集群兼容

package main

import (
"context"
"fmt"
"log"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)

func main() {
// 加载 kubeconfig
kubeconfig := "/path/to/your/kubeconfig" // 替换为你的 kubeconfig 路径
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
log.Fatalf("Failed to build kubeconfig: %v", err)
}

// 创建 Kubernetes 客户端
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatalf("Failed to create clientset: %v", err)
}

// 设置 Watch 选项
options := metav1.ListOptions{
Watch: true,
}

// 启动 Watch
watchPods(clientset, options)
}

func watchPods(clientset *kubernetes.Clientset, options metav1.ListOptions) {
// 监听 Pod 变化
watchInterface, err := clientset.CoreV1().Pods("").Watch(context.TODO(), options)
if err != nil {
log.Fatalf("Failed to watch pods: %v", err)
}

fmt.Println("Watching for pod changes...")

// 处理事件
for event := range watchInterface.ResultChan() {
pod, ok := event.Object.(*metav1.Pod)
if !ok {
log.Fatalf("Unexpected type")
}

switch event.Type {
case watch.Added:
fmt.Printf("Pod added: %s\n", pod.Name)
case watch.Modified:
fmt.Printf("Pod modified: %s\n", pod.Name)
case watch.Deleted:
fmt.Printf("Pod deleted: %s\n", pod.Name)
}
}
}