Kubectl依赖于cobra包构建命令行支持,该包是支持通用的命令行构建库。

Kubectl流程分析

大致工作流程为:

  • 用户发起请求
  • 根据用户执行的动作,分发给处理对应动作的Cmd
  • 解析用户命令
  • 向ApiServer获取数据
  • 整理返回为通用的数据集合
  • 找到解释查询数据的句柄
  • 使用句柄对整理出的数据集合进行打印输出

kubectl describe node node1为例:

func main()

kubernetes/cmd/kubectl/kubectl.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func main() {
rand.Seed(time.Now().UTC().UnixNano())

command := cmd.NewDefaultKubectlCommand()

// TODO: once we switch everything over to Cobra commands, we can go back to calling
// utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
// normalize func and add the go flag set by hand.
pflag.CommandLine.SetNormalizeFunc(utilflag.WordSepNormalizeFunc)
pflag.CommandLine.AddGoFlagSet(goflag.CommandLine)
// utilflag.InitFlags()
logs.InitLogs()
defer logs.FlushLogs()

if err := command.Execute(); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
}

其中,NewDefaultKubectlCommand()向cobra注册了kubectl的命令。

func NewDefaultKubectlCommand()

kubernetes/pkg/kubectl/cmd/cmd.go

1
2
3
func NewDefaultKubectlCommand() *cobra.Command {
return NewKubectlCommand(os.Stdin, os.Stdout, os.Stderr)
}

func NewKubectlCommand(in io.Reader, out, err io.Writer)

kubernetes/pkg/kubectl/cmd/cmd.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
// NewKubectlCommand creates the `kubectl` command and its nested children.
func NewKubectlCommand(in io.Reader, out, err io.Writer) *cobra.Command {
// Parent command to which all subcommands are added.
cmds := &cobra.Command{
Use: "kubectl",
Short: i18n.T("kubectl controls the Kubernetes cluster manager"),
Long: templates.LongDesc(`
kubectl controls the Kubernetes cluster manager.

Find more information at:
https://kubernetes.io/docs/reference/kubectl/overview/`),
Run: runHelp,
BashCompletionFunction: bashCompletionFunc,
}

flags := cmds.PersistentFlags()
flags.SetNormalizeFunc(utilflag.WarnWordSepNormalizeFunc) // Warn for "_" flags

// Normalize all flags that are coming from other packages or pre-configurations
// a.k.a. change all "_" to "-". e.g. glog package
flags.SetNormalizeFunc(utilflag.WordSepNormalizeFunc)

kubeConfigFlags := genericclioptions.NewConfigFlags()
kubeConfigFlags.AddFlags(flags)
matchVersionKubeConfigFlags := cmdutil.NewMatchVersionFlags(kubeConfigFlags)
matchVersionKubeConfigFlags.AddFlags(cmds.PersistentFlags())

cmds.PersistentFlags().AddGoFlagSet(flag.CommandLine)

f := cmdutil.NewFactory(matchVersionKubeConfigFlags)

// Sending in 'nil' for the getLanguageFn() results in using
// the LANG environment variable.
//
// TODO: Consider adding a flag or file preference for setting
// the language, instead of just loading from the LANG env. variable.
i18n.LoadTranslations("kubectl", nil)

// From this point and forward we get warnings on flags that contain "_" separators
cmds.SetGlobalNormalizationFunc(utilflag.WarnWordSepNormalizeFunc)

ioStreams := genericclioptions.IOStreams{In: in, Out: out, ErrOut: err}

groups := templates.CommandGroups{
{
Message: "Basic Commands (Beginner):",
Commands: []*cobra.Command{
create.NewCmdCreate(f, ioStreams),
NewCmdExposeService(f, ioStreams),
NewCmdRun(f, ioStreams),
set.NewCmdSet(f, ioStreams),
deprecatedAlias("run-container", NewCmdRun(f, ioStreams)),
},
},
{
Message: "Basic Commands (Intermediate):",
Commands: []*cobra.Command{
NewCmdExplain("kubectl", f, ioStreams),
get.NewCmdGet("kubectl", f, ioStreams),
NewCmdEdit(f, ioStreams),
NewCmdDelete(f, ioStreams),
},
},
{
Message: "Deploy Commands:",
Commands: []*cobra.Command{
rollout.NewCmdRollout(f, ioStreams),
NewCmdRollingUpdate(f, ioStreams),
NewCmdScale(f, ioStreams),
NewCmdAutoscale(f, ioStreams),
},
},
{
Message: "Cluster Management Commands:",
Commands: []*cobra.Command{
NewCmdCertificate(f, ioStreams),
NewCmdClusterInfo(f, ioStreams),
NewCmdTop(f, ioStreams),
NewCmdCordon(f, ioStreams),
NewCmdUncordon(f, ioStreams),
NewCmdDrain(f, ioStreams),
NewCmdTaint(f, ioStreams),
},
},
{
Message: "Troubleshooting and Debugging Commands:",
Commands: []*cobra.Command{
NewCmdDescribe("kubectl", f, ioStreams), // 此处注册了describe的cmd
NewCmdLogs(f, ioStreams),
NewCmdAttach(f, ioStreams),
NewCmdExec(f, ioStreams),
NewCmdPortForward(f, ioStreams),
NewCmdProxy(f, ioStreams),
NewCmdCp(f, ioStreams),
auth.NewCmdAuth(f, ioStreams),
},
},
{
Message: "Advanced Commands:",
Commands: []*cobra.Command{
NewCmdApply("kubectl", f, ioStreams),
NewCmdPatch(f, ioStreams),
NewCmdReplace(f, ioStreams),
wait.NewCmdWait(f, ioStreams),
NewCmdConvert(f, ioStreams),
},
},
{
Message: "Settings Commands:",
Commands: []*cobra.Command{
NewCmdLabel(f, ioStreams),
NewCmdAnnotate("kubectl", f, ioStreams),
NewCmdCompletion(ioStreams.Out, ""),
},
},
}
groups.Add(cmds)

filters := []string{"options"}

// Hide the "alpha" subcommand if there are no alpha commands in this build.
alpha := NewCmdAlpha(f, ioStreams)
if !alpha.HasSubCommands() {
filters = append(filters, alpha.Name())
}

templates.ActsAsRootCommand(cmds, filters, groups...)

for name, completion := range bash_completion_flags {
if cmds.Flag(name) != nil {
if cmds.Flag(name).Annotations == nil {
cmds.Flag(name).Annotations = map[string][]string{}
}
cmds.Flag(name).Annotations[cobra.BashCompCustom] = append(
cmds.Flag(name).Annotations[cobra.BashCompCustom],
completion,
)
}
}

cmds.AddCommand(alpha)
cmds.AddCommand(cmdconfig.NewCmdConfig(f, clientcmd.NewDefaultPathOptions(), ioStreams))
cmds.AddCommand(NewCmdPlugin(f, ioStreams))
cmds.AddCommand(NewCmdVersion(f, ioStreams))
cmds.AddCommand(NewCmdApiVersions(f, ioStreams))
cmds.AddCommand(NewCmdApiResources(f, ioStreams))
cmds.AddCommand(NewCmdOptions(ioStreams.Out))

return cmds
}

