kubelet
没有特别指出的情况下,该系列的代码分析均基于Kubernetes v1.11.7版本

kubelet代码采用了Cobra命令行框架,核心代码如下:

func main()

kubernetes/cmd/kubelet/kubelet.go

1
2
3
4
5
6
7
8
9
10
11
12
func main() {
rand.Seed(time.Now().UTC().UnixNano())
// 初始化命令行
command := app.NewKubeletCommand(server.SetupSignalHandler())
logs.InitLogs()
defer logs.FlushLogs()
// 执行
if err := command.Execute(); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
}

这里为kubelet启动的入口,其中app.NewKubeletCommand(server.SetupSignalHandler())是这个函数主要执行的过程,包括kubelet启动参数的验证、kubelet对象的生成以及kubelet维护pod所需要用到的服务。

func NewKubeletCommand(stopCh <-chan struct{})

kubernetes/cmd/kubelet/app/server.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// NewKubeletCommand creates a *cobra.Command object with default parameters
func NewKubeletCommand(stopCh <-chan struct{}) *cobra.Command {
// 生成kubelet命令行的flag信息
cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError)
cleanFlagSet.SetNormalizeFunc(flag.WordSepNormalizeFunc)
// 设置默认的KubeletFlags的值,包括docker、证书路径、插件目录、CIDR等信息
kubeletFlags := options.NewKubeletFlags()
// 生成kubelet默认的配置文件
kubeletConfig, err := options.NewKubeletConfiguration()
// programmer error
if err != nil {
glog.Fatal(err)
}
// 封装命令行工具
cmd := &cobra.Command{
Use: componentKubelet,
Long: `The kubelet is the primary "node agent" that runs on each
node. The kubelet works in terms of a PodSpec. A PodSpec is a YAML or JSON object
that describes a pod. The kubelet takes a set of PodSpecs that are provided through
various mechanisms (primarily through the apiserver) and ensures that the containers
described in those PodSpecs are running and healthy. The kubelet doesn't manage
containers which were not created by Kubernetes.

Other than from an PodSpec from the apiserver, there are three ways that a container
manifest can be provided to the Kubelet.

File: Path passed as a flag on the command line. Files under this path will be monitored
periodically for updates. The monitoring period is 20s by default and is configurable
via a flag.

HTTP endpoint: HTTP endpoint passed as a parameter on the command line. This endpoint
is checked every 20 seconds (also configurable with a flag).

HTTP server: The kubelet can also listen for HTTP and respond to a simple API
(underspec'd currently) to submit a new manifest.`,
// The Kubelet has special flag parsing requirements to enforce flag precedence rules,
// so we do all our parsing manually in Run, below.
// DisableFlagParsing=true provides the full set of flags passed to the kubelet in the
// `args` arg to Run, without Cobra's interference.
DisableFlagParsing: true,
Run: func(cmd *cobra.Command, args []string) {
……
},
}

// keep cleanFlagSet separate, so Cobra doesn't pollute it with the global flags
// 为kubelet命令行工具结合所有的flags
kubeletFlags.AddFlags(cleanFlagSet) // kubelet flag
options.AddKubeletConfigFlags(cleanFlagSet, kubeletConfig) // kubelet config
options.AddGlobalFlags(cleanFlagSet) // global: glog/cadvisor/credential……
cleanFlagSet.BoolP("help", "h", false, fmt.Sprintf("help for %s", cmd.Name())) // help

// ugly, but necessary, because Cobra's default UsageFunc and HelpFunc pollute the flagset with global flags
const usageFmt = "Usage:\n %s\n\nFlags:\n%s"
cmd.SetUsageFunc(func(cmd *cobra.Command) error {
fmt.Fprintf(cmd.OutOrStderr(), usageFmt, cmd.UseLine(), cleanFlagSet.FlagUsagesWrapped(2))
return nil
})
cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {
fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine(), cleanFlagSet.FlagUsagesWrapped(2))
})

return cmd
}

通过对各种特定参数的解析,最终生成kubeletFlagskubeletConfig两个重要的参数对象,用来构造kubeletServer和其他需求。

func (c *Command) Execute()

kubernetes/vendor/github.com/spf13/cobra/command.go

