Client-go的四种客户端的简单使用


Client-go的四种客户端使用

我们知道kubectl是通过命令行交互的方式与Kubernetes API Server进行交互的,Kubernetes还提供了通过编程的方式与Kubernetes API-Server进行通信。

client-go是从 Kubernetes的代码中单独抽离出来的包,并作为官方提 供的Go语言的客户端发挥作用。

client-go简单、易用, Kubernetes系统的其他组件与Kubernetes API Server通信 的方式也基于client-go实现。

client-go在Kubernetes系统上做了大量的 优化,Kubernetes核心组件(如kube-scheduler、kube- controller-manager等)都通过client-go与Kubernetes

API Server进行交互。

Client-go为我们提供了四种客户端。

  • RESTClient:这是最基础的客户端对象,仅对HTTP Request进行了封装,实现RESTFul风格API,这个对象的使用并不方便,因为很多参数都要使用者来设置;
  • ClientSet:把Resource和Version也封装成方法了,用起来更简单直接,一个资源是一个客户端,多个资源就对应了多个客户端,所以ClientSet就是多个客户端的集合,不过ClientSet只能访问内置资源,访问不了自定义资源;
  • DynamicClient:可以访问内置资源和自定义资源,拿出的内容是Object类型,按实际情况自己去做强制转换,当然了也会有强转失败的风险;
  • DiscoveryClient:用于发现kubernetes的API Server支持的Group、Version、Resources等信息。

1、RESTClient

我们来使用RESTClinet实现获取kubernetes集群中所有的pod资源。

package main

import (
	"context"
	"fmt"
	v1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/kubernetes/scheme"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
)

func restClientGetPods() {
	// 获取config,从本机中获取kubeconfig的配置文件。
	config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
	if err != nil {
		panic(err)
	}

	// 因为pod的group为空,version为v1
	config.GroupVersion = &v1.SchemeGroupVersion
	// 设置反序列化
	config.NegotiatedSerializer = scheme.Codecs
	// 指定ApiPath,参考/api/v1/namespaces/{namespace}/pods
	config.APIPath = "/api"

	// 根据配置文件,获取rest客户端
	client, err := rest.RESTClientFor(config)
	if err != nil {
		panic(err)
	}

	// 指定接收参数
	result := &v1.PodList{}

	// 指定namespace
	namespace := "kube-system"
	// 通过rest client获取pod的信息
	err = client.Get().
		Namespace(namespace).                                                    // 指定namespace
		Resource("pods").                                                        // 指定要获取的资源对象
		VersionedParams(&metav1.ListOptions{Limit: 100}, scheme.ParameterCodec). // 指定大小限制和序列化工具
		Do(context.TODO()).                                                      // 执行
		Into(result)                                                             // 将结果写入result
	if err != nil {
		panic(err)
	}

	for _, pod := range result.Items {
		fmt.Println(pod.Name)
	}
}

运行我们的程序,我们就可以获取到"kube-system"命名空间下的所有Pod的列表,但是我们也可以通过RESTClient来增删改查kubernetes其他的内置资源。

2、ClientSet

上面演示了使用RESTClient对kubernetes的内置资源的操作,但是为什么还需要ClientSet呢?

这是因为ClientSet对RESTClient做了一层封装,把每一种资源都封装了对应的客户端。

下面来演示使用ClientSet创建Namespace,在Namespace里创建Deploy,创建Service,使用Service来访问Deploy所管理的Pod。

package main

import (
	"context"
	"fmt"
  "k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
	v1 "k8s.io/api/apps/v1"
	corev1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/kubernetes"
	"k8s.io/utils/pointer"
)

const (
	NAMESPACE  = "dev"
	DEPLOYMENT = "dev-deploy"
	SERVICE    = "dev-service"
)

func initConfig() (*rest.Config, error) {
	// config
	// 从本机中获取kubeconfig的配置文件,因此第一个参数是空字符串
	config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
	if err != nil {
		return nil, err
	}
	return config, nil
}


// 初始化客户端
func initClient() (*kubernetes.Clientset, error) {
	config, err := initConfig()
	if err != nil {
		panic(err)
	}
	// 实例化clientSet对象
	return kubernetes.NewForConfig(config)
}

