feat: generate run record at the beginning of the workflow execution

This commit is contained in:
Fu Diwei 2025-01-22 03:48:58 +08:00
parent 7e0f575e0a
commit bee4ba10cb
2 changed files with 46 additions and 18 deletions

View File

@ -55,10 +55,10 @@ func (r *WorkflowRepository) GetById(ctx context.Context, id string) (*domain.Wo
return r.castRecordToModel(record) return r.castRecordToModel(record)
} }
func (r *WorkflowRepository) Save(ctx context.Context, workflow *domain.Workflow) error { func (r *WorkflowRepository) Save(ctx context.Context, workflow *domain.Workflow) (*domain.Workflow, error) {
collection, err := app.GetApp().FindCollectionByNameOrId(domain.CollectionNameWorkflow) collection, err := app.GetApp().FindCollectionByNameOrId(domain.CollectionNameWorkflow)
if err != nil { if err != nil {
return err return workflow, err
} }
var record *core.Record var record *core.Record
@ -68,9 +68,9 @@ func (r *WorkflowRepository) Save(ctx context.Context, workflow *domain.Workflow
record, err = app.GetApp().FindRecordById(domain.CollectionNameWorkflow, workflow.Id) record, err = app.GetApp().FindRecordById(domain.CollectionNameWorkflow, workflow.Id)
if err != nil { if err != nil {
if errors.Is(err, sql.ErrNoRows) { if errors.Is(err, sql.ErrNoRows) {
return domain.ErrRecordNotFound return workflow, domain.ErrRecordNotFound
} }
return err return workflow, err
} }
} }
@ -86,18 +86,36 @@ func (r *WorkflowRepository) Save(ctx context.Context, workflow *domain.Workflow
record.Set("lastRunStatus", string(workflow.LastRunStatus)) record.Set("lastRunStatus", string(workflow.LastRunStatus))
record.Set("lastRunTime", workflow.LastRunTime) record.Set("lastRunTime", workflow.LastRunTime)
return app.GetApp().Save(record) if err := app.GetApp().Save(record); err != nil {
return workflow, err
} }
func (r *WorkflowRepository) SaveRun(ctx context.Context, workflowRun *domain.WorkflowRun) error { workflow.Id = record.Id
workflow.CreatedAt = record.GetDateTime("created").Time()
workflow.UpdatedAt = record.GetDateTime("updated").Time()
return workflow, nil
}
func (r *WorkflowRepository) SaveRun(ctx context.Context, workflowRun *domain.WorkflowRun) (*domain.WorkflowRun, error) {
collection, err := app.GetApp().FindCollectionByNameOrId(domain.CollectionNameWorkflowRun) collection, err := app.GetApp().FindCollectionByNameOrId(domain.CollectionNameWorkflowRun)
if err != nil { if err != nil {
return err return workflowRun, err
}
var workflowRunRecord *core.Record
if workflowRun.Id == "" {
workflowRunRecord = core.NewRecord(collection)
} else {
workflowRunRecord, err = app.GetApp().FindRecordById(domain.CollectionNameWorkflowRun, workflowRun.Id)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return workflowRun, err
}
workflowRunRecord = core.NewRecord(collection)
}
} }
err = app.GetApp().RunInTransaction(func(txApp core.App) error { err = app.GetApp().RunInTransaction(func(txApp core.App) error {
workflowRunRecord := core.NewRecord(collection)
workflowRunRecord.Id = workflowRun.Id
workflowRunRecord.Set("workflowId", workflowRun.WorkflowId) workflowRunRecord.Set("workflowId", workflowRun.WorkflowId)
workflowRunRecord.Set("trigger", string(workflowRun.Trigger)) workflowRunRecord.Set("trigger", string(workflowRun.Trigger))
workflowRunRecord.Set("status", string(workflowRun.Status)) workflowRunRecord.Set("status", string(workflowRun.Status))
@ -115,6 +133,7 @@ func (r *WorkflowRepository) SaveRun(ctx context.Context, workflowRun *domain.Wo
return err return err
} }
workflowRecord.IgnoreUnchangedFields(true)
workflowRecord.Set("lastRunId", workflowRunRecord.Id) workflowRecord.Set("lastRunId", workflowRunRecord.Id)
workflowRecord.Set("lastRunStatus", workflowRunRecord.GetString("status")) workflowRecord.Set("lastRunStatus", workflowRunRecord.GetString("status"))
workflowRecord.Set("lastRunTime", workflowRunRecord.GetString("startedAt")) workflowRecord.Set("lastRunTime", workflowRunRecord.GetString("startedAt"))
@ -123,13 +142,17 @@ func (r *WorkflowRepository) SaveRun(ctx context.Context, workflowRun *domain.Wo
return err return err
} }
workflowRun.Id = workflowRunRecord.Id
workflowRun.CreatedAt = workflowRunRecord.GetDateTime("created").Time()
workflowRun.UpdatedAt = workflowRunRecord.GetDateTime("updated").Time()
return nil return nil
}) })
if err != nil { if err != nil {
return err return workflowRun, err
} }
return nil return workflowRun, nil
} }
func (r *WorkflowRepository) castRecordToModel(record *core.Record) (*domain.Workflow, error) { func (r *WorkflowRepository) castRecordToModel(record *core.Record) (*domain.Workflow, error) {

View File

@ -23,8 +23,8 @@ type workflowRunData struct {
type workflowRepository interface { type workflowRepository interface {
ListEnabledAuto(ctx context.Context) ([]*domain.Workflow, error) ListEnabledAuto(ctx context.Context) ([]*domain.Workflow, error)
GetById(ctx context.Context, id string) (*domain.Workflow, error) GetById(ctx context.Context, id string) (*domain.Workflow, error)
Save(ctx context.Context, workflow *domain.Workflow) error Save(ctx context.Context, workflow *domain.Workflow) (*domain.Workflow, error)
SaveRun(ctx context.Context, workflowRun *domain.WorkflowRun) error SaveRun(ctx context.Context, workflowRun *domain.WorkflowRun) (*domain.WorkflowRun, error)
} }
type WorkflowService struct { type WorkflowService struct {
@ -88,9 +88,10 @@ func (s *WorkflowService) Run(ctx context.Context, req *dtos.WorkflowRunReq) err
workflow.LastRunTime = time.Now() workflow.LastRunTime = time.Now()
workflow.LastRunStatus = domain.WorkflowRunStatusTypePending workflow.LastRunStatus = domain.WorkflowRunStatusTypePending
workflow.LastRunId = "" workflow.LastRunId = ""
if resp, err := s.repo.Save(ctx, workflow); err != nil {
if err := s.repo.Save(ctx, workflow); err != nil {
return err return err
} else {
workflow = resp
} }
s.ch <- &workflowRunData{ s.ch <- &workflowRunData{
@ -122,13 +123,17 @@ func (s *WorkflowService) run(ctx context.Context) {
func (s *WorkflowService) runWithData(ctx context.Context, runData *workflowRunData) error { func (s *WorkflowService) runWithData(ctx context.Context, runData *workflowRunData) error {
workflow := runData.Workflow workflow := runData.Workflow
run := &domain.WorkflowRun{ run := &domain.WorkflowRun{
WorkflowId: workflow.Id, WorkflowId: workflow.Id,
Status: domain.WorkflowRunStatusTypeRunning, Status: domain.WorkflowRunStatusTypeRunning,
Trigger: runData.RunTrigger, Trigger: runData.RunTrigger,
StartedAt: time.Now(), StartedAt: time.Now(),
} }
if resp, err := s.repo.SaveRun(ctx, run); err != nil {
return err
} else {
run = resp
}
processor := processor.NewWorkflowProcessor(workflow) processor := processor.NewWorkflowProcessor(workflow)
if runErr := processor.Run(ctx); runErr != nil { if runErr := processor.Run(ctx); runErr != nil {
@ -136,7 +141,7 @@ func (s *WorkflowService) runWithData(ctx context.Context, runData *workflowRunD
run.EndedAt = time.Now() run.EndedAt = time.Now()
run.Logs = processor.GetRunLogs() run.Logs = processor.GetRunLogs()
run.Error = runErr.Error() run.Error = runErr.Error()
if err := s.repo.SaveRun(ctx, run); err != nil { if _, err := s.repo.SaveRun(ctx, run); err != nil {
app.GetLogger().Error("failed to save workflow run", "err", err) app.GetLogger().Error("failed to save workflow run", "err", err)
} }
@ -151,7 +156,7 @@ func (s *WorkflowService) runWithData(ctx context.Context, runData *workflowRunD
} else { } else {
run.Status = domain.WorkflowRunStatusTypeFailed run.Status = domain.WorkflowRunStatusTypeFailed
} }
if err := s.repo.SaveRun(ctx, run); err != nil { if _, err := s.repo.SaveRun(ctx, run); err != nil {
app.GetLogger().Error("failed to save workflow run", "err", err) app.GetLogger().Error("failed to save workflow run", "err", err)
return err return err
} }