feat: cancel workflow run

This commit is contained in:
Fu Diwei 2025-01-22 04:13:16 +08:00
parent bee4ba10cb
commit 0f945881a1
11 changed files with 130 additions and 48 deletions

View File

@ -2,7 +2,12 @@
import "github.com/usual2970/certimate/internal/domain" import "github.com/usual2970/certimate/internal/domain"
type WorkflowRunReq struct { type WorkflowStartRunReq struct {
WorkflowId string `json:"-"` WorkflowId string `json:"-"`
Trigger domain.WorkflowTriggerType `json:"trigger"` Trigger domain.WorkflowTriggerType `json:"trigger"`
} }
type WorkflowCancelRunReq struct {
WorkflowId string `json:"-"`
RunId string `json:"-"`
}

View File

@ -24,12 +24,12 @@ func NewCertificateHandler(router *router.RouterGroup[*core.RequestEvent], servi
} }
group := router.Group("/certificates") group := router.Group("/certificates")
group.POST("/{id}/archive", handler.run) group.POST("/{certificateId}/archive", handler.run)
} }
func (handler *CertificateHandler) run(e *core.RequestEvent) error { func (handler *CertificateHandler) run(e *core.RequestEvent) error {
req := &dtos.CertificateArchiveFileReq{} req := &dtos.CertificateArchiveFileReq{}
req.CertificateId = e.Request.PathValue("id") req.CertificateId = e.Request.PathValue("certificateId")
if err := e.BindBody(req); err != nil { if err := e.BindBody(req); err != nil {
return resp.Err(e, err) return resp.Err(e, err)
} }

View File

@ -11,7 +11,8 @@ import (
) )
type workflowService interface { type workflowService interface {
Run(ctx context.Context, req *dtos.WorkflowRunReq) error StartRun(ctx context.Context, req *dtos.WorkflowStartRunReq) error
CancelRun(ctx context.Context, req *dtos.WorkflowCancelRunReq) error
Stop(ctx context.Context) Stop(ctx context.Context)
} }
@ -25,17 +26,30 @@ func NewWorkflowHandler(router *router.RouterGroup[*core.RequestEvent], service
} }
group := router.Group("/workflows") group := router.Group("/workflows")
group.POST("/{id}/run", handler.run) group.POST("/{workflowId}/runs", handler.run)
group.POST("/{workflowId}/runs/{runId}/cancel", handler.cancel)
} }
func (handler *WorkflowHandler) run(e *core.RequestEvent) error { func (handler *WorkflowHandler) run(e *core.RequestEvent) error {
req := &dtos.WorkflowRunReq{} req := &dtos.WorkflowStartRunReq{}
req.WorkflowId = e.Request.PathValue("id") req.WorkflowId = e.Request.PathValue("workflowId")
if err := e.BindBody(req); err != nil { if err := e.BindBody(req); err != nil {
return resp.Err(e, err) return resp.Err(e, err)
} }
if err := handler.service.Run(e.Request.Context(), req); err != nil { if err := handler.service.StartRun(e.Request.Context(), req); err != nil {
return resp.Err(e, err)
}
return resp.Ok(e, nil)
}
func (handler *WorkflowHandler) cancel(e *core.RequestEvent) error {
req := &dtos.WorkflowCancelRunReq{}
req.WorkflowId = e.Request.PathValue("workflowId")
req.RunId = e.Request.PathValue("runId")
if err := handler.service.CancelRun(e.Request.Context(), req); err != nil {
return resp.Err(e, err) return resp.Err(e, err)
} }

View File

@ -65,7 +65,7 @@ func onWorkflowRecordCreateOrUpdate(ctx context.Context, record *core.Record) er
// 反之,重新添加定时任务 // 反之,重新添加定时任务
err := scheduler.Add(fmt.Sprintf("workflow#%s", workflowId), record.GetString("triggerCron"), func() { err := scheduler.Add(fmt.Sprintf("workflow#%s", workflowId), record.GetString("triggerCron"), func() {
NewWorkflowService(repository.NewWorkflowRepository()).Run(ctx, &dtos.WorkflowRunReq{ NewWorkflowService(repository.NewWorkflowRepository()).StartRun(ctx, &dtos.WorkflowStartRunReq{
WorkflowId: workflowId, WorkflowId: workflowId,
Trigger: domain.WorkflowTriggerTypeAuto, Trigger: domain.WorkflowTriggerTypeAuto,
}) })

View File

@ -60,7 +60,7 @@ func (s *WorkflowService) InitSchedule(ctx context.Context) error {
scheduler := app.GetScheduler() scheduler := app.GetScheduler()
for _, workflow := range workflows { for _, workflow := range workflows {
err := scheduler.Add(fmt.Sprintf("workflow#%s", workflow.Id), workflow.TriggerCron, func() { err := scheduler.Add(fmt.Sprintf("workflow#%s", workflow.Id), workflow.TriggerCron, func() {
s.Run(ctx, &dtos.WorkflowRunReq{ s.StartRun(ctx, &dtos.WorkflowStartRunReq{
WorkflowId: workflow.Id, WorkflowId: workflow.Id,
Trigger: domain.WorkflowTriggerTypeAuto, Trigger: domain.WorkflowTriggerTypeAuto,
}) })
@ -74,7 +74,7 @@ func (s *WorkflowService) InitSchedule(ctx context.Context) error {
return nil return nil
} }
func (s *WorkflowService) Run(ctx context.Context, req *dtos.WorkflowRunReq) error { func (s *WorkflowService) StartRun(ctx context.Context, req *dtos.WorkflowStartRunReq) error {
workflow, err := s.repo.GetById(ctx, req.WorkflowId) workflow, err := s.repo.GetById(ctx, req.WorkflowId)
if err != nil { if err != nil {
app.GetLogger().Error("failed to get workflow", "id", req.WorkflowId, "err", err) app.GetLogger().Error("failed to get workflow", "id", req.WorkflowId, "err", err)
@ -102,6 +102,12 @@ func (s *WorkflowService) Run(ctx context.Context, req *dtos.WorkflowRunReq) err
return nil return nil
} }
func (s *WorkflowService) CancelRun(ctx context.Context, req *dtos.WorkflowCancelRunReq) error {
// TODO: 取消运行,防止因为某些原因意外挂起(如进程被杀死)导致工作流一直处于 running 状态无法重新运行
return nil
}
func (s *WorkflowService) Stop(ctx context.Context) { func (s *WorkflowService) Stop(ctx context.Context) {
s.cancel() s.cancel()
s.wg.Wait() s.wg.Wait()

View File

@ -3,10 +3,10 @@ import { ClientResponseError } from "pocketbase";
import { type CertificateFormatType } from "@/domain/certificate"; import { type CertificateFormatType } from "@/domain/certificate";
import { getPocketBase } from "@/repository/_pocketbase"; import { getPocketBase } from "@/repository/_pocketbase";
export const archive = async (id: string, format?: CertificateFormatType) => { export const archive = async (certificateId: string, format?: CertificateFormatType) => {
const pb = getPocketBase(); const pb = getPocketBase();
const resp = await pb.send<BaseResponse<string>>(`/api/certificates/${encodeURIComponent(id)}/archive`, { const resp = await pb.send<BaseResponse<string>>(`/api/certificates/${encodeURIComponent(certificateId)}/archive`, {
method: "POST", method: "POST",
headers: { headers: {
"Content-Type": "application/json", "Content-Type": "application/json",

View File

@ -3,10 +3,10 @@ import { ClientResponseError } from "pocketbase";
import { WORKFLOW_TRIGGERS } from "@/domain/workflow"; import { WORKFLOW_TRIGGERS } from "@/domain/workflow";
import { getPocketBase } from "@/repository/_pocketbase"; import { getPocketBase } from "@/repository/_pocketbase";
export const run = async (id: string) => { export const startRun = async (workflowId: string) => {
const pb = getPocketBase(); const pb = getPocketBase();
const resp = await pb.send<BaseResponse>(`/api/workflows/${encodeURIComponent(id)}/run`, { const resp = await pb.send<BaseResponse>(`/api/workflows/${encodeURIComponent(workflowId)}/runs`, {
method: "POST", method: "POST",
headers: { headers: {
"Content-Type": "application/json", "Content-Type": "application/json",
@ -22,3 +22,20 @@ export const run = async (id: string) => {
return resp; return resp;
}; };
export const cancelRun = async (workflowId: string, runId: string) => {
const pb = getPocketBase();
const resp = await pb.send<BaseResponse>(`/api/workflows/${encodeURIComponent(workflowId)}/runs/${encodeURIComponent(runId)}/cancel`, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
});
if (resp.code != 0) {
throw new ClientResponseError({ status: resp.code, response: resp, data: {} });
}
return resp;
};

View File

@ -6,6 +6,7 @@ import {
CloseCircleOutlined as CloseCircleOutlinedIcon, CloseCircleOutlined as CloseCircleOutlinedIcon,
DeleteOutlined as DeleteOutlinedIcon, DeleteOutlined as DeleteOutlinedIcon,
PauseCircleOutlined as PauseCircleOutlinedIcon, PauseCircleOutlined as PauseCircleOutlinedIcon,
PauseOutlined as PauseOutlinedIcon,
SelectOutlined as SelectOutlinedIcon, SelectOutlined as SelectOutlinedIcon,
SyncOutlined as SyncOutlinedIcon, SyncOutlined as SyncOutlinedIcon,
} from "@ant-design/icons"; } from "@ant-design/icons";
@ -14,6 +15,7 @@ import { Button, Empty, Modal, Table, type TableProps, Tag, Tooltip, notificatio
import dayjs from "dayjs"; import dayjs from "dayjs";
import { ClientResponseError } from "pocketbase"; import { ClientResponseError } from "pocketbase";
import { cancelRun as cancelWorkflowRun } from "@/api/workflows";
import { WORKFLOW_TRIGGERS } from "@/domain/workflow"; import { WORKFLOW_TRIGGERS } from "@/domain/workflow";
import { WORKFLOW_RUN_STATUSES, type WorkflowRunModel } from "@/domain/workflowRun"; import { WORKFLOW_RUN_STATUSES, type WorkflowRunModel } from "@/domain/workflowRun";
import { list as listWorkflowRuns, remove as removeWorkflowRun } from "@/repository/workflowRun"; import { list as listWorkflowRuns, remove as removeWorkflowRun } from "@/repository/workflowRun";
@ -125,7 +127,14 @@ const WorkflowRuns = ({ className, style, workflowId }: WorkflowRunsProps) => {
align: "end", align: "end",
fixed: "right", fixed: "right",
width: 120, width: 120,
render: (_, record) => ( render: (_, record) => {
const allowCancel = record.status === WORKFLOW_RUN_STATUSES.PENDING || record.status === WORKFLOW_RUN_STATUSES.RUNNING;
const aloowDelete =
record.status === WORKFLOW_RUN_STATUSES.SUCCEEDED ||
record.status === WORKFLOW_RUN_STATUSES.FAILED ||
record.status === WORKFLOW_RUN_STATUSES.CANCELED;
return (
<Button.Group> <Button.Group>
<WorkflowRunDetailDrawer <WorkflowRunDetailDrawer
data={record} data={record}
@ -136,15 +145,23 @@ const WorkflowRuns = ({ className, style, workflowId }: WorkflowRunsProps) => {
} }
/> />
<Tooltip title={t("workflow_run.action.cancel")}>
<Button
color="default"
disabled={!allowCancel}
icon={<PauseOutlinedIcon />}
variant="text"
onClick={() => {
handleCancelClick(record);
}}
/>
</Tooltip>
<Tooltip title={t("workflow_run.action.delete")}> <Tooltip title={t("workflow_run.action.delete")}>
<Button <Button
color="danger" color="danger"
danger danger
disabled={ disabled={!aloowDelete}
record.status !== WORKFLOW_RUN_STATUSES.SUCCEEDED &&
record.status !== WORKFLOW_RUN_STATUSES.FAILED &&
record.status !== WORKFLOW_RUN_STATUSES.CANCELED
}
icon={<DeleteOutlinedIcon />} icon={<DeleteOutlinedIcon />}
variant="text" variant="text"
onClick={() => { onClick={() => {
@ -153,7 +170,8 @@ const WorkflowRuns = ({ className, style, workflowId }: WorkflowRunsProps) => {
/> />
</Tooltip> </Tooltip>
</Button.Group> </Button.Group>
), );
},
}, },
]; ];
const [tableData, setTableData] = useState<WorkflowRunModel[]>([]); const [tableData, setTableData] = useState<WorkflowRunModel[]>([]);
@ -193,6 +211,24 @@ const WorkflowRuns = ({ className, style, workflowId }: WorkflowRunsProps) => {
} }
); );
const handleCancelClick = (workflowRun: WorkflowRunModel) => {
modalApi.confirm({
title: t("workflow_run.action.cancel"),
content: t("workflow_run.action.cancel.confirm"),
onOk: async () => {
try {
const resp = await cancelWorkflowRun(workflowId, workflowRun.id);
if (resp) {
refreshData();
}
} catch (err) {
console.error(err);
notificationApi.error({ message: t("common.text.request_error"), description: getErrMsg(err) });
}
},
});
};
const handleDeleteClick = (workflowRun: WorkflowRunModel) => { const handleDeleteClick = (workflowRun: WorkflowRunModel) => {
modalApi.confirm({ modalApi.confirm({
title: t("workflow_run.action.delete"), title: t("workflow_run.action.delete"),

View File

@ -1,7 +1,9 @@
{ {
"workflow_run.action.view": "View detail", "workflow_run.action.view": "View detail",
"workflow_run.action.delete": "Delete history run", "workflow_run.action.cancel": "Cancel run",
"workflow_run.action.delete.confirm": "Are you sure to delete this history run?", "workflow_run.action.cancel.confirm": "Are you sure to cancel this run?",
"workflow_run.action.delete": "Delete run",
"workflow_run.action.delete.confirm": "Are you sure to delete this run?",
"workflow_run.props.id": "ID", "workflow_run.props.id": "ID",
"workflow_run.props.status": "Status", "workflow_run.props.status": "Status",

View File

@ -1,7 +1,9 @@
{ {
"workflow_run.action.edit": "查看详情", "workflow_run.action.view": "查看详情",
"workflow_run.action.delete": "删除执行历史", "workflow_run.action.cancel": "取消执行",
"workflow_run.action.delete.confirm": "确定要删除此执行历史吗?此操作仅清除日志,不会影响各节点的执行结果和签发的证书。", "workflow_run.action.cancel.confirm": "确定要取消此执行吗?请注意此操作仅中止流程,但不会回滚已执行的节点。",
"workflow_run.action.delete": "删除执行",
"workflow_run.action.delete.confirm": "确定要删除此执行吗?请注意此操作仅清除日志历史,但不会影响各节点的执行结果和签发的证书。",
"workflow_run.props.id": "ID", "workflow_run.props.id": "ID",
"workflow_run.props.status": "状态", "workflow_run.props.status": "状态",

View File

@ -16,7 +16,7 @@ import { createSchemaFieldRule } from "antd-zod";
import { isEqual } from "radash"; import { isEqual } from "radash";
import { z } from "zod"; import { z } from "zod";
import { run as runWorkflow } from "@/api/workflows"; import { startRun as startWorkflowRun } from "@/api/workflows";
import ModalForm from "@/components/ModalForm"; import ModalForm from "@/components/ModalForm";
import Show from "@/components/Show"; import Show from "@/components/Show";
import WorkflowElements from "@/components/workflow/WorkflowElements"; import WorkflowElements from "@/components/workflow/WorkflowElements";
@ -187,7 +187,7 @@ const WorkflowDetail = () => {
} }
}); });
await runWorkflow(workflowId!); await startWorkflowRun(workflowId!);
messageApi.info(t("workflow.detail.orchestration.action.run.prompt")); messageApi.info(t("workflow.detail.orchestration.action.run.prompt"));
} catch (err) { } catch (err) {