1
2
3
4
5
6
7
// Execute uses the args (os.Args[1:] by default)
// and run through the command tree finding appropriate matches
// for commands and then corresponding flags.
func (c *Command) Execute() error {
_, err := c.ExecuteC()
return err
}

完成上面的初始化过程后,从此函数启动kubelet,包括读取解析args等过程,然后执行Command.Run()

Run: func(cmd *cobra.Command, args []string)

kubernetes/cmd/kubelet/app/server.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
Run: func(cmd *cobra.Command, args []string) {
// 解析参数,判断是否合法
// initial flag parse, since we disable cobra's flag parsing
if err := cleanFlagSet.Parse(args); err != nil {
cmd.Usage()
glog.Fatal(err)
}

// check if there are non-flag arguments in the command line
cmds := cleanFlagSet.Args()
if len(cmds) > 0 {
cmd.Usage()
glog.Fatalf("unknown command: %s", cmds[0])
}
// 是否为调用kubelet help,默认为false
// short-circuit on help
help, err := cleanFlagSet.GetBool("help")
if err != nil {
glog.Fatal(`"help" flag is non-bool, programmer error, please correct`)
}
if help {
cmd.Help()
return
}
// 是否为调用kubelet version,默认为VersionFalse
// short-circuit on verflag
verflag.PrintAndExitIfRequested()
utilflag.PrintFlags(cleanFlagSet)

// set feature gates from initial flags-based config
if err := utilfeature.DefaultFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
glog.Fatal(err)
}
// 验证参数是否合法:dynamic config、cadvisor port……
// validate the initial KubeletFlags
if err := options.ValidateKubeletFlags(kubeletFlags); err != nil {
glog.Fatal(err)
}

// 指定默认的pod运行所需要的镜像
if kubeletFlags.ContainerRuntime == "remote" && cleanFlagSet.Changed("pod-infra-container-image") {
glog.Warning("Warning: For remote container runtime, --pod-infra-container-image is ignored in kubelet, which should be set in that remote runtime instead")
}

// load kubelet config file, if provided
if configFile := kubeletFlags.KubeletConfigFile; len(configFile) > 0 {
kubeletConfig, err = loadConfigFile(configFile)
if err != nil {
glog.Fatal(err)
}
// We must enforce flag precedence by re-parsing the command line into the new object.
// This is necessary to preserve backwards-compatibility across binary upgrades.
// See issue #56171 for more details.
if err := kubeletConfigFlagPrecedence(kubeletConfig, args); err != nil {
glog.Fatal(err)
}
// update feature gates based on new config
if err := utilfeature.DefaultFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
glog.Fatal(err)
}
}

// We always validate the local configuration (command line + config file).
// This is the default "last-known-good" config for dynamic config, and must always remain valid.
if err := kubeletconfigvalidation.ValidateKubeletConfiguration(kubeletConfig); err != nil {
glog.Fatal(err)
}
// 是否使用动态配置
// use dynamic kubelet config, if enabled
var kubeletConfigController *dynamickubeletconfig.Controller
if dynamicConfigDir := kubeletFlags.DynamicConfigDir.Value(); len(dynamicConfigDir) > 0 {
var dynamicKubeletConfig *kubeletconfiginternal.KubeletConfiguration
dynamicKubeletConfig, kubeletConfigController, err = BootstrapKubeletConfigController(dynamicConfigDir,
func(kc *kubeletconfiginternal.KubeletConfiguration) error {
// Here, we enforce flag precedence inside the controller, prior to the controller's validation sequence,
// so that we get a complete validation at the same point where we can decide to reject dynamic config.
// This fixes the flag-precedence component of issue #63305.
// See issue #56171 for general details on flag precedence.
return kubeletConfigFlagPrecedence(kc, args)
})
if err != nil {
glog.Fatal(err)
}
// If we should just use our existing, local config, the controller will return a nil config
if dynamicKubeletConfig != nil {
kubeletConfig = dynamicKubeletConfig
// Note: flag precedence was already enforced in the controller, prior to validation,
// by our above transform function. Now we simply update feature gates from the new config.
if err := utilfeature.DefaultFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
glog.Fatal(err)
}
}
}

