From a20b82b9cf8f082d865f3e0ebfd832907484f9b7 Mon Sep 17 00:00:00 2001 From: Fu Diwei Date: Thu, 16 Jan 2025 23:02:08 +0800 Subject: [PATCH] feat: re-run workflow nodes when critical configurations changed --- go.mod | 2 +- go.sum | 4 +- .../workflow/node-processor/apply_node.go | 89 ++++++++++++------- .../workflow/node-processor/deploy_node.go | 68 ++++++++------ .../workflow/node-processor/notify_node.go | 8 +- .../workflow/node-processor/start_node.go | 8 +- 6 files changed, 107 insertions(+), 72 deletions(-) diff --git a/go.mod b/go.mod index 89e2287e..152d66c6 100644 --- a/go.mod +++ b/go.mod @@ -39,7 +39,7 @@ require ( github.com/volcengine/volc-sdk-golang v1.0.189 github.com/volcengine/volcengine-go-sdk v1.0.177 golang.org/x/crypto v0.32.0 - golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67 + golang.org/x/exp v0.0.0-20250106191152-7588d65b2ba8 k8s.io/api v0.32.0 k8s.io/apimachinery v0.32.0 k8s.io/client-go v0.32.0 diff --git a/go.sum b/go.sum index eda86588..57ff9916 100644 --- a/go.sum +++ b/go.sum @@ -957,8 +957,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= -golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67 h1:1UoZQm6f0P/ZO0w1Ri+f+ifG/gXhegadRdwBIXEFWDo= -golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67/go.mod h1:qj5a5QZpwLU2NLQudwIN5koi3beDhSAlJwa67PuM98c= +golang.org/x/exp v0.0.0-20250106191152-7588d65b2ba8 h1:yqrTHse8TCMW1M1ZCP+VAR/l0kKxwaAIqN/il7x4voA= +golang.org/x/exp v0.0.0-20250106191152-7588d65b2ba8/go.mod h1:tujkw807nyEEAamNbDrEGzRav+ilXA7PCRAd6xsmwiU= golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= diff --git a/internal/workflow/node-processor/apply_node.go b/internal/workflow/node-processor/apply_node.go index 30e28aad..4e20ebcb 100644 --- a/internal/workflow/node-processor/apply_node.go +++ b/internal/workflow/node-processor/apply_node.go @@ -5,6 +5,8 @@ import ( "strings" "time" + "golang.org/x/exp/maps" + "github.com/usual2970/certimate/internal/applicant" "github.com/usual2970/certimate/internal/domain" "github.com/usual2970/certimate/internal/pkg/utils/certs" @@ -29,35 +31,29 @@ func NewApplyNode(node *domain.WorkflowNode) *applyNode { // 申请节点根据申请类型执行不同的操作 func (a *applyNode) Run(ctx context.Context) error { - const validityDuration = time.Hour * 24 * 10 - a.AddOutput(ctx, a.node.Name, "开始执行") - // 查询是否申请过,已申请过则直接返回 - // TODO: 先保持和 v0.2 一致,后续增加是否强制申请的参数 - output, err := a.outputRepo.GetByNodeId(ctx, a.node.Id) + + // 查询上次执行结果 + lastOutput, err := a.outputRepo.GetByNodeId(ctx, a.node.Id) if err != nil && !domain.IsRecordNotFoundError(err) { a.AddOutput(ctx, a.node.Name, "查询申请记录失败", err.Error()) return err } - if output != nil && output.Succeeded { - lastCertificate, _ := a.certRepo.GetByWorkflowNodeId(ctx, a.node.Id) - if lastCertificate != nil { - if time.Until(lastCertificate.ExpireAt) > validityDuration { - a.AddOutput(ctx, a.node.Name, "已申请过证书,且证书在有效期内") - return nil - } - } + // 检测是否可以跳过本次执行 + if skippable, skipReason := a.checkCanSkip(ctx, lastOutput); skippable { + a.AddOutput(ctx, a.node.Name, skipReason) + return nil } - // 获取Applicant + // 初始化申请器 applicant, err := applicant.NewWithApplyNode(a.node) if err != nil { a.AddOutput(ctx, a.node.Name, "获取申请对象失败", err.Error()) return err } - // 申请 + // 申请证书 applyResult, err := applicant.Apply() if err != nil { a.AddOutput(ctx, a.node.Name, "申请失败", err.Error()) @@ -65,27 +61,12 @@ func (a *applyNode) Run(ctx context.Context) error { } a.AddOutput(ctx, a.node.Name, "申请成功") - // 记录申请结果 - // 保持一个节点只有一个输出 - outputId := "" - if output != nil { - outputId = output.Id - } - output = &domain.WorkflowOutput{ - Meta: domain.Meta{Id: outputId}, - WorkflowId: GetWorkflowId(ctx), - NodeId: a.node.Id, - Node: a.node, - Succeeded: true, - Outputs: a.node.Outputs, - } - + // 解析证书并生成实体 certX509, err := certs.ParseCertificateFromPEM(applyResult.CertificateFullChain) if err != nil { a.AddOutput(ctx, a.node.Name, "解析证书失败", err.Error()) return err } - certificate := &domain.Certificate{ Source: domain.CertificateSourceTypeWorkflow, SubjectAltNames: strings.Join(certX509.DNSNames, ";"), @@ -100,7 +81,19 @@ func (a *applyNode) Run(ctx context.Context) error { WorkflowNodeId: a.node.Id, } - if err := a.outputRepo.Save(ctx, output, certificate, func(id string) error { + // 保存执行结果 + // TODO: 先保持一个节点始终只有一个输出,后续增加版本控制 + currentOutput := &domain.WorkflowOutput{ + WorkflowId: GetWorkflowId(ctx), + NodeId: a.node.Id, + Node: a.node, + Succeeded: true, + Outputs: a.node.Outputs, + } + if lastOutput != nil { + currentOutput.Id = lastOutput.Id + } + if err := a.outputRepo.Save(ctx, currentOutput, certificate, func(id string) error { if certificate != nil { certificate.WorkflowOutputId = id } @@ -110,8 +103,38 @@ func (a *applyNode) Run(ctx context.Context) error { a.AddOutput(ctx, a.node.Name, "保存申请记录失败", err.Error()) return err } - a.AddOutput(ctx, a.node.Name, "保存申请记录成功") return nil } + +func (a *applyNode) checkCanSkip(ctx context.Context, lastOutput *domain.WorkflowOutput) (skip bool, reason string) { + const validityDuration = time.Hour * 24 * 10 + + // TODO: 可控制是否强制申请 + if lastOutput != nil && lastOutput.Succeeded { + // 比较和上次申请时的关键配置(即影响证书签发的)参数是否一致 + if lastOutput.Node.GetConfigString("domains") != a.node.GetConfigString("domains") { + return false, "配置项变化:域名" + } + if lastOutput.Node.GetConfigString("contactEmail") != a.node.GetConfigString("contactEmail") { + return false, "配置项变化:联系邮箱" + } + if lastOutput.Node.GetConfigString("provider") != a.node.GetConfigString("provider") { + return false, "配置项变化:DNS 提供商授权" + } + if !maps.Equal(lastOutput.Node.GetConfigMap("providerConfig"), a.node.GetConfigMap("providerConfig")) { + return false, "配置项变化:DNS 提供商参数" + } + if lastOutput.Node.GetConfigString("keyAlgorithm") != a.node.GetConfigString("keyAlgorithm") { + return false, "配置项变化:数字签名算法" + } + + lastCertificate, _ := a.certRepo.GetByWorkflowNodeId(ctx, a.node.Id) + if lastCertificate != nil && time.Until(lastCertificate.ExpireAt) > validityDuration { + return true, "已申请过证书,且证书尚未临近过期" + } + } + + return false, "无历史申请记录" +} diff --git a/internal/workflow/node-processor/deploy_node.go b/internal/workflow/node-processor/deploy_node.go index 135e8cfd..6b4fb512 100644 --- a/internal/workflow/node-processor/deploy_node.go +++ b/internal/workflow/node-processor/deploy_node.go @@ -8,6 +8,7 @@ import ( "github.com/usual2970/certimate/internal/deployer" "github.com/usual2970/certimate/internal/domain" "github.com/usual2970/certimate/internal/repository" + "golang.org/x/exp/maps" ) type deployNode struct { @@ -28,77 +29,88 @@ func NewDeployNode(node *domain.WorkflowNode) *deployNode { func (d *deployNode) Run(ctx context.Context) error { d.AddOutput(ctx, d.node.Name, "开始执行") - // 检查是否部署过(部署过则直接返回,和 v0.2 暂时保持一致) - output, err := d.outputRepo.GetByNodeId(ctx, d.node.Id) + + // 查询上次执行结果 + lastOutput, err := d.outputRepo.GetByNodeId(ctx, d.node.Id) if err != nil && !domain.IsRecordNotFoundError(err) { d.AddOutput(ctx, d.node.Name, "查询部署记录失败", err.Error()) return err } - // 获取部署对象 - // 获取证书 - certSource := d.node.GetConfigString("certificate") + // 获取前序节点输出证书 + certSource := d.node.GetConfigString("certificate") certSourceSlice := strings.Split(certSource, "#") if len(certSourceSlice) != 2 { d.AddOutput(ctx, d.node.Name, "证书来源配置错误", certSource) return fmt.Errorf("证书来源配置错误: %s", certSource) } - - cert, err := d.certRepo.GetByWorkflowNodeId(ctx, certSourceSlice[0]) + certificate, err := d.certRepo.GetByWorkflowNodeId(ctx, certSourceSlice[0]) if err != nil { d.AddOutput(ctx, d.node.Name, "获取证书失败", err.Error()) return err } - // 未部署过,开始部署 - // 部署过但是证书更新了,重新部署 - // 部署过且证书未更新,直接返回 - - if d.deployed(output) && cert.CreatedAt.Before(output.UpdatedAt) { - d.AddOutput(ctx, d.node.Name, "已部署过且证书未更新") + // 检测是否可以跳过本次执行 + if skippable, skipReason := d.checkCanSkip(ctx, lastOutput); skippable { + if certificate.CreatedAt.Before(lastOutput.UpdatedAt) { + d.AddOutput(ctx, d.node.Name, "已部署过且证书未更新") + } else { + d.AddOutput(ctx, d.node.Name, skipReason) + } return nil } + // 初始化部署器 deploy, err := deployer.NewWithDeployNode(d.node, struct { Certificate string PrivateKey string - }{Certificate: cert.Certificate, PrivateKey: cert.PrivateKey}) + }{Certificate: certificate.Certificate, PrivateKey: certificate.PrivateKey}) if err != nil { d.AddOutput(ctx, d.node.Name, "获取部署对象失败", err.Error()) return err } - // 部署 + // 部署证书 if err := deploy.Deploy(ctx); err != nil { d.AddOutput(ctx, d.node.Name, "部署失败", err.Error()) return err } - d.AddOutput(ctx, d.node.Name, "部署成功") - // 记录部署结果 - outputId := "" - if output != nil { - outputId = output.Id - } - output = &domain.WorkflowOutput{ - Meta: domain.Meta{Id: outputId}, + // 保存执行结果 + // TODO: 先保持一个节点始终只有一个输出,后续增加版本控制 + currentOutput := &domain.WorkflowOutput{ + Meta: domain.Meta{}, WorkflowId: GetWorkflowId(ctx), NodeId: d.node.Id, Node: d.node, Succeeded: true, } - - if err := d.outputRepo.Save(ctx, output, nil, nil); err != nil { + if lastOutput != nil { + currentOutput.Id = lastOutput.Id + } + if err := d.outputRepo.Save(ctx, currentOutput, nil, nil); err != nil { d.AddOutput(ctx, d.node.Name, "保存部署记录失败", err.Error()) return err } - d.AddOutput(ctx, d.node.Name, "保存部署记录成功") return nil } -func (d *deployNode) deployed(output *domain.WorkflowOutput) bool { - return output != nil && output.Succeeded +func (d *deployNode) checkCanSkip(ctx context.Context, lastOutput *domain.WorkflowOutput) (skip bool, reason string) { + // TODO: 可控制是否强制部署 + if lastOutput != nil && lastOutput.Succeeded { + // 比较和上次部署时的关键配置(即影响证书部署的)参数是否一致 + if lastOutput.Node.GetConfigString("provider") != d.node.GetConfigString("provider") { + return false, "配置项变化:主机提供商授权" + } + if !maps.Equal(lastOutput.Node.GetConfigMap("providerConfig"), d.node.GetConfigMap("providerConfig")) { + return false, "配置项变化:主机提供商参数" + } + + return true, "已部署过证书" + } + + return false, "无历史部署记录" } diff --git a/internal/workflow/node-processor/notify_node.go b/internal/workflow/node-processor/notify_node.go index ffb40566..cbc7fa47 100644 --- a/internal/workflow/node-processor/notify_node.go +++ b/internal/workflow/node-processor/notify_node.go @@ -26,18 +26,20 @@ func (n *notifyNode) Run(ctx context.Context) error { n.AddOutput(ctx, n.node.Name, "开始执行") // 获取通知配置 - setting, err := n.settingsRepo.GetByName(ctx, "notifyChannels") + settings, err := n.settingsRepo.GetByName(ctx, "notifyChannels") if err != nil { n.AddOutput(ctx, n.node.Name, "获取通知配置失败", err.Error()) return err } - channelConfig, err := setting.GetNotifyChannelConfig(n.node.GetConfigString("channel")) + // 获取通知渠道 + channelConfig, err := settings.GetNotifyChannelConfig(n.node.GetConfigString("channel")) if err != nil { n.AddOutput(ctx, n.node.Name, "获取通知渠道配置失败", err.Error()) return err } + // 发送通知 if err := notify.SendToChannel(n.node.GetConfigString("subject"), n.node.GetConfigString("message"), n.node.GetConfigString("channel"), @@ -46,7 +48,7 @@ func (n *notifyNode) Run(ctx context.Context) error { n.AddOutput(ctx, n.node.Name, "发送通知失败", err.Error()) return err } - n.AddOutput(ctx, n.node.Name, "发送通知成功") + return nil } diff --git a/internal/workflow/node-processor/start_node.go b/internal/workflow/node-processor/start_node.go index 6dc641ab..81d93de6 100644 --- a/internal/workflow/node-processor/start_node.go +++ b/internal/workflow/node-processor/start_node.go @@ -18,11 +18,9 @@ func NewStartNode(node *domain.WorkflowNode) *startNode { } } -// 开始节点没有任何操作 func (s *startNode) Run(ctx context.Context) error { - s.AddOutput(ctx, - s.node.Name, - "完成", - ) + // 开始节点没有任何操作 + s.AddOutput(ctx, s.node.Name, "完成") + return nil }