refactor: new workflow run logs

This commit is contained in:
Fu Diwei 2025-02-10 13:04:31 +08:00
parent 75c89b3d0b
commit 4f5c1dc6d7
12 changed files with 71 additions and 63 deletions

View File

@ -31,17 +31,26 @@ const (
type WorkflowRunLog struct { type WorkflowRunLog struct {
NodeId string `json:"nodeId"` NodeId string `json:"nodeId"`
NodeName string `json:"nodeName"` NodeName string `json:"nodeName"`
Records []WorkflowRunLogRecord `json:"records"`
Error string `json:"error"` Error string `json:"error"`
Outputs []WorkflowRunLogOutput `json:"outputs"`
} }
type WorkflowRunLogOutput struct { type WorkflowRunLogRecord struct {
Time string `json:"time"` Time string `json:"time"`
Title string `json:"title"` Level WorkflowRunLogLevel `json:"level"`
Content string `json:"content"` Content string `json:"content"`
Error string `json:"error"` Error string `json:"error"`
} }
type WorkflowRunLogLevel string
const (
WorkflowRunLogLevelDebug WorkflowRunLogLevel = "DEBUG"
WorkflowRunLogLevelInfo WorkflowRunLogLevel = "INFO"
WorkflowRunLogLevelWarn WorkflowRunLogLevel = "WARN"
WorkflowRunLogLevelError WorkflowRunLogLevel = "ERROR"
)
type WorkflowRunLogs []WorkflowRunLog type WorkflowRunLogs []WorkflowRunLog
func (r WorkflowRunLogs) ErrorString() string { func (r WorkflowRunLogs) ErrorString() string {

View File

@ -24,7 +24,7 @@ type applyNode struct {
func NewApplyNode(node *domain.WorkflowNode) *applyNode { func NewApplyNode(node *domain.WorkflowNode) *applyNode {
return &applyNode{ return &applyNode{
node: node, node: node,
nodeLogger: NewNodeLogger(node), nodeLogger: newNodeLogger(node),
certRepo: repository.NewCertificateRepository(), certRepo: repository.NewCertificateRepository(),
outputRepo: repository.NewWorkflowOutputRepository(), outputRepo: repository.NewWorkflowOutputRepository(),
@ -32,40 +32,40 @@ func NewApplyNode(node *domain.WorkflowNode) *applyNode {
} }
func (n *applyNode) Process(ctx context.Context) error { func (n *applyNode) Process(ctx context.Context) error {
n.AddOutput(ctx, n.node.Name, "开始执行") n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, "进入申请证书节点")
// 查询上次执行结果 // 查询上次执行结果
lastOutput, err := n.outputRepo.GetByNodeId(ctx, n.node.Id) lastOutput, err := n.outputRepo.GetByNodeId(ctx, n.node.Id)
if err != nil && !domain.IsRecordNotFoundError(err) { if err != nil && !domain.IsRecordNotFoundError(err) {
n.AddOutput(ctx, n.node.Name, "查询申请记录失败", err.Error()) n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "查询申请记录失败", err.Error())
return err return err
} }
// 检测是否可以跳过本次执行 // 检测是否可以跳过本次执行
if skippable, skipReason := n.checkCanSkip(ctx, lastOutput); skippable { if skippable, skipReason := n.checkCanSkip(ctx, lastOutput); skippable {
n.AddOutput(ctx, n.node.Name, skipReason) n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, skipReason)
return nil return nil
} }
// 初始化申请器 // 初始化申请器
applicant, err := applicant.NewWithApplyNode(n.node) applicant, err := applicant.NewWithApplyNode(n.node)
if err != nil { if err != nil {
n.AddOutput(ctx, n.node.Name, "获取申请对象失败", err.Error()) n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "获取申请对象失败", err.Error())
return err return err
} }
// 申请证书 // 申请证书
applyResult, err := applicant.Apply() applyResult, err := applicant.Apply()
if err != nil { if err != nil {
n.AddOutput(ctx, n.node.Name, "申请失败", err.Error()) n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "申请失败", err.Error())
return err return err
} }
n.AddOutput(ctx, n.node.Name, "申请成功") n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, "申请成功")
// 解析证书并生成实体 // 解析证书并生成实体
certX509, err := certs.ParseCertificateFromPEM(applyResult.CertificateFullChain) certX509, err := certs.ParseCertificateFromPEM(applyResult.CertificateFullChain)
if err != nil { if err != nil {
n.AddOutput(ctx, n.node.Name, "解析证书失败", err.Error()) n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "解析证书失败", err.Error())
return err return err
} }
certificate := &domain.Certificate{ certificate := &domain.Certificate{
@ -89,10 +89,10 @@ func (n *applyNode) Process(ctx context.Context) error {
Outputs: n.node.Outputs, Outputs: n.node.Outputs,
} }
if _, err := n.outputRepo.SaveWithCertificate(ctx, output, certificate); err != nil { if _, err := n.outputRepo.SaveWithCertificate(ctx, output, certificate); err != nil {
n.AddOutput(ctx, n.node.Name, "保存申请记录失败", err.Error()) n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "保存申请记录失败", err.Error())
return err return err
} }
n.AddOutput(ctx, n.node.Name, "保存申请记录成功") n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, "保存申请记录成功")
return nil return nil
} }