// construct a KubeletServer from kubeletFlags and kubeletConfig
kubeletServer := &options.KubeletServer{
KubeletFlags: *kubeletFlags,
KubeletConfiguration: *kubeletConfig,
}
// 获取tlsOptions、mountPath、dockerConfig、volumePlugin等
// use kubeletServer to construct the default KubeletDeps
kubeletDeps, err := UnsecuredDependencies(kubeletServer)
if err != nil {
glog.Fatal(err)
}

// add the kubelet config controller to kubeletDeps
kubeletDeps.KubeletConfigController = kubeletConfigController

// start the experimental docker shim, if enabled
if kubeletServer.KubeletFlags.ExperimentalDockershim {
if err := RunDockershim(&kubeletServer.KubeletFlags, kubeletConfig, stopCh); err != nil {
glog.Fatal(err)
}
return
}

// run the kubelet
glog.V(5).Infof("KubeletConfiguration: %#v", kubeletServer.KubeletConfiguration)
if err := Run(kubeletServer, kubeletDeps, stopCh); err != nil {
glog.Fatal(err)
}
}

此过程主要是对参数的处理,以生成kubelet server的配置及依赖,然后通过Run(kubeletServer, kubeletDeps, stopCh)启动kubelet。

func Run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{})

kubernetes/cmd/kubelet/app/server.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Run runs the specified KubeletServer with the given Dependencies. This should never exit.
// The kubeDeps argument may be nil - if so, it is initialized from the settings on KubeletServer.
// Otherwise, the caller is assumed to have set up the Dependencies object and a default one will
// not be generated.
func Run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{}) error {
// To help debugging, immediately log version
glog.Infof("Version: %+v", version.Get())
if err := initForOS(s.KubeletFlags.WindowsService); err != nil {
return fmt.Errorf("failed OS init: %v", err)
}
if err := run(s, kubeDeps, stopCh); err != nil {
return fmt.Errorf("failed to run Kubelet: %v", err)
}
return nil
}

func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{})