func createNamespace(clientSet *kubernetes.Clientset) {
	// 获取namespace的接口
	namespaceClient := clientSet.CoreV1().Namespaces()
	namespace := &corev1.Namespace{
		ObjectMeta: metav1.ObjectMeta{
			Name: NAMESPACE,
		},
		Status: corev1.NamespaceStatus{},
	}
	// 调用Create方法进行创建
	ns, err := namespaceClient.Create(context.TODO(), namespace, metav1.CreateOptions{})
	if err != nil {
		panic(err)
	}

	fmt.Println(ns.GetName())
}

func createDeploy(clientSet *kubernetes.Clientset) {
	// 获取deploy的接口
	deployClient := clientSet.AppsV1().Deployments(NAMESPACE)
	// 定义要创建的deployment
	deploy := &v1.Deployment{
		ObjectMeta: metav1.ObjectMeta{
			Name:      DEPLOYMENT,
			Namespace: NAMESPACE,
		},
		Spec: v1.DeploymentSpec{
			Replicas: pointer.Int32(2),
			Selector: &metav1.LabelSelector{
				MatchLabels: map[string]string{
					"app": "nginx",
				},
			},
			Template: corev1.PodTemplateSpec{
				ObjectMeta: metav1.ObjectMeta{
					Name: "nginx",
					Labels: map[string]string{
						"app": "nginx",
					},
				},
				Spec: corev1.PodSpec{
					Containers: []corev1.Container{
						{
							Name:  "nginx",
							Image: "nginx:1.17.1",
							Ports: []corev1.ContainerPort{
								{
									ContainerPort: 80,
									Protocol:      corev1.ProtocolTCP,
								},
							},
							ImagePullPolicy: "IfNotPresent",
						},
					},
				},
			},
		},
		Status: v1.DeploymentStatus{},
	}
	// 创建
	result, err := deployClient.Create(context.TODO(), deploy, metav1.CreateOptions{})
	if err != nil {
		panic(err)
	}
	fmt.Println(result.GetName())
}

// 创建svc
func createService(clientSet *kubernetes.Clientset) {
	// 获取serivce的接口
	serviceClient := clientSet.CoreV1().Services(NAMESPACE)
	// 定义service
	service := &corev1.Service{
		ObjectMeta: metav1.ObjectMeta{
			Name:      SERVICE,
			Namespace: NAMESPACE,
		},
		Spec: corev1.ServiceSpec{
			Ports: []corev1.ServicePort{
				{
					Port:     80,
					NodePort: 30001,
				},
			},
			Selector: map[string]string{
				"app": "nginx",
			},
			Type: corev1.ServiceTypeNodePort,
		},
		Status: corev1.ServiceStatus{},
	}

	result, err := serviceClient.Create(context.TODO(), service, metav1.CreateOptions{})
	if err != nil {
		panic(err)
	}
	fmt.Println(result.GetName())
}

// 测试
func Run() {
	// 创建clientSet的客户端
	clientSet, err := initClient()
	if err != nil {
		panic(err)
	}

	// 通过clientSet创建namespace
	createNamespace(clientSet)

	// 通过clientSet创建deployment
	createDeploy(clientSet)

	// 通过clientSet创建service
	createService(clientSet)
}

func deleteAll() {
	emptyDeleteOptions := metav1.DeleteOptions{}
	clientSet, err := initClient()
	if err != nil {
		return
	}
	// 删除svc
	err = clientSet.CoreV1().Services(NAMESPACE).Delete(context.TODO(), SERVICE, emptyDeleteOptions)
	if err != nil {
		return
	}

	// 删除deployment
	err = clientSet.AppsV1().Deployments(NAMESPACE).Delete(context.TODO(), DEPLOYMENT, emptyDeleteOptions)
	if err != nil {
		return
	}

	// 删除namespace
	err = clientSet.CoreV1().Namespaces().Delete(context.TODO(), NAMESPACE, emptyDeleteOptions)
	if err != nil {
		return
	}

	// 删除pod
	err = clientSet.CoreV1().Pods("default").Delete(context.TODO(), "nginx", emptyDeleteOptions)
	if err != nil {
		return
	}
}

运行Run()后我们进行查看:

$ kubectl get ns | grep dev
dev               Active   57s

$ kubectl get all -n dev
NAME                              READY   STATUS    RESTARTS   AGE
pod/dev-deploy-757ccd788c-jfdsr   1/1     Running   0          2m3s
pod/dev-deploy-757ccd788c-ltp6g   1/1     Running   0          2m3s