View File

@ -14,13 +14,11 @@ type conditionNode struct {
func NewConditionNode(node *domain.WorkflowNode) *conditionNode { func NewConditionNode(node *domain.WorkflowNode) *conditionNode {
return &conditionNode{ return &conditionNode{
node: node, node: node,
nodeLogger: NewNodeLogger(node), nodeLogger: newNodeLogger(node),
} }
} }
func (n *conditionNode) Process(ctx context.Context) error { func (n *conditionNode) Process(ctx context.Context) error {
// 此类型节点不需要执行任何操作,直接返回 // 此类型节点不需要执行任何操作,直接返回
n.AddOutput(ctx, n.node.Name, "完成")
return nil return nil
} }

View File

@ -22,7 +22,7 @@ type deployNode struct {
func NewDeployNode(node *domain.WorkflowNode) *deployNode { func NewDeployNode(node *domain.WorkflowNode) *deployNode {
return &deployNode{ return &deployNode{
node: node, node: node,
nodeLogger: NewNodeLogger(node), nodeLogger: newNodeLogger(node),
certRepo: repository.NewCertificateRepository(), certRepo: repository.NewCertificateRepository(),
outputRepo: repository.NewWorkflowOutputRepository(), outputRepo: repository.NewWorkflowOutputRepository(),
@ -30,12 +30,12 @@ func NewDeployNode(node *domain.WorkflowNode) *deployNode {
} }
func (n *deployNode) Process(ctx context.Context) error { func (n *deployNode) Process(ctx context.Context) error {
n.AddOutput(ctx, n.node.Name, "开始执行") n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, "开始执行")
// 查询上次执行结果 // 查询上次执行结果
lastOutput, err := n.outputRepo.GetByNodeId(ctx, n.node.Id) lastOutput, err := n.outputRepo.GetByNodeId(ctx, n.node.Id)
if err != nil && !domain.IsRecordNotFoundError(err) { if err != nil && !domain.IsRecordNotFoundError(err) {
n.AddOutput(ctx, n.node.Name, "查询部署记录失败", err.Error()) n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "查询部署记录失败", err.Error())
return err return err
} }
@ -43,19 +43,19 @@ func (n *deployNode) Process(ctx context.Context) error {
previousNodeOutputCertificateSource := n.node.GetConfigForDeploy().Certificate previousNodeOutputCertificateSource := n.node.GetConfigForDeploy().Certificate
previousNodeOutputCertificateSourceSlice := strings.Split(previousNodeOutputCertificateSource, "#") previousNodeOutputCertificateSourceSlice := strings.Split(previousNodeOutputCertificateSource, "#")
if len(previousNodeOutputCertificateSourceSlice) != 2 { if len(previousNodeOutputCertificateSourceSlice) != 2 {
n.AddOutput(ctx, n.node.Name, "证书来源配置错误", previousNodeOutputCertificateSource) n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "证书来源配置错误", previousNodeOutputCertificateSource)
return fmt.Errorf("证书来源配置错误: %s", previousNodeOutputCertificateSource) return fmt.Errorf("证书来源配置错误: %s", previousNodeOutputCertificateSource)
} }
certificate, err := n.certRepo.GetByWorkflowNodeId(ctx, previousNodeOutputCertificateSourceSlice[0]) certificate, err := n.certRepo.GetByWorkflowNodeId(ctx, previousNodeOutputCertificateSourceSlice[0])
if err != nil { if err != nil {
n.AddOutput(ctx, n.node.Name, "获取证书失败", err.Error()) n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "获取证书失败", err.Error())
return err return err
} }
// 检测是否可以跳过本次执行 // 检测是否可以跳过本次执行
if lastOutput != nil && certificate.CreatedAt.Before(lastOutput.UpdatedAt) { if lastOutput != nil && certificate.CreatedAt.Before(lastOutput.UpdatedAt) {
if skippable, skipReason := n.checkCanSkip(ctx, lastOutput); skippable { if skippable, skipReason := n.checkCanSkip(ctx, lastOutput); skippable {
n.AddOutput(ctx, n.node.Name, skipReason) n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, skipReason)
return nil return nil
} }
} }
@ -66,16 +66,16 @@ func (n *deployNode) Process(ctx context.Context) error {
PrivateKey string PrivateKey string
}{Certificate: certificate.Certificate, PrivateKey: certificate.PrivateKey}) }{Certificate: certificate.Certificate, PrivateKey: certificate.PrivateKey})
if err != nil { if err != nil {
n.AddOutput(ctx, n.node.Name, "获取部署对象失败", err.Error()) n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "获取部署对象失败", err.Error())
return err return err
} }
// 部署证书 // 部署证书
if err := deployer.Deploy(ctx); err != nil { if err := deployer.Deploy(ctx); err != nil {
n.AddOutput(ctx, n.node.Name, "部署失败", err.Error()) n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "部署失败", err.Error())
return err return err
} }
n.AddOutput(ctx, n.node.Name, "部署成功") n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, "部署成功")
// 保存执行结果 // 保存执行结果
output := &domain.WorkflowOutput{ output := &domain.WorkflowOutput{
@ -86,10 +86,10 @@ func (n *deployNode) Process(ctx context.Context) error {
Succeeded: true, Succeeded: true,
} }
if _, err := n.outputRepo.Save(ctx, output); err != nil { if _, err := n.outputRepo.Save(ctx, output); err != nil {
n.AddOutput(ctx, n.node.Name, "保存部署记录失败", err.Error()) n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "保存部署记录失败", err.Error())
return err return err
} }
n.AddOutput(ctx, n.node.Name, "保存部署记录成功") n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, "保存部署记录成功")
return nil return nil
} }