kubernetes/cmd/kubelet/app/server.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{}) (err error) {
// Set global feature gates based on the value on the initial KubeletServer
err = utilfeature.DefaultFeatureGate.SetFromMap(s.KubeletConfiguration.FeatureGates)
if err != nil {
return err
}
// 验证kubelet config及flags
// validate the initial KubeletServer (we set feature gates first, because this validation depends on feature gates)
if err := options.ValidateKubeletServer(s); err != nil {
return err
}

// Obtain Kubelet Lock File
if s.ExitOnLockContention && s.LockFilePath == "" {
return errors.New("cannot exit on lock file contention: no lock file specified")
}
done := make(chan struct{})
if s.LockFilePath != "" {
glog.Infof("acquiring file lock on %q", s.LockFilePath)
if err := flock.Acquire(s.LockFilePath); err != nil {
return fmt.Errorf("unable to acquire file lock on %q: %v", s.LockFilePath, err)
}
if s.ExitOnLockContention {
glog.Infof("watching for inotify events for: %v", s.LockFilePath)
if err := watchForLockfileContention(s.LockFilePath, done); err != nil {
return err
}
}
}

// Register current configuration with /configz endpoint
err = initConfigz(&s.KubeletConfiguration)
if err != nil {
glog.Errorf("unable to register KubeletConfiguration with configz, error: %v", err)
}

// About to get clients and such, detect standaloneMode
standaloneMode := true
if len(s.KubeConfig) > 0 {
standaloneMode = false
}

// 加载依赖服务相关,生成dockerClientConfig、mounter OOMAdjuster、ProbeVolumePlugins、GetDynamicPulginProber、tlsOptions等
if kubeDeps == nil {
kubeDeps, err = UnsecuredDependencies(s)
if err != nil {
return err
}
}
// 是否使用cloud provider
if kubeDeps.Cloud == nil {
if !cloudprovider.IsExternal(s.CloudProvider) {
cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
if err != nil {
return err
}
if cloud == nil {
glog.V(2).Infof("No cloud provider specified: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)
} else {
glog.V(2).Infof("Successfully initialized cloud provider: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)
}
kubeDeps.Cloud = cloud
}
}

nodeName, err := getNodeName(kubeDeps.Cloud, nodeutil.GetHostname(s.HostnameOverride))
if err != nil {
return err
}

// 验证kubeconfig,如果kubeconfig为空,则向kube-apiserver申请生成证书
if s.BootstrapKubeconfig != "" {
if err := bootstrap.LoadClientCert(s.KubeConfig, s.BootstrapKubeconfig, s.CertDirectory, nodeName); err != nil {
return err
}
}

// if in standalone mode, indicate as much by setting all clients to nil
if standaloneMode {
kubeDeps.KubeClient = nil
kubeDeps.ExternalKubeClient = nil
kubeDeps.EventClient = nil
kubeDeps.HeartbeatClient = nil
glog.Warningf("standalone mode, no API client")
} else if kubeDeps.KubeClient == nil || kubeDeps.ExternalKubeClient == nil || kubeDeps.EventClient == nil || kubeDeps.HeartbeatClient == nil {
// initialize clients if not standalone mode and any of the clients are not provided
var kubeClient clientset.Interface
var eventClient v1core.EventsGetter
var heartbeatClient v1core.CoreV1Interface
var externalKubeClient clientset.Interface
// 生成api server客户端配置
clientConfig, err := createAPIServerClientConfig(s)
if err != nil {
return fmt.Errorf("invalid kubeconfig: %v", err)
}

// 启动证书管理客户端,是否自动续期
var clientCertificateManager certificate.Manager
if s.RotateCertificates && utilfeature.DefaultFeatureGate.Enabled(features.RotateKubeletClientCertificate) {
clientCertificateManager, err = kubeletcertificate.NewKubeletClientCertificateManager(s.CertDirectory, nodeName, clientConfig.CertData, clientConfig.KeyData, clientConfig.CertFile, clientConfig.KeyFile)
if err != nil {
return err
}
}
// we set exitAfter to five minutes because we use this client configuration to request new certs - if we are unable
// to request new certs, we will be unable to continue normal operation. Exiting the process allows a wrapper
// or the bootstrapping credentials to potentially lay down new initial config.
closeAllConns, err := kubeletcertificate.UpdateTransport(wait.NeverStop, clientConfig, clientCertificateManager, 5*time.Minute)
if err != nil {
return err
}
// 获取kubeClient客户端
kubeClient, err = clientset.NewForConfig(clientConfig)
if err != nil {
glog.Warningf("New kubeClient from clientConfig error: %v", err)
} else if kubeClient.CertificatesV1beta1() != nil && clientCertificateManager != nil {
glog.V(2).Info("Starting client certificate rotation.")
clientCertificateManager.SetCertificateSigningRequestClient(kubeClient.CertificatesV1beta1().CertificateSigningRequests())
clientCertificateManager.Start()
}
externalKubeClient, err = clientset.NewForConfig(clientConfig)
if err != nil {
glog.Warningf("New kubeClient from clientConfig error: %v", err)
}

// make a separate client for events
// 获取事件客户端
eventClientConfig := *clientConfig
eventClientConfig.QPS = float32(s.EventRecordQPS)
eventClientConfig.Burst = int(s.EventBurst)
eventClient, err = v1core.NewForConfig(&eventClientConfig)
if err != nil {
glog.Warningf("Failed to create API Server client for Events: %v", err)
}

// make a separate client for heartbeat with throttling disabled and a timeout attached
// 获取心跳客户端
heartbeatClientConfig := *clientConfig
heartbeatClientConfig.Timeout = s.KubeletConfiguration.NodeStatusUpdateFrequency.Duration
heartbeatClientConfig.QPS = float32(-1)
heartbeatClient, err = v1core.NewForConfig(&heartbeatClientConfig)
if err != nil {
glog.Warningf("Failed to create API Server client for heartbeat: %v", err)
}

kubeDeps.KubeClient = kubeClient
kubeDeps.ExternalKubeClient = externalKubeClient
if heartbeatClient != nil {
kubeDeps.HeartbeatClient = heartbeatClient
kubeDeps.OnHeartbeatFailure = closeAllConns
}
if eventClient != nil {
kubeDeps.EventClient = eventClient
}
}

// If the kubelet config controller is available, and dynamic config is enabled, start the config and status sync loops
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) && len(s.DynamicConfigDir.Value()) > 0 &&
kubeDeps.KubeletConfigController != nil && !standaloneMode && !s.RunOnce {
if err := kubeDeps.KubeletConfigController.StartSync(kubeDeps.KubeClient, kubeDeps.EventClient, string(nodeName)); err != nil {
return err
}
}
// kubelet 认证授权服务端
if kubeDeps.Auth == nil {
auth, err := BuildAuth(nodeName, kubeDeps.ExternalKubeClient, s.KubeletConfiguration)
if err != nil {
return err
}
kubeDeps.Auth = auth
}

// 启动cAdvisor客户端
if kubeDeps.CAdvisorInterface == nil {
imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.ContainerRuntime, s.RemoteRuntimeEndpoint)
kubeDeps.CAdvisorInterface, err = cadvisor.New(s.Address, uint(s.CAdvisorPort), imageFsInfoProvider, s.RootDirectory, cadvisor.UsingLegacyCadvisorStats(s.ContainerRuntime, s.RemoteRuntimeEndpoint))
if err != nil {
return err
}
}

// Setup event recorder if required.
makeEventRecorder(kubeDeps, nodeName)

// 启动容器管理服务:cgroups、mount、qos……
if kubeDeps.ContainerManager == nil {
if s.CgroupsPerQOS && s.CgroupRoot == "" {
glog.Infof("--cgroups-per-qos enabled, but --cgroup-root was not specified. defaulting to /")
s.CgroupRoot = "/"
}
kubeReserved, err := parseResourceList(s.KubeReserved)
if err != nil {
return err
}
systemReserved, err := parseResourceList(s.SystemReserved)
if err != nil {
return err
}
var hardEvictionThresholds []evictionapi.Threshold
// If the user requested to ignore eviction thresholds, then do not set valid values for hardEvictionThresholds here.
if !s.ExperimentalNodeAllocatableIgnoreEvictionThreshold {
hardEvictionThresholds, err = eviction.ParseThresholdConfig([]string{}, s.EvictionHard, nil, nil, nil)
if err != nil {
return err
}
}
experimentalQOSReserved, err := cm.ParseQOSReserved(s.QOSReserved)
if err != nil {
return err
}

devicePluginEnabled := utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins)

kubeDeps.ContainerManager, err = cm.NewContainerManager(
kubeDeps.Mounter,
kubeDeps.CAdvisorInterface,
cm.NodeConfig{
RuntimeCgroupsName: s.RuntimeCgroups,
SystemCgroupsName: s.SystemCgroups,
KubeletCgroupsName: s.KubeletCgroups,
ContainerRuntime: s.ContainerRuntime,
CgroupsPerQOS: s.CgroupsPerQOS,
CgroupRoot: s.CgroupRoot,
CgroupDriver: s.CgroupDriver,
KubeletRootDir: s.RootDirectory,
ProtectKernelDefaults: s.ProtectKernelDefaults,
NodeAllocatableConfig: cm.NodeAllocatableConfig{
KubeReservedCgroupName: s.KubeReservedCgroup,
SystemReservedCgroupName: s.SystemReservedCgroup,
EnforceNodeAllocatable: sets.NewString(s.EnforceNodeAllocatable...),
KubeReserved: kubeReserved,
SystemReserved: systemReserved,
HardEvictionThresholds: hardEvictionThresholds,
},
QOSReserved: *experimentalQOSReserved,
ExperimentalCPUManagerPolicy: s.CPUManagerPolicy,
ExperimentalCPUManagerReconcilePeriod: s.CPUManagerReconcilePeriod.Duration,
ExperimentalPodPidsLimit: s.PodPidsLimit,
EnforceCPULimits: s.CPUCFSQuota,
},
s.FailSwapOn,
devicePluginEnabled,
kubeDeps.Recorder)

if err != nil {
return err
}
}