NAME                  TYPE       CLUSTER-IP       EXTERNAL-IP   PORT(S)        AGE
service/dev-service   NodePort   10.104.118.223           80:30001/TCP   2m4s

NAME                         READY   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/dev-deploy   2/2     2            2           2m4s

NAME                                    DESIRED   CURRENT   READY   AGE
replicaset.apps/dev-deploy-757ccd788c   2         2         2       2m4s

# 访问
$ curl 192.168.216.196:30001



Welcome to nginx!



Welcome to nginx!

If you see this page, the nginx web server is successfully installed and working. Further configuration is required.

For online documentation and support please refer to nginx.org.
Commercial support is available at nginx.com.

Thank you for using nginx.

最后我们调用deleteAll()做一下清理工作。

$ kubectl get ns | grep dev
dev               Terminating   5m47s

$ kubectl get ns | grep dev 

3、DynamicClient

如果我们只是操作kubernetes的内置资源,前面介绍的两种客户端已经足够了,而且ClientSet把每一种资源都封装了对应的客户端。但是对于CRD资源,前面两种客户端就无能为力了。这时候就该我们的DynamicClient出场了。

在使用DynamicClient之前,我们必须先了解下Object.runtimeUnstructured分别是什么东西。

  • Object.runtime

在kubernetes的代码中,资源对象对应着具体的数据结构,这些数据结构都实现了同一个接口,名为Object.runtime,源码位置是k8s.io/apimachinery/pkg/runtime/interfaces.go,定义如下:

// Object interface must be supported by all API types registered with Scheme. Since objects in a scheme are
// expected to be serialized to the wire, the interface an Object must provide to the Scheme allows
// serializers to set the kind, version, and group the object is represented as. An Object may choose
// to return a no-op ObjectKindAccessor in cases where it is not expected to be serialized.
type Object interface {
	GetObjectKind() schema.ObjectKind
	DeepCopyObject() Object
}

DeepCopyObject():将内存中的对象克隆出一个新的对象;

GetObjectKind():处理Object.runtime类型的变量时,只要调用其GetObjectKind方法就知道它的具体身份了(GVK)。

  • Unstructured

与结构化数据相对的就是非结构化数据了。先看看什么是结构化数据:

{
  "id": 11,
  "name": "libi"
}

对于上面的json字串,key和value的类型是固定的,因此可以编写对应的数据结构来表示:

type Person struct{
  Id int `json:id`
  Name string `json:name`
}

在实际的kubernetes环境中,可能会遇到一些无法预知结构的数据,例如前面的JSON字符串中还有第三个字段,字段值的具体内容和类型在编码时并不确定,而是在真正运行的时候才知道,那么在编码时如何处理呢?在go中我们可以使用空接口interface{}来表示,实际上client-go也是这么做的。k8s.io/apimachinery/pkg/apis/meta/v1/unstructured/unstructured.go:

type Unstructured struct {
	// Object is a JSON compatible map with string, float, int, bool, []interface{}, or
	// map[string]interface{}
	// children.
	Object map[string]interface{}
}

下面我们进入实战环节,这里直接搬取了官方demo。

首先是定义了一个deployment资源实例,进行创建后,更新Pod的数量,接着又获取了一下containers以及更新第一个容器的镜像。接下来获取deploy资源列表并,最后清理deployment。

package main

import (
	"bufio"
	"context"
	"fmt"
  "k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/client-go/dynamic"
	"k8s.io/client-go/util/retry"
	"os"
)

func initConfig() (*rest.Config, error) {
	// config
	// 从本机中获取kubeconfig的配置文件,因此第一个参数是空字符串
	config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
	if err != nil {
		return nil, err
	}
	return config, nil
}


