From 0f945881a172f0a57c775e82e3f1d5e04aef28d7 Mon Sep 17 00:00:00 2001 From: Fu Diwei Date: Wed, 22 Jan 2025 04:13:16 +0800 Subject: [PATCH] feat: cancel workflow run --- internal/domain/dtos/workflow.go | 7 +- internal/rest/handlers/certificate.go | 4 +- internal/rest/handlers/workflow.go | 24 +++-- internal/workflow/event.go | 2 +- internal/workflow/service.go | 10 ++- ui/src/api/certificates.ts | 4 +- ui/src/api/workflows.ts | 21 ++++- ui/src/components/workflow/WorkflowRuns.tsx | 88 +++++++++++++------ ui/src/i18n/locales/en/nls.workflow.runs.json | 6 +- ui/src/i18n/locales/zh/nls.workflow.runs.json | 8 +- ui/src/pages/workflows/WorkflowDetail.tsx | 4 +- 11 files changed, 130 insertions(+), 48 deletions(-) diff --git a/internal/domain/dtos/workflow.go b/internal/domain/dtos/workflow.go index 3a760971..9d1f5781 100644 --- a/internal/domain/dtos/workflow.go +++ b/internal/domain/dtos/workflow.go @@ -2,7 +2,12 @@ import "github.com/usual2970/certimate/internal/domain" -type WorkflowRunReq struct { +type WorkflowStartRunReq struct { WorkflowId string `json:"-"` Trigger domain.WorkflowTriggerType `json:"trigger"` } + +type WorkflowCancelRunReq struct { + WorkflowId string `json:"-"` + RunId string `json:"-"` +} diff --git a/internal/rest/handlers/certificate.go b/internal/rest/handlers/certificate.go index 26788de6..5f240bb0 100644 --- a/internal/rest/handlers/certificate.go +++ b/internal/rest/handlers/certificate.go @@ -24,12 +24,12 @@ func NewCertificateHandler(router *router.RouterGroup[*core.RequestEvent], servi } group := router.Group("/certificates") - group.POST("/{id}/archive", handler.run) + group.POST("/{certificateId}/archive", handler.run) } func (handler *CertificateHandler) run(e *core.RequestEvent) error { req := &dtos.CertificateArchiveFileReq{} - req.CertificateId = e.Request.PathValue("id") + req.CertificateId = e.Request.PathValue("certificateId") if err := e.BindBody(req); err != nil { return resp.Err(e, err) } diff --git a/internal/rest/handlers/workflow.go b/internal/rest/handlers/workflow.go index b1ba70e4..83b3302b 100644 --- a/internal/rest/handlers/workflow.go +++ b/internal/rest/handlers/workflow.go @@ -11,7 +11,8 @@ import ( ) type workflowService interface { - Run(ctx context.Context, req *dtos.WorkflowRunReq) error + StartRun(ctx context.Context, req *dtos.WorkflowStartRunReq) error + CancelRun(ctx context.Context, req *dtos.WorkflowCancelRunReq) error Stop(ctx context.Context) } @@ -25,17 +26,30 @@ func NewWorkflowHandler(router *router.RouterGroup[*core.RequestEvent], service } group := router.Group("/workflows") - group.POST("/{id}/run", handler.run) + group.POST("/{workflowId}/runs", handler.run) + group.POST("/{workflowId}/runs/{runId}/cancel", handler.cancel) } func (handler *WorkflowHandler) run(e *core.RequestEvent) error { - req := &dtos.WorkflowRunReq{} - req.WorkflowId = e.Request.PathValue("id") + req := &dtos.WorkflowStartRunReq{} + req.WorkflowId = e.Request.PathValue("workflowId") if err := e.BindBody(req); err != nil { return resp.Err(e, err) } - if err := handler.service.Run(e.Request.Context(), req); err != nil { + if err := handler.service.StartRun(e.Request.Context(), req); err != nil { + return resp.Err(e, err) + } + + return resp.Ok(e, nil) +} + +func (handler *WorkflowHandler) cancel(e *core.RequestEvent) error { + req := &dtos.WorkflowCancelRunReq{} + req.WorkflowId = e.Request.PathValue("workflowId") + req.RunId = e.Request.PathValue("runId") + + if err := handler.service.CancelRun(e.Request.Context(), req); err != nil { return resp.Err(e, err) } diff --git a/internal/workflow/event.go b/internal/workflow/event.go index 9bb3b310..f8117dbc 100644 --- a/internal/workflow/event.go +++ b/internal/workflow/event.go @@ -65,7 +65,7 @@ func onWorkflowRecordCreateOrUpdate(ctx context.Context, record *core.Record) er // 反之,重新添加定时任务 err := scheduler.Add(fmt.Sprintf("workflow#%s", workflowId), record.GetString("triggerCron"), func() { - NewWorkflowService(repository.NewWorkflowRepository()).Run(ctx, &dtos.WorkflowRunReq{ + NewWorkflowService(repository.NewWorkflowRepository()).StartRun(ctx, &dtos.WorkflowStartRunReq{ WorkflowId: workflowId, Trigger: domain.WorkflowTriggerTypeAuto, }) diff --git a/internal/workflow/service.go b/internal/workflow/service.go index 25564ad8..1f103d72 100644 --- a/internal/workflow/service.go +++ b/internal/workflow/service.go @@ -60,7 +60,7 @@ func (s *WorkflowService) InitSchedule(ctx context.Context) error { scheduler := app.GetScheduler() for _, workflow := range workflows { err := scheduler.Add(fmt.Sprintf("workflow#%s", workflow.Id), workflow.TriggerCron, func() { - s.Run(ctx, &dtos.WorkflowRunReq{ + s.StartRun(ctx, &dtos.WorkflowStartRunReq{ WorkflowId: workflow.Id, Trigger: domain.WorkflowTriggerTypeAuto, }) @@ -74,7 +74,7 @@ func (s *WorkflowService) InitSchedule(ctx context.Context) error { return nil } -func (s *WorkflowService) Run(ctx context.Context, req *dtos.WorkflowRunReq) error { +func (s *WorkflowService) StartRun(ctx context.Context, req *dtos.WorkflowStartRunReq) error { workflow, err := s.repo.GetById(ctx, req.WorkflowId) if err != nil { app.GetLogger().Error("failed to get workflow", "id", req.WorkflowId, "err", err) @@ -102,6 +102,12 @@ func (s *WorkflowService) Run(ctx context.Context, req *dtos.WorkflowRunReq) err return nil } +func (s *WorkflowService) CancelRun(ctx context.Context, req *dtos.WorkflowCancelRunReq) error { + // TODO: 取消运行,防止因为某些原因意外挂起(如进程被杀死)导致工作流一直处于 running 状态无法重新运行 + + return nil +} + func (s *WorkflowService) Stop(ctx context.Context) { s.cancel() s.wg.Wait() diff --git a/ui/src/api/certificates.ts b/ui/src/api/certificates.ts index 4b566ecf..71ef30aa 100644 --- a/ui/src/api/certificates.ts +++ b/ui/src/api/certificates.ts @@ -3,10 +3,10 @@ import { ClientResponseError } from "pocketbase"; import { type CertificateFormatType } from "@/domain/certificate"; import { getPocketBase } from "@/repository/_pocketbase"; -export const archive = async (id: string, format?: CertificateFormatType) => { +export const archive = async (certificateId: string, format?: CertificateFormatType) => { const pb = getPocketBase(); - const resp = await pb.send>(`/api/certificates/${encodeURIComponent(id)}/archive`, { + const resp = await pb.send>(`/api/certificates/${encodeURIComponent(certificateId)}/archive`, { method: "POST", headers: { "Content-Type": "application/json", diff --git a/ui/src/api/workflows.ts b/ui/src/api/workflows.ts index 4ad9e2d3..6a9d69f6 100644 --- a/ui/src/api/workflows.ts +++ b/ui/src/api/workflows.ts @@ -3,10 +3,10 @@ import { ClientResponseError } from "pocketbase"; import { WORKFLOW_TRIGGERS } from "@/domain/workflow"; import { getPocketBase } from "@/repository/_pocketbase"; -export const run = async (id: string) => { +export const startRun = async (workflowId: string) => { const pb = getPocketBase(); - const resp = await pb.send(`/api/workflows/${encodeURIComponent(id)}/run`, { + const resp = await pb.send(`/api/workflows/${encodeURIComponent(workflowId)}/runs`, { method: "POST", headers: { "Content-Type": "application/json", @@ -22,3 +22,20 @@ export const run = async (id: string) => { return resp; }; + +export const cancelRun = async (workflowId: string, runId: string) => { + const pb = getPocketBase(); + + const resp = await pb.send(`/api/workflows/${encodeURIComponent(workflowId)}/runs/${encodeURIComponent(runId)}/cancel`, { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + }); + + if (resp.code != 0) { + throw new ClientResponseError({ status: resp.code, response: resp, data: {} }); + } + + return resp; +}; diff --git a/ui/src/components/workflow/WorkflowRuns.tsx b/ui/src/components/workflow/WorkflowRuns.tsx index 788bf1e0..c90ab5a3 100644 --- a/ui/src/components/workflow/WorkflowRuns.tsx +++ b/ui/src/components/workflow/WorkflowRuns.tsx @@ -6,6 +6,7 @@ import { CloseCircleOutlined as CloseCircleOutlinedIcon, DeleteOutlined as DeleteOutlinedIcon, PauseCircleOutlined as PauseCircleOutlinedIcon, + PauseOutlined as PauseOutlinedIcon, SelectOutlined as SelectOutlinedIcon, SyncOutlined as SyncOutlinedIcon, } from "@ant-design/icons"; @@ -14,6 +15,7 @@ import { Button, Empty, Modal, Table, type TableProps, Tag, Tooltip, notificatio import dayjs from "dayjs"; import { ClientResponseError } from "pocketbase"; +import { cancelRun as cancelWorkflowRun } from "@/api/workflows"; import { WORKFLOW_TRIGGERS } from "@/domain/workflow"; import { WORKFLOW_RUN_STATUSES, type WorkflowRunModel } from "@/domain/workflowRun"; import { list as listWorkflowRuns, remove as removeWorkflowRun } from "@/repository/workflowRun"; @@ -125,35 +127,51 @@ const WorkflowRuns = ({ className, style, workflowId }: WorkflowRunsProps) => { align: "end", fixed: "right", width: 120, - render: (_, record) => ( - - -