From bee4ba10cbb15a7d09fd6c3cef56fde04fc644bc Mon Sep 17 00:00:00 2001 From: Fu Diwei Date: Wed, 22 Jan 2025 03:48:58 +0800 Subject: [PATCH] feat: generate run record at the beginning of the workflow execution --- internal/repository/workflow.go | 45 +++++++++++++++++++++++++-------- internal/workflow/service.go | 19 +++++++++----- 2 files changed, 46 insertions(+), 18 deletions(-) diff --git a/internal/repository/workflow.go b/internal/repository/workflow.go index ac9b3e9f..edf7cf7f 100644 --- a/internal/repository/workflow.go +++ b/internal/repository/workflow.go @@ -55,10 +55,10 @@ func (r *WorkflowRepository) GetById(ctx context.Context, id string) (*domain.Wo return r.castRecordToModel(record) } -func (r *WorkflowRepository) Save(ctx context.Context, workflow *domain.Workflow) error { +func (r *WorkflowRepository) Save(ctx context.Context, workflow *domain.Workflow) (*domain.Workflow, error) { collection, err := app.GetApp().FindCollectionByNameOrId(domain.CollectionNameWorkflow) if err != nil { - return err + return workflow, err } var record *core.Record @@ -68,9 +68,9 @@ func (r *WorkflowRepository) Save(ctx context.Context, workflow *domain.Workflow record, err = app.GetApp().FindRecordById(domain.CollectionNameWorkflow, workflow.Id) if err != nil { if errors.Is(err, sql.ErrNoRows) { - return domain.ErrRecordNotFound + return workflow, domain.ErrRecordNotFound } - return err + return workflow, err } } @@ -86,18 +86,36 @@ func (r *WorkflowRepository) Save(ctx context.Context, workflow *domain.Workflow record.Set("lastRunStatus", string(workflow.LastRunStatus)) record.Set("lastRunTime", workflow.LastRunTime) - return app.GetApp().Save(record) + if err := app.GetApp().Save(record); err != nil { + return workflow, err + } + + workflow.Id = record.Id + workflow.CreatedAt = record.GetDateTime("created").Time() + workflow.UpdatedAt = record.GetDateTime("updated").Time() + return workflow, nil } -func (r *WorkflowRepository) SaveRun(ctx context.Context, workflowRun *domain.WorkflowRun) error { +func (r *WorkflowRepository) SaveRun(ctx context.Context, workflowRun *domain.WorkflowRun) (*domain.WorkflowRun, error) { collection, err := app.GetApp().FindCollectionByNameOrId(domain.CollectionNameWorkflowRun) if err != nil { - return err + return workflowRun, err + } + + var workflowRunRecord *core.Record + if workflowRun.Id == "" { + workflowRunRecord = core.NewRecord(collection) + } else { + workflowRunRecord, err = app.GetApp().FindRecordById(domain.CollectionNameWorkflowRun, workflowRun.Id) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return workflowRun, err + } + workflowRunRecord = core.NewRecord(collection) + } } err = app.GetApp().RunInTransaction(func(txApp core.App) error { - workflowRunRecord := core.NewRecord(collection) - workflowRunRecord.Id = workflowRun.Id workflowRunRecord.Set("workflowId", workflowRun.WorkflowId) workflowRunRecord.Set("trigger", string(workflowRun.Trigger)) workflowRunRecord.Set("status", string(workflowRun.Status)) @@ -115,6 +133,7 @@ func (r *WorkflowRepository) SaveRun(ctx context.Context, workflowRun *domain.Wo return err } + workflowRecord.IgnoreUnchangedFields(true) workflowRecord.Set("lastRunId", workflowRunRecord.Id) workflowRecord.Set("lastRunStatus", workflowRunRecord.GetString("status")) workflowRecord.Set("lastRunTime", workflowRunRecord.GetString("startedAt")) @@ -123,13 +142,17 @@ func (r *WorkflowRepository) SaveRun(ctx context.Context, workflowRun *domain.Wo return err } + workflowRun.Id = workflowRunRecord.Id + workflowRun.CreatedAt = workflowRunRecord.GetDateTime("created").Time() + workflowRun.UpdatedAt = workflowRunRecord.GetDateTime("updated").Time() + return nil }) if err != nil { - return err + return workflowRun, err } - return nil + return workflowRun, nil } func (r *WorkflowRepository) castRecordToModel(record *core.Record) (*domain.Workflow, error) { diff --git a/internal/workflow/service.go b/internal/workflow/service.go index 5a34805e..25564ad8 100644 --- a/internal/workflow/service.go +++ b/internal/workflow/service.go @@ -23,8 +23,8 @@ type workflowRunData struct { type workflowRepository interface { ListEnabledAuto(ctx context.Context) ([]*domain.Workflow, error) GetById(ctx context.Context, id string) (*domain.Workflow, error) - Save(ctx context.Context, workflow *domain.Workflow) error - SaveRun(ctx context.Context, workflowRun *domain.WorkflowRun) error + Save(ctx context.Context, workflow *domain.Workflow) (*domain.Workflow, error) + SaveRun(ctx context.Context, workflowRun *domain.WorkflowRun) (*domain.WorkflowRun, error) } type WorkflowService struct { @@ -88,9 +88,10 @@ func (s *WorkflowService) Run(ctx context.Context, req *dtos.WorkflowRunReq) err workflow.LastRunTime = time.Now() workflow.LastRunStatus = domain.WorkflowRunStatusTypePending workflow.LastRunId = "" - - if err := s.repo.Save(ctx, workflow); err != nil { + if resp, err := s.repo.Save(ctx, workflow); err != nil { return err + } else { + workflow = resp } s.ch <- &workflowRunData{ @@ -122,13 +123,17 @@ func (s *WorkflowService) run(ctx context.Context) { func (s *WorkflowService) runWithData(ctx context.Context, runData *workflowRunData) error { workflow := runData.Workflow - run := &domain.WorkflowRun{ WorkflowId: workflow.Id, Status: domain.WorkflowRunStatusTypeRunning, Trigger: runData.RunTrigger, StartedAt: time.Now(), } + if resp, err := s.repo.SaveRun(ctx, run); err != nil { + return err + } else { + run = resp + } processor := processor.NewWorkflowProcessor(workflow) if runErr := processor.Run(ctx); runErr != nil { @@ -136,7 +141,7 @@ func (s *WorkflowService) runWithData(ctx context.Context, runData *workflowRunD run.EndedAt = time.Now() run.Logs = processor.GetRunLogs() run.Error = runErr.Error() - if err := s.repo.SaveRun(ctx, run); err != nil { + if _, err := s.repo.SaveRun(ctx, run); err != nil { app.GetLogger().Error("failed to save workflow run", "err", err) } @@ -151,7 +156,7 @@ func (s *WorkflowService) runWithData(ctx context.Context, runData *workflowRunD } else { run.Status = domain.WorkflowRunStatusTypeFailed } - if err := s.repo.SaveRun(ctx, run); err != nil { + if _, err := s.repo.SaveRun(ctx, run); err != nil { app.GetLogger().Error("failed to save workflow run", "err", err) return err }