Ingress nginx Controller源码分析01-主体结构


主要结构图

主要逻辑

  1. 构建NginxController
  2. Store对Informer进行封装,用来监听ingress, ingressClass, Endpoint, Service, Secret, ConfigMap, Namespace等的变化,当发生变化时,写入到 updateCh
  3. NginxController的Start方法最后,开启对updateCh的处理,如有数据,则写入syncQueue
  4. Queue启动worker, 如queue中有数据,则回调syncIngress进行处理
  5. syncIngress方法中,判断是否可以动态更新 (IsDynamicConfigurationEnough),还是需要nginx进行Reload(OnUpdate方法中)
  6. generateTemplate方法,根据nginx的配置文件的模板文件,生成对应的配置文件。

入口函数

cmd/nginx/main.go

func main() {
   klog.InitFlags(nil)

   rand.Seed(time.Now().UnixNano())

   fmt.Println(version.String())

   showVersion, conf, err := parseFlags()
   if showVersion {
      os.Exit(0)
   }

   if err != nil {
      klog.Fatal(err)
   }

   err = file.CreateRequiredDirectories()
   if err != nil {
      klog.Fatal(err)
   }

   kubeClient, err := createApiserverClient(conf.APIServerHost, conf.RootCAFile, conf.KubeConfigFile)
   if err != nil {
      handleFatalInitError(err)
   }

   if len(conf.DefaultService) > 0 {
      err := checkService(conf.DefaultService, kubeClient)
      if err != nil {
         klog.Fatal(err)
      }

      klog.InfoS("Valid default backend", "service", conf.DefaultService)
   }

   if len(conf.PublishService) > 0 {
      err := checkService(conf.PublishService, kubeClient)
      if err != nil {
         klog.Fatal(err)
      }
   }

   if conf.Namespace != "" {
      _, err = kubeClient.CoreV1().Namespaces().Get(context.TODO(), conf.Namespace, metav1.GetOptions{})
      if err != nil {
         klog.Fatalf("No namespace with name %v found: %v", conf.Namespace, err)
      }
   }

   conf.FakeCertificate = ssl.GetFakeSSLCert()
   klog.InfoS("SSL fake certificate created", "file", conf.FakeCertificate.PemFileName)

   if !k8s.NetworkingIngressAvailable(kubeClient) {
      klog.Fatalf("ingress-nginx requires Kubernetes v1.19.0 or higher")
   }

   _, err = kubeClient.NetworkingV1().IngressClasses().List(context.TODO(), metav1.ListOptions{})
   if err != nil {
      if !errors.IsNotFound(err) {
         if errors.IsForbidden(err) {
            klog.Warningf("No permissions to list and get Ingress Classes: %v, IngressClass feature will be disabled", err)
            conf.IngressClassConfiguration.IgnoreIngressClass = true
         }
      }
   }
   conf.Client = kubeClient

   err = k8s.GetIngressPod(kubeClient)
   if err != nil {
      klog.Fatalf("Unexpected error obtaining ingress-nginx pod: %v", err)
   }

   reg := prometheus.NewRegistry()

   reg.MustRegister(prometheus.NewGoCollector())
   reg.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{
      PidFn:        func() (int, error) { return os.Getpid(), nil },
      ReportErrors: true,
   }))

   mc := metric.NewDummyCollector()
   if conf.EnableMetrics {
      mc, err = metric.NewCollector(conf.MetricsPerHost, reg, conf.IngressClassConfiguration.Controller)
      if err != nil {
         klog.Fatalf("Error creating prometheus collector:  %v", err)
      }
   }
   // Pass the ValidationWebhook status to determine if we need to start the collector
   // for the admissionWebhook
   mc.Start(conf.ValidationWebhook)

   if conf.EnableProfiling {
      go registerProfiler()
   }

   ngx := controller.NewNGINXController(conf, mc)

   mux := http.NewServeMux()
   registerHealthz(nginx.HealthPath, ngx, mux)
   registerMetrics(reg, mux)

   go startHTTPServer(conf.HealthCheckHost, conf.ListenPorts.Health, mux)
   go ngx.Start()

   handleSigterm(ngx, func(code int) {
      os.Exit(code)
   })
}