View File

@ -14,13 +14,13 @@ type executeFailureNode struct {
func NewExecuteFailureNode(node *domain.WorkflowNode) *executeFailureNode { func NewExecuteFailureNode(node *domain.WorkflowNode) *executeFailureNode {
return &executeFailureNode{ return &executeFailureNode{
node: node, node: node,
nodeLogger: NewNodeLogger(node), nodeLogger: newNodeLogger(node),
} }
} }
func (n *executeFailureNode) Process(ctx context.Context) error { func (n *executeFailureNode) Process(ctx context.Context) error {
// 此类型节点不需要执行任何操作,直接返回 // 此类型节点不需要执行任何操作,直接返回
n.AddOutput(ctx, n.node.Name, "进入执行失败分支") n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, "进入执行失败分支")
return nil return nil
} }

View File

@ -14,13 +14,13 @@ type executeSuccessNode struct {
func NewExecuteSuccessNode(node *domain.WorkflowNode) *executeSuccessNode { func NewExecuteSuccessNode(node *domain.WorkflowNode) *executeSuccessNode {
return &executeSuccessNode{ return &executeSuccessNode{
node: node, node: node,
nodeLogger: NewNodeLogger(node), nodeLogger: newNodeLogger(node),
} }
} }
func (n *executeSuccessNode) Process(ctx context.Context) error { func (n *executeSuccessNode) Process(ctx context.Context) error {
// 此类型节点不需要执行任何操作,直接返回 // 此类型节点不需要执行任何操作,直接返回
n.AddOutput(ctx, n.node.Name, "进入执行成功分支") n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, "进入执行成功分支")
return nil return nil
} }

View File

