Ingress Nginx Controller源码分析03-nginx配置如何更新


更新方法syncIngress

经过前面分析,当ingress发生变更时,会回调到syncIngress方法

// syncIngress collects all the pieces required to assemble the NGINX
// configuration file and passes the resulting data structures to the backend
// (OnUpdate) when a reload is deemed necessary.
func (n *NGINXController) syncIngress(interface{}) error {
   n.syncRateLimiter.Accept()

   if n.syncQueue.IsShuttingDown() {go
      return nil
   }
   //step1
   ings := n.store.ListIngresses()
   //step2
   hosts, servers, pcfg := n.getConfiguration(ings)

   n.metricCollector.SetSSLExpireTime(servers)

   if n.runningConfig.Equal(pcfg) {
      klog.V(3).Infof("No configuration change detected, skipping backend reload")
      return nil
   }

   n.metricCollector.SetHosts(hosts)

   //step3
   if !n.IsDynamicConfigurationEnough(pcfg) {
      klog.InfoS("Configuration changes detected, backend reload required")

      hash, _ := hashstructure.Hash(pcfg, &hashstructure.HashOptions{
         TagName: "json",
      })

      pcfg.ConfigurationChecksum = fmt.Sprintf("%v", hash)

     //step4
      err := n.OnUpdate(*pcfg)
      if err != nil {
         n.metricCollector.IncReloadErrorCount()
         n.metricCollector.ConfigSuccess(hash, false)
         klog.Errorf("Unexpected failure reloading the backend:\n%v", err)
         n.recorder.Eventf(k8s.IngressPodDetails, apiv1.EventTypeWarning, "RELOAD", fmt.Sprintf("Error reloading NGINX: %v", err))
         return err
      }

      klog.InfoS("Backend successfully reloaded")
      n.metricCollector.ConfigSuccess(hash, true)
      n.metricCollector.IncReloadCount()

      n.recorder.Eventf(k8s.IngressPodDetails, apiv1.EventTypeNormal, "RELOAD", "NGINX reload triggered due to a change in configuration")
   }

   isFirstSync := n.runningConfig.Equal(&ingress.Configuration{})
   if isFirstSync {
      // For the initial sync it always takes some time for NGINX to start listening
      // For large configurations it might take a while so we loop and back off
      klog.InfoS("Initial sync, sleeping for 1 second")
      time.Sleep(1 * time.Second)
   }

   retry := wait.Backoff{
      Steps:    15,
      Duration: 1 * time.Second,
      Factor:   0.8,
      Jitter:   0.1,
   }

   err := wait.ExponentialBackoff(retry, func() (bool, error) {
     //step5
      err := n.configureDynamically(pcfg)
      if err == nil {
         klog.V(2).Infof("Dynamic reconfiguration succeeded.")
         return true, nil
      }

      klog.Warningf("Dynamic reconfiguration failed: %v", err)
      return false, err
   })
   if err != nil {
      klog.Errorf("Unexpected failure reconfiguring NGINX:\n%v", err)
      return err
   }

   ri := getRemovedIngresses(n.runningConfig, pcfg)
   re := getRemovedHosts(n.runningConfig, pcfg)
   n.metricCollector.RemoveMetrics(ri, re)

   n.runningConfig = pcfg

   return nil
}

主要逻辑

  1. Step1, 调用store.ListIngress,取到所有的ingress
  2. step2, getConfiguration,构造所有的host, server, cfg
  3. Step3, IsDynamicConfigurationEnough, 判断是不是只需要动态更新
  4. 如果不能动态更新,则进行step4, OnUpdate进行nginx的reload
  5. 如果可以进行动态更新,进入step5,configureDynamically进行配置的动态更新

主要涉及三个点

  1. 什么情况下需要动态更新,什么情况下需要reload
  2. 如何进行reaload
  3. 如何进行动态更新

动态更新的判断标准

// IsDynamicConfigurationEnough returns whether a Configuration can be
// dynamically applied, without reloading the backend.
func (n *NGINXController) IsDynamicConfigurationEnough(pcfg *ingress.Configuration) bool {
   copyOfRunningConfig := *n.runningConfig
   copyOfPcfg := *pcfg

   copyOfRunningConfig.Backends = []*ingress.Backend{}
   copyOfPcfg.Backends = []*ingress.Backend{}

   clearL4serviceEndpoints(©OfRunningConfig)
   clearL4serviceEndpoints(©OfPcfg)

   clearCertificates(©OfRunningConfig)
   clearCertificates(©OfPcfg)

   return copyOfRunningConfig.Equal(©OfPcfg)
}