主要逻辑

  1. Step1 初始化配置,获取kubeClient

    kubeClient, err := createApiserverClient(conf.APIServerHost, conf.RootCAFile, conf.KubeConfigFile)

  2. Step2 检查DefaultService、PublicService、Namespace等存在

  3. step3 检查IngressClasses权限

  4. step4 检查ingress controller pod存在

  5. Step5 创建NGINXController, 开启健康检查,metrics

    ngx := controller.NewNGINXController(conf, mc)

方法NewNGINXController

//internal/ingress/controller/nginx.go
// NewNGINXController creates a new NGINX Ingress controller.
func NewNGINXController(config *Configuration, mc metric.Collector) *NGINXController {
   eventBroadcaster := record.NewBroadcaster()
   eventBroadcaster.StartLogging(klog.Infof)
   eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{
      Interface: config.Client.CoreV1().Events(config.Namespace),
   })

   h, err := dns.GetSystemNameServers()
   if err != nil {
      klog.Warningf("Error reading system nameservers: %v", err)
   }

   n := &NGINXController{
      isIPV6Enabled: ing_net.IsIPv6Enabled(),

      resolver:        h,
      cfg:             config,
      syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(config.SyncRateLimit, 1),

      recorder: eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{
         Component: "nginx-ingress-controller",
      }),

      stopCh:   make(chan struct{}),
      updateCh: channels.NewRingChannel(1024),

      ngxErrCh: make(chan error),

      stopLock: &sync.Mutex{},

      runningConfig: new(ingress.Configuration),

      Proxy: &TCPProxy{},

      metricCollector: mc,

      command: NewNginxCommand(),
   }

   if n.cfg.ValidationWebhook != "" {
      n.validationWebhookServer = &http.Server{
         Addr:      config.ValidationWebhook,
         Handler:   adm_controller.NewAdmissionControllerServer(&adm_controller.IngressAdmission{Checker: n}),
         TLSConfig: ssl.NewTLSListener(n.cfg.ValidationWebhookCertPath, n.cfg.ValidationWebhookKeyPath).TLSConfig(),
         // disable http/2
         // https://github.com/kubernetes/kubernetes/issues/80313
         // https://github.com/kubernetes/ingress-nginx/issues/6323#issuecomment-737239159
         TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)),
      }
   }

   n.store = store.New(
      config.Namespace,
      config.WatchNamespaceSelector,
      config.ConfigMapName,
      config.TCPConfigMapName,
      config.UDPConfigMapName,
      config.DefaultSSLCertificate,
      config.ResyncPeriod,
      config.Client,
      n.updateCh,
      config.DisableCatchAll,
      config.IngressClassConfiguration)

   n.syncQueue = task.NewTaskQueue(n.syncIngress)

   if config.UpdateStatus {
      n.syncStatus = status.NewStatusSyncer(status.Config{
         Client:                 config.Client,
         PublishService:         config.PublishService,
         PublishStatusAddress:   config.PublishStatusAddress,
         IngressLister:          n.store,
         UpdateStatusOnShutdown: config.UpdateStatusOnShutdown,
         UseNodeInternalIP:      config.UseNodeInternalIP,
      })
   } else {
      klog.Warning("Update of Ingress status is disabled (flag --update-status)")
   }

   onTemplateChange := func() {
      template, err := ngx_template.NewTemplate(nginx.TemplatePath)
      if err != nil {
         // this error is different from the rest because it must be clear why nginx is not working
         klog.ErrorS(err, "Error loading new template")
         return
      }

      n.t = template
      klog.InfoS("New NGINX configuration template loaded")
      n.syncQueue.EnqueueTask(task.GetDummyObject("template-change"))
   }

   ngxTpl, err := ngx_template.NewTemplate(nginx.TemplatePath)
   if err != nil {
      klog.Fatalf("Invalid NGINX configuration template: %v", err)
   }

   n.t = ngxTpl

   _, err = watch.NewFileWatcher(nginx.TemplatePath, onTemplateChange)
   if err != nil {
      klog.Fatalf("Error creating file watcher for %v: %v", nginx.TemplatePath, err)
   }

   filesToWatch := []string{}
   err = filepath.Walk("/etc/nginx/geoip/", func(path string, info os.FileInfo, err error) error {
      if err != nil {
         return err
      }

      if info.IsDir() {
         return nil
      }

      filesToWatch = append(filesToWatch, path)
      return nil
   })

   if err != nil {
      klog.Fatalf("Error creating file watchers: %v", err)
   }

   for _, f := range filesToWatch {
      _, err = watch.NewFileWatcher(f, func() {
         klog.InfoS("File changed detected. Reloading NGINX", "path", f)
         n.syncQueue.EnqueueTask(task.GetDummyObject("file-change"))
      })
      if err != nil {
         klog.Fatalf("Error creating file watcher for %v: %v", f, err)
      }
   }

   return n
}

