跳到主要内容

Client-go

1.Client-go的四种方式

  • ClientSent
  • DynamicClient
  • RESTClient
  • DiscoveryClient

一般我们使用前两种就够了,第三种也使用很少,第四种我们作为了解即可。通过下面的逻辑,我们可以看出,最终前面三门都会请求RestClient。最终RestClient去请求API server。重点可以了解ClientSet和DynamicClient。

package main

import (
"context"
"fmt"
"flag"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)

1.1 RestClient

RestClient是属于比较底层的客户端,下面通过RestClient来实现一个客户端的简单实现

package main

import (
"context"
"flag"
"fmt"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)

func main() {
kubeconfig := flag.String("kubeconfig", "/etc/rancher/k8s/k8s_test.yaml", "absolute path to the kubeconfig file")
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
fmt.Printf("error: %s", err.Error())
}
config.APIPath = "api"
config.GroupVersion = &corev1.SchemeGroupVersion
//指定编码器
config.NegotiatedSerializer = scheme.Codecs
//初始化 restClient
restClient, err := rest.RESTClientFor(config)
if err != nil {
fmt.Printf("error: %s", err.Error())
}

//因为RestClient获取到的是一个json ,需要 unmashl
//需要创建一个空结构体,用户存储pod列表
podList := &corev1.PodList{}
restClient.Get().Namespace("monitoring").Resource("pods").VersionedParams(&metav1.ListOptions{Limit: 500}, scheme.ParameterCodec).Do(context.Background()).Into(podList)
for _, pod := range podList.Items {
fmt.Println(pod.Name, pod.Namespace, pod.Status.Phase, pod.Status.PodIP)
}
}

可以点Get()方法进去之后查看还有很多的方法,通过rest的方式实现。 比较底层,实现起来需要定义结构体, 也比较麻烦

1.2 ClientSet

源码地址:

https://github.com/kubernetes/client-go/blob/master/kubernetes/clientset.go

最常用的Client, 实现了所有k8s标准对象的接口

package main

import (
"context"
"flag"
"fmt"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/utils/pointer"
)

// 通过clientSet实现创建一个服务
func main() {
kubeconfig := flag.String("kubeconfig", "/etc/rancher/k8s/k8s_test.yaml", "absolute path to the kubeconfig file")
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
fmt.Printf("error: %s", err.Error())
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
fmt.Printf("error: %s", err.Error())
}

//定义一个nginx deployment
deployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "nginx-clientset",
},
Spec: appsv1.DeploymentSpec{
Replicas: pointer.Int32Ptr(1),
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "nginx",
},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "nginx",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "nginx",
Image: "nginx:1.7.9",
Ports: []corev1.ContainerPort{
{
ContainerPort: 80,
},
},
},
},
},
},
},
}

//创建nginx deployment
deploymentClient := clientset.AppsV1().Deployments(corev1.NamespaceDefault)
result, err := deploymentClient.Create(context.TODO(), deployment, metav1.CreateOptions{})
if err != nil {
fmt.Printf("error: %s", err.Error())
}
fmt.Printf("Deployment created: %s", result.GetObjectMeta().GetName())
}

执行

go mod init && go run main.go

如何想要定义一个service,其实跟上面是类似的

总结: ClientSet基本上可以对所有k8s的资源进行增删改查,但是只有一种例外,就是crd。 ClientSet中封装的都是k8s标准的一些资源,所以不会自定义的crd资源封装,如果想要实现操作自定义crd资源,那么在下面的方式中,通过NynamicClient来实现,那么在下面的方式中我将会进行演示说明。

1.3DynamicClient

源码地址:

https://github.com/kubernetes/client-go/tree/master/dynamic

动态客户端: 可以对任何资源进行操作(包括标准k8s资源以及自定义crd)

返回的所有数据结构都是Unstructured struct

1.3.1 使用DynamicClient来获取资源

package main

import (
"flag"
"fmt"
"golang.org/x/net/context"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/clientcmd"
)

func main() {
//可通过GVR来创建任何资源
kubeconfig := flag.String("kubeconfig", "/etc/rancher/k8s/k8s_test.yaml", "absolute path to the kubeconfig file")
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
fmt.Printf("error: %s", err.Error())
}
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
fmt.Printf("error: %s", err.Error())
}
//指定GVR
gvr := schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "pods",
}

//获取pod 列表,这里所谓的podlist是没有unStruct,是属于非结构化数据
unStructPodList, err := dynamicClient.Resource(gvr).Namespace("kube-system").List(context.TODO(), metav1.ListOptions{})
if err != nil {
fmt.Printf("error: %s" + err.Error())
}
//定义一个podList Struct
PodList := &corev1.PodList{}
//将Unstructured 转换为PodList

