K8S APISERVER源码: 服务启动

基于版本 1.6.7

启动流程

  • cmd/kube-apiserver/apiserver.go
func main() {
   app.Run(s)
}
  • cmd/kube-apiserver/app/server.go
func Run(s *options.ServerRunOptions) error {
    // 构建master配置信息
    config, sharedInformers, err := BuildMasterConfig(s)
    // 调用RunServer
    return RunServer(config, sharedInformers, wait.NeverStop)
}

func RunServer(config *master.Config, sharedInformers informers.SharedInformerFactory, stopCh <-chan struct{}) error {
    // 执行相关初始化
    m, err := config.Complete().New()     // => TO: Container初始化
    // 启动
    return m.GenericAPIServer.PrepareRun().Run(stopCh)  // => next
}
  • vendor/k8s.io/apiserver/pkg/server/genericapiserver.go

启动主体函数都在这个文件中, 绑定地址/端口号, 并最终启动

func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
    s.NonBlockingRun(stopCh)
}

func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error {
    s.serveSecurely(internalStopCh)
    // or
    s.serveInsecurely(internalStopCh) // => next
}
  • vendor/k8s.io/apiserver/pkg/server/serve.go
func (s *GenericAPIServer) serveInsecurely(stopCh <-chan struct{}) error {
    insecureServer := &http.Server{
        Addr:           s.InsecureServingInfo.BindAddress,
        Handler:        s.InsecureHandler,   // s.Hnalder for secure
        MaxHeaderBytes: 1 << 20,
    }
   runServer(insecureServer, s.InsecureServingInfo.BindNetwork, stopCh) // => next
}


func runServer(server *http.Server, network string, stopCh <-chan struct{}) (int, error) {
    go func() {
        for {
            var listener net.Listener
            listener = tcpKeepAliveListener{ln.(*net.TCPListener)}
            // *http.Server
            err := server.Serve(listener)
            }
    }()
}

Container初始化

  • cmd/kube-apiserver/app/server.go
func RunServer(config *master.Config, sharedInformers informers.SharedInformerFactory, stopCh <-chan struct{}) error {
    // 执行相关初始化
    m, err := config.Complete().New()     // => TO: Container初始化
    // 启动
    return m.GenericAPIServer.PrepareRun().Run(stopCh)  // => next
}
  • kubernetes/pkg/master/master.go
func (c completedConfig) New() (*Master, error) {
   // m.GenericAPIServer.HandlerContainer = APIContainer,   APIContainer.Container =  restful.NewContainer()
    s, err := c.Config.GenericConfig.SkipComplete().New() // completion is done in Complete, no need for a second time
   m := &Master{
        GenericAPIServer: s,
    }
}
  • vendor/k8s.io/apiserver/pkg/server/config.go

到这里, 完成了 s.Handler, s.InsecureHandler 的初始化

func (c completedConfig) New() (*GenericAPIServer, error) {
  s := &GenericAPIServer{
  }
  // s.HandlerContainer = APIContainer
    s.HandlerContainer = mux.NewAPIContainer(http.NewServeMux(), c.Serializer)  // => next 1

  // 生成 Handler
    s.Handler, s.InsecureHandler = c.BuildHandlerChainsFunc(s.HandlerContainer.ServeMux, c.Config)  // => next 2
}
  • 1: vendor/k8s.io/apiserver/pkg/server/mux/container.go

新建一个APIContainer, 包含

// NewAPIContainer constructs a new container for APIs
func NewAPIContainer(mux *http.ServeMux, s runtime.NegotiatedSerializer) *APIContainer {
    c := APIContainer{
        // 新建一个Container
        Container: restful.NewContainer(),
        NonSwaggerRoutes: PathRecorderMux{
            mux: mux,
        },
        UnlistedRoutes: mux,
    }
    // 配置 http.ServerMux
    c.Container.ServeMux = mux
    // 配置路由方式, 使用CurlyRouter
    c.Container.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*}
    return &c
}
  • 2: vendor/k8s.io/apiserver/pkg/server/config.go
type Config struct {
    BuildHandlerChainsFunc func(apiHandler http.Handler, c *Config) (secure, insecure http.Handler)
}


func NewConfig() *Config {
    return &Config{
            BuildHandlerChainsFunc:      DefaultBuildHandlerChain,
    }
}


func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) (secure, insecure http.Handler) {
    return generic(protect(apiHandler)), generic(audit(apiHandler)) // add filters to handler
}

注意, 这里传递的参数是: s.HandlerContainer.ServeMux, DefaultBuildHandlerChain的参数是apiHandler http.Handler, 前者包含后者interface定义的方法.

  • net/http/server.go
type Handler interface {
        ServeHTTP(ResponseWriter, *Request)
}

// ServeHTTP dispatches the request to the handler whose
// pattern most closely matches the request URL.
func (mux *ServeMux) ServeHTTP(w ResponseWriter, r *Request) {
    if r.RequestURI == "*" {
        if r.ProtoAtLeast(1, 1) {
            w.Header().Set("Connection", "close")
        }
        w.WriteHeader(StatusBadRequest)
        return
    }
    h, _ := mux.Handler(r)
    h.ServeHTTP(w, r)
}

初始化后, Hnalder 以及 InsecureHandler赋值Container, 然后在new Server前, 将handler放入

&http.Server{
        Addr:           s.InsecureServingInfo.BindAddress,
        Handler:        s.InsecureHandler,   // s.Hanlder for secure
        MaxHeaderBytes: 1 << 20,
}
发布于:2019-11-08 02:13:56