if err := checkPermissions(); err != nil {
glog.Error(err)
}

utilruntime.ReallyCrash = s.ReallyCrashForTesting

rand.Seed(time.Now().UTC().UnixNano())

// TODO(vmarmol): Do this through container config.
oomAdjuster := kubeDeps.OOMAdjuster
if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
glog.Warning(err)
}

if err := RunKubelet(&s.KubeletFlags, &s.KubeletConfiguration, kubeDeps, s.RunOnce); err != nil {
return err
}

// 开启健康检查端口
if s.HealthzPort > 0 {
healthz.DefaultHealthz()
go wait.Until(func() {
err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), nil)
if err != nil {
glog.Errorf("Starting health server failed: %v", err)
}
}, 5*time.Second, wait.NeverStop)
}

if s.RunOnce {
return nil
}

// If systemd is used, notify it that we have started
go daemon.SdNotify(false, "READY=1")

select {
case <-done:
break
case <-stopCh:
break
}

return nil
}

func RunKubelet(kubeFlags *options.KubeletFlags, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, runOnce bool)

kubernetes/cmd/kubelet/app/server.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
// RunKubelet is responsible for setting up and running a kubelet.  It is used in three different applications:
// 1 Integration tests
// 2 Kubelet binary
// 3 Standalone 'kubernetes' binary
// Eventually, #2 will be replaced with instances of #3
func RunKubelet(kubeFlags *options.KubeletFlags, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, runOnce bool) error {
hostname := nodeutil.GetHostname(kubeFlags.HostnameOverride)
// Query the cloud provider for our node name, default to hostname if kubeDeps.Cloud == nil
nodeName, err := getNodeName(kubeDeps.Cloud, hostname)
if err != nil {
return err
}
// Setup event recorder if required.
makeEventRecorder(kubeDeps, nodeName)

// TODO(mtaufen): I moved the validation of these fields here, from UnsecuredKubeletConfig,
// so that I could remove the associated fields from KubeletConfiginternal. I would
// prefer this to be done as part of an independent validation step on the
// KubeletConfiguration. But as far as I can tell, we don't have an explicit
// place for validation of the KubeletConfiguration yet.
hostNetworkSources, err := kubetypes.GetValidatedSources(kubeFlags.HostNetworkSources)
if err != nil {
return err
}

hostPIDSources, err := kubetypes.GetValidatedSources(kubeFlags.HostPIDSources)
if err != nil {
return err
}

hostIPCSources, err := kubetypes.GetValidatedSources(kubeFlags.HostIPCSources)
if err != nil {
return err
}

privilegedSources := capabilities.PrivilegedSources{
HostNetworkSources: hostNetworkSources,
HostPIDSources: hostPIDSources,
HostIPCSources: hostIPCSources,
}
capabilities.Setup(kubeFlags.AllowPrivileged, privilegedSources, 0)

credentialprovider.SetPreferredDockercfgPath(kubeFlags.RootDirectory)
glog.V(2).Infof("Using root directory: %v", kubeFlags.RootDirectory)

if kubeDeps.OSInterface == nil {
kubeDeps.OSInterface = kubecontainer.RealOS{}
}

k, err := CreateAndInitKubelet(kubeCfg,
kubeDeps,
&kubeFlags.ContainerRuntimeOptions,
kubeFlags.ContainerRuntime,
kubeFlags.RuntimeCgroups,
kubeFlags.HostnameOverride,
kubeFlags.NodeIP,
kubeFlags.ProviderID,
kubeFlags.CloudProvider,
kubeFlags.CertDirectory,
kubeFlags.RootDirectory,
kubeFlags.RegisterNode,
kubeFlags.RegisterWithTaints,
kubeFlags.AllowedUnsafeSysctls,
kubeFlags.RemoteRuntimeEndpoint,
kubeFlags.RemoteImageEndpoint,
kubeFlags.ExperimentalMounterPath,
kubeFlags.ExperimentalKernelMemcgNotification,
kubeFlags.ExperimentalCheckNodeCapabilitiesBeforeMount,
kubeFlags.ExperimentalNodeAllocatableIgnoreEvictionThreshold,
kubeFlags.MinimumGCAge,
kubeFlags.MaxPerPodContainerCount,
kubeFlags.MaxContainerCount,
kubeFlags.MasterServiceNamespace,
kubeFlags.RegisterSchedulable,
kubeFlags.NonMasqueradeCIDR,
kubeFlags.KeepTerminatedPodVolumes,
kubeFlags.NodeLabels,
kubeFlags.SeccompProfileRoot,
kubeFlags.BootstrapCheckpointPath,
kubeFlags.NodeStatusMaxImages)
if err != nil {
return fmt.Errorf("failed to create kubelet: %v", err)
}

// NewMainKubelet should have set up a pod source config if one didn't exist
// when the builder was run. This is just a precaution.
if kubeDeps.PodConfig == nil {
return fmt.Errorf("failed to create kubelet, pod source config was nil")
}
podCfg := kubeDeps.PodConfig

rlimit.RlimitNumFiles(uint64(kubeCfg.MaxOpenFiles))

// process pods and exit.
if runOnce {
if _, err := k.RunOnce(podCfg.Updates()); err != nil {
return fmt.Errorf("runonce failed: %v", err)
}
glog.Infof("Started kubelet as runonce")
} else {
startKubelet(k, podCfg, kubeCfg, kubeDeps, kubeFlags.EnableServer)
glog.Infof("Started kubelet")
}
return nil
}