err = runtime.DefaultUnstructuredConverter.FromUnstructured(unStructPodList.UnstructuredContent(), PodList)
if err != nil {
fmt.Printf("error: %s" + err.Error())
}
for _, Pod := range PodList.Items {
fmt.Println(Pod.Name, Pod.Namespace, Pod.Status.Phase)
}
}

1.3.2 读取yaml文件来创建资源

package main

import (
"context"
_ "embed"
"flag"
"fmt"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/clientcmd"
"strings"
)

var (
//go:embed nginx.yaml
deployYaml string
)

func main() {
//可通过GVR来创建任何资源
kubeconfig := flag.String("kubeconfig", "/etc/rancher/k8s/k8s_test.yaml", "absolute path to the kubeconfig file")
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
fmt.Printf("error: %s", err.Error())
}
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
fmt.Printf("error: %s", err.Error())
}

//解析yaml 转成Unstructured
deployObj := &unstructured.Unstructured{}
if err = yaml.Unmarshal([]byte(deployYaml), deployObj); err != nil {
fmt.Printf("error: %s", err.Error())
}

//指定GVR
apiVersion, found, err := unstructured.NestedString(deployObj.Object, "apiVersion")
if err != nil || !found {
fmt.Printf("error: %s", err.Error())
}
kind, found, err := unstructured.NestedString(deployObj.Object, "kind")
if err != nil || !found {
fmt.Printf("error: %s", err.Error())
}

gvr := schema.GroupVersionResource{}
versionParts := strings.Split(apiVersion, "/")
if len(versionParts) == 2 {
gvr.Group = versionParts[0]
gvr.Version = versionParts[1]
} else {
gvr.Group = versionParts[0]
}
switch kind {
case "Deployment":
gvr.Resource = "deployments"
case "pod":
gvr.Resource = "pods"
default:
fmt.Printf("Unsupported kind: %s", kind)
}
//使用dynamicClient创建资源
_, err = dynamicClient.Resource(gvr).Namespace("default").Create(context.TODO(), deployObj, v1.CreateOptions{})
if err != nil {
fmt.Printf("error: %s", err.Error())
}
fmt.Printf("Create deployment successfully")
}

Nginx.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx-dynamic-2
spec:
selector:
matchLabels:
app: nginx
replicas: 2 # tells deployment to run 2 pods matching the template
template:
metadata:
labels:
app: nginx
spec:
containers:
- name: nginx
image: nginx:1.14.2
ports:
- containerPort: 80

2.Client-gowatch

2.1 使用go-watch实现监听

package main

import (
"context"
"flag"
"fmt"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)

func main() {
kubeconfig := flag.String("kubeconfig", "/etc/rancher/k8s/k8s_test.yaml", "absolute path to the kubeconfig file")
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
fmt.Printf("error: %s", err.Error())
}
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
fmt.Printf("error: %s", err.Error())
}
timeout := int64(600)
watcher, _ := clientSet.CoreV1().Pods("default").Watch(context.TODO(), metav1.ListOptions{TimeoutSeconds: &timeout})
for event := range watcher.ResultChan() {
item := event.Object.(*corev1.Pod)
switch event.Type {
case watch.Added:
processPod(item.GetName())
case watch.Modified:
case watch.Deleted:

}
}

}

func processPod(podName string) {
fmt.Printf("processing pod %s\n", podName)
}

timeout := int64(600)

定义了watch的时间

为什么不推荐直接使用Watch

  • 处理watch超时,断开重连等情况的处理比较复杂

  • Watch机制直接请求K8s API Server,增加了集群负载

  • 对于希望对多个资源Watch时需要创建单独的连接而无法共享,增加资源消耗和集群负载

  • 重连可能会导致事件丢失

  • 大量事件产生时无限流逻辑,可能导致业务过载崩溃

  • 业务获得事件信号后,如果处理失败,没有第二次处理机会

2.2 使用Informer代替

使用Informer的一些特点

  • 基于Watch实现,提供更高层次的抽象,更简单,安全,高性能

  • 自动处理超时和重连机制

  • 本地缓存机制,无须频繁调用API Server

  • 内置全量和增量同步机制,确保事件不丢失

  • 可结合Rate Limiting和延迟队列,控制时间处理速率,避免业务过载,同时支持错误和重试

    使用SharedlnformerFactory创建一个共享的Informer实例 可以实现监听多个资源,比如监听Deployment,Service等,这样可以减少网络和资源消耗,减轻K8s API负载

下面通过Informer来实现,可以解决上面watch的存在的一些问题,比如,informer可以通过缓存来减轻多api-server的压力

package main

import (
"flag"
"fmt"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"time"
)

func main() {
kubeconfig := flag.String("kubeconfig", "/etc/rancher/k8s/k8s_test.yaml", "absolute path to the kubeconfig file")
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
fmt.Printf("error: %s", err.Error())
}
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
fmt.Printf("error: %s", err.Error())
}
informerFactory := informers.NewSharedInformerFactory(clientSet, time.Minute*12) //全量同步时间