IsDynamicConfigurationEnough的逻辑

  1. 取到当前正在运行的runningConfig

  2. 处理当前的和目标的数据(L4serviceEndpoints和Certificates)主要是清理掉TCP和UDP的Endpoint中的Service 以及Certificates

    // Helper function to clear endpoints from the ingress configuration since they should be ignored when
    // checking if the new configuration changes can be applied dynamically.
    func clearL4serviceEndpoints(config *ingress.Configuration) {
       var clearedTCPL4Services []ingress.L4Service
       var clearedUDPL4Services []ingress.L4Service
       for _, service := range config.TCPEndpoints {
          copyofService := ingress.L4Service{
             Port:      service.Port,
             Backend:   service.Backend,
             Endpoints: []ingress.Endpoint{},
             Service:   nil,
          }
          clearedTCPL4Services = append(clearedTCPL4Services, copyofService)
       }
       for _, service := range config.UDPEndpoints {
          copyofService := ingress.L4Service{
             Port:      service.Port,
             Backend:   service.Backend,
             Endpoints: []ingress.Endpoint{},
             Service:   nil,
          }
          clearedUDPL4Services = append(clearedUDPL4Services, copyofService)
       }
       config.TCPEndpoints = clearedTCPL4Services
       config.UDPEndpoints = clearedUDPL4Services
    }
    

    注意其中的

    Endpoints: []ingress.Endpoint{}

    Service: nil

    // Helper function to clear Certificates from the ingress configuration since they should be ignored when
    // checking if the new configuration changes can be applied dynamically if dynamic certificates is on
    func clearCertificates(config *ingress.Configuration) {
       var clearedServers []*ingress.Server
       for _, server := range config.Servers {
          copyOfServer := *server
          copyOfServer.SSLCert = nil
          clearedServers = append(clearedServers, ©OfServer)
       }
       config.Servers = clearedServers
    }
    

    copyOfServer.SSLCert = nil

  3. 调用Equals进行比较

// Equal tests for equality between two Configuration types
func (c1 *Configuration) Equal(c2 *Configuration) bool {
   if c1 == c2 {
      return true
   }
   if c1 == nil || c2 == nil {
      return false
   }

   if !c1.DefaultSSLCertificate.Equal(c2.DefaultSSLCertificate) {
      return false
   }

   match := compareBackends(c1.Backends, c2.Backends)
   if !match {
      return false
   }

   if len(c1.Servers) != len(c2.Servers) {
      return false
   }

   // Servers are sorted
   for idx, c1s := range c1.Servers {
      if !c1s.Equal(c2.Servers[idx]) {
         return false
      }
   }

   match = compareL4Service(c1.TCPEndpoints, c2.TCPEndpoints)
   if !match {
      return false
   }

   match = compareL4Service(c1.UDPEndpoints, c2.UDPEndpoints)
   if !match {
      return false
   }

   if len(c1.PassthroughBackends) != len(c2.PassthroughBackends) {
      return false
   }

   for _, ptb1 := range c1.PassthroughBackends {
      found := false
      for _, ptb2 := range c2.PassthroughBackends {
         if ptb1.Equal(ptb2) {
            found = true
            break
         }
      }
      if !found {
         return false
      }
   }

   if c1.BackendConfigChecksum != c2.BackendConfigChecksum {
      return false
   }

   return true
}

不能动态更新的条件

Backend, Servers, L4Service, PassthroughBackends,BackendConfigChecksum等发生变更时,都不能进行动态更新。执行OnUpdate进行nginx reload

那剩下的,如backend中的endpoint、upstream、configureCertificates发生变更,则返回true,可以进行动态更新。

如何reload

