From 2dd8fb2ee26252ae0112ed5d8f79ffff6dd5476b Mon Sep 17 00:00:00 2001 From: yoan <536464346@qq.com> Date: Thu, 16 Jan 2025 14:42:54 +0800 Subject: [PATCH] pool certificate issuance requests --- internal/applicant/applicant.go | 28 +++++++++++++++++++- internal/pkg/utils/pool/pool.go | 46 +++++++++++++++++++++++++++++++++ 2 files changed, 73 insertions(+), 1 deletion(-) create mode 100644 internal/pkg/utils/pool/pool.go diff --git a/internal/applicant/applicant.go b/internal/applicant/applicant.go index 5b77c95f..a0e67fee 100644 --- a/internal/applicant/applicant.go +++ b/internal/applicant/applicant.go @@ -15,10 +15,24 @@ import ( "github.com/go-acme/lego/v4/lego" "github.com/usual2970/certimate/internal/domain" + "github.com/usual2970/certimate/internal/pkg/utils/pool" "github.com/usual2970/certimate/internal/pkg/utils/slices" "github.com/usual2970/certimate/internal/repository" ) +const defaultPoolSize = 8 + +var poolInstance *pool.Pool[proxyApplicant, applicantResult] + +type applicantResult struct { + result *ApplyCertResult + err error +} + +func init() { + poolInstance = pool.NewPool[proxyApplicant, applicantResult](defaultPoolSize) +} + type ApplyCertResult struct { CertificateFullChain string IssuerCertificate string @@ -84,6 +98,17 @@ func NewWithApplyNode(node *domain.WorkflowNode) (Applicant, error) { }, nil } +func applyAsync(applicant challenge.Provider, options *applicantOptions) <-chan applicantResult { + return poolInstance.Submit(context.Background(), func(p proxyApplicant) applicantResult { + rs := applicantResult{} + rs.result, rs.err = apply(p.applicant, p.options) + return rs + }, proxyApplicant{ + applicant: applicant, + options: options, + }) +} + func apply(challengeProvider challenge.Provider, options *applicantOptions) (*ApplyCertResult, error) { settingsRepo := repository.NewSettingsRepository() settings, _ := settingsRepo.GetByName(context.Background(), "sslProvider") @@ -185,5 +210,6 @@ type proxyApplicant struct { } func (d *proxyApplicant) Apply() (*ApplyCertResult, error) { - return apply(d.applicant, d.options) + rs := <-applyAsync(d.applicant, d.options) + return rs.result, rs.err } diff --git a/internal/pkg/utils/pool/pool.go b/internal/pkg/utils/pool/pool.go new file mode 100644 index 00000000..6f9ede08 --- /dev/null +++ b/internal/pkg/utils/pool/pool.go @@ -0,0 +1,46 @@ +package pool + +import ( + "context" +) + +type Task[I, O any] func(I) O + +type Pool[I, O any] struct { + ch chan struct{} + size int +} + +func NewPool[I, O any](size int) *Pool[I, O] { + return &Pool[I, O]{ + ch: make(chan struct{}, size), + size: size, + } +} + +func (p *Pool[I, O]) Submit(ctx context.Context, task Task[I, O], input I) <-chan O { + resultChan := make(chan O, 1) + + go func() { + select { + case p.ch <- struct{}{}: + defer func() { + <-p.ch + close(resultChan) + }() + + result := task(input) + select { + case <-ctx.Done(): + return + case resultChan <- result: + } + + case <-ctx.Done(): + close(resultChan) + return + } + }() + + return resultChan +}