From cbf711ee606e169cbae4948dd200fa4fb81ffd2b Mon Sep 17 00:00:00 2001 From: Fu Diwei Date: Mon, 10 Feb 2025 16:12:12 +0800 Subject: [PATCH] feat: save run logs when each workflow node completed --- internal/domain/workflow.go | 4 +- internal/workflow/dispatcher/dispatcher.go | 2 +- internal/workflow/dispatcher/invoker.go | 13 ++++++- internal/workflow/dispatcher/singleton.go | 1 + internal/workflow/node-processor/processor.go | 1 + .../components/workflow/WorkflowRunDetail.tsx | 39 ++++++++++--------- ui/src/i18n/locales/en/nls.workflow.runs.json | 1 + ui/src/i18n/locales/zh/nls.workflow.runs.json | 1 + 8 files changed, 40 insertions(+), 22 deletions(-) diff --git a/internal/domain/workflow.go b/internal/domain/workflow.go index ac8fbce6..1229da47 100644 --- a/internal/domain/workflow.go +++ b/internal/domain/workflow.go @@ -55,8 +55,8 @@ type WorkflowNode struct { Inputs []WorkflowNodeIO `json:"inputs"` Outputs []WorkflowNodeIO `json:"outputs"` - Next *WorkflowNode `json:"next"` - Branches []WorkflowNode `json:"branches"` + Next *WorkflowNode `json:"next,omitempty"` + Branches []WorkflowNode `json:"branches,omitempty"` Validated bool `json:"validated"` } diff --git a/internal/workflow/dispatcher/dispatcher.go b/internal/workflow/dispatcher/dispatcher.go index 0b504c25..47b91ca0 100644 --- a/internal/workflow/dispatcher/dispatcher.go +++ b/internal/workflow/dispatcher/dispatcher.go @@ -242,7 +242,7 @@ func (w *WorkflowDispatcher) work(ctx context.Context, data *WorkflowWorkerData) } // 执行工作流 - invoker := newWorkflowInvoker(data) + invoker := newWorkflowInvokerWithData(w.workflowRunRepo, data) if runErr := invoker.Invoke(ctx); runErr != nil { if errors.Is(runErr, context.Canceled) { run.Status = domain.WorkflowRunStatusTypeCanceled diff --git a/internal/workflow/dispatcher/invoker.go b/internal/workflow/dispatcher/invoker.go index 3033314e..d35cca3e 100644 --- a/internal/workflow/dispatcher/invoker.go +++ b/internal/workflow/dispatcher/invoker.go @@ -13,18 +13,23 @@ type workflowInvoker struct { workflowContent *domain.WorkflowNode runId string runLogs []domain.WorkflowRunLog + + workflowRunRepo workflowRunRepository } -func newWorkflowInvoker(data *WorkflowWorkerData) *workflowInvoker { +func newWorkflowInvokerWithData(workflowRunRepo workflowRunRepository, data *WorkflowWorkerData) *workflowInvoker { if data == nil { panic("worker data is nil") } + // TODO: 待优化,日志与执行解耦 return &workflowInvoker{ workflowId: data.WorkflowId, workflowContent: data.WorkflowContent, runId: data.RunId, runLogs: make([]domain.WorkflowRunLog, 0), + + workflowRunRepo: workflowRunRepo, } } @@ -70,6 +75,12 @@ func (w *workflowInvoker) processNode(ctx context.Context, node *domain.Workflow log := processor.GetLog(ctx) if log != nil { w.runLogs = append(w.runLogs, *log) + + // TODO: 待优化,把 /pkg/core/* 包下的输出写入到 DEBUG 级别的日志中 + if run, err := w.workflowRunRepo.GetById(ctx, w.runId); err == nil { + run.Logs = w.runLogs + w.workflowRunRepo.Save(ctx, run) + } } if procErr != nil { break diff --git a/internal/workflow/dispatcher/singleton.go b/internal/workflow/dispatcher/singleton.go index 37c34f56..b5834c48 100644 --- a/internal/workflow/dispatcher/singleton.go +++ b/internal/workflow/dispatcher/singleton.go @@ -23,6 +23,7 @@ var ( ) func GetSingletonDispatcher(workflowRepo workflowRepository, workflowRunRepo workflowRunRepository) *WorkflowDispatcher { + // TODO: 待优化构造过程 intanceOnce.Do(func() { instance = newWorkflowDispatcher(workflowRepo, workflowRunRepo) }) diff --git a/internal/workflow/node-processor/processor.go b/internal/workflow/node-processor/processor.go index 52576510..08712280 100644 --- a/internal/workflow/node-processor/processor.go +++ b/internal/workflow/node-processor/processor.go @@ -55,6 +55,7 @@ func (l *nodeLogger) AppendLogRecord(ctx context.Context, level domain.WorkflowR } if len(err) > 0 { record.Error = err[0] + l.log.Error = err[0] } l.log.Records = append(l.log.Records, record) diff --git a/ui/src/components/workflow/WorkflowRunDetail.tsx b/ui/src/components/workflow/WorkflowRunDetail.tsx index 8c416bc2..22ad5b1f 100644 --- a/ui/src/components/workflow/WorkflowRunDetail.tsx +++ b/ui/src/components/workflow/WorkflowRunDetail.tsx @@ -32,25 +32,28 @@ const WorkflowRunDetail = ({ data, ...props }: WorkflowRunDetailProps) => { {t("workflow_run.props.status.failed")}} /> -
-
- {data.logs?.map((item, i) => { - return ( -
-
{item.nodeName}
-
- {item.records?.map((output, j) => { - return ( -
-
[{dayjs(output.time).format("YYYY-MM-DD HH:mm:ss")}]
- {output.error ?
{output.error}
:
{output.content}
} -
- ); - })} +
+ {t("workflow_run.logs")} +
+
+ {data.logs?.map((item, i) => { + return ( +
+
{item.nodeName}
+
+ {item.records?.map((output, j) => { + return ( +
+
[{dayjs(output.time).format("YYYY-MM-DD HH:mm:ss")}]
+ {output.error ?
{output.error}
:
{output.content}
} +
+ ); + })} +
-
- ); - })} + ); + })} +
diff --git a/ui/src/i18n/locales/en/nls.workflow.runs.json b/ui/src/i18n/locales/en/nls.workflow.runs.json index c0b2feb3..d26ddecd 100644 --- a/ui/src/i18n/locales/en/nls.workflow.runs.json +++ b/ui/src/i18n/locales/en/nls.workflow.runs.json @@ -18,6 +18,7 @@ "workflow_run.props.started_at": "Started at", "workflow_run.props.ended_at": "Ended at", + "workflow_run.logs": "Logs", "workflow_run.artifacts": "Artifacts", "workflow_run_artifact.props.type": "Type", diff --git a/ui/src/i18n/locales/zh/nls.workflow.runs.json b/ui/src/i18n/locales/zh/nls.workflow.runs.json index 8a7dcd18..4a57e4ca 100644 --- a/ui/src/i18n/locales/zh/nls.workflow.runs.json +++ b/ui/src/i18n/locales/zh/nls.workflow.runs.json @@ -18,6 +18,7 @@ "workflow_run.props.started_at": "开始时间", "workflow_run.props.ended_at": "完成时间", + "workflow_run.logs": "日志", "workflow_run.artifacts": "输出产物", "workflow_run_artifact.props.type": "类型",