// OnUpdate is called by the synchronization loop whenever configuration
// changes were detected. The received backend Configuration is merged with the
// configuration ConfigMap before generating the final configuration file.
// Returns nil in case the backend was successfully reloaded.
func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
   cfg := n.store.GetBackendConfiguration()
   cfg.Resolver = n.resolver
	 //step1
   content, err := n.generateTemplate(cfg, ingressCfg)
   if err != nil {
      return err
   }

   err = createOpentracingCfg(cfg)
   if err != nil {
      return err
   }

  //step2
   err = n.testTemplate(content)
   if err != nil {
      return err
   }

   if klog.V(2).Enabled() {
      src, _ := os.ReadFile(cfgPath)
      if !bytes.Equal(src, content) {
         tmpfile, err := os.CreateTemp("", "new-nginx-cfg")
         if err != nil {
            return err
         }
         defer tmpfile.Close()
         err = os.WriteFile(tmpfile.Name(), content, file.ReadWriteByUser)
         if err != nil {
            return err
         }

         diffOutput, err := exec.Command("diff", "-I", "'# Configuration.*'", "-u", cfgPath, tmpfile.Name()).CombinedOutput()
         if err != nil {
            if exitError, ok := err.(*exec.ExitError); ok {
               ws := exitError.Sys().(syscall.WaitStatus)
               if ws.ExitStatus() == 2 {
                  klog.Warningf("Failed to executing diff command: %v", err)
               }
            }
         }

         klog.InfoS("NGINX configuration change", "diff", string(diffOutput))

         // we do not defer the deletion of temp files in order
         // to keep them around for inspection in case of error
         os.Remove(tmpfile.Name())
      }
   }

  //step3 
   err = os.WriteFile(cfgPath, content, file.ReadWriteByUser)
   if err != nil {
      return err
   }

  //step4
   o, err := n.command.ExecCommand("-s", "reload").CombinedOutput()
   if err != nil {
      return fmt.Errorf("%v\n%v", err, string(o))
   }

   return nil
}

主逻辑

  1. step1, generateTemplate,根据模板生成配置文件类内容
  2. step2, 验证配置文件
  3. step3, 写入配置文件
  4. step4, 执行nginx reload

如何动态更新

// configureDynamically encodes new Backends in JSON format and POSTs the
// payload to an internal HTTP endpoint handled by Lua.
func (n *NGINXController) configureDynamically(pcfg *ingress.Configuration) error {
   backendsChanged := !reflect.DeepEqual(n.runningConfig.Backends, pcfg.Backends)
   if backendsChanged {
      err := configureBackends(pcfg.Backends)
      if err != nil {
         return err
      }
   }

   streamConfigurationChanged := !reflect.DeepEqual(n.runningConfig.TCPEndpoints, pcfg.TCPEndpoints) || !reflect.DeepEqual(n.runningConfig.UDPEndpoints, pcfg.UDPEndpoints)
   if streamConfigurationChanged {
      err := updateStreamConfiguration(pcfg.TCPEndpoints, pcfg.UDPEndpoints)
      if err != nil {
         return err
      }
   }

   serversChanged := !reflect.DeepEqual(n.runningConfig.Servers, pcfg.Servers)
   if serversChanged {
      err := configureCertificates(pcfg.Servers)
      if err != nil {
         return err
      }
   }

   return nil
}

以configureBackends举例

func configureBackends(rawBackends []*ingress.Backend) error {
   backends := make([]*ingress.Backend, len(rawBackends))

   for i, backend := range rawBackends {
      var service *apiv1.Service
      if backend.Service != nil {
         service = &apiv1.Service{Spec: backend.Service.Spec}
      }
      luaBackend := &ingress.Backend{
         Name:                 backend.Name,
         Port:                 backend.Port,
         SSLPassthrough:       backend.SSLPassthrough,
         SessionAffinity:      backend.SessionAffinity,
         UpstreamHashBy:       backend.UpstreamHashBy,
         LoadBalancing:        backend.LoadBalancing,
         Service:              service,
         NoServer:             backend.NoServer,
         TrafficShapingPolicy: backend.TrafficShapingPolicy,
         AlternativeBackends:  backend.AlternativeBackends,
      }

      var endpoints []ingress.Endpoint
      for _, endpoint := range backend.Endpoints {
         endpoints = append(endpoints, ingress.Endpoint{
            Address: endpoint.Address,
            Port:    endpoint.Port,
         })
      }

      luaBackend.Endpoints = endpoints
      backends[i] = luaBackend
   }

   statusCode, _, err := nginx.NewPostStatusRequest("/configuration/backends", "application/json", backends)
   if err != nil {
      return err
   }

   if statusCode != http.StatusCreated {
      return fmt.Errorf("unexpected error code: %d", statusCode)
   }

   return nil
}

构造出backends的json数据,调用nginx的接口/configuration/backends, 进行backend的更新

相关