Ingress Nginx Controller源码分析03-nginx配置如何更新
更新方法syncIngress
经过前面分析,当ingress发生变更时,会回调到syncIngress方法
// syncIngress collects all the pieces required to assemble the NGINX
// configuration file and passes the resulting data structures to the backend
// (OnUpdate) when a reload is deemed necessary.
func (n *NGINXController) syncIngress(interface{}) error {
n.syncRateLimiter.Accept()
if n.syncQueue.IsShuttingDown() {go
return nil
}
//step1
ings := n.store.ListIngresses()
//step2
hosts, servers, pcfg := n.getConfiguration(ings)
n.metricCollector.SetSSLExpireTime(servers)
if n.runningConfig.Equal(pcfg) {
klog.V(3).Infof("No configuration change detected, skipping backend reload")
return nil
}
n.metricCollector.SetHosts(hosts)
//step3
if !n.IsDynamicConfigurationEnough(pcfg) {
klog.InfoS("Configuration changes detected, backend reload required")
hash, _ := hashstructure.Hash(pcfg, &hashstructure.HashOptions{
TagName: "json",
})
pcfg.ConfigurationChecksum = fmt.Sprintf("%v", hash)
//step4
err := n.OnUpdate(*pcfg)
if err != nil {
n.metricCollector.IncReloadErrorCount()
n.metricCollector.ConfigSuccess(hash, false)
klog.Errorf("Unexpected failure reloading the backend:\n%v", err)
n.recorder.Eventf(k8s.IngressPodDetails, apiv1.EventTypeWarning, "RELOAD", fmt.Sprintf("Error reloading NGINX: %v", err))
return err
}
klog.InfoS("Backend successfully reloaded")
n.metricCollector.ConfigSuccess(hash, true)
n.metricCollector.IncReloadCount()
n.recorder.Eventf(k8s.IngressPodDetails, apiv1.EventTypeNormal, "RELOAD", "NGINX reload triggered due to a change in configuration")
}
isFirstSync := n.runningConfig.Equal(&ingress.Configuration{})
if isFirstSync {
// For the initial sync it always takes some time for NGINX to start listening
// For large configurations it might take a while so we loop and back off
klog.InfoS("Initial sync, sleeping for 1 second")
time.Sleep(1 * time.Second)
}
retry := wait.Backoff{
Steps: 15,
Duration: 1 * time.Second,
Factor: 0.8,
Jitter: 0.1,
}
err := wait.ExponentialBackoff(retry, func() (bool, error) {
//step5
err := n.configureDynamically(pcfg)
if err == nil {
klog.V(2).Infof("Dynamic reconfiguration succeeded.")
return true, nil
}
klog.Warningf("Dynamic reconfiguration failed: %v", err)
return false, err
})
if err != nil {
klog.Errorf("Unexpected failure reconfiguring NGINX:\n%v", err)
return err
}
ri := getRemovedIngresses(n.runningConfig, pcfg)
re := getRemovedHosts(n.runningConfig, pcfg)
n.metricCollector.RemoveMetrics(ri, re)
n.runningConfig = pcfg
return nil
}
主要逻辑
- Step1, 调用store.ListIngress,取到所有的ingress
- step2, getConfiguration,构造所有的host, server, cfg
- Step3, IsDynamicConfigurationEnough, 判断是不是只需要动态更新
- 如果不能动态更新,则进行step4, OnUpdate进行nginx的reload
- 如果可以进行动态更新,进入step5,configureDynamically进行配置的动态更新
主要涉及三个点
- 什么情况下需要动态更新,什么情况下需要reload
- 如何进行reaload
- 如何进行动态更新
动态更新的判断标准
// IsDynamicConfigurationEnough returns whether a Configuration can be
// dynamically applied, without reloading the backend.
func (n *NGINXController) IsDynamicConfigurationEnough(pcfg *ingress.Configuration) bool {
copyOfRunningConfig := *n.runningConfig
copyOfPcfg := *pcfg
copyOfRunningConfig.Backends = []*ingress.Backend{}
copyOfPcfg.Backends = []*ingress.Backend{}
clearL4serviceEndpoints(©OfRunningConfig)
clearL4serviceEndpoints(©OfPcfg)
clearCertificates(©OfRunningConfig)
clearCertificates(©OfPcfg)
return copyOfRunningConfig.Equal(©OfPcfg)
}
IsDynamicConfigurationEnough的逻辑
-
取到当前正在运行的runningConfig
-
处理当前的和目标的数据(L4serviceEndpoints和Certificates)主要是清理掉TCP和UDP的Endpoint中的Service 以及Certificates
// Helper function to clear endpoints from the ingress configuration since they should be ignored when // checking if the new configuration changes can be applied dynamically. func clearL4serviceEndpoints(config *ingress.Configuration) { var clearedTCPL4Services []ingress.L4Service var clearedUDPL4Services []ingress.L4Service for _, service := range config.TCPEndpoints { copyofService := ingress.L4Service{ Port: service.Port, Backend: service.Backend, Endpoints: []ingress.Endpoint{}, Service: nil, } clearedTCPL4Services = append(clearedTCPL4Services, copyofService) } for _, service := range config.UDPEndpoints { copyofService := ingress.L4Service{ Port: service.Port, Backend: service.Backend, Endpoints: []ingress.Endpoint{}, Service: nil, } clearedUDPL4Services = append(clearedUDPL4Services, copyofService) } config.TCPEndpoints = clearedTCPL4Services config.UDPEndpoints = clearedUDPL4Services }
注意其中的
Endpoints: []ingress.Endpoint{}
Service: nil
// Helper function to clear Certificates from the ingress configuration since they should be ignored when // checking if the new configuration changes can be applied dynamically if dynamic certificates is on func clearCertificates(config *ingress.Configuration) { var clearedServers []*ingress.Server for _, server := range config.Servers { copyOfServer := *server copyOfServer.SSLCert = nil clearedServers = append(clearedServers, ©OfServer) } config.Servers = clearedServers }
copyOfServer.SSLCert = nil
-
调用Equals进行比较
// Equal tests for equality between two Configuration types
func (c1 *Configuration) Equal(c2 *Configuration) bool {
if c1 == c2 {
return true
}
if c1 == nil || c2 == nil {
return false
}
if !c1.DefaultSSLCertificate.Equal(c2.DefaultSSLCertificate) {
return false
}
match := compareBackends(c1.Backends, c2.Backends)
if !match {
return false
}
if len(c1.Servers) != len(c2.Servers) {
return false
}
// Servers are sorted
for idx, c1s := range c1.Servers {
if !c1s.Equal(c2.Servers[idx]) {
return false
}
}
match = compareL4Service(c1.TCPEndpoints, c2.TCPEndpoints)
if !match {
return false
}
match = compareL4Service(c1.UDPEndpoints, c2.UDPEndpoints)
if !match {
return false
}
if len(c1.PassthroughBackends) != len(c2.PassthroughBackends) {
return false
}
for _, ptb1 := range c1.PassthroughBackends {
found := false
for _, ptb2 := range c2.PassthroughBackends {
if ptb1.Equal(ptb2) {
found = true
break
}
}
if !found {
return false
}
}
if c1.BackendConfigChecksum != c2.BackendConfigChecksum {
return false
}
return true
}
不能动态更新的条件
Backend, Servers, L4Service, PassthroughBackends,BackendConfigChecksum等发生变更时,都不能进行动态更新。执行OnUpdate进行nginx reload
那剩下的,如backend中的endpoint、upstream、configureCertificates发生变更,则返回true,可以进行动态更新。
如何reload
// OnUpdate is called by the synchronization loop whenever configuration
// changes were detected. The received backend Configuration is merged with the
// configuration ConfigMap before generating the final configuration file.
// Returns nil in case the backend was successfully reloaded.
func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
cfg := n.store.GetBackendConfiguration()
cfg.Resolver = n.resolver
//step1
content, err := n.generateTemplate(cfg, ingressCfg)
if err != nil {
return err
}
err = createOpentracingCfg(cfg)
if err != nil {
return err
}
//step2
err = n.testTemplate(content)
if err != nil {
return err
}
if klog.V(2).Enabled() {
src, _ := os.ReadFile(cfgPath)
if !bytes.Equal(src, content) {
tmpfile, err := os.CreateTemp("", "new-nginx-cfg")
if err != nil {
return err
}
defer tmpfile.Close()
err = os.WriteFile(tmpfile.Name(), content, file.ReadWriteByUser)
if err != nil {
return err
}
diffOutput, err := exec.Command("diff", "-I", "'# Configuration.*'", "-u", cfgPath, tmpfile.Name()).CombinedOutput()
if err != nil {
if exitError, ok := err.(*exec.ExitError); ok {
ws := exitError.Sys().(syscall.WaitStatus)
if ws.ExitStatus() == 2 {
klog.Warningf("Failed to executing diff command: %v", err)
}
}
}
klog.InfoS("NGINX configuration change", "diff", string(diffOutput))
// we do not defer the deletion of temp files in order
// to keep them around for inspection in case of error
os.Remove(tmpfile.Name())
}
}
//step3
err = os.WriteFile(cfgPath, content, file.ReadWriteByUser)
if err != nil {
return err
}
//step4
o, err := n.command.ExecCommand("-s", "reload").CombinedOutput()
if err != nil {
return fmt.Errorf("%v\n%v", err, string(o))
}
return nil
}
主逻辑
- step1, generateTemplate,根据模板生成配置文件类内容
- step2, 验证配置文件
- step3, 写入配置文件
- step4, 执行nginx reload
如何动态更新
// configureDynamically encodes new Backends in JSON format and POSTs the
// payload to an internal HTTP endpoint handled by Lua.
func (n *NGINXController) configureDynamically(pcfg *ingress.Configuration) error {
backendsChanged := !reflect.DeepEqual(n.runningConfig.Backends, pcfg.Backends)
if backendsChanged {
err := configureBackends(pcfg.Backends)
if err != nil {
return err
}
}
streamConfigurationChanged := !reflect.DeepEqual(n.runningConfig.TCPEndpoints, pcfg.TCPEndpoints) || !reflect.DeepEqual(n.runningConfig.UDPEndpoints, pcfg.UDPEndpoints)
if streamConfigurationChanged {
err := updateStreamConfiguration(pcfg.TCPEndpoints, pcfg.UDPEndpoints)
if err != nil {
return err
}
}
serversChanged := !reflect.DeepEqual(n.runningConfig.Servers, pcfg.Servers)
if serversChanged {
err := configureCertificates(pcfg.Servers)
if err != nil {
return err
}
}
return nil
}
以configureBackends举例
func configureBackends(rawBackends []*ingress.Backend) error {
backends := make([]*ingress.Backend, len(rawBackends))
for i, backend := range rawBackends {
var service *apiv1.Service
if backend.Service != nil {
service = &apiv1.Service{Spec: backend.Service.Spec}
}
luaBackend := &ingress.Backend{
Name: backend.Name,
Port: backend.Port,
SSLPassthrough: backend.SSLPassthrough,
SessionAffinity: backend.SessionAffinity,
UpstreamHashBy: backend.UpstreamHashBy,
LoadBalancing: backend.LoadBalancing,
Service: service,
NoServer: backend.NoServer,
TrafficShapingPolicy: backend.TrafficShapingPolicy,
AlternativeBackends: backend.AlternativeBackends,
}
var endpoints []ingress.Endpoint
for _, endpoint := range backend.Endpoints {
endpoints = append(endpoints, ingress.Endpoint{
Address: endpoint.Address,
Port: endpoint.Port,
})
}
luaBackend.Endpoints = endpoints
backends[i] = luaBackend
}
statusCode, _, err := nginx.NewPostStatusRequest("/configuration/backends", "application/json", backends)
if err != nil {
return err
}
if statusCode != http.StatusCreated {
return fmt.Errorf("unexpected error code: %d", statusCode)
}
return nil
}
构造出backends的json数据,调用nginx的接口/configuration/backends, 进行backend的更新