mirror of
https://github.com/woodchen-ink/certimate.git
synced 2025-07-18 09:21:56 +08:00
refactor: clean code
This commit is contained in:
parent
dea4106569
commit
8ecb71fb55
@ -17,21 +17,21 @@ const (
|
|||||||
defaultExpireMessage = "有 ${COUNT} 张证书即将过期,域名分别为 ${DOMAINS},请保持关注!"
|
defaultExpireMessage = "有 ${COUNT} 张证书即将过期,域名分别为 ${DOMAINS},请保持关注!"
|
||||||
)
|
)
|
||||||
|
|
||||||
type CertificateRepository interface {
|
type certificateRepository interface {
|
||||||
ListExpireSoon(ctx context.Context) ([]domain.Certificate, error)
|
ListExpireSoon(ctx context.Context) ([]*domain.Certificate, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type certificateService struct {
|
type CertificateService struct {
|
||||||
repo CertificateRepository
|
repo certificateRepository
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewCertificateService(repo CertificateRepository) *certificateService {
|
func NewCertificateService(repo certificateRepository) *CertificateService {
|
||||||
return &certificateService{
|
return &CertificateService{
|
||||||
repo: repo,
|
repo: repo,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *certificateService) InitSchedule(ctx context.Context) error {
|
func (s *CertificateService) InitSchedule(ctx context.Context) error {
|
||||||
scheduler := app.GetScheduler()
|
scheduler := app.GetScheduler()
|
||||||
err := scheduler.Add("certificate", "0 0 * * *", func() {
|
err := scheduler.Add("certificate", "0 0 * * *", func() {
|
||||||
certs, err := s.repo.ListExpireSoon(context.Background())
|
certs, err := s.repo.ListExpireSoon(context.Background())
|
||||||
@ -58,13 +58,11 @@ func (s *certificateService) InitSchedule(ctx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type certificateNotification struct {
|
func buildExpireSoonNotification(certificates []*domain.Certificate) *struct {
|
||||||
Subject string `json:"subject"`
|
Subject string
|
||||||
Message string `json:"message"`
|
Message string
|
||||||
}
|
} {
|
||||||
|
if len(certificates) == 0 {
|
||||||
func buildExpireSoonNotification(records []domain.Certificate) *certificateNotification {
|
|
||||||
if len(records) == 0 {
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -85,9 +83,9 @@ func buildExpireSoonNotification(records []domain.Certificate) *certificateNotif
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 替换变量
|
// 替换变量
|
||||||
count := len(records)
|
count := len(certificates)
|
||||||
domains := make([]string, count)
|
domains := make([]string, count)
|
||||||
for i, record := range records {
|
for i, record := range certificates {
|
||||||
domains[i] = record.SubjectAltNames
|
domains[i] = record.SubjectAltNames
|
||||||
}
|
}
|
||||||
countStr := strconv.Itoa(count)
|
countStr := strconv.Itoa(count)
|
||||||
@ -98,8 +96,8 @@ func buildExpireSoonNotification(records []domain.Certificate) *certificateNotif
|
|||||||
message = strings.ReplaceAll(message, "${DOMAINS}", domainStr)
|
message = strings.ReplaceAll(message, "${DOMAINS}", domainStr)
|
||||||
|
|
||||||
// 返回消息
|
// 返回消息
|
||||||
return &certificateNotification{
|
return &struct {
|
||||||
Subject: subject,
|
Subject string
|
||||||
Message: message,
|
Message string
|
||||||
}
|
}{Subject: subject, Message: message}
|
||||||
}
|
}
|
||||||
|
@ -7,11 +7,11 @@ import (
|
|||||||
|
|
||||||
type Access struct {
|
type Access struct {
|
||||||
Meta
|
Meta
|
||||||
Name string `json:"name" db:"name"`
|
Name string `json:"name" db:"name"`
|
||||||
Provider string `json:"provider" db:"provider"`
|
Provider string `json:"provider" db:"provider"`
|
||||||
Config string `json:"config" db:"config"`
|
Config string `json:"config" db:"config"`
|
||||||
Usage string `json:"usage" db:"usage"`
|
Usage string `json:"usage" db:"usage"`
|
||||||
DeletedAt time.Time `json:"deleted" db:"deleted"`
|
DeletedAt *time.Time `json:"deleted" db:"deleted"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Access) UnmarshalConfigToMap() (map[string]any, error) {
|
func (a *Access) UnmarshalConfigToMap() (map[string]any, error) {
|
||||||
|
@ -23,4 +23,5 @@ type Certificate struct {
|
|||||||
WorkflowId string `json:"workflowId" db:"workflowId"`
|
WorkflowId string `json:"workflowId" db:"workflowId"`
|
||||||
WorkflowNodeId string `json:"workflowNodeId" db:"workflowNodeId"`
|
WorkflowNodeId string `json:"workflowNodeId" db:"workflowNodeId"`
|
||||||
WorkflowOutputId string `json:"workflowOutputId" db:"workflowOutputId"`
|
WorkflowOutputId string `json:"workflowOutputId" db:"workflowOutputId"`
|
||||||
|
DeletedAt *time.Time `json:"deleted" db:"deleted"`
|
||||||
}
|
}
|
||||||
|
@ -1,33 +0,0 @@
|
|||||||
package domain
|
|
||||||
|
|
||||||
var (
|
|
||||||
ErrInvalidParams = NewXError(400, "invalid params")
|
|
||||||
ErrRecordNotFound = NewXError(404, "record not found")
|
|
||||||
)
|
|
||||||
|
|
||||||
func IsRecordNotFound(err error) bool {
|
|
||||||
if e, ok := err.(*XError); ok {
|
|
||||||
return e.GetCode() == ErrRecordNotFound.GetCode()
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
type XError struct {
|
|
||||||
Code int `json:"code"`
|
|
||||||
Msg string `json:"msg"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewXError(code int, msg string) *XError {
|
|
||||||
return &XError{code, msg}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *XError) Error() string {
|
|
||||||
return e.Msg
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *XError) GetCode() int {
|
|
||||||
if e.Code == 0 {
|
|
||||||
return 100
|
|
||||||
}
|
|
||||||
return e.Code
|
|
||||||
}
|
|
30
internal/domain/error.go
Normal file
30
internal/domain/error.go
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
package domain
|
||||||
|
|
||||||
|
var (
|
||||||
|
ErrInvalidParams = NewError(400, "invalid params")
|
||||||
|
ErrRecordNotFound = NewError(404, "record not found")
|
||||||
|
)
|
||||||
|
|
||||||
|
type Error struct {
|
||||||
|
Code int `json:"code"`
|
||||||
|
Msg string `json:"msg"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewError(code int, msg string) *Error {
|
||||||
|
if code == 0 {
|
||||||
|
code = -1
|
||||||
|
}
|
||||||
|
|
||||||
|
return &Error{code, msg}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *Error) Error() string {
|
||||||
|
return e.Msg
|
||||||
|
}
|
||||||
|
|
||||||
|
func IsRecordNotFoundError(err error) bool {
|
||||||
|
if e, ok := err.(*Error); ok {
|
||||||
|
return e.Code == ErrRecordNotFound.Code
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
@ -12,15 +12,15 @@ const (
|
|||||||
notifyTestBody = "欢迎使用 Certimate ,这是一条测试通知。"
|
notifyTestBody = "欢迎使用 Certimate ,这是一条测试通知。"
|
||||||
)
|
)
|
||||||
|
|
||||||
type SettingsRepository interface {
|
type settingsRepository interface {
|
||||||
GetByName(ctx context.Context, name string) (*domain.Settings, error)
|
GetByName(ctx context.Context, name string) (*domain.Settings, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type NotifyService struct {
|
type NotifyService struct {
|
||||||
settingRepo SettingsRepository
|
settingRepo settingsRepository
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewNotifyService(settingRepo SettingsRepository) *NotifyService {
|
func NewNotifyService(settingRepo settingsRepository) *NotifyService {
|
||||||
return &NotifyService{
|
return &NotifyService{
|
||||||
settingRepo: settingRepo,
|
settingRepo: settingRepo,
|
||||||
}
|
}
|
||||||
|
25
internal/pkg/utils/types/types.go
Normal file
25
internal/pkg/utils/types/types.go
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
package types
|
||||||
|
|
||||||
|
import "reflect"
|
||||||
|
|
||||||
|
// 判断对象是否为 nil。
|
||||||
|
//
|
||||||
|
// 入参:
|
||||||
|
// - value:待判断的对象。
|
||||||
|
//
|
||||||
|
// 出参:
|
||||||
|
// - 如果对象值为 nil,则返回 true;否则返回 false。
|
||||||
|
func IsNil(obj any) bool {
|
||||||
|
if obj == nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
v := reflect.ValueOf(obj)
|
||||||
|
if v.Kind() == reflect.Ptr {
|
||||||
|
return v.IsNil()
|
||||||
|
} else if v.Kind() == reflect.Interface {
|
||||||
|
return v.Elem().IsNil()
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
@ -4,9 +4,12 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/pocketbase/pocketbase/models"
|
||||||
"github.com/usual2970/certimate/internal/app"
|
"github.com/usual2970/certimate/internal/app"
|
||||||
"github.com/usual2970/certimate/internal/domain"
|
"github.com/usual2970/certimate/internal/domain"
|
||||||
|
"github.com/usual2970/certimate/internal/pkg/utils/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
type AccessRepository struct{}
|
type AccessRepository struct{}
|
||||||
@ -15,7 +18,7 @@ func NewAccessRepository() *AccessRepository {
|
|||||||
return &AccessRepository{}
|
return &AccessRepository{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *AccessRepository) GetById(ctx context.Context, id string) (*domain.Access, error) {
|
func (r *AccessRepository) GetById(ctx context.Context, id string) (*domain.Access, error) {
|
||||||
record, err := app.GetApp().Dao().FindRecordById("access", id)
|
record, err := app.GetApp().Dao().FindRecordById("access", id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, sql.ErrNoRows) {
|
if errors.Is(err, sql.ErrNoRows) {
|
||||||
@ -24,6 +27,18 @@ func (a *AccessRepository) GetById(ctx context.Context, id string) (*domain.Acce
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !types.IsNil(record.Get("deleted")) {
|
||||||
|
return nil, domain.ErrRecordNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
return r.castRecordToModel(record)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AccessRepository) castRecordToModel(record *models.Record) (*domain.Access, error) {
|
||||||
|
if record == nil {
|
||||||
|
return nil, fmt.Errorf("record is nil")
|
||||||
|
}
|
||||||
|
|
||||||
access := &domain.Access{
|
access := &domain.Access{
|
||||||
Meta: domain.Meta{
|
Meta: domain.Meta{
|
||||||
Id: record.GetId(),
|
Id: record.GetId(),
|
||||||
|
@ -22,7 +22,11 @@ var g singleflight.Group
|
|||||||
|
|
||||||
func (r *AcmeAccountRepository) GetByCAAndEmail(ca, email string) (*domain.AcmeAccount, error) {
|
func (r *AcmeAccountRepository) GetByCAAndEmail(ca, email string) (*domain.AcmeAccount, error) {
|
||||||
resp, err, _ := g.Do(fmt.Sprintf("acme_account_%s_%s", ca, email), func() (interface{}, error) {
|
resp, err, _ := g.Do(fmt.Sprintf("acme_account_%s_%s", ca, email), func() (interface{}, error) {
|
||||||
resp, err := app.GetApp().Dao().FindFirstRecordByFilter("acme_accounts", "ca={:ca} && email={:email}", dbx.Params{"ca": ca, "email": email})
|
resp, err := app.GetApp().Dao().FindFirstRecordByFilter(
|
||||||
|
"acme_accounts",
|
||||||
|
"ca={:ca} && email={:email}",
|
||||||
|
dbx.Params{"ca": ca, "email": email},
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -33,30 +37,15 @@ func (r *AcmeAccountRepository) GetByCAAndEmail(ca, email string) (*domain.AcmeA
|
|||||||
}
|
}
|
||||||
|
|
||||||
if resp == nil {
|
if resp == nil {
|
||||||
return nil, fmt.Errorf("acme account not found")
|
return nil, domain.ErrRecordNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
record, ok := resp.(*models.Record)
|
record, ok := resp.(*models.Record)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("acme account not found")
|
return nil, domain.ErrRecordNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
resource := ®istration.Resource{}
|
return r.castRecordToModel(record)
|
||||||
if err := record.UnmarshalJSONField("resource", resource); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return &domain.AcmeAccount{
|
|
||||||
Meta: domain.Meta{
|
|
||||||
Id: record.GetId(),
|
|
||||||
CreatedAt: record.GetCreated().Time(),
|
|
||||||
UpdatedAt: record.GetUpdated().Time(),
|
|
||||||
},
|
|
||||||
CA: record.GetString("ca"),
|
|
||||||
Email: record.GetString("email"),
|
|
||||||
Key: record.GetString("key"),
|
|
||||||
Resource: resource,
|
|
||||||
}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *AcmeAccountRepository) Save(ca, email, key string, resource *registration.Resource) error {
|
func (r *AcmeAccountRepository) Save(ca, email, key string, resource *registration.Resource) error {
|
||||||
@ -72,3 +61,27 @@ func (r *AcmeAccountRepository) Save(ca, email, key string, resource *registrati
|
|||||||
record.Set("resource", resource)
|
record.Set("resource", resource)
|
||||||
return app.GetApp().Dao().Save(record)
|
return app.GetApp().Dao().Save(record)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *AcmeAccountRepository) castRecordToModel(record *models.Record) (*domain.AcmeAccount, error) {
|
||||||
|
if record == nil {
|
||||||
|
return nil, fmt.Errorf("record is nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
resource := ®istration.Resource{}
|
||||||
|
if err := record.UnmarshalJSONField("resource", resource); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
acmeAccount := &domain.AcmeAccount{
|
||||||
|
Meta: domain.Meta{
|
||||||
|
Id: record.GetId(),
|
||||||
|
CreatedAt: record.GetCreated().Time(),
|
||||||
|
UpdatedAt: record.GetUpdated().Time(),
|
||||||
|
},
|
||||||
|
CA: record.GetString("ca"),
|
||||||
|
Email: record.GetString("email"),
|
||||||
|
Key: record.GetString("key"),
|
||||||
|
Resource: resource,
|
||||||
|
}
|
||||||
|
return acmeAccount, nil
|
||||||
|
}
|
||||||
|
@ -2,9 +2,15 @@ package repository
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/pocketbase/dbx"
|
||||||
|
"github.com/pocketbase/pocketbase/models"
|
||||||
"github.com/usual2970/certimate/internal/app"
|
"github.com/usual2970/certimate/internal/app"
|
||||||
"github.com/usual2970/certimate/internal/domain"
|
"github.com/usual2970/certimate/internal/domain"
|
||||||
|
"github.com/usual2970/certimate/internal/pkg/utils/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
type CertificateRepository struct{}
|
type CertificateRepository struct{}
|
||||||
@ -13,14 +19,89 @@ func NewCertificateRepository() *CertificateRepository {
|
|||||||
return &CertificateRepository{}
|
return &CertificateRepository{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *CertificateRepository) ListExpireSoon(ctx context.Context) ([]domain.Certificate, error) {
|
func (r *CertificateRepository) ListExpireSoon(ctx context.Context) ([]*domain.Certificate, error) {
|
||||||
certificates := []domain.Certificate{}
|
records, err := app.GetApp().Dao().FindRecordsByFilter(
|
||||||
err := app.GetApp().Dao().DB().
|
"certificate",
|
||||||
NewQuery("SELECT * FROM certificate WHERE expireAt > DATETIME('now') AND expireAt < DATETIME('now', '+20 days')").
|
"expireAt>DATETIME('now') && expireAt<DATETIME('now', '+20 days') && deleted=null",
|
||||||
All(&certificates)
|
"-created",
|
||||||
|
0, 0,
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
certificates := make([]*domain.Certificate, 0)
|
||||||
|
for _, record := range records {
|
||||||
|
certificate, err := r.castRecordToModel(record)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
certificates = append(certificates, certificate)
|
||||||
|
}
|
||||||
|
|
||||||
return certificates, nil
|
return certificates, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *CertificateRepository) GetById(ctx context.Context, id string) (*domain.Certificate, error) {
|
||||||
|
record, err := app.GetApp().Dao().FindRecordById("certificate", id)
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, sql.ErrNoRows) {
|
||||||
|
return nil, domain.ErrRecordNotFound
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if !types.IsNil(record.Get("deleted")) {
|
||||||
|
return nil, domain.ErrRecordNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
return r.castRecordToModel(record)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *CertificateRepository) GetByWorkflowNodeId(ctx context.Context, workflowNodeId string) (*domain.Certificate, error) {
|
||||||
|
records, err := app.GetApp().Dao().FindRecordsByFilter(
|
||||||
|
"certificate",
|
||||||
|
"workflowNodeId={:workflowNodeId} && deleted=null",
|
||||||
|
"-created", 1, 0,
|
||||||
|
dbx.Params{"workflowNodeId": workflowNodeId},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, sql.ErrNoRows) {
|
||||||
|
return nil, domain.ErrRecordNotFound
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if len(records) == 0 {
|
||||||
|
return nil, domain.ErrRecordNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
return r.castRecordToModel(records[0])
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *CertificateRepository) castRecordToModel(record *models.Record) (*domain.Certificate, error) {
|
||||||
|
if record == nil {
|
||||||
|
return nil, fmt.Errorf("record is nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
certificate := &domain.Certificate{
|
||||||
|
Meta: domain.Meta{
|
||||||
|
Id: record.GetId(),
|
||||||
|
CreatedAt: record.GetCreated().Time(),
|
||||||
|
UpdatedAt: record.GetUpdated().Time(),
|
||||||
|
},
|
||||||
|
Source: domain.CertificateSourceType(record.GetString("source")),
|
||||||
|
SubjectAltNames: record.GetString("subjectAltNames"),
|
||||||
|
Certificate: record.GetString("certificate"),
|
||||||
|
PrivateKey: record.GetString("privateKey"),
|
||||||
|
IssuerCertificate: record.GetString("issuerCertificate"),
|
||||||
|
EffectAt: record.GetDateTime("effectAt").Time(),
|
||||||
|
ExpireAt: record.GetDateTime("expireAt").Time(),
|
||||||
|
ACMECertUrl: record.GetString("acmeCertUrl"),
|
||||||
|
ACMECertStableUrl: record.GetString("acmeCertStableUrl"),
|
||||||
|
WorkflowId: record.GetString("workflowId"),
|
||||||
|
WorkflowNodeId: record.GetString("workflowNodeId"),
|
||||||
|
WorkflowOutputId: record.GetString("workflowOutputId"),
|
||||||
|
}
|
||||||
|
return certificate, nil
|
||||||
|
}
|
||||||
|
@ -2,6 +2,8 @@ package repository
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"errors"
|
||||||
|
|
||||||
"github.com/pocketbase/dbx"
|
"github.com/pocketbase/dbx"
|
||||||
"github.com/usual2970/certimate/internal/app"
|
"github.com/usual2970/certimate/internal/app"
|
||||||
@ -14,9 +16,16 @@ func NewSettingsRepository() *SettingsRepository {
|
|||||||
return &SettingsRepository{}
|
return &SettingsRepository{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SettingsRepository) GetByName(ctx context.Context, name string) (*domain.Settings, error) {
|
func (r *SettingsRepository) GetByName(ctx context.Context, name string) (*domain.Settings, error) {
|
||||||
record, err := app.GetApp().Dao().FindFirstRecordByFilter("settings", "name={:name}", dbx.Params{"name": name})
|
record, err := app.GetApp().Dao().FindFirstRecordByFilter(
|
||||||
|
"settings",
|
||||||
|
"name={:name}",
|
||||||
|
dbx.Params{"name": name},
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if errors.Is(err, sql.ErrNoRows) {
|
||||||
|
return nil, domain.ErrRecordNotFound
|
||||||
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
"github.com/pocketbase/dbx"
|
"github.com/pocketbase/dbx"
|
||||||
"github.com/pocketbase/pocketbase/daos"
|
"github.com/pocketbase/pocketbase/daos"
|
||||||
@ -18,29 +19,44 @@ func NewWorkflowRepository() *WorkflowRepository {
|
|||||||
return &WorkflowRepository{}
|
return &WorkflowRepository{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WorkflowRepository) ListEnabledAuto(ctx context.Context) ([]domain.Workflow, error) {
|
func (r *WorkflowRepository) ListEnabledAuto(ctx context.Context) ([]*domain.Workflow, error) {
|
||||||
records, err := app.GetApp().Dao().FindRecordsByFilter(
|
records, err := app.GetApp().Dao().FindRecordsByFilter(
|
||||||
"workflow",
|
"workflow",
|
||||||
"enabled={:enabled} && trigger={:trigger}",
|
"enabled={:enabled} && trigger={:trigger}",
|
||||||
"-created", 1000, 0, dbx.Params{"enabled": true, "trigger": domain.WorkflowTriggerTypeAuto},
|
"-created",
|
||||||
|
0, 0,
|
||||||
|
dbx.Params{"enabled": true, "trigger": domain.WorkflowTriggerTypeAuto},
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
rs := make([]domain.Workflow, 0)
|
workflows := make([]*domain.Workflow, 0)
|
||||||
for _, record := range records {
|
for _, record := range records {
|
||||||
workflow, err := record2Workflow(record)
|
workflow, err := r.castRecordToModel(record)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
rs = append(rs, *workflow)
|
|
||||||
|
workflows = append(workflows, workflow)
|
||||||
}
|
}
|
||||||
|
|
||||||
return rs, nil
|
return workflows, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WorkflowRepository) Save(ctx context.Context, workflow *domain.Workflow) error {
|
func (r *WorkflowRepository) GetById(ctx context.Context, id string) (*domain.Workflow, error) {
|
||||||
|
record, err := app.GetApp().Dao().FindRecordById("workflow", id)
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, sql.ErrNoRows) {
|
||||||
|
return nil, domain.ErrRecordNotFound
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return r.castRecordToModel(record)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *WorkflowRepository) Save(ctx context.Context, workflow *domain.Workflow) error {
|
||||||
collection, err := app.GetApp().Dao().FindCollectionByNameOrId(workflow.Table())
|
collection, err := app.GetApp().Dao().FindCollectionByNameOrId(workflow.Table())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -73,7 +89,7 @@ func (w *WorkflowRepository) Save(ctx context.Context, workflow *domain.Workflow
|
|||||||
return app.GetApp().Dao().SaveRecord(record)
|
return app.GetApp().Dao().SaveRecord(record)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WorkflowRepository) SaveRun(ctx context.Context, run *domain.WorkflowRun) error {
|
func (r *WorkflowRepository) SaveRun(ctx context.Context, workflowRun *domain.WorkflowRun) error {
|
||||||
collection, err := app.GetApp().Dao().FindCollectionByNameOrId("workflow_run")
|
collection, err := app.GetApp().Dao().FindCollectionByNameOrId("workflow_run")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -81,20 +97,20 @@ func (w *WorkflowRepository) SaveRun(ctx context.Context, run *domain.WorkflowRu
|
|||||||
|
|
||||||
err = app.GetApp().Dao().RunInTransaction(func(txDao *daos.Dao) error {
|
err = app.GetApp().Dao().RunInTransaction(func(txDao *daos.Dao) error {
|
||||||
record := models.NewRecord(collection)
|
record := models.NewRecord(collection)
|
||||||
record.Set("workflowId", run.WorkflowId)
|
record.Set("workflowId", workflowRun.WorkflowId)
|
||||||
record.Set("trigger", string(run.Trigger))
|
record.Set("trigger", string(workflowRun.Trigger))
|
||||||
record.Set("status", string(run.Status))
|
record.Set("status", string(workflowRun.Status))
|
||||||
record.Set("startedAt", run.StartedAt)
|
record.Set("startedAt", workflowRun.StartedAt)
|
||||||
record.Set("endedAt", run.EndedAt)
|
record.Set("endedAt", workflowRun.EndedAt)
|
||||||
record.Set("logs", run.Logs)
|
record.Set("logs", workflowRun.Logs)
|
||||||
record.Set("error", run.Error)
|
record.Set("error", workflowRun.Error)
|
||||||
err = txDao.SaveRecord(record)
|
err = txDao.SaveRecord(record)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// unable trigger sse using DB()
|
// unable trigger sse using DB()
|
||||||
workflowRecord, err := txDao.FindRecordById("workflow", run.WorkflowId)
|
workflowRecord, err := txDao.FindRecordById("workflow", workflowRun.WorkflowId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -112,19 +128,11 @@ func (w *WorkflowRepository) SaveRun(ctx context.Context, run *domain.WorkflowRu
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WorkflowRepository) GetById(ctx context.Context, id string) (*domain.Workflow, error) {
|
func (r *WorkflowRepository) castRecordToModel(record *models.Record) (*domain.Workflow, error) {
|
||||||
record, err := app.GetApp().Dao().FindRecordById("workflow", id)
|
if record == nil {
|
||||||
if err != nil {
|
return nil, fmt.Errorf("record is nil")
|
||||||
if errors.Is(err, sql.ErrNoRows) {
|
|
||||||
return nil, domain.ErrRecordNotFound
|
|
||||||
}
|
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return record2Workflow(record)
|
|
||||||
}
|
|
||||||
|
|
||||||
func record2Workflow(record *models.Record) (*domain.Workflow, error) {
|
|
||||||
content := &domain.WorkflowNode{}
|
content := &domain.WorkflowNode{}
|
||||||
if err := record.UnmarshalJSONField("content", content); err != nil {
|
if err := record.UnmarshalJSONField("content", content); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -17,8 +17,14 @@ func NewWorkflowOutputRepository() *WorkflowOutputRepository {
|
|||||||
return &WorkflowOutputRepository{}
|
return &WorkflowOutputRepository{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WorkflowOutputRepository) GetByNodeId(ctx context.Context, nodeId string) (*domain.WorkflowOutput, error) {
|
func (r *WorkflowOutputRepository) GetByNodeId(ctx context.Context, nodeId string) (*domain.WorkflowOutput, error) {
|
||||||
records, err := app.GetApp().Dao().FindRecordsByFilter("workflow_output", "nodeId={:nodeId}", "-created", 1, 0, dbx.Params{"nodeId": nodeId})
|
records, err := app.GetApp().Dao().FindRecordsByFilter(
|
||||||
|
"workflow_output",
|
||||||
|
"nodeId={:nodeId}",
|
||||||
|
"-created",
|
||||||
|
1, 0,
|
||||||
|
dbx.Params{"nodeId": nodeId},
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, sql.ErrNoRows) {
|
if errors.Is(err, sql.ErrNoRows) {
|
||||||
return nil, domain.ErrRecordNotFound
|
return nil, domain.ErrRecordNotFound
|
||||||
@ -56,44 +62,8 @@ func (w *WorkflowOutputRepository) GetByNodeId(ctx context.Context, nodeId strin
|
|||||||
return rs, nil
|
return rs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WorkflowOutputRepository) GetCertificateByNodeId(ctx context.Context, nodeId string) (*domain.Certificate, error) {
|
|
||||||
records, err := app.GetApp().Dao().FindRecordsByFilter("certificate", "workflowNodeId={:workflowNodeId}", "-created", 1, 0, dbx.Params{"workflowNodeId": nodeId})
|
|
||||||
if err != nil {
|
|
||||||
if errors.Is(err, sql.ErrNoRows) {
|
|
||||||
return nil, domain.ErrRecordNotFound
|
|
||||||
}
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if len(records) == 0 {
|
|
||||||
return nil, domain.ErrRecordNotFound
|
|
||||||
}
|
|
||||||
|
|
||||||
record := records[0]
|
|
||||||
|
|
||||||
rs := &domain.Certificate{
|
|
||||||
Meta: domain.Meta{
|
|
||||||
Id: record.GetId(),
|
|
||||||
CreatedAt: record.GetCreated().Time(),
|
|
||||||
UpdatedAt: record.GetUpdated().Time(),
|
|
||||||
},
|
|
||||||
Source: domain.CertificateSourceType(record.GetString("source")),
|
|
||||||
SubjectAltNames: record.GetString("subjectAltNames"),
|
|
||||||
Certificate: record.GetString("certificate"),
|
|
||||||
PrivateKey: record.GetString("privateKey"),
|
|
||||||
IssuerCertificate: record.GetString("issuerCertificate"),
|
|
||||||
EffectAt: record.GetDateTime("effectAt").Time(),
|
|
||||||
ExpireAt: record.GetDateTime("expireAt").Time(),
|
|
||||||
ACMECertUrl: record.GetString("acmeCertUrl"),
|
|
||||||
ACMECertStableUrl: record.GetString("acmeCertStableUrl"),
|
|
||||||
WorkflowId: record.GetString("workflowId"),
|
|
||||||
WorkflowNodeId: record.GetString("workflowNodeId"),
|
|
||||||
WorkflowOutputId: record.GetString("workflowOutputId"),
|
|
||||||
}
|
|
||||||
return rs, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// 保存节点输出
|
// 保存节点输出
|
||||||
func (w *WorkflowOutputRepository) Save(ctx context.Context, output *domain.WorkflowOutput, certificate *domain.Certificate, cb func(id string) error) error {
|
func (r *WorkflowOutputRepository) Save(ctx context.Context, output *domain.WorkflowOutput, certificate *domain.Certificate, cb func(id string) error) error {
|
||||||
var record *models.Record
|
var record *models.Record
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
|
@ -37,5 +37,5 @@ func (handler *notifyHandler) test(c echo.Context) error {
|
|||||||
return resp.Err(c, err)
|
return resp.Err(c, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return resp.Succ(c, nil)
|
return resp.Ok(c, nil)
|
||||||
}
|
}
|
||||||
|
@ -14,7 +14,7 @@ type Response struct {
|
|||||||
Data interface{} `json:"data"`
|
Data interface{} `json:"data"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func Succ(e echo.Context, data interface{}) error {
|
func Ok(e echo.Context, data interface{}) error {
|
||||||
rs := &Response{
|
rs := &Response{
|
||||||
Code: 0,
|
Code: 0,
|
||||||
Msg: "success",
|
Msg: "success",
|
||||||
@ -24,10 +24,11 @@ func Succ(e echo.Context, data interface{}) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func Err(e echo.Context, err error) error {
|
func Err(e echo.Context, err error) error {
|
||||||
xerr, ok := err.(*domain.XError)
|
code := 500
|
||||||
code := 100
|
|
||||||
|
xerr, ok := err.(*domain.Error)
|
||||||
if ok {
|
if ok {
|
||||||
code = xerr.GetCode()
|
code = xerr.Code
|
||||||
}
|
}
|
||||||
|
|
||||||
rs := &Response{
|
rs := &Response{
|
||||||
|
@ -30,6 +30,6 @@ func (handler *statisticsHandler) get(c echo.Context) error {
|
|||||||
if statistics, err := handler.service.Get(c.Request().Context()); err != nil {
|
if statistics, err := handler.service.Get(c.Request().Context()); err != nil {
|
||||||
return resp.Err(c, err)
|
return resp.Err(c, err)
|
||||||
} else {
|
} else {
|
||||||
return resp.Succ(c, statistics)
|
return resp.Ok(c, statistics)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -36,5 +36,5 @@ func (handler *workflowHandler) run(c echo.Context) error {
|
|||||||
if err := handler.service.Run(c.Request().Context(), req); err != nil {
|
if err := handler.service.Run(c.Request().Context(), req); err != nil {
|
||||||
return resp.Err(c, err)
|
return resp.Err(c, err)
|
||||||
}
|
}
|
||||||
return resp.Succ(c, nil)
|
return resp.Ok(c, nil)
|
||||||
}
|
}
|
||||||
|
@ -6,15 +6,15 @@ import (
|
|||||||
"github.com/usual2970/certimate/internal/domain"
|
"github.com/usual2970/certimate/internal/domain"
|
||||||
)
|
)
|
||||||
|
|
||||||
type StatisticsRepository interface {
|
type statisticsRepository interface {
|
||||||
Get(ctx context.Context) (*domain.Statistics, error)
|
Get(ctx context.Context) (*domain.Statistics, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type StatisticsService struct {
|
type StatisticsService struct {
|
||||||
repo StatisticsRepository
|
repo statisticsRepository
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewStatisticsService(repo StatisticsRepository) *StatisticsService {
|
func NewStatisticsService(repo statisticsRepository) *StatisticsService {
|
||||||
return &StatisticsService{
|
return &StatisticsService{
|
||||||
repo: repo,
|
repo: repo,
|
||||||
}
|
}
|
||||||
|
@ -13,44 +13,35 @@ import (
|
|||||||
|
|
||||||
type applyNode struct {
|
type applyNode struct {
|
||||||
node *domain.WorkflowNode
|
node *domain.WorkflowNode
|
||||||
outputRepo WorkflowOutputRepository
|
certRepo certificateRepository
|
||||||
*Logger
|
outputRepo workflowOutputRepository
|
||||||
|
*nodeLogger
|
||||||
}
|
}
|
||||||
|
|
||||||
var validityDuration = time.Hour * 24 * 10
|
|
||||||
|
|
||||||
func NewApplyNode(node *domain.WorkflowNode) *applyNode {
|
func NewApplyNode(node *domain.WorkflowNode) *applyNode {
|
||||||
return &applyNode{
|
return &applyNode{
|
||||||
node: node,
|
node: node,
|
||||||
Logger: NewLogger(node),
|
nodeLogger: NewNodeLogger(node),
|
||||||
outputRepo: repository.NewWorkflowOutputRepository(),
|
outputRepo: repository.NewWorkflowOutputRepository(),
|
||||||
|
certRepo: repository.NewCertificateRepository(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type WorkflowOutputRepository interface {
|
|
||||||
// 查询节点输出
|
|
||||||
GetByNodeId(ctx context.Context, nodeId string) (*domain.WorkflowOutput, error)
|
|
||||||
|
|
||||||
// 查询申请节点的证书
|
|
||||||
GetCertificateByNodeId(ctx context.Context, nodeId string) (*domain.Certificate, error)
|
|
||||||
|
|
||||||
// 保存节点输出
|
|
||||||
Save(ctx context.Context, output *domain.WorkflowOutput, certificate *domain.Certificate, cb func(id string) error) error
|
|
||||||
}
|
|
||||||
|
|
||||||
// 申请节点根据申请类型执行不同的操作
|
// 申请节点根据申请类型执行不同的操作
|
||||||
func (a *applyNode) Run(ctx context.Context) error {
|
func (a *applyNode) Run(ctx context.Context) error {
|
||||||
|
const validityDuration = time.Hour * 24 * 10
|
||||||
|
|
||||||
a.AddOutput(ctx, a.node.Name, "开始执行")
|
a.AddOutput(ctx, a.node.Name, "开始执行")
|
||||||
// 查询是否申请过,已申请过则直接返回
|
// 查询是否申请过,已申请过则直接返回
|
||||||
// TODO: 先保持和 v0.2 一致,后续增加是否强制申请的参数
|
// TODO: 先保持和 v0.2 一致,后续增加是否强制申请的参数
|
||||||
output, err := a.outputRepo.GetByNodeId(ctx, a.node.Id)
|
output, err := a.outputRepo.GetByNodeId(ctx, a.node.Id)
|
||||||
if err != nil && !domain.IsRecordNotFound(err) {
|
if err != nil && !domain.IsRecordNotFoundError(err) {
|
||||||
a.AddOutput(ctx, a.node.Name, "查询申请记录失败", err.Error())
|
a.AddOutput(ctx, a.node.Name, "查询申请记录失败", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if output != nil && output.Succeeded {
|
if output != nil && output.Succeeded {
|
||||||
lastCertificate, _ := a.outputRepo.GetCertificateByNodeId(ctx, a.node.Id)
|
lastCertificate, _ := a.certRepo.GetByWorkflowNodeId(ctx, a.node.Id)
|
||||||
if lastCertificate != nil {
|
if lastCertificate != nil {
|
||||||
if time.Until(lastCertificate.ExpireAt) > validityDuration {
|
if time.Until(lastCertificate.ExpireAt) > validityDuration {
|
||||||
a.AddOutput(ctx, a.node.Name, "已申请过证书,且证书在有效期内")
|
a.AddOutput(ctx, a.node.Name, "已申请过证书,且证书在有效期内")
|
||||||
|
@ -8,13 +8,13 @@ import (
|
|||||||
|
|
||||||
type conditionNode struct {
|
type conditionNode struct {
|
||||||
node *domain.WorkflowNode
|
node *domain.WorkflowNode
|
||||||
*Logger
|
*nodeLogger
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewConditionNode(node *domain.WorkflowNode) *conditionNode {
|
func NewConditionNode(node *domain.WorkflowNode) *conditionNode {
|
||||||
return &conditionNode{
|
return &conditionNode{
|
||||||
node: node,
|
node: node,
|
||||||
Logger: NewLogger(node),
|
nodeLogger: NewNodeLogger(node),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -12,15 +12,17 @@ import (
|
|||||||
|
|
||||||
type deployNode struct {
|
type deployNode struct {
|
||||||
node *domain.WorkflowNode
|
node *domain.WorkflowNode
|
||||||
outputRepo WorkflowOutputRepository
|
certRepo certificateRepository
|
||||||
*Logger
|
outputRepo workflowOutputRepository
|
||||||
|
*nodeLogger
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewDeployNode(node *domain.WorkflowNode) *deployNode {
|
func NewDeployNode(node *domain.WorkflowNode) *deployNode {
|
||||||
return &deployNode{
|
return &deployNode{
|
||||||
node: node,
|
node: node,
|
||||||
Logger: NewLogger(node),
|
nodeLogger: NewNodeLogger(node),
|
||||||
outputRepo: repository.NewWorkflowOutputRepository(),
|
outputRepo: repository.NewWorkflowOutputRepository(),
|
||||||
|
certRepo: repository.NewCertificateRepository(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -28,7 +30,7 @@ func (d *deployNode) Run(ctx context.Context) error {
|
|||||||
d.AddOutput(ctx, d.node.Name, "开始执行")
|
d.AddOutput(ctx, d.node.Name, "开始执行")
|
||||||
// 检查是否部署过(部署过则直接返回,和 v0.2 暂时保持一致)
|
// 检查是否部署过(部署过则直接返回,和 v0.2 暂时保持一致)
|
||||||
output, err := d.outputRepo.GetByNodeId(ctx, d.node.Id)
|
output, err := d.outputRepo.GetByNodeId(ctx, d.node.Id)
|
||||||
if err != nil && !domain.IsRecordNotFound(err) {
|
if err != nil && !domain.IsRecordNotFoundError(err) {
|
||||||
d.AddOutput(ctx, d.node.Name, "查询部署记录失败", err.Error())
|
d.AddOutput(ctx, d.node.Name, "查询部署记录失败", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -42,7 +44,7 @@ func (d *deployNode) Run(ctx context.Context) error {
|
|||||||
return fmt.Errorf("证书来源配置错误: %s", certSource)
|
return fmt.Errorf("证书来源配置错误: %s", certSource)
|
||||||
}
|
}
|
||||||
|
|
||||||
cert, err := d.outputRepo.GetCertificateByNodeId(ctx, certSourceSlice[0])
|
cert, err := d.certRepo.GetByWorkflowNodeId(ctx, certSourceSlice[0])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
d.AddOutput(ctx, d.node.Name, "获取证书失败", err.Error())
|
d.AddOutput(ctx, d.node.Name, "获取证书失败", err.Error())
|
||||||
return err
|
return err
|
||||||
|
@ -8,20 +8,17 @@ import (
|
|||||||
"github.com/usual2970/certimate/internal/repository"
|
"github.com/usual2970/certimate/internal/repository"
|
||||||
)
|
)
|
||||||
|
|
||||||
type SettingRepository interface {
|
|
||||||
GetByName(ctx context.Context, name string) (*domain.Settings, error)
|
|
||||||
}
|
|
||||||
type notifyNode struct {
|
type notifyNode struct {
|
||||||
node *domain.WorkflowNode
|
node *domain.WorkflowNode
|
||||||
settingRepo SettingRepository
|
settingsRepo settingRepository
|
||||||
*Logger
|
*nodeLogger
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewNotifyNode(node *domain.WorkflowNode) *notifyNode {
|
func NewNotifyNode(node *domain.WorkflowNode) *notifyNode {
|
||||||
return ¬ifyNode{
|
return ¬ifyNode{
|
||||||
node: node,
|
node: node,
|
||||||
Logger: NewLogger(node),
|
nodeLogger: NewNodeLogger(node),
|
||||||
settingRepo: repository.NewSettingsRepository(),
|
settingsRepo: repository.NewSettingsRepository(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -29,7 +26,7 @@ func (n *notifyNode) Run(ctx context.Context) error {
|
|||||||
n.AddOutput(ctx, n.node.Name, "开始执行")
|
n.AddOutput(ctx, n.node.Name, "开始执行")
|
||||||
|
|
||||||
// 获取通知配置
|
// 获取通知配置
|
||||||
setting, err := n.settingRepo.GetByName(ctx, "notifyChannels")
|
setting, err := n.settingsRepo.GetByName(ctx, "notifyChannels")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
n.AddOutput(ctx, n.node.Name, "获取通知配置失败", err.Error())
|
n.AddOutput(ctx, n.node.Name, "获取通知配置失败", err.Error())
|
||||||
return err
|
return err
|
||||||
|
@ -8,18 +8,18 @@ import (
|
|||||||
"github.com/usual2970/certimate/internal/domain"
|
"github.com/usual2970/certimate/internal/domain"
|
||||||
)
|
)
|
||||||
|
|
||||||
type NodeProcessor interface {
|
type nodeProcessor interface {
|
||||||
Run(ctx context.Context) error
|
Run(ctx context.Context) error
|
||||||
Log(ctx context.Context) *domain.WorkflowRunLog
|
Log(ctx context.Context) *domain.WorkflowRunLog
|
||||||
AddOutput(ctx context.Context, title, content string, err ...string)
|
AddOutput(ctx context.Context, title, content string, err ...string)
|
||||||
}
|
}
|
||||||
|
|
||||||
type Logger struct {
|
type nodeLogger struct {
|
||||||
log *domain.WorkflowRunLog
|
log *domain.WorkflowRunLog
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewLogger(node *domain.WorkflowNode) *Logger {
|
func NewNodeLogger(node *domain.WorkflowNode) *nodeLogger {
|
||||||
return &Logger{
|
return &nodeLogger{
|
||||||
log: &domain.WorkflowRunLog{
|
log: &domain.WorkflowRunLog{
|
||||||
NodeId: node.Id,
|
NodeId: node.Id,
|
||||||
NodeName: node.Name,
|
NodeName: node.Name,
|
||||||
@ -28,11 +28,11 @@ func NewLogger(node *domain.WorkflowNode) *Logger {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *Logger) Log(ctx context.Context) *domain.WorkflowRunLog {
|
func (l *nodeLogger) Log(ctx context.Context) *domain.WorkflowRunLog {
|
||||||
return l.log
|
return l.log
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *Logger) AddOutput(ctx context.Context, title, content string, err ...string) {
|
func (l *nodeLogger) AddOutput(ctx context.Context, title, content string, err ...string) {
|
||||||
output := domain.WorkflowRunLogOutput{
|
output := domain.WorkflowRunLogOutput{
|
||||||
Time: time.Now().UTC().Format(time.RFC3339),
|
Time: time.Now().UTC().Format(time.RFC3339),
|
||||||
Title: title,
|
Title: title,
|
||||||
@ -45,7 +45,7 @@ func (l *Logger) AddOutput(ctx context.Context, title, content string, err ...st
|
|||||||
l.log.Outputs = append(l.log.Outputs, output)
|
l.log.Outputs = append(l.log.Outputs, output)
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetProcessor(node *domain.WorkflowNode) (NodeProcessor, error) {
|
func GetProcessor(node *domain.WorkflowNode) (nodeProcessor, error) {
|
||||||
switch node.Type {
|
switch node.Type {
|
||||||
case domain.WorkflowNodeTypeStart:
|
case domain.WorkflowNodeTypeStart:
|
||||||
return NewStartNode(node), nil
|
return NewStartNode(node), nil
|
||||||
@ -60,3 +60,16 @@ func GetProcessor(node *domain.WorkflowNode) (NodeProcessor, error) {
|
|||||||
}
|
}
|
||||||
return nil, errors.New("not implemented")
|
return nil, errors.New("not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type certificateRepository interface {
|
||||||
|
GetByWorkflowNodeId(ctx context.Context, workflowNodeId string) (*domain.Certificate, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type workflowOutputRepository interface {
|
||||||
|
GetByNodeId(ctx context.Context, nodeId string) (*domain.WorkflowOutput, error)
|
||||||
|
Save(ctx context.Context, output *domain.WorkflowOutput, certificate *domain.Certificate, cb func(id string) error) error
|
||||||
|
}
|
||||||
|
|
||||||
|
type settingRepository interface {
|
||||||
|
GetByName(ctx context.Context, name string) (*domain.Settings, error)
|
||||||
|
}
|
||||||
|
@ -8,13 +8,13 @@ import (
|
|||||||
|
|
||||||
type startNode struct {
|
type startNode struct {
|
||||||
node *domain.WorkflowNode
|
node *domain.WorkflowNode
|
||||||
*Logger
|
*nodeLogger
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewStartNode(node *domain.WorkflowNode) *startNode {
|
func NewStartNode(node *domain.WorkflowNode) *startNode {
|
||||||
return &startNode{
|
return &startNode{
|
||||||
node: node,
|
node: node,
|
||||||
Logger: NewLogger(node),
|
nodeLogger: NewNodeLogger(node),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -19,21 +19,21 @@ type workflowRunData struct {
|
|||||||
Options *domain.WorkflowRunReq
|
Options *domain.WorkflowRunReq
|
||||||
}
|
}
|
||||||
|
|
||||||
type WorkflowRepository interface {
|
type workflowRepository interface {
|
||||||
|
ListEnabledAuto(ctx context.Context) ([]*domain.Workflow, error)
|
||||||
GetById(ctx context.Context, id string) (*domain.Workflow, error)
|
GetById(ctx context.Context, id string) (*domain.Workflow, error)
|
||||||
SaveRun(ctx context.Context, run *domain.WorkflowRun) error
|
|
||||||
Save(ctx context.Context, workflow *domain.Workflow) error
|
Save(ctx context.Context, workflow *domain.Workflow) error
|
||||||
ListEnabledAuto(ctx context.Context) ([]domain.Workflow, error)
|
SaveRun(ctx context.Context, run *domain.WorkflowRun) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type WorkflowService struct {
|
type WorkflowService struct {
|
||||||
ch chan *workflowRunData
|
ch chan *workflowRunData
|
||||||
repo WorkflowRepository
|
repo workflowRepository
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewWorkflowService(repo WorkflowRepository) *WorkflowService {
|
func NewWorkflowService(repo workflowRepository) *WorkflowService {
|
||||||
rs := &WorkflowService{
|
rs := &WorkflowService{
|
||||||
repo: repo,
|
repo: repo,
|
||||||
ch: make(chan *workflowRunData, 1),
|
ch: make(chan *workflowRunData, 1),
|
||||||
|
54
migrations/1737019549_updated_certificate.go
Normal file
54
migrations/1737019549_updated_certificate.go
Normal file
@ -0,0 +1,54 @@
|
|||||||
|
package migrations
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
|
||||||
|
"github.com/pocketbase/dbx"
|
||||||
|
"github.com/pocketbase/pocketbase/daos"
|
||||||
|
m "github.com/pocketbase/pocketbase/migrations"
|
||||||
|
"github.com/pocketbase/pocketbase/models/schema"
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
m.Register(func(db dbx.Builder) error {
|
||||||
|
dao := daos.New(db);
|
||||||
|
|
||||||
|
collection, err := dao.FindCollectionByNameOrId("4szxr9x43tpj6np")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// add
|
||||||
|
new_deleted := &schema.SchemaField{}
|
||||||
|
if err := json.Unmarshal([]byte(`{
|
||||||
|
"system": false,
|
||||||
|
"id": "klyf4nlq",
|
||||||
|
"name": "deleted",
|
||||||
|
"type": "date",
|
||||||
|
"required": false,
|
||||||
|
"presentable": false,
|
||||||
|
"unique": false,
|
||||||
|
"options": {
|
||||||
|
"min": "",
|
||||||
|
"max": ""
|
||||||
|
}
|
||||||
|
}`), new_deleted); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
collection.Schema.AddField(new_deleted)
|
||||||
|
|
||||||
|
return dao.SaveCollection(collection)
|
||||||
|
}, func(db dbx.Builder) error {
|
||||||
|
dao := daos.New(db);
|
||||||
|
|
||||||
|
collection, err := dao.FindCollectionByNameOrId("4szxr9x43tpj6np")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// remove
|
||||||
|
collection.Schema.RemoveField("klyf4nlq")
|
||||||
|
|
||||||
|
return dao.SaveCollection(collection)
|
||||||
|
})
|
||||||
|
}
|
@ -5,7 +5,7 @@ import { getPocketBase } from "@/repository/pocketbase";
|
|||||||
export const notifyTest = async (channel: string) => {
|
export const notifyTest = async (channel: string) => {
|
||||||
const pb = getPocketBase();
|
const pb = getPocketBase();
|
||||||
|
|
||||||
const resp = await pb.send("/api/notify/test", {
|
const resp = await pb.send<BaseResponse>("/api/notify/test", {
|
||||||
method: "POST",
|
method: "POST",
|
||||||
headers: {
|
headers: {
|
||||||
"Content-Type": "application/json",
|
"Content-Type": "application/json",
|
||||||
|
@ -6,7 +6,7 @@ import { getPocketBase } from "@/repository/pocketbase";
|
|||||||
export const get = async () => {
|
export const get = async () => {
|
||||||
const pb = getPocketBase();
|
const pb = getPocketBase();
|
||||||
|
|
||||||
const resp = await pb.send("/api/statistics/get", {
|
const resp = await pb.send<BaseResponse<Statistics>>("/api/statistics/get", {
|
||||||
method: "GET",
|
method: "GET",
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -14,5 +14,5 @@ export const get = async () => {
|
|||||||
throw new ClientResponseError({ status: resp.code, response: resp, data: {} });
|
throw new ClientResponseError({ status: resp.code, response: resp, data: {} });
|
||||||
}
|
}
|
||||||
|
|
||||||
return resp.data as Statistics;
|
return resp;
|
||||||
};
|
};
|
||||||
|
@ -1,12 +1,12 @@
|
|||||||
import { ClientResponseError, type RecordSubscription } from "pocketbase";
|
import { ClientResponseError } from "pocketbase";
|
||||||
|
|
||||||
import { WORKFLOW_TRIGGERS, type WorkflowModel } from "@/domain/workflow";
|
import { WORKFLOW_TRIGGERS } from "@/domain/workflow";
|
||||||
import { getPocketBase } from "@/repository/pocketbase";
|
import { getPocketBase } from "@/repository/pocketbase";
|
||||||
|
|
||||||
export const run = async (id: string) => {
|
export const run = async (id: string) => {
|
||||||
const pb = getPocketBase();
|
const pb = getPocketBase();
|
||||||
|
|
||||||
const resp = await pb.send("/api/workflow/run", {
|
const resp = await pb.send<BaseResponse>("/api/workflow/run", {
|
||||||
method: "POST",
|
method: "POST",
|
||||||
headers: {
|
headers: {
|
||||||
"Content-Type": "application/json",
|
"Content-Type": "application/json",
|
||||||
@ -23,15 +23,3 @@ export const run = async (id: string) => {
|
|||||||
|
|
||||||
return resp;
|
return resp;
|
||||||
};
|
};
|
||||||
|
|
||||||
export const subscribe = async (id: string, cb: (e: RecordSubscription<WorkflowModel>) => void) => {
|
|
||||||
const pb = getPocketBase();
|
|
||||||
|
|
||||||
return pb.collection("workflow").subscribe(id, cb);
|
|
||||||
};
|
|
||||||
|
|
||||||
export const unsubscribe = async (id: string) => {
|
|
||||||
const pb = getPocketBase();
|
|
||||||
|
|
||||||
return pb.collection("workflow").unsubscribe(id);
|
|
||||||
};
|
|
||||||
|
@ -139,9 +139,9 @@ const WorkflowRuns = ({ className, style, workflowId }: WorkflowRunsProps) => {
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
refreshDeps: [workflowId, page, pageSize],
|
refreshDeps: [workflowId, page, pageSize],
|
||||||
onSuccess: (data) => {
|
onSuccess: (res) => {
|
||||||
setTableData(data.items);
|
setTableData(res.items);
|
||||||
setTableTotal(data.totalItems);
|
setTableTotal(res.totalItems);
|
||||||
},
|
},
|
||||||
onError: (err) => {
|
onError: (err) => {
|
||||||
if (err instanceof ClientResponseError && err.isAbort) {
|
if (err instanceof ClientResponseError && err.isAbort) {
|
||||||
|
@ -8,7 +8,7 @@ export interface CertificateModel extends BaseModel {
|
|||||||
effectAt: ISO8601String;
|
effectAt: ISO8601String;
|
||||||
expireAt: ISO8601String;
|
expireAt: ISO8601String;
|
||||||
workflowId: string;
|
workflowId: string;
|
||||||
expand: {
|
expand?: {
|
||||||
workflowId?: WorkflowModel; // TODO: ugly, maybe to use an alias?
|
workflowId?: WorkflowModel; // TODO: ugly, maybe to use an alias?
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -142,9 +142,9 @@ const AccessList = () => {
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
refreshDeps: [accesses, page, pageSize],
|
refreshDeps: [accesses, page, pageSize],
|
||||||
onSuccess: (data) => {
|
onSuccess: (res) => {
|
||||||
setTableData(data.items);
|
setTableData(res.items);
|
||||||
setTableTotal(data.totalItems);
|
setTableTotal(res.totalItems);
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
@ -204,9 +204,9 @@ const CertificateList = () => {
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
refreshDeps: [filters, page, pageSize],
|
refreshDeps: [filters, page, pageSize],
|
||||||
onSuccess: (data) => {
|
onSuccess: (res) => {
|
||||||
setTableData(data.items);
|
setTableData(res.items);
|
||||||
setTableTotal(data.totalItems);
|
setTableTotal(res.totalItems);
|
||||||
},
|
},
|
||||||
onError: (err) => {
|
onError: (err) => {
|
||||||
if (err instanceof ClientResponseError && err.isAbort) {
|
if (err instanceof ClientResponseError && err.isAbort) {
|
||||||
|
@ -164,9 +164,9 @@ const Dashboard = () => {
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
refreshDeps: [page, pageSize],
|
refreshDeps: [page, pageSize],
|
||||||
onSuccess: (data) => {
|
onSuccess: (res) => {
|
||||||
setTableData(data.items);
|
setTableData(res.items);
|
||||||
setTableTotal(data.totalItems > 3 ? 3 : data.totalItems);
|
setTableTotal(res.totalItems > 3 ? 3 : res.totalItems);
|
||||||
},
|
},
|
||||||
onError: (err) => {
|
onError: (err) => {
|
||||||
if (err instanceof ClientResponseError && err.isAbort) {
|
if (err instanceof ClientResponseError && err.isAbort) {
|
||||||
@ -193,8 +193,8 @@ const Dashboard = () => {
|
|||||||
return getStatistics();
|
return getStatistics();
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
onSuccess: (data) => {
|
onSuccess: (res) => {
|
||||||
setStatistics(data);
|
setStatistics(res.data);
|
||||||
},
|
},
|
||||||
onError: (err) => {
|
onError: (err) => {
|
||||||
if (err instanceof ClientResponseError && err.isAbort) {
|
if (err instanceof ClientResponseError && err.isAbort) {
|
||||||
|
@ -16,7 +16,7 @@ import { createSchemaFieldRule } from "antd-zod";
|
|||||||
import { isEqual } from "radash";
|
import { isEqual } from "radash";
|
||||||
import { z } from "zod";
|
import { z } from "zod";
|
||||||
|
|
||||||
import { run as runWorkflow, subscribe as subscribeWorkflow, unsubscribe as unsubscribeWorkflow } from "@/api/workflow";
|
import { run as runWorkflow } from "@/api/workflow";
|
||||||
import ModalForm from "@/components/ModalForm";
|
import ModalForm from "@/components/ModalForm";
|
||||||
import Show from "@/components/Show";
|
import Show from "@/components/Show";
|
||||||
import WorkflowElements from "@/components/workflow/WorkflowElements";
|
import WorkflowElements from "@/components/workflow/WorkflowElements";
|
||||||
@ -24,7 +24,7 @@ import WorkflowRuns from "@/components/workflow/WorkflowRuns";
|
|||||||
import { isAllNodesValidated } from "@/domain/workflow";
|
import { isAllNodesValidated } from "@/domain/workflow";
|
||||||
import { WORKFLOW_RUN_STATUSES } from "@/domain/workflowRun";
|
import { WORKFLOW_RUN_STATUSES } from "@/domain/workflowRun";
|
||||||
import { useAntdForm, useZustandShallowSelector } from "@/hooks";
|
import { useAntdForm, useZustandShallowSelector } from "@/hooks";
|
||||||
import { remove as removeWorkflow } from "@/repository/workflow";
|
import { remove as removeWorkflow, subscribe as subscribeWorkflow, unsubscribe as unsubscribeWorkflow } from "@/repository/workflow";
|
||||||
import { useWorkflowStore } from "@/stores/workflow";
|
import { useWorkflowStore } from "@/stores/workflow";
|
||||||
import { getErrMsg } from "@/utils/error";
|
import { getErrMsg } from "@/utils/error";
|
||||||
|
|
||||||
|
@ -250,9 +250,9 @@ const WorkflowList = () => {
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
refreshDeps: [filters, page, pageSize],
|
refreshDeps: [filters, page, pageSize],
|
||||||
onSuccess: (data) => {
|
onSuccess: (res) => {
|
||||||
setTableData(data.items);
|
setTableData(res.items);
|
||||||
setTableTotal(data.totalItems);
|
setTableTotal(res.totalItems);
|
||||||
},
|
},
|
||||||
onError: (err) => {
|
onError: (err) => {
|
||||||
if (err instanceof ClientResponseError && err.isAbort) {
|
if (err instanceof ClientResponseError && err.isAbort) {
|
||||||
|
@ -19,17 +19,18 @@ export const list = async (request: ListCertificateRequest) => {
|
|||||||
const perPage = request.perPage || 10;
|
const perPage = request.perPage || 10;
|
||||||
|
|
||||||
const options: RecordListOptions = {
|
const options: RecordListOptions = {
|
||||||
sort: "-created",
|
|
||||||
expand: "workflowId",
|
expand: "workflowId",
|
||||||
|
filter: "deleted=null",
|
||||||
|
sort: "-created",
|
||||||
requestKey: null,
|
requestKey: null,
|
||||||
};
|
};
|
||||||
|
|
||||||
if (request.state === "expireSoon") {
|
if (request.state === "expireSoon") {
|
||||||
options.filter = pb.filter("expireAt<{:expiredAt}", {
|
options.filter = pb.filter("expireAt<{:expiredAt} && deleted=null", {
|
||||||
expiredAt: dayjs().add(15, "d").toDate(),
|
expiredAt: dayjs().add(20, "d").toDate(),
|
||||||
});
|
});
|
||||||
} else if (request.state === "expired") {
|
} else if (request.state === "expired") {
|
||||||
options.filter = pb.filter("expireAt<={:expiredAt}", {
|
options.filter = pb.filter("expireAt<={:expiredAt} && deleted=null", {
|
||||||
expiredAt: new Date(),
|
expiredAt: new Date(),
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
import { type RecordListOptions } from "pocketbase";
|
import { type RecordListOptions, type RecordSubscription } from "pocketbase";
|
||||||
|
|
||||||
import { type WorkflowModel } from "@/domain/workflow";
|
import { type WorkflowModel } from "@/domain/workflow";
|
||||||
import { getPocketBase } from "./pocketbase";
|
import { getPocketBase } from "./pocketbase";
|
||||||
@ -48,3 +48,15 @@ export const save = async (record: MaybeModelRecord<WorkflowModel>) => {
|
|||||||
export const remove = async (record: MaybeModelRecordWithId<WorkflowModel>) => {
|
export const remove = async (record: MaybeModelRecordWithId<WorkflowModel>) => {
|
||||||
return await getPocketBase().collection(COLLECTION_NAME).delete(record.id);
|
return await getPocketBase().collection(COLLECTION_NAME).delete(record.id);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
export const subscribe = async (id: string, cb: (e: RecordSubscription<WorkflowModel>) => void) => {
|
||||||
|
const pb = getPocketBase();
|
||||||
|
|
||||||
|
return pb.collection("workflow").subscribe(id, cb);
|
||||||
|
};
|
||||||
|
|
||||||
|
export const unsubscribe = async (id: string) => {
|
||||||
|
const pb = getPocketBase();
|
||||||
|
|
||||||
|
return pb.collection("workflow").unsubscribe(id);
|
||||||
|
};
|
||||||
|
6
ui/types/global.d.ts
vendored
6
ui/types/global.d.ts
vendored
@ -12,6 +12,12 @@ declare global {
|
|||||||
declare type MaybeModelRecord<T extends BaseModel = BaseModel> = T | Omit<T, "id" | "created" | "updated" | "deleted">;
|
declare type MaybeModelRecord<T extends BaseModel = BaseModel> = T | Omit<T, "id" | "created" | "updated" | "deleted">;
|
||||||
|
|
||||||
declare type MaybeModelRecordWithId<T extends BaseModel = BaseModel> = T | Pick<T, "id">;
|
declare type MaybeModelRecordWithId<T extends BaseModel = BaseModel> = T | Pick<T, "id">;
|
||||||
|
|
||||||
|
declare interface BaseResponse<T = any> {
|
||||||
|
code: number;
|
||||||
|
msg: string;
|
||||||
|
data: T;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export {};
|
export {};
|
||||||
|
Loading…
x
Reference in New Issue
Block a user