@ -18,37 +18,37 @@ type notifyNode struct {
func NewNotifyNode(node *domain.WorkflowNode) *notifyNode { func NewNotifyNode(node *domain.WorkflowNode) *notifyNode {
return &notifyNode{ return &notifyNode{
node: node, node: node,
nodeLogger: NewNodeLogger(node), nodeLogger: newNodeLogger(node),
settingsRepo: repository.NewSettingsRepository(), settingsRepo: repository.NewSettingsRepository(),
} }
} }
func (n *notifyNode) Process(ctx context.Context) error { func (n *notifyNode) Process(ctx context.Context) error {
n.AddOutput(ctx, n.node.Name, "开始执行") n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, "进入推送通知节点")
nodeConfig := n.node.GetConfigForNotify() nodeConfig := n.node.GetConfigForNotify()
// 获取通知配置 // 获取通知配置
settings, err := n.settingsRepo.GetByName(ctx, "notifyChannels") settings, err := n.settingsRepo.GetByName(ctx, "notifyChannels")
if err != nil { if err != nil {
n.AddOutput(ctx, n.node.Name, "获取通知配置失败", err.Error()) n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "获取通知配置失败", err.Error())
return err return err
} }
// 获取通知渠道 // 获取通知渠道
channelConfig, err := settings.GetNotifyChannelConfig(nodeConfig.Channel) channelConfig, err := settings.GetNotifyChannelConfig(nodeConfig.Channel)
if err != nil { if err != nil {
n.AddOutput(ctx, n.node.Name, "获取通知渠道配置失败", err.Error()) n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "获取通知渠道配置失败", err.Error())
return err return err
} }
// 发送通知 // 发送通知
if err := notify.SendToChannel(nodeConfig.Subject, nodeConfig.Message, nodeConfig.Channel, channelConfig); err != nil { if err := notify.SendToChannel(nodeConfig.Subject, nodeConfig.Message, nodeConfig.Channel, channelConfig); err != nil {
n.AddOutput(ctx, n.node.Name, "发送通知失败", err.Error()) n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "发送通知失败", err.Error())
return err return err
} }
n.AddOutput(ctx, n.node.Name, "发送通知成功") n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, "发送通知成功")
return nil return nil
} }

View File

@ -10,8 +10,9 @@ import (
type NodeProcessor interface { type NodeProcessor interface {
Process(ctx context.Context) error Process(ctx context.Context) error
GetLog(ctx context.Context) *domain.WorkflowRunLog GetLog(ctx context.Context) *domain.WorkflowRunLog
AddOutput(ctx context.Context, title, content string, err ...string) AppendLogRecord(ctx context.Context, level domain.WorkflowRunLogLevel, content string, err ...string)
} }
type nodeLogger struct { type nodeLogger struct {
@ -32,12 +33,12 @@ type settingsRepository interface {
GetByName(ctx context.Context, name string) (*domain.Settings, error) GetByName(ctx context.Context, name string) (*domain.Settings, error)
} }
func NewNodeLogger(node *domain.WorkflowNode) *nodeLogger { func newNodeLogger(node *domain.WorkflowNode) *nodeLogger {
return &nodeLogger{ return &nodeLogger{
log: &domain.WorkflowRunLog{ log: &domain.WorkflowRunLog{
NodeId: node.Id, NodeId: node.Id,
NodeName: node.Name, NodeName: node.Name,
Outputs: make([]domain.WorkflowRunLogOutput, 0), Records: make([]domain.WorkflowRunLogRecord, 0),
}, },
} }
} }
@ -46,17 +47,17 @@ func (l *nodeLogger) GetLog(ctx context.Context) *domain.WorkflowRunLog {
return l.log return l.log
} }
func (l *nodeLogger) AddOutput(ctx context.Context, title, content string, err ...string) { func (l *nodeLogger) AppendLogRecord(ctx context.Context, level domain.WorkflowRunLogLevel, content string, err ...string) {
output := domain.WorkflowRunLogOutput{ record := domain.WorkflowRunLogRecord{
Time: time.Now().UTC().Format(time.RFC3339), Time: time.Now().UTC().Format(time.RFC3339),
Title: title, Level: level,
Content: content, Content: content,
} }
if len(err) > 0 { if len(err) > 0 {
output.Error = err[0] record.Error = err[0]
l.log.Error = err[0]
} }
l.log.Outputs = append(l.log.Outputs, output)
l.log.Records = append(l.log.Records, record)
} }
func GetProcessor(node *domain.WorkflowNode) (NodeProcessor, error) { func GetProcessor(node *domain.WorkflowNode) (NodeProcessor, error) {

View File

@ -14,13 +14,13 @@ type startNode struct {
func NewStartNode(node *domain.WorkflowNode) *startNode { func NewStartNode(node *domain.WorkflowNode) *startNode {
return &startNode{ return &startNode{
node: node, node: node,
nodeLogger: NewNodeLogger(node), nodeLogger: newNodeLogger(node),
} }
} }
func (n *startNode) Process(ctx context.Context) error { func (n *startNode) Process(ctx context.Context) error {
// 此类型节点不需要执行任何操作,直接返回 // 此类型节点不需要执行任何操作,直接返回
n.AddOutput(ctx, n.node.Name, "完成") n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, "进入开始节点")
return nil return nil
} }

View File

@ -22,7 +22,7 @@ type uploadNode struct {
func NewUploadNode(node *domain.WorkflowNode) *uploadNode { func NewUploadNode(node *domain.WorkflowNode) *uploadNode {
return &uploadNode{ return &uploadNode{
node: node, node: node,
nodeLogger: NewNodeLogger(node), nodeLogger: newNodeLogger(node),
certRepo: repository.NewCertificateRepository(), certRepo: repository.NewCertificateRepository(),
outputRepo: repository.NewWorkflowOutputRepository(), outputRepo: repository.NewWorkflowOutputRepository(),
@ -30,20 +30,20 @@ func NewUploadNode(node *domain.WorkflowNode) *uploadNode {
} }
func (n *uploadNode) Process(ctx context.Context) error { func (n *uploadNode) Process(ctx context.Context) error {
n.AddOutput(ctx, n.node.Name, "进入上传证书节点") n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, "进入上传证书节点")
nodeConfig := n.node.GetConfigForUpload() nodeConfig := n.node.GetConfigForUpload()
// 查询上次执行结果 // 查询上次执行结果
lastOutput, err := n.outputRepo.GetByNodeId(ctx, n.node.Id) lastOutput, err := n.outputRepo.GetByNodeId(ctx, n.node.Id)
if err != nil && !domain.IsRecordNotFoundError(err) { if err != nil && !domain.IsRecordNotFoundError(err) {
n.AddOutput(ctx, n.node.Name, "查询申请记录失败", err.Error()) n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "查询申请记录失败", err.Error())
return err return err
} }
// 检测是否可以跳过本次执行 // 检测是否可以跳过本次执行
if skippable, skipReason := n.checkCanSkip(ctx, lastOutput); skippable { if skippable, skipReason := n.checkCanSkip(ctx, lastOutput); skippable {
n.AddOutput(ctx, n.node.Name, skipReason) n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, skipReason)
return nil return nil
} }
@ -51,11 +51,11 @@ func (n *uploadNode) Process(ctx context.Context) error {
// 如果证书过期,则直接返回错误 // 如果证书过期,则直接返回错误
certX509, err := certs.ParseCertificateFromPEM(nodeConfig.Certificate) certX509, err := certs.ParseCertificateFromPEM(nodeConfig.Certificate)
if err != nil { if err != nil {
n.AddOutput(ctx, n.node.Name, "解析证书失败") n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "解析证书失败")
return err return err
} }
if time.Now().After(certX509.NotAfter) { if time.Now().After(certX509.NotAfter) {
n.AddOutput(ctx, n.node.Name, "证书已过期") n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelWarn, "证书已过期")
return errors.New("certificate is expired") return errors.New("certificate is expired")
} }
@ -75,10 +75,10 @@ func (n *uploadNode) Process(ctx context.Context) error {
Outputs: n.node.Outputs, Outputs: n.node.Outputs,
} }
if _, err := n.outputRepo.SaveWithCertificate(ctx, output, certificate); err != nil { if _, err := n.outputRepo.SaveWithCertificate(ctx, output, certificate); err != nil {
n.AddOutput(ctx, n.node.Name, "保存上传记录失败", err.Error()) n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "保存上传记录失败", err.Error())
return err return err
} }
n.AddOutput(ctx, n.node.Name, "保存上传记录成功") n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, "保存上传记录成功")
return nil return nil
} }