func dynamicClient() {
	config, err := initConfig()
	if err != nil {
		panic(err)
	}

	// 创建dynamic客户端
	client, err := dynamic.NewForConfig(config)
	if err != nil {
		panic(err)
	}

	deploymentRes := schema.GroupVersionResource{
		Group:    "apps",
		Version:  "v1",
		Resource: "deployments",
	}

	deployment := &unstructured.Unstructured{
		Object: map[string]interface{}{
			"apiVersion": "apps/v1",
			"kind":       "Deployment",
			"metadata": map[string]interface{}{
				"name": "demo-deployment",
			},
			"spec": map[string]interface{}{
				"replicas": 2,
				"selector": map[string]interface{}{
					"matchLabels": map[string]interface{}{
						"app": "demo",
					},
				},
				"template": map[string]interface{}{
					"metadata": map[string]interface{}{
						"labels": map[string]interface{}{
							"app": "demo",
						},
					},
					"spec": map[string]interface{}{
						"containers": []map[string]interface{}{
							{
								"name":  "nginx",
								"image": "nginx:1.17.1",
								"ports": []map[string]interface{}{
									{
										"name":          "http",
										"protocol":      "TCP",
										"containerPort": 80,
									},
								},
							},
						},
					},
				},
			},
		},
	}

	// 创建Deployment
	fmt.Println("Create deployment......")
	result, err := client.Resource(deploymentRes).Namespace("default").Create(context.TODO(), deployment, metav1.CreateOptions{})
	if err != nil {
		panic(err)
	}
	fmt.Printf("Create deployment %q.\n", result.GetName())

	// 更新
	prompt()
	retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
		result, getErr := client.Resource(deploymentRes).Namespace("default").
			Get(context.TODO(), "demo-deployment", metav1.GetOptions{})
		if err != nil {
			panic(fmt.Errorf("failed to get latest version of Deployment:%v", getErr))
		}

		// update replicas to 1
		if err := unstructured.SetNestedField(result.Object, int64(1), "spec", "replicas"); err != nil {
			panic(fmt.Errorf("failed to set replica value:%v", err))
		}

		// extract spec containers
		containers, found, err := unstructured.NestedSlice(result.Object, "spec", "template", "spec", "containers")
		if err != nil || !found || containers == nil {
			panic(fmt.Errorf("deployment containers not found or err in sepc:%v", err))
		}

		// update container[0] image
		if err := unstructured.SetNestedField(containers[0].(map[string]interface{}), "nginx:1.17.9", "image"); err != nil {
			panic(err)
		}

		_, updateErr := client.Resource(deploymentRes).Namespace("default").Update(context.TODO(), result, metav1.UpdateOptions{})
		return updateErr
	})
	if retryErr != nil {
		panic(fmt.Errorf("update failed:%v", retryErr))
	}
	fmt.Println("Update deployment......")

	// list Deployments
	prompt()
	fmt.Printf("Listing deployment in namesapce %q:\n", metav1.NamespaceDefault)
	list, err := client.Resource(deploymentRes).Namespace("default").List(context.TODO(), metav1.ListOptions{})
	if err != nil {
		panic(err)
	}

	for _, d := range list.Items {
		replicas, found, err := unstructured.NestedInt64(d.Object, "spec", "replicas")
		if err != nil || !found {
			fmt.Printf("Replicas not found for deployment %s: error=%s", d.GetName(), err)
			continue
		}
		fmt.Printf("* %s(%d replicas)\n", d.GetName(), replicas)
	}

	// Delete Deployment
	prompt()
	fmt.Println("Deleting deployment......")
	deletePolicy := metav1.DeletePropagationBackground
	deleteOptions := metav1.DeleteOptions{
		PropagationPolicy: &deletePolicy,
	}
	if err := client.Resource(deploymentRes).Namespace("default").Delete(context.TODO(), "demo-deployment", deleteOptions); err != nil {
		panic(err)
	}

}

func prompt() {
	fmt.Printf("-> Press Return key to continue.")
	scanner := bufio.NewScanner(os.Stdin)
	for scanner.Scan() {
		break
	}

	if err := scanner.Err(); err != nil {
		panic(err)
	}
	fmt.Println()
}

上面的例子都有一个共同的特点,都是需要我们去组装数据,那么现在有这样一个需求,我不想去组装数据,我已经有了对应的yaml了,我要读取里面的内容,将其转换为kubernetesr认识的资源:

package main

import (
	"context"
	"fmt"
	"k8s.io/apimachinery/pkg/api/meta"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
	"k8s.io/apimachinery/pkg/runtime/serializer/yaml"
	"k8s.io/client-go/discovery"
	memory "k8s.io/client-go/discovery/cached"
	"k8s.io/client-go/dynamic"
	"k8s.io/client-go/restmapper"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/util/retry"
)

const deploymentYAML = `
apiVersion: apps/v1
kind: Deployment
metadata:
  name: nginx-deployment
  namespace: default
spec:
  selector:
    matchLabels:
      app: nginx
  template:
    metadata:
      labels:
        app: nginx
    spec:
      containers:
      - name: nginx
        image: nginx:latest
`