//deployment 可以对deployment进行监听
deploymentInformer := informerFactory.Apps().V1().Deployments()
informer := deploymentInformer.Informer()

deployLister := deploymentInformer.Lister()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
fmt.Println("Add obj:")
},
UpdateFunc: func(old, new interface{}) {
fmt.Println("Update old:")
},
DeleteFunc: func(obj interface{}) {
fmt.Println("Delete obj:")
},
})

//service 可以对service进行监听
serviceInformer := informerFactory.Core().V1().Services()
informer = serviceInformer.Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
fmt.Println("Add obj:")
},
UpdateFunc: func(old, new interface{}) {
fmt.Println("Update old")
},
DeleteFunc: func(obj interface{}) {
fmt.Println("Delete obj:")
},
})
stopper := make(chan struct{})
defer close(stopper)
//启动informer, List & watch
informerFactory.Start(stopper)
informerFactory.WaitForCacheSync(stopper) //等待Informer的所有缓存
deployments, err := deployLister.List(labels.Everything())
if err != nil {
fmt.Printf("error: %s", err.Error())
}
for idx, deploy := range deployments {
fmt.Println(idx, deploy.Name)
}
<-stopper
}

2.3 引入RateLimitingQueue

在上面的一个列子,EventHandler业务逻辑处理失败了怎么办?

基于事件驱动,没有二次处理机会

引入WorkQueue(RateLimitingQueue)处理业务逻辑: 错误重试,防止Hot Loop(过载)

package main

import (
"flag"
"fmt"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue"
"time"
)

func main() {

kubeconfig := flag.String("kubeconfig", "/etc/rancher/k8s/k8s_test.yaml", "absolute path to the kubeconfig file")
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
fmt.Printf("error: %s", err.Error())
}
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
fmt.Printf("error: %s", err.Error())
}
informerFactory := informers.NewSharedInformerFactory(clientSet, time.Second*30)

//ratelimitqueue
queue := workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]())
//对 deployment监听
deployInformer := informerFactory.Apps().V1().Deployments()
informer := deployInformer.Informer()
informer.AddEventHandler(&cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { onAddDeployment(obj, queue) },
UpdateFunc: func(old, new interface{}) { onUpdateDeployment(new, queue) },
DeleteFunc: func(obj interface{}) { onDeleteDeployment(obj, queue) },
})

controller := NewController(queue, deployInformer.Informer().GetIndexer(), informer)
stopper := make(chan struct{})
defer close(stopper)

//启动informer
informerFactory.Start(stopper)
informerFactory.WaitForCacheSync(stopper)

//处理队列里的事件
go func() {
for {
if !controller.processNextItem() {
break
}
}
}()
<-stopper
}

type Controller struct {
indexer cache.Indexer
queue workqueue.TypedRateLimitingInterface[string]
informer cache.Controller
}

func NewController(qeue workqueue.TypedRateLimitingInterface[string], indexer cache.Indexer, informer cache.Controller) *Controller {
return &Controller{
indexer: indexer,
queue: qeue,
informer: informer,
}
}
func (c *Controller) processNextItem() bool {
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(key)
err := c.syncToStdout(key)
c.handleErr(err, key)
return true
}

func (c *Controller) syncToStdout(key string) error {
//通过key 直接从indexer中获取对象
obj, exists, err := c.indexer.GetByKey(key)
if err != nil {
fmt.Printf("error getting object with key %s from store: % v\n", key, err)
return err
}

if !exists {
fmt.Printf("key %s does not exist anymore\n", key)
} else {
deployment := obj.(*appsv1.Deployment)
fmt.Printf("sync object %s to stdout\n", deployment.GetName())
if deployment.Name == "test-deployment" {
return fmt.Errorf("test-deployment is not allowed")
}
}
return nil
}

func (c *Controller) handleErr(err error, key string) {
if err == nil {
c.queue.Forget(key)
return
}
if c.queue.NumRequeues(key) < 5 {
fmt.Printf("Error syncing pod %v: %v", key, err)
c.queue.AddRateLimited(key)
}
c.queue.Forget(key)
fmt.Printf("Error syncing pod %v: %v", key, err)
}

func onAddDeployment(obj interface{}, queue workqueue.TypedRateLimitingInterface[string]) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
}

func onUpdateDeployment(newobj interface{}, queue workqueue.TypedRateLimitingInterface[string]) {
key, err := cache.MetaNamespaceKeyFunc(newobj)
if err == nil {
queue.Add(key)
}
}

func onDeleteDeployment(obj interface{}, queue workqueue.TypedRateLimitingInterface[string]) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
}

这个代码的效果如果创建test-deployment, 则会将他重新放入处理队列,模拟重试机制

kubectl create deploy test-deployment --image nginx