View File

@ -39,7 +39,7 @@ const WorkflowRunDetail = ({ data, ...props }: WorkflowRunDetailProps) => {
<div key={i} className="flex flex-col space-y-2"> <div key={i} className="flex flex-col space-y-2">
<div className="font-semibold">{item.nodeName}</div> <div className="font-semibold">{item.nodeName}</div>
<div className="flex flex-col space-y-1"> <div className="flex flex-col space-y-1">
{item.outputs?.map((output, j) => { {item.records?.map((output, j) => {
return ( return (
<div key={j} className="flex space-x-2 text-sm" style={{ wordBreak: "break-word" }}> <div key={j} className="flex space-x-2 text-sm" style={{ wordBreak: "break-word" }}>
<div className="whitespace-nowrap">[{dayjs(output.time).format("YYYY-MM-DD HH:mm:ss")}]</div> <div className="whitespace-nowrap">[{dayjs(output.time).format("YYYY-MM-DD HH:mm:ss")}]</div>

View File

@ -16,13 +16,13 @@ export interface WorkflowRunModel extends BaseModel {
export type WorkflowRunLog = { export type WorkflowRunLog = {
nodeId: string; nodeId: string;
nodeName: string; nodeName: string;
outputs?: WorkflowRunLogOutput[]; records?: WorkflowRunLogRecord[];
error?: string; error?: string;
}; };
export type WorkflowRunLogOutput = { export type WorkflowRunLogRecord = {
time: ISO8601String; time: ISO8601String;
title: string; level: string;
content: string; content: string;
error?: string; error?: string;
}; };