diff --git a/internal/domain/expr.go b/internal/domain/expr.go new file mode 100644 index 00000000..3b312642 --- /dev/null +++ b/internal/domain/expr.go @@ -0,0 +1,262 @@ +package domain + +import ( + "encoding/json" + "fmt" +) + +type Value any + +type ( + ComparisonOperator string + LogicalOperator string +) + +const ( + GreaterThan ComparisonOperator = ">" + LessThan ComparisonOperator = "<" + GreaterOrEqual ComparisonOperator = ">=" + LessOrEqual ComparisonOperator = "<=" + Equal ComparisonOperator = "==" + NotEqual ComparisonOperator = "!=" + Is ComparisonOperator = "is" + + And LogicalOperator = "and" + Or LogicalOperator = "or" + Not LogicalOperator = "not" +) + +type Expr interface { + GetType() string + Eval(variables map[string]map[string]any) (any, error) +} + +type ConstExpr struct { + Type string `json:"type"` + Value Value `json:"value"` +} + +func (c ConstExpr) GetType() string { return c.Type } + +type VarExpr struct { + Type string `json:"type"` + Selector WorkflowNodeIOValueSelector `json:"selector"` +} + +func (v VarExpr) GetType() string { return v.Type } + +func (v VarExpr) Eval(variables map[string]map[string]any) (any, error) { + if v.Selector.Id == "" { + return nil, fmt.Errorf("node id is empty") + } + if v.Selector.Name == "" { + return nil, fmt.Errorf("name is empty") + } + + if _, ok := variables[v.Selector.Id]; !ok { + return nil, fmt.Errorf("node %s not found", v.Selector.Id) + } + + if _, ok := variables[v.Selector.Id][v.Selector.Name]; !ok { + return nil, fmt.Errorf("variable %s not found in node %s", v.Selector.Name, v.Selector.NodeId) + } + + return variables[v.Selector.Id][v.Selector.Name], nil +} + +type CompareExpr struct { + Type string `json:"type"` // compare + Op ComparisonOperator `json:"op"` + Left Expr `json:"left"` + Right Expr `json:"right"` +} + +func (c CompareExpr) GetType() string { return c.Type } + +func (c CompareExpr) Eval(variables map[string]map[string]any) (any, error) { + left, err := c.Left.Eval(variables) + if err != nil { + return nil, err + } + right, err := c.Right.Eval(variables) + if err != nil { + return nil, err + } + + switch c.Op { + case GreaterThan: + return left.(float64) > right.(float64), nil + case LessThan: + return left.(float64) < right.(float64), nil + case GreaterOrEqual: + return left.(float64) >= right.(float64), nil + case LessOrEqual: + return left.(float64) <= right.(float64), nil + case Equal: + return left == right, nil + case NotEqual: + return left != right, nil + case Is: + return left == right, nil + default: + return nil, fmt.Errorf("unknown operator: %s", c.Op) + } +} + +type LogicalExpr struct { + Type string `json:"type"` // logical + Op LogicalOperator `json:"op"` + Left Expr `json:"left"` + Right Expr `json:"right"` +} + +func (l LogicalExpr) GetType() string { return l.Type } + +func (l LogicalExpr) Eval(variables map[string]map[string]any) (any, error) { + left, err := l.Left.Eval(variables) + if err != nil { + return nil, err + } + right, err := l.Right.Eval(variables) + if err != nil { + return nil, err + } + + switch l.Op { + case And: + return left.(bool) && right.(bool), nil + case Or: + return left.(bool) || right.(bool), nil + default: + return nil, fmt.Errorf("unknown operator: %s", l.Op) + } +} + +type NotExpr struct { + Type string `json:"type"` // not + Expr Expr `json:"expr"` +} + +func (n NotExpr) GetType() string { return n.Type } + +func (n NotExpr) Eval(variables map[string]map[string]any) (any, error) { + inner, err := n.Expr.Eval(variables) + if err != nil { + return nil, err + } + return !inner.(bool), nil +} + +type rawExpr struct { + Type string `json:"type"` +} + +func MarshalExpr(e Expr) ([]byte, error) { + return json.Marshal(e) +} + +func UnmarshalExpr(data []byte) (Expr, error) { + var typ rawExpr + if err := json.Unmarshal(data, &typ); err != nil { + return nil, err + } + + switch typ.Type { + case "const": + var e ConstExpr + if err := json.Unmarshal(data, &e); err != nil { + return nil, err + } + return e, nil + case "var": + var e VarExpr + if err := json.Unmarshal(data, &e); err != nil { + return nil, err + } + return e, nil + case "compare": + var e CompareExprRaw + if err := json.Unmarshal(data, &e); err != nil { + return nil, err + } + return e.ToCompareExpr() + case "logical": + var e LogicalExprRaw + if err := json.Unmarshal(data, &e); err != nil { + return nil, err + } + return e.ToLogicalExpr() + case "not": + var e NotExprRaw + if err := json.Unmarshal(data, &e); err != nil { + return nil, err + } + return e.ToNotExpr() + default: + return nil, fmt.Errorf("unknown expr type: %s", typ.Type) + } +} + +type CompareExprRaw struct { + Type string `json:"type"` + Op ComparisonOperator `json:"op"` + Left json.RawMessage `json:"left"` + Right json.RawMessage `json:"right"` +} + +func (r CompareExprRaw) ToCompareExpr() (CompareExpr, error) { + leftExpr, err := UnmarshalExpr(r.Left) + if err != nil { + return CompareExpr{}, err + } + rightExpr, err := UnmarshalExpr(r.Right) + if err != nil { + return CompareExpr{}, err + } + return CompareExpr{ + Type: r.Type, + Op: r.Op, + Left: leftExpr, + Right: rightExpr, + }, nil +} + +type LogicalExprRaw struct { + Type string `json:"type"` + Op LogicalOperator `json:"op"` + Left json.RawMessage `json:"left"` + Right json.RawMessage `json:"right"` +} + +func (r LogicalExprRaw) ToLogicalExpr() (LogicalExpr, error) { + left, err := UnmarshalExpr(r.Left) + if err != nil { + return LogicalExpr{}, err + } + right, err := UnmarshalExpr(r.Right) + if err != nil { + return LogicalExpr{}, err + } + return LogicalExpr{ + Type: r.Type, + Op: r.Op, + Left: left, + Right: right, + }, nil +} + +type NotExprRaw struct { + Type string `json:"type"` + Expr json.RawMessage `json:"expr"` +} + +func (r NotExprRaw) ToNotExpr() (NotExpr, error) { + inner, err := UnmarshalExpr(r.Expr) + if err != nil { + return NotExpr{}, err + } + return NotExpr{ + Type: r.Type, + Expr: inner, + }, nil +} diff --git a/internal/domain/workflow.go b/internal/domain/workflow.go index 6f3cccea..e1e72354 100644 --- a/internal/domain/workflow.go +++ b/internal/domain/workflow.go @@ -81,6 +81,10 @@ type WorkflowNodeConfigForApply struct { SkipBeforeExpiryDays int32 `json:"skipBeforeExpiryDays,omitempty"` // 证书到期前多少天前跳过续期(零值将使用默认值 30) } +type WorkflowNodeConfigForCondition struct { + Expression Expr `json:"expression"` // 条件表达式 +} + type WorkflowNodeConfigForUpload struct { Certificate string `json:"certificate"` PrivateKey string `json:"privateKey"` @@ -104,6 +108,22 @@ type WorkflowNodeConfigForNotify struct { Message string `json:"message"` // 通知内容 } +func (n *WorkflowNode) GetConfigForCondition() WorkflowNodeConfigForCondition { + raw := maputil.GetString(n.Config, "expression") + if raw == "" { + return WorkflowNodeConfigForCondition{} + } + + expr, err := UnmarshalExpr([]byte(raw)) + if err != nil { + return WorkflowNodeConfigForCondition{} + } + + return WorkflowNodeConfigForCondition{ + Expression: expr, + } +} + func (n *WorkflowNode) GetConfigForApply() WorkflowNodeConfigForApply { skipBeforeExpiryDays := maputil.GetInt32(n.Config, "skipBeforeExpiryDays") if skipBeforeExpiryDays == 0 { @@ -171,6 +191,7 @@ type WorkflowNodeIO struct { type WorkflowNodeIOValueSelector struct { Id string `json:"id"` Name string `json:"name"` + Type string `json:"type"` } const WorkflowNodeIONameCertificate string = "certificate" diff --git a/internal/workflow/dispatcher/invoker.go b/internal/workflow/dispatcher/invoker.go index c644b26b..a4de08e7 100644 --- a/internal/workflow/dispatcher/invoker.go +++ b/internal/workflow/dispatcher/invoker.go @@ -101,6 +101,11 @@ func (w *workflowInvoker) processNode(ctx context.Context, node *domain.Workflow processor.GetLogger().Error(procErr.Error()) break } + + nodeOutputs := processor.GetOutputs() + if len(nodeOutputs) > 0 { + ctx = nodes.AddNodeOutput(ctx, current.Id, nodeOutputs) + } } break diff --git a/internal/workflow/node-processor/apply_node.go b/internal/workflow/node-processor/apply_node.go index 97b7575d..468f553b 100644 --- a/internal/workflow/node-processor/apply_node.go +++ b/internal/workflow/node-processor/apply_node.go @@ -16,6 +16,7 @@ import ( type applyNode struct { node *domain.WorkflowNode *nodeProcessor + *nodeOutputer certRepo certificateRepository outputRepo workflowOutputRepository @@ -25,6 +26,7 @@ func NewApplyNode(node *domain.WorkflowNode) *applyNode { return &applyNode{ node: node, nodeProcessor: newNodeProcessor(node), + nodeOutputer: newNodeOutputer(), certRepo: repository.NewCertificateRepository(), outputRepo: repository.NewWorkflowOutputRepository(), @@ -71,6 +73,7 @@ func (n *applyNode) Process(ctx context.Context) error { n.logger.Warn("failed to parse certificate, may be the CA responded error") return err } + certificate := &domain.Certificate{ Source: domain.CertificateSourceTypeWorkflow, Certificate: applyResult.CertificateFullChain, @@ -96,6 +99,10 @@ func (n *applyNode) Process(ctx context.Context) error { return err } + // 添加中间结果 + n.outputs["certificate.validated"] = true + n.outputs["certificate.daysLeft"] = int(time.Until(certificate.ExpireAt).Hours() / 24) + n.logger.Info("apply completed") return nil @@ -139,6 +146,10 @@ func (n *applyNode) checkCanSkip(ctx context.Context, lastOutput *domain.Workflo renewalInterval := time.Duration(currentNodeConfig.SkipBeforeExpiryDays) * time.Hour * 24 expirationTime := time.Until(lastCertificate.ExpireAt) if expirationTime > renewalInterval { + + n.outputs["certificate.validated"] = true + n.outputs["certificate.daysLeft"] = int(expirationTime.Hours() / 24) + return true, fmt.Sprintf("the certificate has already been issued (expires in %dd, next renewal in %dd)", int(expirationTime.Hours()/24), currentNodeConfig.SkipBeforeExpiryDays) } } diff --git a/internal/workflow/node-processor/condition_node.go b/internal/workflow/node-processor/condition_node.go index 2bac55fa..f8ed228b 100644 --- a/internal/workflow/node-processor/condition_node.go +++ b/internal/workflow/node-processor/condition_node.go @@ -2,6 +2,7 @@ package nodeprocessor import ( "context" + "errors" "github.com/usual2970/certimate/internal/domain" ) @@ -9,16 +10,43 @@ import ( type conditionNode struct { node *domain.WorkflowNode *nodeProcessor + *nodeOutputer } func NewConditionNode(node *domain.WorkflowNode) *conditionNode { return &conditionNode{ node: node, nodeProcessor: newNodeProcessor(node), + nodeOutputer: newNodeOutputer(), } } func (n *conditionNode) Process(ctx context.Context) error { - // 此类型节点不需要执行任何操作,直接返回 + n.logger.Info("enter condition node: " + n.node.Name) + + nodeConfig := n.node.GetConfigForCondition() + if nodeConfig.Expression == nil { + return nil + } return nil } + +func (n *conditionNode) eval(ctx context.Context, expression domain.Expr) (any, error) { + switch expr:=expression.(type) { + case domain.CompareExpr: + left,err:= n.eval(ctx, expr.Left) + if err != nil { + return nil, err + } + right,err:= n.eval(ctx, expr.Right) + if err != nil { + return nil, err + } + + case domain.LogicalExpr: + case domain.NotExpr: + case domain.VarExpr: + case domain.ConstExpr: + } + return false, errors.New("unknown expression type") +} diff --git a/internal/workflow/node-processor/context.go b/internal/workflow/node-processor/context.go new file mode 100644 index 00000000..adceacf6 --- /dev/null +++ b/internal/workflow/node-processor/context.go @@ -0,0 +1,126 @@ +package nodeprocessor + +import ( + "context" + "sync" +) + +// 定义上下文键类型,避免键冲突 +type workflowContextKey string + +const ( + nodeOutputsKey workflowContextKey = "node_outputs" +) + +// 带互斥锁的节点输出容器 +type nodeOutputsContainer struct { + sync.RWMutex + outputs map[string]map[string]any +} + +// 创建新的并发安全的节点输出容器 +func newNodeOutputsContainer() *nodeOutputsContainer { + return &nodeOutputsContainer{ + outputs: make(map[string]map[string]any), + } +} + +// 添加节点输出到上下文 +func AddNodeOutput(ctx context.Context, nodeId string, output map[string]any) context.Context { + container := getNodeOutputsContainer(ctx) + if container == nil { + container = newNodeOutputsContainer() + } + + container.Lock() + defer container.Unlock() + + // 创建输出的深拷贝以避免后续修改 + outputCopy := make(map[string]any, len(output)) + for k, v := range output { + outputCopy[k] = v + } + + container.outputs[nodeId] = outputCopy + return context.WithValue(ctx, nodeOutputsKey, container) +} + +// 从上下文获取节点输出 +func GetNodeOutput(ctx context.Context, nodeId string) map[string]any { + container := getNodeOutputsContainer(ctx) + if container == nil { + return nil + } + + container.RLock() + defer container.RUnlock() + + output, exists := container.outputs[nodeId] + if !exists { + return nil + } + + outputCopy := make(map[string]any, len(output)) + for k, v := range output { + outputCopy[k] = v + } + + return outputCopy +} + +// 获取特定节点的特定输出项 +func GetNodeOutputValue(ctx context.Context, nodeId string, key string) (any, bool) { + output := GetNodeOutput(ctx, nodeId) + if output == nil { + return nil, false + } + + value, exists := output[key] + return value, exists +} + +// 获取所有节点输出 +func GetNodeOutputs(ctx context.Context) map[string]map[string]any { + container := getNodeOutputsContainer(ctx) + if container == nil { + return nil + } + + container.RLock() + defer container.RUnlock() + + // 创建所有输出的深拷贝 + allOutputs := make(map[string]map[string]any, len(container.outputs)) + for nodeId, output := range container.outputs { + nodeCopy := make(map[string]any, len(output)) + for k, v := range output { + nodeCopy[k] = v + } + allOutputs[nodeId] = nodeCopy + } + + return allOutputs +} + +// 获取节点输出容器 +func getNodeOutputsContainer(ctx context.Context) *nodeOutputsContainer { + value := ctx.Value(nodeOutputsKey) + if value == nil { + return nil + } + return value.(*nodeOutputsContainer) +} + +// 检查节点是否有输出 +func HasNodeOutput(ctx context.Context, nodeId string) bool { + container := getNodeOutputsContainer(ctx) + if container == nil { + return false + } + + container.RLock() + defer container.RUnlock() + + _, exists := container.outputs[nodeId] + return exists +} diff --git a/internal/workflow/node-processor/deploy_node.go b/internal/workflow/node-processor/deploy_node.go index d60a5a7a..3819b4a2 100644 --- a/internal/workflow/node-processor/deploy_node.go +++ b/internal/workflow/node-processor/deploy_node.go @@ -15,6 +15,7 @@ import ( type deployNode struct { node *domain.WorkflowNode *nodeProcessor + *nodeOutputer certRepo certificateRepository outputRepo workflowOutputRepository @@ -24,6 +25,7 @@ func NewDeployNode(node *domain.WorkflowNode) *deployNode { return &deployNode{ node: node, nodeProcessor: newNodeProcessor(node), + nodeOutputer: newNodeOutputer(), certRepo: repository.NewCertificateRepository(), outputRepo: repository.NewWorkflowOutputRepository(), diff --git a/internal/workflow/node-processor/execute_failure_node.go b/internal/workflow/node-processor/execute_failure_node.go index 59f6a5bd..d3f61e30 100644 --- a/internal/workflow/node-processor/execute_failure_node.go +++ b/internal/workflow/node-processor/execute_failure_node.go @@ -9,12 +9,14 @@ import ( type executeFailureNode struct { node *domain.WorkflowNode *nodeProcessor + *nodeOutputer } func NewExecuteFailureNode(node *domain.WorkflowNode) *executeFailureNode { return &executeFailureNode{ node: node, nodeProcessor: newNodeProcessor(node), + nodeOutputer: newNodeOutputer(), } } diff --git a/internal/workflow/node-processor/execute_success_node.go b/internal/workflow/node-processor/execute_success_node.go index e5b65860..46a74482 100644 --- a/internal/workflow/node-processor/execute_success_node.go +++ b/internal/workflow/node-processor/execute_success_node.go @@ -9,12 +9,14 @@ import ( type executeSuccessNode struct { node *domain.WorkflowNode *nodeProcessor + *nodeOutputer } func NewExecuteSuccessNode(node *domain.WorkflowNode) *executeSuccessNode { return &executeSuccessNode{ node: node, nodeProcessor: newNodeProcessor(node), + nodeOutputer: newNodeOutputer(), } } diff --git a/internal/workflow/node-processor/notify_node.go b/internal/workflow/node-processor/notify_node.go index 1840938b..8f336931 100644 --- a/internal/workflow/node-processor/notify_node.go +++ b/internal/workflow/node-processor/notify_node.go @@ -12,6 +12,7 @@ import ( type notifyNode struct { node *domain.WorkflowNode *nodeProcessor + *nodeOutputer settingsRepo settingsRepository } @@ -20,6 +21,7 @@ func NewNotifyNode(node *domain.WorkflowNode) *notifyNode { return ¬ifyNode{ node: node, nodeProcessor: newNodeProcessor(node), + nodeOutputer: newNodeOutputer(), settingsRepo: repository.NewSettingsRepository(), } diff --git a/internal/workflow/node-processor/processor.go b/internal/workflow/node-processor/processor.go index 4523b13a..eb7bc155 100644 --- a/internal/workflow/node-processor/processor.go +++ b/internal/workflow/node-processor/processor.go @@ -14,6 +14,8 @@ type NodeProcessor interface { SetLogger(*slog.Logger) Process(ctx context.Context) error + + GetOutputs() map[string]any } type nodeProcessor struct { @@ -32,6 +34,20 @@ func (n *nodeProcessor) SetLogger(logger *slog.Logger) { n.logger = logger } +type nodeOutputer struct { + outputs map[string]any +} + +func newNodeOutputer() *nodeOutputer { + return &nodeOutputer{ + outputs: make(map[string]any), + } +} + +func (n *nodeOutputer) GetOutputs() map[string]any { + return n.outputs +} + type certificateRepository interface { GetByWorkflowNodeId(ctx context.Context, workflowNodeId string) (*domain.Certificate, error) } diff --git a/internal/workflow/node-processor/start_node.go b/internal/workflow/node-processor/start_node.go index 5bbc1c09..30dee424 100644 --- a/internal/workflow/node-processor/start_node.go +++ b/internal/workflow/node-processor/start_node.go @@ -9,12 +9,14 @@ import ( type startNode struct { node *domain.WorkflowNode *nodeProcessor + *nodeOutputer } func NewStartNode(node *domain.WorkflowNode) *startNode { return &startNode{ node: node, nodeProcessor: newNodeProcessor(node), + nodeOutputer: newNodeOutputer(), } } diff --git a/internal/workflow/node-processor/upload_node.go b/internal/workflow/node-processor/upload_node.go index 2da19eed..7fbb1515 100644 --- a/internal/workflow/node-processor/upload_node.go +++ b/internal/workflow/node-processor/upload_node.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strings" + "time" "github.com/usual2970/certimate/internal/domain" "github.com/usual2970/certimate/internal/repository" @@ -12,6 +13,7 @@ import ( type uploadNode struct { node *domain.WorkflowNode *nodeProcessor + *nodeOutputer certRepo certificateRepository outputRepo workflowOutputRepository @@ -21,6 +23,7 @@ func NewUploadNode(node *domain.WorkflowNode) *uploadNode { return &uploadNode{ node: node, nodeProcessor: newNodeProcessor(node), + nodeOutputer: newNodeOutputer(), certRepo: repository.NewCertificateRepository(), outputRepo: repository.NewWorkflowOutputRepository(), @@ -66,6 +69,9 @@ func (n *uploadNode) Process(ctx context.Context) error { return err } + n.outputs["certificate.validated"] = true + n.outputs["certificate.daysLeft"] = int(time.Until(certificate.ExpireAt).Hours() / 24) + n.logger.Info("upload completed") return nil @@ -85,6 +91,8 @@ func (n *uploadNode) checkCanSkip(ctx context.Context, lastOutput *domain.Workfl lastCertificate, _ := n.certRepo.GetByWorkflowNodeId(ctx, n.node.Id) if lastCertificate != nil { + n.outputs["certificate.validated"] = true + n.outputs["certificate.daysLeft"] = int(time.Until(lastCertificate.ExpireAt).Hours() / 24) return true, "the certificate has already been uploaded" } } diff --git a/ui/src/components/workflow/node/ConditionNode.tsx b/ui/src/components/workflow/node/ConditionNode.tsx index 43b32e60..d3f1defc 100644 --- a/ui/src/components/workflow/node/ConditionNode.tsx +++ b/ui/src/components/workflow/node/ConditionNode.tsx @@ -5,7 +5,7 @@ import { Button, Card, Popover } from "antd"; import SharedNode, { type SharedNodeProps } from "./_SharedNode"; import AddNode from "./AddNode"; import ConditionNodeConfigForm, { ConditionItem, ConditionNodeConfigFormFieldValues, ConditionNodeConfigFormInstance } from "./ConditionNodeConfigForm"; -import { Expr, WorkflowNodeIoValueType } from "@/domain/workflow"; +import { Expr, WorkflowNodeIoValueType, Value } from "@/domain/workflow"; import { produce } from "immer"; import { useWorkflowStore } from "@/stores/workflow"; import { useZustandShallowSelector } from "@/hooks"; @@ -30,15 +30,34 @@ const ConditionNode = ({ node, disabled, branchId, branchIndex }: ConditionNodeP // 创建单个条件的表达式 const createComparisonExpr = (condition: ConditionItem): Expr => { const selectors = condition.leftSelector.split("#"); + const t = selectors[2] as WorkflowNodeIoValueType; const left: Expr = { type: "var", selector: { id: selectors[0], name: selectors[1], - type: selectors[2] as WorkflowNodeIoValueType, + type: t, }, }; - const right: Expr = { type: "const", value: condition.rightValue || "" }; + + let value: Value = condition.rightValue; + switch (t) { + case "boolean": + if (value === "true") { + value = true; + } else if (value === "false") { + value = false; + } + break; + case "number": + value = parseInt(value as string); + break; + case "string": + value = value as string; + break; + } + + const right: Expr = { type: "const", value: value }; return { type: "compare", diff --git a/ui/src/domain/workflow.ts b/ui/src/domain/workflow.ts index 792b7d45..9cd12287 100644 --- a/ui/src/domain/workflow.ts +++ b/ui/src/domain/workflow.ts @@ -232,7 +232,7 @@ export const workflowNodeIOOptions = (node: WorkflowNode) => { // #region Condition expression -type Value = string | number | boolean; +export type Value = string | number | boolean; export type ComparisonOperator = ">" | "<" | ">=" | "<=" | "==" | "!=" | "is";