diff --git a/internal/domain/workflow_run.go b/internal/domain/workflow_run.go index 25ba1d7a..ff5424c5 100644 --- a/internal/domain/workflow_run.go +++ b/internal/domain/workflow_run.go @@ -31,17 +31,26 @@ const ( type WorkflowRunLog struct { NodeId string `json:"nodeId"` NodeName string `json:"nodeName"` + Records []WorkflowRunLogRecord `json:"records"` Error string `json:"error"` - Outputs []WorkflowRunLogOutput `json:"outputs"` } -type WorkflowRunLogOutput struct { - Time string `json:"time"` - Title string `json:"title"` - Content string `json:"content"` - Error string `json:"error"` +type WorkflowRunLogRecord struct { + Time string `json:"time"` + Level WorkflowRunLogLevel `json:"level"` + Content string `json:"content"` + Error string `json:"error"` } +type WorkflowRunLogLevel string + +const ( + WorkflowRunLogLevelDebug WorkflowRunLogLevel = "DEBUG" + WorkflowRunLogLevelInfo WorkflowRunLogLevel = "INFO" + WorkflowRunLogLevelWarn WorkflowRunLogLevel = "WARN" + WorkflowRunLogLevelError WorkflowRunLogLevel = "ERROR" +) + type WorkflowRunLogs []WorkflowRunLog func (r WorkflowRunLogs) ErrorString() string { diff --git a/internal/workflow/node-processor/apply_node.go b/internal/workflow/node-processor/apply_node.go index d75486ea..3c974b8c 100644 --- a/internal/workflow/node-processor/apply_node.go +++ b/internal/workflow/node-processor/apply_node.go @@ -24,7 +24,7 @@ type applyNode struct { func NewApplyNode(node *domain.WorkflowNode) *applyNode { return &applyNode{ node: node, - nodeLogger: NewNodeLogger(node), + nodeLogger: newNodeLogger(node), certRepo: repository.NewCertificateRepository(), outputRepo: repository.NewWorkflowOutputRepository(), @@ -32,40 +32,40 @@ func NewApplyNode(node *domain.WorkflowNode) *applyNode { } func (n *applyNode) Process(ctx context.Context) error { - n.AddOutput(ctx, n.node.Name, "开始执行") + n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, "进入申请证书节点") // 查询上次执行结果 lastOutput, err := n.outputRepo.GetByNodeId(ctx, n.node.Id) if err != nil && !domain.IsRecordNotFoundError(err) { - n.AddOutput(ctx, n.node.Name, "查询申请记录失败", err.Error()) + n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "查询申请记录失败", err.Error()) return err } // 检测是否可以跳过本次执行 if skippable, skipReason := n.checkCanSkip(ctx, lastOutput); skippable { - n.AddOutput(ctx, n.node.Name, skipReason) + n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, skipReason) return nil } // 初始化申请器 applicant, err := applicant.NewWithApplyNode(n.node) if err != nil { - n.AddOutput(ctx, n.node.Name, "获取申请对象失败", err.Error()) + n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "获取申请对象失败", err.Error()) return err } // 申请证书 applyResult, err := applicant.Apply() if err != nil { - n.AddOutput(ctx, n.node.Name, "申请失败", err.Error()) + n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "申请失败", err.Error()) return err } - n.AddOutput(ctx, n.node.Name, "申请成功") + n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, "申请成功") // 解析证书并生成实体 certX509, err := certs.ParseCertificateFromPEM(applyResult.CertificateFullChain) if err != nil { - n.AddOutput(ctx, n.node.Name, "解析证书失败", err.Error()) + n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "解析证书失败", err.Error()) return err } certificate := &domain.Certificate{ @@ -89,10 +89,10 @@ func (n *applyNode) Process(ctx context.Context) error { Outputs: n.node.Outputs, } if _, err := n.outputRepo.SaveWithCertificate(ctx, output, certificate); err != nil { - n.AddOutput(ctx, n.node.Name, "保存申请记录失败", err.Error()) + n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "保存申请记录失败", err.Error()) return err } - n.AddOutput(ctx, n.node.Name, "保存申请记录成功") + n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, "保存申请记录成功") return nil } diff --git a/internal/workflow/node-processor/condition_node.go b/internal/workflow/node-processor/condition_node.go index 994965ba..499a5004 100644 --- a/internal/workflow/node-processor/condition_node.go +++ b/internal/workflow/node-processor/condition_node.go @@ -14,13 +14,11 @@ type conditionNode struct { func NewConditionNode(node *domain.WorkflowNode) *conditionNode { return &conditionNode{ node: node, - nodeLogger: NewNodeLogger(node), + nodeLogger: newNodeLogger(node), } } func (n *conditionNode) Process(ctx context.Context) error { // 此类型节点不需要执行任何操作,直接返回 - n.AddOutput(ctx, n.node.Name, "完成") - return nil } diff --git a/internal/workflow/node-processor/deploy_node.go b/internal/workflow/node-processor/deploy_node.go index e3b6137f..c7ab964f 100644 --- a/internal/workflow/node-processor/deploy_node.go +++ b/internal/workflow/node-processor/deploy_node.go @@ -22,7 +22,7 @@ type deployNode struct { func NewDeployNode(node *domain.WorkflowNode) *deployNode { return &deployNode{ node: node, - nodeLogger: NewNodeLogger(node), + nodeLogger: newNodeLogger(node), certRepo: repository.NewCertificateRepository(), outputRepo: repository.NewWorkflowOutputRepository(), @@ -30,12 +30,12 @@ func NewDeployNode(node *domain.WorkflowNode) *deployNode { } func (n *deployNode) Process(ctx context.Context) error { - n.AddOutput(ctx, n.node.Name, "开始执行") + n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, "开始执行") // 查询上次执行结果 lastOutput, err := n.outputRepo.GetByNodeId(ctx, n.node.Id) if err != nil && !domain.IsRecordNotFoundError(err) { - n.AddOutput(ctx, n.node.Name, "查询部署记录失败", err.Error()) + n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "查询部署记录失败", err.Error()) return err } @@ -43,19 +43,19 @@ func (n *deployNode) Process(ctx context.Context) error { previousNodeOutputCertificateSource := n.node.GetConfigForDeploy().Certificate previousNodeOutputCertificateSourceSlice := strings.Split(previousNodeOutputCertificateSource, "#") if len(previousNodeOutputCertificateSourceSlice) != 2 { - n.AddOutput(ctx, n.node.Name, "证书来源配置错误", previousNodeOutputCertificateSource) + n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "证书来源配置错误", previousNodeOutputCertificateSource) return fmt.Errorf("证书来源配置错误: %s", previousNodeOutputCertificateSource) } certificate, err := n.certRepo.GetByWorkflowNodeId(ctx, previousNodeOutputCertificateSourceSlice[0]) if err != nil { - n.AddOutput(ctx, n.node.Name, "获取证书失败", err.Error()) + n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "获取证书失败", err.Error()) return err } // 检测是否可以跳过本次执行 if lastOutput != nil && certificate.CreatedAt.Before(lastOutput.UpdatedAt) { if skippable, skipReason := n.checkCanSkip(ctx, lastOutput); skippable { - n.AddOutput(ctx, n.node.Name, skipReason) + n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, skipReason) return nil } } @@ -66,16 +66,16 @@ func (n *deployNode) Process(ctx context.Context) error { PrivateKey string }{Certificate: certificate.Certificate, PrivateKey: certificate.PrivateKey}) if err != nil { - n.AddOutput(ctx, n.node.Name, "获取部署对象失败", err.Error()) + n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "获取部署对象失败", err.Error()) return err } // 部署证书 if err := deployer.Deploy(ctx); err != nil { - n.AddOutput(ctx, n.node.Name, "部署失败", err.Error()) + n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "部署失败", err.Error()) return err } - n.AddOutput(ctx, n.node.Name, "部署成功") + n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, "部署成功") // 保存执行结果 output := &domain.WorkflowOutput{ @@ -86,10 +86,10 @@ func (n *deployNode) Process(ctx context.Context) error { Succeeded: true, } if _, err := n.outputRepo.Save(ctx, output); err != nil { - n.AddOutput(ctx, n.node.Name, "保存部署记录失败", err.Error()) + n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "保存部署记录失败", err.Error()) return err } - n.AddOutput(ctx, n.node.Name, "保存部署记录成功") + n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, "保存部署记录成功") return nil } diff --git a/internal/workflow/node-processor/execute_failure_node.go b/internal/workflow/node-processor/execute_failure_node.go index a64019bb..2516edb4 100644 --- a/internal/workflow/node-processor/execute_failure_node.go +++ b/internal/workflow/node-processor/execute_failure_node.go @@ -14,13 +14,13 @@ type executeFailureNode struct { func NewExecuteFailureNode(node *domain.WorkflowNode) *executeFailureNode { return &executeFailureNode{ node: node, - nodeLogger: NewNodeLogger(node), + nodeLogger: newNodeLogger(node), } } func (n *executeFailureNode) Process(ctx context.Context) error { // 此类型节点不需要执行任何操作,直接返回 - n.AddOutput(ctx, n.node.Name, "进入执行失败分支") + n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, "进入执行失败分支") return nil } diff --git a/internal/workflow/node-processor/execute_success_node.go b/internal/workflow/node-processor/execute_success_node.go index e0cfea1e..a7833a53 100644 --- a/internal/workflow/node-processor/execute_success_node.go +++ b/internal/workflow/node-processor/execute_success_node.go @@ -14,13 +14,13 @@ type executeSuccessNode struct { func NewExecuteSuccessNode(node *domain.WorkflowNode) *executeSuccessNode { return &executeSuccessNode{ node: node, - nodeLogger: NewNodeLogger(node), + nodeLogger: newNodeLogger(node), } } func (n *executeSuccessNode) Process(ctx context.Context) error { // 此类型节点不需要执行任何操作,直接返回 - n.AddOutput(ctx, n.node.Name, "进入执行成功分支") + n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, "进入执行成功分支") return nil } diff --git a/internal/workflow/node-processor/notify_node.go b/internal/workflow/node-processor/notify_node.go index 21e6ac15..e4c3da2d 100644 --- a/internal/workflow/node-processor/notify_node.go +++ b/internal/workflow/node-processor/notify_node.go @@ -18,37 +18,37 @@ type notifyNode struct { func NewNotifyNode(node *domain.WorkflowNode) *notifyNode { return ¬ifyNode{ node: node, - nodeLogger: NewNodeLogger(node), + nodeLogger: newNodeLogger(node), settingsRepo: repository.NewSettingsRepository(), } } func (n *notifyNode) Process(ctx context.Context) error { - n.AddOutput(ctx, n.node.Name, "开始执行") + n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, "进入推送通知节点") nodeConfig := n.node.GetConfigForNotify() // 获取通知配置 settings, err := n.settingsRepo.GetByName(ctx, "notifyChannels") if err != nil { - n.AddOutput(ctx, n.node.Name, "获取通知配置失败", err.Error()) + n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "获取通知配置失败", err.Error()) return err } // 获取通知渠道 channelConfig, err := settings.GetNotifyChannelConfig(nodeConfig.Channel) if err != nil { - n.AddOutput(ctx, n.node.Name, "获取通知渠道配置失败", err.Error()) + n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "获取通知渠道配置失败", err.Error()) return err } // 发送通知 if err := notify.SendToChannel(nodeConfig.Subject, nodeConfig.Message, nodeConfig.Channel, channelConfig); err != nil { - n.AddOutput(ctx, n.node.Name, "发送通知失败", err.Error()) + n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "发送通知失败", err.Error()) return err } - n.AddOutput(ctx, n.node.Name, "发送通知成功") + n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, "发送通知成功") return nil } diff --git a/internal/workflow/node-processor/processor.go b/internal/workflow/node-processor/processor.go index 61155892..52576510 100644 --- a/internal/workflow/node-processor/processor.go +++ b/internal/workflow/node-processor/processor.go @@ -10,8 +10,9 @@ import ( type NodeProcessor interface { Process(ctx context.Context) error + GetLog(ctx context.Context) *domain.WorkflowRunLog - AddOutput(ctx context.Context, title, content string, err ...string) + AppendLogRecord(ctx context.Context, level domain.WorkflowRunLogLevel, content string, err ...string) } type nodeLogger struct { @@ -32,12 +33,12 @@ type settingsRepository interface { GetByName(ctx context.Context, name string) (*domain.Settings, error) } -func NewNodeLogger(node *domain.WorkflowNode) *nodeLogger { +func newNodeLogger(node *domain.WorkflowNode) *nodeLogger { return &nodeLogger{ log: &domain.WorkflowRunLog{ NodeId: node.Id, NodeName: node.Name, - Outputs: make([]domain.WorkflowRunLogOutput, 0), + Records: make([]domain.WorkflowRunLogRecord, 0), }, } } @@ -46,17 +47,17 @@ func (l *nodeLogger) GetLog(ctx context.Context) *domain.WorkflowRunLog { return l.log } -func (l *nodeLogger) AddOutput(ctx context.Context, title, content string, err ...string) { - output := domain.WorkflowRunLogOutput{ +func (l *nodeLogger) AppendLogRecord(ctx context.Context, level domain.WorkflowRunLogLevel, content string, err ...string) { + record := domain.WorkflowRunLogRecord{ Time: time.Now().UTC().Format(time.RFC3339), - Title: title, + Level: level, Content: content, } if len(err) > 0 { - output.Error = err[0] - l.log.Error = err[0] + record.Error = err[0] } - l.log.Outputs = append(l.log.Outputs, output) + + l.log.Records = append(l.log.Records, record) } func GetProcessor(node *domain.WorkflowNode) (NodeProcessor, error) { diff --git a/internal/workflow/node-processor/start_node.go b/internal/workflow/node-processor/start_node.go index 99e15af2..7d04685a 100644 --- a/internal/workflow/node-processor/start_node.go +++ b/internal/workflow/node-processor/start_node.go @@ -14,13 +14,13 @@ type startNode struct { func NewStartNode(node *domain.WorkflowNode) *startNode { return &startNode{ node: node, - nodeLogger: NewNodeLogger(node), + nodeLogger: newNodeLogger(node), } } func (n *startNode) Process(ctx context.Context) error { // 此类型节点不需要执行任何操作,直接返回 - n.AddOutput(ctx, n.node.Name, "完成") + n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, "进入开始节点") return nil } diff --git a/internal/workflow/node-processor/upload_node.go b/internal/workflow/node-processor/upload_node.go index 9e316501..99108481 100644 --- a/internal/workflow/node-processor/upload_node.go +++ b/internal/workflow/node-processor/upload_node.go @@ -22,7 +22,7 @@ type uploadNode struct { func NewUploadNode(node *domain.WorkflowNode) *uploadNode { return &uploadNode{ node: node, - nodeLogger: NewNodeLogger(node), + nodeLogger: newNodeLogger(node), certRepo: repository.NewCertificateRepository(), outputRepo: repository.NewWorkflowOutputRepository(), @@ -30,20 +30,20 @@ func NewUploadNode(node *domain.WorkflowNode) *uploadNode { } func (n *uploadNode) Process(ctx context.Context) error { - n.AddOutput(ctx, n.node.Name, "进入上传证书节点") + n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, "进入上传证书节点") nodeConfig := n.node.GetConfigForUpload() // 查询上次执行结果 lastOutput, err := n.outputRepo.GetByNodeId(ctx, n.node.Id) if err != nil && !domain.IsRecordNotFoundError(err) { - n.AddOutput(ctx, n.node.Name, "查询申请记录失败", err.Error()) + n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "查询申请记录失败", err.Error()) return err } // 检测是否可以跳过本次执行 if skippable, skipReason := n.checkCanSkip(ctx, lastOutput); skippable { - n.AddOutput(ctx, n.node.Name, skipReason) + n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, skipReason) return nil } @@ -51,11 +51,11 @@ func (n *uploadNode) Process(ctx context.Context) error { // 如果证书过期,则直接返回错误 certX509, err := certs.ParseCertificateFromPEM(nodeConfig.Certificate) if err != nil { - n.AddOutput(ctx, n.node.Name, "解析证书失败") + n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "解析证书失败") return err } if time.Now().After(certX509.NotAfter) { - n.AddOutput(ctx, n.node.Name, "证书已过期") + n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelWarn, "证书已过期") return errors.New("certificate is expired") } @@ -75,10 +75,10 @@ func (n *uploadNode) Process(ctx context.Context) error { Outputs: n.node.Outputs, } if _, err := n.outputRepo.SaveWithCertificate(ctx, output, certificate); err != nil { - n.AddOutput(ctx, n.node.Name, "保存上传记录失败", err.Error()) + n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "保存上传记录失败", err.Error()) return err } - n.AddOutput(ctx, n.node.Name, "保存上传记录成功") + n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, "保存上传记录成功") return nil } diff --git a/ui/src/components/workflow/WorkflowRunDetail.tsx b/ui/src/components/workflow/WorkflowRunDetail.tsx index d2a20a5f..8c416bc2 100644 --- a/ui/src/components/workflow/WorkflowRunDetail.tsx +++ b/ui/src/components/workflow/WorkflowRunDetail.tsx @@ -39,7 +39,7 @@ const WorkflowRunDetail = ({ data, ...props }: WorkflowRunDetailProps) => {