注册所有的cmd命令

[## func NewCmdDescribe(parent string, f cmdutil.Factory, streams genericclioptions.IOStreams)
kubernetes/pkg/kubectl/cmd/describe.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func NewCmdDescribe(parent string, f cmdutil.Factory, streams genericclioptions.IOStreams) *cobra.Command {
o := &DescribeOptions{
FilenameOptions: &resource.FilenameOptions{},
DescriberSettings: &printers.DescriberSettings{
ShowEvents: true,
},

CmdParent: parent,

IOStreams: streams,
}

cmd := &cobra.Command{
Use: "describe (-f FILENAME | TYPE [NAME_PREFIX | -l label] | TYPE/NAME)",
DisableFlagsInUseLine: true,
Short: i18n.T("Show details of a specific resource or group of resources"),
Long: describeLong + "\n\n" + cmdutil.SuggestApiResources(parent),
Example: describeExample,
Run: func(cmd *cobra.Command, args []string) { // 处理回调函数
cmdutil.CheckErr(o.Complete(f, cmd, args))
cmdutil.CheckErr(o.Run())
},
}
usage := "containing the resource to describe"
cmdutil.AddFilenameOptionFlags(cmd, o.FilenameOptions, usage)
cmd.Flags().StringVarP(&o.Selector, "selector", "l", o.Selector, "Selector (label query) to filter on, supports '=', '==', and '!='.(e.g. -l key1=value1,key2=value2)")
cmd.Flags().BoolVar(&o.AllNamespaces, "all-namespaces", o.AllNamespaces, "If present, list the requested object(s) across all namespaces. Namespace in current context is ignored even if specified with --namespace.")
cmd.Flags().BoolVar(&o.DescriberSettings.ShowEvents, "show-events", o.DescriberSettings.ShowEvents, "If true, display events related to the described object.")
cmdutil.AddIncludeUninitializedFlag(cmd)
return cmd
}

func (o *DescribeOptions) Run()

kubernetes/pkg/kubectl/cmd/describe.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
func (o *DescribeOptions) Run() error {
r := o.NewBuilder().
Unstructured().
ContinueOnError().
NamespaceParam(o.Namespace).DefaultNamespace().AllNamespaces(o.AllNamespaces).
FilenameParam(o.EnforceNamespace, o.FilenameOptions).
LabelSelectorParam(o.Selector). // 设置用户的标签选择
IncludeUninitialized(o.IncludeUninitialized).
ResourceTypeOrNameArgs(true, o.BuilderArgs...). // 提取用户选择操作的对象类型
Flatten(). // 决定以何种方式从Kubernetes的返回数据中提取数据
Do() // 执行命令获取数据
err := r.Err()
if err != nil {
return err
}

allErrs := []error{}
infos, err := r.Infos()
if err != nil {
if apierrors.IsNotFound(err) && len(o.BuilderArgs) == 2 {
return o.DescribeMatchingResources(err, o.BuilderArgs[0], o.BuilderArgs[1])
}
allErrs = append(allErrs, err)
}

errs := sets.NewString()
first := true
for _, info := range infos {
mapping := info.ResourceMapping()
describer, err := o.Describer(mapping)
if err != nil {
if errs.Has(err.Error()) {
continue
}
allErrs = append(allErrs, err)
errs.Insert(err.Error())
continue
}
// 通过describe方法将提取到的数据打印出来
s, err := describer.Describe(info.Namespace, info.Name, *o.DescriberSettings)
if err != nil {
if errs.Has(err.Error()) {
continue
}
allErrs = append(allErrs, err)
errs.Insert(err.Error())
continue
}
if first {
first = false
fmt.Fprint(o.Out, s)
} else {
fmt.Fprintf(o.Out, "\n\n%s", s)
}
}

return utilerrors.NewAggregate(allErrs)
}

  • o.NewBuilder().Unstructured().ContinueOnError().NamespaceParam()....Flatten()该链式调用是为执行命令做准备,然后通过Do()是注册具体向ApiServer请求数据,并将返回数据转化为通用结构。
  • describer.Describe()函数是将提取出的返回函数打印出来做可视化。

func (b *Builder) Do()

kubernetes/pkg/kubectl/genericclioptions/resource/builder.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// Do returns a Result object with a Visitor for the resources identified by the Builder.
// The visitor will respect the error behavior specified by ContinueOnError. Note that stream
// inputs are consumed by the first execution - use Infos() or Object() on the Result to capture a list
// for further iteration.
func (b *Builder) Do() *Result {
r := b.visitorResult()
r.mapper = b.Mapper()
if r.err != nil {
return r
}
if b.flatten {
r.visitor = NewFlattenListVisitor(r.visitor, b.objectTyper, b.mapper)
}
helpers := []VisitorFunc{}
// 注册获取数据前的动作
if b.defaultNamespace {
helpers = append(helpers, SetNamespace(b.namespace))
}
if b.requireNamespace {
helpers = append(helpers, RequireNamespace(b.namespace))
}
helpers = append(helpers, FilterNamespace)
if b.requireObject {
// 注册从ApiServer获取数据的方法
helpers = append(helpers, RetrieveLazy)
}
// 注册从返回数据中提取数据的方法
r.visitor = NewDecoratedVisitor(r.visitor, helpers...)
if b.continueOnError {
r.visitor = ContinueOnErrorVisitor{r.visitor}
}
return r
}

func RetrieveLazy(info *Info, err error)

kubernetes/pkg/kubectl/genericclioptions/resource/vistor.go

1
2
3
4
5
6
7
8
9
10
// RetrieveLazy updates the object if it has not been loaded yet.
func RetrieveLazy(info *Info, err error) error {
if err != nil {
return err
}
if info.Object == nil {
return info.Get() // 从ApiServer获取数据
}
return nil
}

func NewDecoratedVisitor(v Visitor, fn ...VisitorFunc)

kubernetes/pkg/kubectl/genericclioptions/resource/vistor.go

1
2
3
4
5
6
7
8
9
// NewDecoratedVisitor will create a visitor that invokes the provided visitor functions before
// the user supplied visitor function is invoked, giving them the opportunity to mutate the Info
// object or terminate early with an error.
func NewDecoratedVisitor(v Visitor, fn ...VisitorFunc) Visitor {
if len(fn) == 0 {
return v
}
return DecoratedVisitor{v, fn}
}

func (r *Result) Infos()

kubernetes/pkg/kubectl/genericclioptions/resource/result.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Infos returns an array of all of the resource infos retrieved via traversal.
// Will attempt to traverse the entire set of visitors only once, and will return
// a cached list on subsequent calls.
func (r *Result) Infos() ([]*Info, error) {
if r.err != nil {
return nil, r.err
}
if r.info != nil {
return r.info, nil
}

infos := []*Info{}
err := r.visitor.Visit(func(info *Info, err error) error {
if err != nil {
return err
}
infos = append(infos, info)
return nil
})
err = utilerrors.FilterOut(err, r.ignoreErrors...)

r.info, r.err = infos, err
return infos, err
}

func (v DecoratedVisitor) Visit(fn VisitorFunc)

kubernetes/pkg/kubectl/genericclioptions/resource/visitor.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Visit implements Visitor
func (v DecoratedVisitor) Visit(fn VisitorFunc) error {
return v.visitor.Visit(func(info *Info, err error) error {
if err != nil {
return err
}
for i := range v.decorators {
if err := v.decorators[i](info, nil); err != nil {
return err
}
}
return fn(info, nil)
})
}

Visitor可以使用户可以将来自ApiServer的数据转化为通用数据集合。