func CreateAndInitKubelet(……)

kubernetes/cmd/kubelet/app/server.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
func CreateAndInitKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
kubeDeps *kubelet.Dependencies,
crOptions *config.ContainerRuntimeOptions,
containerRuntime string,
runtimeCgroups string,
hostnameOverride string,
nodeIP string,
providerID string,
cloudProvider string,
certDirectory string,
rootDirectory string,
registerNode bool,
registerWithTaints []api.Taint,
allowedUnsafeSysctls []string,
remoteRuntimeEndpoint string,
remoteImageEndpoint string,
experimentalMounterPath string,
experimentalKernelMemcgNotification bool,
experimentalCheckNodeCapabilitiesBeforeMount bool,
experimentalNodeAllocatableIgnoreEvictionThreshold bool,
minimumGCAge metav1.Duration,
maxPerPodContainerCount int32,
maxContainerCount int32,
masterServiceNamespace string,
registerSchedulable bool,
nonMasqueradeCIDR string,
keepTerminatedPodVolumes bool,
nodeLabels map[string]string,
seccompProfileRoot string,
bootstrapCheckpointPath string,
nodeStatusMaxImages int32) (k kubelet.Bootstrap, err error) {
// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
// up into "per source" synchronizations

k, err = kubelet.NewMainKubelet(kubeCfg,
kubeDeps,
crOptions,
containerRuntime,
runtimeCgroups,
hostnameOverride,
nodeIP,
providerID,
cloudProvider,
certDirectory,
rootDirectory,
registerNode,
registerWithTaints,
allowedUnsafeSysctls,
remoteRuntimeEndpoint,
remoteImageEndpoint,
experimentalMounterPath,
experimentalKernelMemcgNotification,
experimentalCheckNodeCapabilitiesBeforeMount,
experimentalNodeAllocatableIgnoreEvictionThreshold,
minimumGCAge,
maxPerPodContainerCount,
maxContainerCount,
masterServiceNamespace,
registerSchedulable,
nonMasqueradeCIDR,
keepTerminatedPodVolumes,
nodeLabels,
seccompProfileRoot,
bootstrapCheckpointPath,
nodeStatusMaxImages)
if err != nil {
return nil, err
}

// 宣告出生
k.BirthCry()
// 启动垃圾回收服务
k.StartGarbageCollection()

return k, nil
}

创建kubelet对象,具体过程kubelet.NewMainKubelet()详见下一篇分析。

func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool)

kubernetes/cmd/kubelet/app/server.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
wg := sync.WaitGroup{}

// start the kubelet
wg.Add(1)
go wait.Until(func() {
wg.Done()
k.Run(podCfg.Updates())
}, 0, wait.NeverStop)

// start the kubelet server
if enableServer {
wg.Add(1)
go wait.Until(func() {
wg.Done()
k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)
}, 0, wait.NeverStop)
}
if kubeCfg.ReadOnlyPort > 0 {
wg.Add(1)
go wait.Until(func() {
wg.Done()
k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
}, 0, wait.NeverStop)
}
wg.Wait()
}

运行kubelet及server,具体过程k.Run(podCfg.Updates())见下一篇分析。