func dynamicClient_demo() {
	// config
	// 从本机中获取kubeconfig的配置文件,因此第一个参数是空字符串
	config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
	if err != nil {
		panic(err)
	}

	// 创建discovery客户端
	discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
	if err != nil {
		panic(err)
	}

	obj := &unstructured.Unstructured{}
	_, gvk, err := yaml.NewDecodingSerializer(unstructured.UnstructuredJSONScheme).Decode([]byte(deploymentYAML), nil, obj)
	if err != nil {
		panic(err)
	}

	// 通过discoveryClient获取GVK GVR 映射
	resourceMapper, err := restmapper.NewDeferredDiscoveryRESTMapper(memory.NewMemCacheClient(discoveryClient)).RESTMapping(gvk.GroupKind(), gvk.Version)
	if err != nil {
		return
	}

	fmt.Println(resourceMapper.Scope.Name())

	// 创建dynamic客户端
	dynamicClient, err := dynamic.NewForConfig(config)
	if err != nil {
		panic(err)
	}

	var dClient dynamic.ResourceInterface
	if resourceMapper.Scope.Name() == meta.RESTScopeNameNamespace {
		dClient = dynamicClient.Resource(resourceMapper.Resource).Namespace(obj.GetNamespace())
	} else {
		dClient = dynamicClient.Resource(resourceMapper.Resource)
	}

	// 创建
	result, err := dClient.Create(context.TODO(), obj, metav1.CreateOptions{})
	if err != nil {
		panic(err)
	}

	fmt.Println(result.GetName())

	// 更新
	if retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
		// 先获取
		getResult, getErr := dClient.Get(context.TODO(), obj.GetName(), metav1.GetOptions{})
		if getErr != nil {
			return err
		}

		// 更新replicas to 2
		err = unstructured.SetNestedField(getResult.Object, int64(2), "spec", "replicas")
		if err != nil {
			return err
		}

		// 更新镜像
		containers, found, err := unstructured.NestedSlice(getResult.Object, "spec", "template", "spec", "containers")
		if err != nil || !found {
			return err
		}
		if err = unstructured.SetNestedField(containers[0].(map[string]interface{}), "nginx:1.17.9", "image"); err != nil {
			return err
		}

		if err = unstructured.SetNestedField(getResult.Object, containers, "spec", "template", "spec", "containers"); err != nil {
			return err
		}

		_, updateErr := dClient.Update(context.TODO(), getResult, metav1.UpdateOptions{})
		return updateErr
	}); retryErr != nil {
		panic(retryErr)
	}

	// 删除
	err = dClient.Delete(context.TODO(), obj.GetName(), metav1.DeleteOptions{})
	if err != nil {
		panic(err)
	}
}

Unstructured与资源对象的相互转换

用Unstructured实例生成资源对象,也可以用资源对象生成Unstructured实例,这个能力是unstructuredConverter的FromUnstructured和ToUnstructured方法分别实现的。

4、DiscoveryClient

DiscoveryClient主要是用于发现kubernetes的API Server支持的Group、Version、Resources等信息。

package test

import (
	"fmt"
	"k8s.io/client-go/discovery"
  "k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
)

func initConfig() (*rest.Config, error) {
	// config
	// 从本机中获取kubeconfig的配置文件,因此第一个参数是空字符串
	config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
	if err != nil {
		return nil, err
	}
	return config, nil
}

func discoveryClient() {
	config, err := initConfig()
	if err != nil {
		panic(config)
	}

	// 新建discoveryClient实例
	client, err := discovery.NewDiscoveryClientForConfig(config)
	if err != nil {
		panic(err)
	}

	// 获取apiGroups 和 apiResources,返回是两个切片
	_, resources, err := client.ServerGroupsAndResources()
	if err != nil {
		panic(err)
	}

	// 先循环遍历看一下有哪些groups,这里直接打印名字
	//for _, group := range apiGroups {
	//	fmt.Println(group.Name)
	//}

	fmt.Println()
	// 查看resources
	for _, resource := range resources {
		// resource里面包含了groupVersion,是个字符串
		// groupVersionStr := resource.GroupVersion
		// ParseGroupVersion将gv字符串转换为struct
		//gv, err := schema.ParseGroupVersion(groupVersionStr)
		//if err != nil {
		//	panic(err)
		//}
		// fmt.Println(gv)
		for _, r := range resource.APIResources {
			fmt.Println(r.Name)
		}
	}
}