主要逻辑:

  1. 实例化NGINXController

  2. 创建validationWebhookServer

  3. 创建store缓存

    n.store = store.New
    
  4. 监听模板文件以及/etc/nginx/geoip下的文件变化,如有变化,发送对应的文件变化事件

  5. 当ingress有变化时,执行syncIngress方法回调

    n.syncQueue = task.NewTaskQueue(n.syncIngress)

上面的逻辑已经创建好了主体的结构。下面我们看下当ingress发生改变时,如何回调执行到syncIngress方法。

  1. 监听ingress的变更

    store.go中沟通各种资源的变更监听,以ingress为例

    //internal/ingress/controller/store/store.go
    ingEventHandler := cache.ResourceEventHandlerFuncs{
       AddFunc: func(obj interface{}) {
          ing, _ := toIngress(obj)
    
          if !watchedNamespace(ing.Namespace) {
             return
          }
    
          ic, err := store.GetIngressClass(ing, icConfig)
          if err != nil {
             klog.InfoS("Ignoring ingress because of error while validating ingress class", "ingress", klog.KObj(ing), "error", err)
             return
          }
    
          klog.InfoS("Found valid IngressClass", "ingress", klog.KObj(ing), "ingressclass", ic)
    
          if hasCatchAllIngressRule(ing.Spec) && disableCatchAll {
             klog.InfoS("Ignoring add for catch-all ingress because of --disable-catch-all", "ingress", klog.KObj(ing))
             return
          }
    
          recorder.Eventf(ing, corev1.EventTypeNormal, "Sync", "Scheduled for sync")
    
          store.syncIngress(ing)
          store.updateSecretIngressMap(ing)
          store.syncSecrets(ing)
    
          updateCh.In() <- Event{
             Type: CreateEvent,
             Obj:  obj,
          }
       },
       DeleteFunc: ingDeleteHandler,
       UpdateFunc: func(old, cur interface{}) {
          oldIng, _ := toIngress(old)
          curIng, _ := toIngress(cur)
    
          if !watchedNamespace(oldIng.Namespace) {
             return
          }
    
          var errOld, errCur error
          var classCur string
          if !icConfig.IgnoreIngressClass {
             _, errOld = store.GetIngressClass(oldIng, icConfig)
             classCur, errCur = store.GetIngressClass(curIng, icConfig)
          }
          if errOld != nil && errCur == nil {
             if hasCatchAllIngressRule(curIng.Spec) && disableCatchAll {
                klog.InfoS("ignoring update for catch-all ingress because of --disable-catch-all", "ingress", klog.KObj(curIng))
                return
             }
    
             klog.InfoS("creating ingress", "ingress", klog.KObj(curIng), "ingressclass", classCur)
             recorder.Eventf(curIng, corev1.EventTypeNormal, "Sync", "Scheduled for sync")
          } else if errOld == nil && errCur != nil {
             klog.InfoS("removing ingress because of unknown ingressclass", "ingress", klog.KObj(curIng))
             ingDeleteHandler(old)
             return
          } else if errCur == nil && !reflect.DeepEqual(old, cur) {
             if hasCatchAllIngressRule(curIng.Spec) && disableCatchAll {
                klog.InfoS("ignoring update for catch-all ingress and delete old one because of --disable-catch-all", "ingress", klog.KObj(curIng))
                ingDeleteHandler(old)
                return
             }
    
             recorder.Eventf(curIng, corev1.EventTypeNormal, "Sync", "Scheduled for sync")
          } else {
             klog.V(3).InfoS("No changes on ingress. Skipping update", "ingress", klog.KObj(curIng))
             return
          }
    
          store.syncIngress(curIng)
          store.updateSecretIngressMap(curIng)
          store.syncSecrets(curIng)
    
          updateCh.In() <- Event{
             Type: UpdateEvent,
             Obj:  cur,
          }
       },
    }
    

    internal/ingress/controller/store/store.go

    New方法构造各种资源的处理EventHandler

    Ingress举例:ResourceEventHandlerFuncs

    处理Ingress的Add、Delete、Update

    store.informers.Ingress.AddEventHandler(ingEventHandler)
    if !icConfig.IgnoreIngressClass {
       store.informers.IngressClass.AddEventHandler(ingressClassEventHandler)
    }
    store.informers.Endpoint.AddEventHandler(epEventHandler)
    store.informers.Secret.AddEventHandler(secrEventHandler)
    store.informers.ConfigMap.AddEventHandler(cmEventHandler)
    store.informers.Service.AddEventHandler(serviceHandler)
    

    Ingress、IngressClass、Endpoint、Secret、ConfigMap、Service注册到k8s的sdk Informers中

    当发生变更时,向updateCh中写入数据。

    万能的死循环

    Start方法,开启万能的死循环,处理updateCh中的各种Event

    //internal/ingress/controller/nginx.go
    for {
       select {
       case err := <-n.ngxErrCh:
          if n.isShuttingDown {
             return
          }
    
          // if the nginx master process dies, the workers continue to process requests
          // until the failure of the configured livenessProbe and restart of the pod.
          if process.IsRespawnIfRequired(err) {
             return
          }
    
       case event := <-n.updateCh.Out():
          if n.isShuttingDown {
             break
          }
    
          if evt, ok := event.(store.Event); ok {
             klog.V(3).InfoS("Event received", "type", evt.Type, "object", evt.Obj)
             if evt.Type == store.ConfigurationEvent {
                // TODO: is this necessary? Consider removing this special case
                n.syncQueue.EnqueueTask(task.GetDummyObject("configmap-change"))
                continue
             }
    
             n.syncQueue.EnqueueSkippableTask(evt.Obj)
          } else {
             klog.Warningf("Unexpected event type received %T", event)
          }
       case <-n.stopCh:
          return
       }
    }
    

    updateCh中有数据时,放到syncQueue中

  2. syncQueue中的数据处理

    //internal/task/queue.go
    // worker processes work in the queue through sync.
    func (t *Queue) worker() {
       for {
          key, quit := t.queue.Get()
          if quit {
             if !isClosed(t.workerDone) {
                close(t.workerDone)
             }
             return
          }
          ts := time.Now().UnixNano()
    
          item := key.(Element)
          if item.Timestamp != 0 && t.lastSync > item.Timestamp {
             klog.V(3).InfoS("skipping sync", "key", item.Key, "last", t.lastSync, "now", item.Timestamp)
             t.queue.Forget(key)
             t.queue.Done(key)
             continue
          }
    
          klog.V(3).InfoS("syncing", "key", item.Key)
          if err := t.sync(key); err != nil {
             klog.ErrorS(err, "requeuing", "key", item.Key)
             t.queue.AddRateLimited(Element{
                Key:       item.Key,
                Timestamp: 0,
             })
          } else {
             t.queue.Forget(key)
             t.lastSync = ts
          }
    
          t.queue.Done(key)
       }
    }
    

    当syncQueue中有数据,回调syncIngress方法 t.sync(key)

    该方法构造时被设置为NGINXController的syncIngress方法

    n.syncQueue = task.NewTaskQueue(n.syncIngress)

    syncIngress方法会进行配置文件的生成,调用nginx -s reload加载配置文件,有些不需要reload会调用lua接口实现动态生效。