Ingress nginx Controller源码分析01-主体结构
主要结构图
主要逻辑
- 构建NginxController
- Store对Informer进行封装,用来监听ingress, ingressClass, Endpoint, Service, Secret, ConfigMap, Namespace等的变化,当发生变化时,写入到 updateCh
- NginxController的Start方法最后,开启对updateCh的处理,如有数据,则写入syncQueue
- Queue启动worker, 如queue中有数据,则回调syncIngress进行处理
- syncIngress方法中,判断是否可以动态更新 (IsDynamicConfigurationEnough),还是需要nginx进行Reload(OnUpdate方法中)
- generateTemplate方法,根据nginx的配置文件的模板文件,生成对应的配置文件。
入口函数
cmd/nginx/main.go
func main() {
klog.InitFlags(nil)
rand.Seed(time.Now().UnixNano())
fmt.Println(version.String())
showVersion, conf, err := parseFlags()
if showVersion {
os.Exit(0)
}
if err != nil {
klog.Fatal(err)
}
err = file.CreateRequiredDirectories()
if err != nil {
klog.Fatal(err)
}
kubeClient, err := createApiserverClient(conf.APIServerHost, conf.RootCAFile, conf.KubeConfigFile)
if err != nil {
handleFatalInitError(err)
}
if len(conf.DefaultService) > 0 {
err := checkService(conf.DefaultService, kubeClient)
if err != nil {
klog.Fatal(err)
}
klog.InfoS("Valid default backend", "service", conf.DefaultService)
}
if len(conf.PublishService) > 0 {
err := checkService(conf.PublishService, kubeClient)
if err != nil {
klog.Fatal(err)
}
}
if conf.Namespace != "" {
_, err = kubeClient.CoreV1().Namespaces().Get(context.TODO(), conf.Namespace, metav1.GetOptions{})
if err != nil {
klog.Fatalf("No namespace with name %v found: %v", conf.Namespace, err)
}
}
conf.FakeCertificate = ssl.GetFakeSSLCert()
klog.InfoS("SSL fake certificate created", "file", conf.FakeCertificate.PemFileName)
if !k8s.NetworkingIngressAvailable(kubeClient) {
klog.Fatalf("ingress-nginx requires Kubernetes v1.19.0 or higher")
}
_, err = kubeClient.NetworkingV1().IngressClasses().List(context.TODO(), metav1.ListOptions{})
if err != nil {
if !errors.IsNotFound(err) {
if errors.IsForbidden(err) {
klog.Warningf("No permissions to list and get Ingress Classes: %v, IngressClass feature will be disabled", err)
conf.IngressClassConfiguration.IgnoreIngressClass = true
}
}
}
conf.Client = kubeClient
err = k8s.GetIngressPod(kubeClient)
if err != nil {
klog.Fatalf("Unexpected error obtaining ingress-nginx pod: %v", err)
}
reg := prometheus.NewRegistry()
reg.MustRegister(prometheus.NewGoCollector())
reg.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{
PidFn: func() (int, error) { return os.Getpid(), nil },
ReportErrors: true,
}))
mc := metric.NewDummyCollector()
if conf.EnableMetrics {
mc, err = metric.NewCollector(conf.MetricsPerHost, reg, conf.IngressClassConfiguration.Controller)
if err != nil {
klog.Fatalf("Error creating prometheus collector: %v", err)
}
}
// Pass the ValidationWebhook status to determine if we need to start the collector
// for the admissionWebhook
mc.Start(conf.ValidationWebhook)
if conf.EnableProfiling {
go registerProfiler()
}
ngx := controller.NewNGINXController(conf, mc)
mux := http.NewServeMux()
registerHealthz(nginx.HealthPath, ngx, mux)
registerMetrics(reg, mux)
go startHTTPServer(conf.HealthCheckHost, conf.ListenPorts.Health, mux)
go ngx.Start()
handleSigterm(ngx, func(code int) {
os.Exit(code)
})
}
主要逻辑
-
Step1 初始化配置,获取kubeClient
kubeClient, err := createApiserverClient(conf.APIServerHost, conf.RootCAFile, conf.KubeConfigFile)
-
Step2 检查DefaultService、PublicService、Namespace等存在
-
step3 检查IngressClasses权限
-
step4 检查ingress controller pod存在
-
Step5 创建NGINXController, 开启健康检查,metrics
ngx := controller.NewNGINXController(conf, mc)
方法NewNGINXController
//internal/ingress/controller/nginx.go
// NewNGINXController creates a new NGINX Ingress controller.
func NewNGINXController(config *Configuration, mc metric.Collector) *NGINXController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{
Interface: config.Client.CoreV1().Events(config.Namespace),
})
h, err := dns.GetSystemNameServers()
if err != nil {
klog.Warningf("Error reading system nameservers: %v", err)
}
n := &NGINXController{
isIPV6Enabled: ing_net.IsIPv6Enabled(),
resolver: h,
cfg: config,
syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(config.SyncRateLimit, 1),
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{
Component: "nginx-ingress-controller",
}),
stopCh: make(chan struct{}),
updateCh: channels.NewRingChannel(1024),
ngxErrCh: make(chan error),
stopLock: &sync.Mutex{},
runningConfig: new(ingress.Configuration),
Proxy: &TCPProxy{},
metricCollector: mc,
command: NewNginxCommand(),
}
if n.cfg.ValidationWebhook != "" {
n.validationWebhookServer = &http.Server{
Addr: config.ValidationWebhook,
Handler: adm_controller.NewAdmissionControllerServer(&adm_controller.IngressAdmission{Checker: n}),
TLSConfig: ssl.NewTLSListener(n.cfg.ValidationWebhookCertPath, n.cfg.ValidationWebhookKeyPath).TLSConfig(),
// disable http/2
// https://github.com/kubernetes/kubernetes/issues/80313
// https://github.com/kubernetes/ingress-nginx/issues/6323#issuecomment-737239159
TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)),
}
}
n.store = store.New(
config.Namespace,
config.WatchNamespaceSelector,
config.ConfigMapName,
config.TCPConfigMapName,
config.UDPConfigMapName,
config.DefaultSSLCertificate,
config.ResyncPeriod,
config.Client,
n.updateCh,
config.DisableCatchAll,
config.IngressClassConfiguration)
n.syncQueue = task.NewTaskQueue(n.syncIngress)
if config.UpdateStatus {
n.syncStatus = status.NewStatusSyncer(status.Config{
Client: config.Client,
PublishService: config.PublishService,
PublishStatusAddress: config.PublishStatusAddress,
IngressLister: n.store,
UpdateStatusOnShutdown: config.UpdateStatusOnShutdown,
UseNodeInternalIP: config.UseNodeInternalIP,
})
} else {
klog.Warning("Update of Ingress status is disabled (flag --update-status)")
}
onTemplateChange := func() {
template, err := ngx_template.NewTemplate(nginx.TemplatePath)
if err != nil {
// this error is different from the rest because it must be clear why nginx is not working
klog.ErrorS(err, "Error loading new template")
return
}
n.t = template
klog.InfoS("New NGINX configuration template loaded")
n.syncQueue.EnqueueTask(task.GetDummyObject("template-change"))
}
ngxTpl, err := ngx_template.NewTemplate(nginx.TemplatePath)
if err != nil {
klog.Fatalf("Invalid NGINX configuration template: %v", err)
}
n.t = ngxTpl
_, err = watch.NewFileWatcher(nginx.TemplatePath, onTemplateChange)
if err != nil {
klog.Fatalf("Error creating file watcher for %v: %v", nginx.TemplatePath, err)
}
filesToWatch := []string{}
err = filepath.Walk("/etc/nginx/geoip/", func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
filesToWatch = append(filesToWatch, path)
return nil
})
if err != nil {
klog.Fatalf("Error creating file watchers: %v", err)
}
for _, f := range filesToWatch {
_, err = watch.NewFileWatcher(f, func() {
klog.InfoS("File changed detected. Reloading NGINX", "path", f)
n.syncQueue.EnqueueTask(task.GetDummyObject("file-change"))
})
if err != nil {
klog.Fatalf("Error creating file watcher for %v: %v", f, err)
}
}
return n
}
主要逻辑:
-
实例化NGINXController
-
创建validationWebhookServer
-
创建store缓存
n.store = store.New
-
监听模板文件以及/etc/nginx/geoip下的文件变化,如有变化,发送对应的文件变化事件
-
当ingress有变化时,执行syncIngress方法回调
n.syncQueue = task.NewTaskQueue(n.syncIngress)
上面的逻辑已经创建好了主体的结构。下面我们看下当ingress发生改变时,如何回调执行到syncIngress方法。
-
监听ingress的变更
store.go中沟通各种资源的变更监听,以ingress为例
//internal/ingress/controller/store/store.go ingEventHandler := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { ing, _ := toIngress(obj) if !watchedNamespace(ing.Namespace) { return } ic, err := store.GetIngressClass(ing, icConfig) if err != nil { klog.InfoS("Ignoring ingress because of error while validating ingress class", "ingress", klog.KObj(ing), "error", err) return } klog.InfoS("Found valid IngressClass", "ingress", klog.KObj(ing), "ingressclass", ic) if hasCatchAllIngressRule(ing.Spec) && disableCatchAll { klog.InfoS("Ignoring add for catch-all ingress because of --disable-catch-all", "ingress", klog.KObj(ing)) return } recorder.Eventf(ing, corev1.EventTypeNormal, "Sync", "Scheduled for sync") store.syncIngress(ing) store.updateSecretIngressMap(ing) store.syncSecrets(ing) updateCh.In() <- Event{ Type: CreateEvent, Obj: obj, } }, DeleteFunc: ingDeleteHandler, UpdateFunc: func(old, cur interface{}) { oldIng, _ := toIngress(old) curIng, _ := toIngress(cur) if !watchedNamespace(oldIng.Namespace) { return } var errOld, errCur error var classCur string if !icConfig.IgnoreIngressClass { _, errOld = store.GetIngressClass(oldIng, icConfig) classCur, errCur = store.GetIngressClass(curIng, icConfig) } if errOld != nil && errCur == nil { if hasCatchAllIngressRule(curIng.Spec) && disableCatchAll { klog.InfoS("ignoring update for catch-all ingress because of --disable-catch-all", "ingress", klog.KObj(curIng)) return } klog.InfoS("creating ingress", "ingress", klog.KObj(curIng), "ingressclass", classCur) recorder.Eventf(curIng, corev1.EventTypeNormal, "Sync", "Scheduled for sync") } else if errOld == nil && errCur != nil { klog.InfoS("removing ingress because of unknown ingressclass", "ingress", klog.KObj(curIng)) ingDeleteHandler(old) return } else if errCur == nil && !reflect.DeepEqual(old, cur) { if hasCatchAllIngressRule(curIng.Spec) && disableCatchAll { klog.InfoS("ignoring update for catch-all ingress and delete old one because of --disable-catch-all", "ingress", klog.KObj(curIng)) ingDeleteHandler(old) return } recorder.Eventf(curIng, corev1.EventTypeNormal, "Sync", "Scheduled for sync") } else { klog.V(3).InfoS("No changes on ingress. Skipping update", "ingress", klog.KObj(curIng)) return } store.syncIngress(curIng) store.updateSecretIngressMap(curIng) store.syncSecrets(curIng) updateCh.In() <- Event{ Type: UpdateEvent, Obj: cur, } }, }
internal/ingress/controller/store/store.go
New方法构造各种资源的处理EventHandler
Ingress举例:ResourceEventHandlerFuncs
处理Ingress的Add、Delete、Update
store.informers.Ingress.AddEventHandler(ingEventHandler) if !icConfig.IgnoreIngressClass { store.informers.IngressClass.AddEventHandler(ingressClassEventHandler) } store.informers.Endpoint.AddEventHandler(epEventHandler) store.informers.Secret.AddEventHandler(secrEventHandler) store.informers.ConfigMap.AddEventHandler(cmEventHandler) store.informers.Service.AddEventHandler(serviceHandler)
Ingress、IngressClass、Endpoint、Secret、ConfigMap、Service注册到k8s的sdk Informers中
当发生变更时,向updateCh中写入数据。
万能的死循环
Start方法,开启万能的死循环,处理updateCh中的各种Event
//internal/ingress/controller/nginx.go for { select { case err := <-n.ngxErrCh: if n.isShuttingDown { return } // if the nginx master process dies, the workers continue to process requests // until the failure of the configured livenessProbe and restart of the pod. if process.IsRespawnIfRequired(err) { return } case event := <-n.updateCh.Out(): if n.isShuttingDown { break } if evt, ok := event.(store.Event); ok { klog.V(3).InfoS("Event received", "type", evt.Type, "object", evt.Obj) if evt.Type == store.ConfigurationEvent { // TODO: is this necessary? Consider removing this special case n.syncQueue.EnqueueTask(task.GetDummyObject("configmap-change")) continue } n.syncQueue.EnqueueSkippableTask(evt.Obj) } else { klog.Warningf("Unexpected event type received %T", event) } case <-n.stopCh: return } }
updateCh中有数据时,放到syncQueue中
-
syncQueue中的数据处理
//internal/task/queue.go // worker processes work in the queue through sync. func (t *Queue) worker() { for { key, quit := t.queue.Get() if quit { if !isClosed(t.workerDone) { close(t.workerDone) } return } ts := time.Now().UnixNano() item := key.(Element) if item.Timestamp != 0 && t.lastSync > item.Timestamp { klog.V(3).InfoS("skipping sync", "key", item.Key, "last", t.lastSync, "now", item.Timestamp) t.queue.Forget(key) t.queue.Done(key) continue } klog.V(3).InfoS("syncing", "key", item.Key) if err := t.sync(key); err != nil { klog.ErrorS(err, "requeuing", "key", item.Key) t.queue.AddRateLimited(Element{ Key: item.Key, Timestamp: 0, }) } else { t.queue.Forget(key) t.lastSync = ts } t.queue.Done(key) } }
当syncQueue中有数据,回调syncIngress方法 t.sync(key)
该方法构造时被设置为NGINXController的syncIngress方法
n.syncQueue = task.NewTaskQueue(n.syncIngress)
syncIngress方法会进行配置文件的生成,调用nginx -s reload加载配置文件,有些不需要reload会调用lua接口实现动态生效。