简介
argo是一个工作流的调度引擎,支持 Steps 和 DAG 这两种工作流。
- Steps: 是按照步骤,从前往后的工作流调度方案。工作流中的每一步都只依赖上一步的结果
- DAG: 全称是 directed acyclic graph,译为有向无环图。与 Steps 的区别在于每一步可能依赖之前的多步输出,但是不会循环依赖(也就是无环)
不论是在什么类型的工作流上,argo都抽象出了两种输入输出:
- parameters: 通常情况下都是字符串,该字符串可以来源于标准输出,也可以来源于文件的内容
- artifacts: 可以理解成文件
输入输出是连接整个工作流的核心。每一步都可以看作是一次函数调用。那么在argo中,它是如何实现在多步之间输入输出的传输呢?下面会通过源代码进行分析。
在看代码之前,可以看一个 argo 的工作流中的一个pod,为了查看更方便,我删除一些不需要关注的字段:
$ kubectl -n workflow describe pods custom-workflow-111-2fw2f-2639432629
Name: custom-workflow-111-2fw2f-2639432629
Namespace: workflow
Labels: pipeline.starx.com/nodeID=743
workflows.argoproj.io/completed=true
workflows.argoproj.io/workflow=custom-workflow-111-2fw2f
Annotations: cni.projectcalico.org/podIP: 10.42.0.83/32
workflows.argoproj.io/node-name: custom-workflow-111-2fw2f.yolov3-evaluate-743
workflows.argoproj.io/outputs:
{"result":...
workflows.argoproj.io/template:
{"name":"yolov3-evaluate-743","inputs":{"parameters":[{"name":"userParam","value":"eyJTY29yZVRocmVzaG9sZCI6MC41LCJJb3VfVGhyZXNob2xkIjowLjQ...
Controlled By: Workflow/custom-workflow-111-2fw2f
Init Containers:
init:
Image: argoproj/argoexec:v2.3.0
Command:
argoexec
init
Environment:
ARGO_POD_NAME: custom-workflow-111-2fw2f-2639432629 (v1:metadata.name)
Mounts:
/argo/inputs/artifacts from input-artifacts (rw)
/argo/podmetadata from podmetadata (rw)
/argo/staging from argo-staging (rw)
/var/run/secrets/kubernetes.io/serviceaccount from default-token-lfk5b (ro)
Containers:
wait:
Image: argoproj/argoexec:v2.3.0
Command:
argoexec
wait
Environment:
ARGO_POD_NAME: custom-workflow-111-2fw2f-2639432629 (v1:metadata.name)
Mounts:
/argo/podmetadata from podmetadata (rw)
/mainctrfs/argo/staging from argo-staging (rw)
/mainctrfs/tmp/artifacts/artifact-input0 from input-artifacts (rw,path="artifact0")
/mainctrfs/tmp/artifacts/artifact-input1 from input-artifacts (rw,path="artifact1")
/var/run/docker.sock from docker-sock (ro)
/var/run/secrets/kubernetes.io/serviceaccount from default-token-lfk5b (ro)
main:
Image: registry.cn-shanghai.aliyuncs.com/xinhuodev/wt:0.4
Command:
sh
Args:
/argo/staging/script
Mounts:
/argo/staging from argo-staging (rw)
/tmp/artifacts/artifact-input0 from input-artifacts (rw,path="artifact0")
/tmp/artifacts/artifact-input1 from input-artifacts (rw,path="artifact1")
Volumes:
podmetadata:
Type: DownwardAPI (a volume populated by information about the pod)
Items:
metadata.annotations -> annotations
docker-sock:
Type: HostPath (bare host directory volume)
Path: /var/run/docker.sock
HostPathType: Socket
input-artifacts:
Type: EmptyDir (a temporary directory that shares a pod's lifetime)
Medium:
SizeLimit: <unset>
argo-staging:
Type: EmptyDir (a temporary directory that shares a pod's lifetime)
Medium:
SizeLimit: <unset>
default-token-lfk5b:
Type: Secret (a volume populated by a Secret)
SecretName: default-token-lfk5b
Optional: false
我们需要关注的信息有:
- Pod 的 Annotations
- Init Containers 启动的初始化容器
- Containers 中的 wait 容器和 main 容器
- Pod 的 Volumes 和每个容器的 Mounts
Init 容器
argo 创建的 Pod 的初始化容器执行了 argoexec init
命令,从名字上可以猜测出,这个容器负责初始化 Pod 中的环境,比如获取来上一步的输入等等,对应的代码是 cmd/argoexec/commands/init.go
, 我们的分析也从这里开始。在执行 argo exec init
之后,第一个调用的函数应该是loadArtifacts()
。这个方法中做了三件事: initExecutor()
、wfExecutor.StageFiles()
、wfExecutor.LoadArtifacts()
initExecutor:
initExecutor 的代码如下(删除了不重要的代码):
func initExecutor() *executor.WorkflowExecutor {
tmpl, err := executor.LoadTemplate(podAnnotationsPath)
var cre executor.ContainerRuntimeExecutor
switch os.Getenv(common.EnvVarContainerRuntimeExecutor) {
case common.ContainerRuntimeExecutorK8sAPI:
cre, err = k8sapi.NewK8sAPIExecutor(clientset, config, podName, namespace)
case common.ContainerRuntimeExecutorKubelet:
cre, err = kubelet.NewKubeletExecutor()
case common.ContainerRuntimeExecutorPNS:
cre, err = pns.NewPNSExecutor(clientset, podName, namespace, tmpl.Outputs.HasOutputs())
default:
cre, err = docker.NewDockerExecutor()
}
wfExecutor := executor.NewExecutor(clientset, podName, namespace, podAnnotationsPath, cre, *tmpl)
yamlBytes, _ := json.Marshal(&wfExecutor.Template)
return &wfExecutor
}
从 podAnnotationsPath
加载模板,这个模板其实就是 Argo 中单步的执行模板,默认情况下它的值是 /argo/podmetadata/annotations
,这正好是 init
容器的挂载,而这个挂载对应的卷是:
podmetadata:
Type: DownwardAPI (a volume populated by information about the pod)
Items:
metadata.annotations -> annotations
这里的 DownwardAPI
也解释一下,它是一种 volume 的类型,可以将 Pod 和 Container 的字段通过挂载文件的方式提供给容器内的进程方案。那么这里就是将 Pod 的 Annotations 字段通过上面的路径提供给 init 容器,init 容器根据其中的 template 获取该 Pod 的输入输出。
接下来判断根据容器运行时进行判断,这里我们只考虑 docker 作为容器运行时的情况。最后调用NewExecutor
实例化了一个 wfExecutor
StageFiles()
源代码如下:
func (we *WorkflowExecutor) StageFiles() error {
var filePath string
var body []byte
switch we.Template.GetType() {
case wfv1.TemplateTypeScript:
log.Infof("Loading script source to %s", common.ExecutorScriptSourcePath)
filePath = common.ExecutorScriptSourcePath
body = []byte(we.Template.Script.Source)
case wfv1.TemplateTypeResource:
log.Infof("Loading manifest to %s", common.ExecutorResourceManifestPath)
filePath = common.ExecutorResourceManifestPath
body = []byte(we.Template.Resource.Manifest)
default:
return nil
}
err := ioutil.WriteFile(filePath, body, 0644)
if err != nil {
return errors.InternalWrapError(err)
}
return nil
}
职责很简单,根据 template 的类型,写入到不同的文件中,比如 script 就写入到 /argo/staging/script
。这就是我们在 main 容器中执行的脚本了。
LoadArtifacts
// LoadArtifacts loads artifacts from location to a container path
func (we *WorkflowExecutor) LoadArtifacts() error {
for _, art := range we.Template.Inputs.Artifacts {
artDriver, err := we.InitDriver(art)
var artPath string
mnt := common.FindOverlappingVolume(&we.Template, art.Path)
if mnt == nil {
artPath = path.Join(common.ExecutorArtifactBaseDir, art.Name)
} else {
// If we get here, it means the input artifact path overlaps with an user specified
// volumeMount in the container. Because we also implement input artifacts as volume
// mounts, we need to load the artifact into the user specified volume mount,
// as opposed to the `input-artifacts` volume that is an implementation detail
// unbeknownst to the user.
log.Infof("Specified artifact path %s overlaps with volume mount at %s. Extracting to volume mount", art.Path, mnt.MountPath)
artPath = path.Join(common.ExecutorMainFilesystemDir, art.Path)
}
// The artifact is downloaded to a temporary location, after which we determine if
// the file is a tarball or not. If it is, it is first extracted then renamed to
// the desired location. If not, it is simply renamed to the location.
tempArtPath := artPath + ".tmp"
err = artDriver.Load(&art, tempArtPath)
if err != nil {
return err
}
if isTarball(tempArtPath) {
err = untar(tempArtPath, artPath)
_ = os.Remove(tempArtPath)
} else {
err = os.Rename(tempArtPath, artPath)
}
if art.Mode != nil {
err = os.Chmod(artPath, os.FileMode(*art.Mode))
}
}
return nil
}
InitDriver
是初始化 Artifacts 的驱动。Argo 支持多种类型的存储系统,在 v2.3.0 这个版本支持: s3, http, git, artifactory, hdfs, raw。
FindOverlappingVolume
是检查 artifacts 的路径和用户挂载的路径是否有重合。如果有,则返回深度最深的路径,如果没有,则返回 nil。如果返回 nil, 则使用 /argo/inputs/artifacts
作为 artifacts 的基础路径。否则使用 /mainctrfs
作为路径。
下面就是下载文件,解压文件并修改权限了。
注意在这里,init、wait和main容器都挂载了input-artifacts
和argo-staging
,并且 init 将输入和script放在了这两个卷中,所以其他几个卷都可以共享这些文件。
wait 容器
wait容器的职责有以下几点:
- 等待 main 容器结束
- 杀死 sidecar
- 保存日志
- 保存 parameters
- 保存 artifacts
- 获取脚本的输出流
- 将输出放在 Annotations 上
下面我们看这些功能点的实现:
等待 main 容器结束
// Wait is the sidecar container logic which waits for the main container to complete.
// Also monitors for updates in the pod annotations which may change (e.g. terminate)
// Upon completion, kills any sidecars after it finishes.
func (we *WorkflowExecutor) Wait() error {
// WaitInit() 是初始化操作,只有 PSN 需要
err := we.RuntimeExecutor.WaitInit()
if err != nil {
return err
}
log.Infof("Waiting on main container")
// waitMainContainerStart的主要原理是周期轮询Pod中的所有容器,检查main容器的ContainerID字段
// 不为空说明启动了
mainContainerID, err := we.waitMainContainerStart()
if err != nil {
return err
}
log.Infof("main container started with container ID: %s", mainContainerID)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// monitorAnnotations是因为pod的annotations会更改
annotationUpdatesCh := we.monitorAnnotations(ctx)
// 超时会杀死
go we.monitorDeadline(ctx, annotationUpdatesCh)
// 这里是直接用ContainerRuntime去等待容器结束的,比如docker,直接调用docker wait
err = we.RuntimeExecutor.Wait(mainContainerID)
if err != nil {
return err
}
log.Infof("Main container completed")
return nil
}
杀死 sidecar
main 容器运行结束后,wait 容器会负责杀死其他容器(这个让我发现了之前用 sidecar 做 main 容器运行结束后的清理工作一直无效的原因)。
// KillSidecars kills any sidecars to the main container
func (we *WorkflowExecutor) KillSidecars() error {
if len(we.Template.Sidecars) == 0 {
log.Infof("No sidecars")
return nil
}
log.Infof("Killing sidecars")
pod, err := we.getPod()
if err != nil {
return err
}
sidecarIDs := make([]string, 0)
// 遍历pod中的容器,排除main和wait,然后调用runtime来杀死容器
for _, ctrStatus := range pod.Status.ContainerStatuses {
if ctrStatus.Name == common.MainContainerName || ctrStatus.Name == common.WaitContainerName {
continue
}
if ctrStatus.State.Terminated != nil {
continue
}
containerID := containerID(ctrStatus.ContainerID)
log.Infof("Killing sidecar %s (%s)", ctrStatus.Name, containerID)
sidecarIDs = append(sidecarIDs, containerID)
}
if len(sidecarIDs) == 0 {
return nil
}
return we.RuntimeExecutor.Kill(sidecarIDs)
}
保存日志
argo 是支持将 main 容器中的日志持久化并保存到指定的地方的(s3, hdfs, Artifactory)。这在 argo 的文档上好像没有提到过。这一部分的逻辑比较简单,就是通过 ContainerRuntime 获取获取容器中的输出流,然后存成文件,通过 argo 中的 storage driver 保存下来。
保存 parameters
// SaveParameters will save the content in the specified file path as output parameter value
func (we *WorkflowExecutor) SaveParameters() error {
if len(we.Template.Outputs.Parameters) == 0 {
log.Infof("No output parameters")
return nil
}
log.Infof("Saving output parameters")
mainCtrID, err := we.GetMainContainerID()
if err != nil {
return err
}
// 遍历模板参数
for i, param := range we.Template.Outputs.Parameters {
log.Infof("Saving path output parameter: %s", param.Name)
// Determine the file path of where to find the parameter
if param.ValueFrom == nil || param.ValueFrom.Path == "" {
continue
}
var output string
if we.isBaseImagePath(param.ValueFrom.Path) {
log.Infof("Copying %s from base image layer", param.ValueFrom.Path)
// 容器内,通过 runtime 获取
output, err = we.RuntimeExecutor.GetFileContents(mainCtrID, param.ValueFrom.Path)
if err != nil {
return err
}
} else {
log.Infof("Copying %s from from volume mount", param.ValueFrom.Path)
mountedPath := filepath.Join(common.ExecutorMainFilesystemDir, param.ValueFrom.Path)
// 容器的挂载卷,直接获取
out, err := ioutil.ReadFile(mountedPath)
if err != nil {
return err
}
output = string(out)
}
outputLen := len(output)
// Trims off a single newline for user convenience
if outputLen > 0 && output[outputLen-1] == '\n' {
output = output[0 : outputLen-1]
}
// 保存下来
we.Template.Outputs.Parameters[i].Value = &output
log.Infof("Successfully saved output parameter: %s", param.Name)
}
return nil
}
保存 artifacts
保存 artifacts 和 保存 parameters 的操作是一样的。
// SaveArtifacts uploads artifacts to the archive location
func (we *WorkflowExecutor) SaveArtifacts() error {
if len(we.Template.Outputs.Artifacts) == 0 {
log.Infof("No output artifacts")
return nil
}
log.Infof("Saving output artifacts")
mainCtrID, err := we.GetMainContainerID()
if err != nil {
return err
}
err = os.MkdirAll(tempOutArtDir, os.ModePerm)
if err != nil {
return errors.InternalWrapError(err)
}
for i, art := range we.Template.Outputs.Artifacts {
err := we.saveArtifact(mainCtrID, &art)
if err != nil {
return err
}
we.Template.Outputs.Artifacts[i] = art
}
return nil
}
获取脚本的输出流
直接调用 runtime 去获取 main 容器的输出流,然后保存到 template.outputs 中
func (we *WorkflowExecutor) CaptureScriptResult() error {
if we.Template.Script == nil {
return nil
}
log.Infof("Capturing script output")
mainContainerID, err := we.GetMainContainerID()
if err != nil {
return err
}
reader, err := we.RuntimeExecutor.GetOutputStream(mainContainerID, false)
if err != nil {
return err
}
defer func() { _ = reader.Close() }()
bytes, err := ioutil.ReadAll(reader)
if err != nil {
return errors.InternalWrapError(err)
}
out := string(bytes)
// Trims off a single newline for user convenience
outputLen := len(out)
if outputLen > 0 && out[outputLen-1] == '\n' {
out = out[0 : outputLen-1]
}
we.Template.Outputs.Result = &out
return nil
}
将输出放在 Annotations 上
将 outputs 存在 pod 的 annotations 上。
func (we *WorkflowExecutor) AnnotateOutputs(logArt *wfv1.Artifact) error {
outputs := we.Template.Outputs.DeepCopy()
if logArt != nil {
outputs.Artifacts = append(outputs.Artifacts, *logArt)
}
if !outputs.HasOutputs() {
return nil
}
log.Infof("Annotating pod with output")
outputBytes, err := json.Marshal(outputs)
if err != nil {
return errors.InternalWrapError(err)
}
return we.AddAnnotation(common.AnnotationKeyOutputs, string(outputBytes))
}
总结
init 容器做了 pod 的初始化,包括存储 script,下载 artifacts等等,这样我们的 main 容器就不用关心输入的来源,只需要在指定地方使用即可。wait 容器负责监控 main 容器的生命周期,在 main 容器中的主要逻辑运行结束之后,负责将输出部分读取,持久化,这样 main 容器就不用操心如何将该步产生的结果传到后面的步骤上的问题。