pool certificate issuance requests

This commit is contained in:
yoan 2025-01-16 14:42:54 +08:00
parent 2218be5d34
commit 2dd8fb2ee2
2 changed files with 73 additions and 1 deletions

View File

@ -15,10 +15,24 @@ import (
"github.com/go-acme/lego/v4/lego" "github.com/go-acme/lego/v4/lego"
"github.com/usual2970/certimate/internal/domain" "github.com/usual2970/certimate/internal/domain"
"github.com/usual2970/certimate/internal/pkg/utils/pool"
"github.com/usual2970/certimate/internal/pkg/utils/slices" "github.com/usual2970/certimate/internal/pkg/utils/slices"
"github.com/usual2970/certimate/internal/repository" "github.com/usual2970/certimate/internal/repository"
) )
const defaultPoolSize = 8
var poolInstance *pool.Pool[proxyApplicant, applicantResult]
type applicantResult struct {
result *ApplyCertResult
err error
}
func init() {
poolInstance = pool.NewPool[proxyApplicant, applicantResult](defaultPoolSize)
}
type ApplyCertResult struct { type ApplyCertResult struct {
CertificateFullChain string CertificateFullChain string
IssuerCertificate string IssuerCertificate string
@ -84,6 +98,17 @@ func NewWithApplyNode(node *domain.WorkflowNode) (Applicant, error) {
}, nil }, nil
} }
func applyAsync(applicant challenge.Provider, options *applicantOptions) <-chan applicantResult {
return poolInstance.Submit(context.Background(), func(p proxyApplicant) applicantResult {
rs := applicantResult{}
rs.result, rs.err = apply(p.applicant, p.options)
return rs
}, proxyApplicant{
applicant: applicant,
options: options,
})
}
func apply(challengeProvider challenge.Provider, options *applicantOptions) (*ApplyCertResult, error) { func apply(challengeProvider challenge.Provider, options *applicantOptions) (*ApplyCertResult, error) {
settingsRepo := repository.NewSettingsRepository() settingsRepo := repository.NewSettingsRepository()
settings, _ := settingsRepo.GetByName(context.Background(), "sslProvider") settings, _ := settingsRepo.GetByName(context.Background(), "sslProvider")
@ -185,5 +210,6 @@ type proxyApplicant struct {
} }
func (d *proxyApplicant) Apply() (*ApplyCertResult, error) { func (d *proxyApplicant) Apply() (*ApplyCertResult, error) {
return apply(d.applicant, d.options) rs := <-applyAsync(d.applicant, d.options)
return rs.result, rs.err
} }

View File

@ -0,0 +1,46 @@
package pool
import (
"context"
)
type Task[I, O any] func(I) O
type Pool[I, O any] struct {
ch chan struct{}
size int
}
func NewPool[I, O any](size int) *Pool[I, O] {
return &Pool[I, O]{
ch: make(chan struct{}, size),
size: size,
}
}
func (p *Pool[I, O]) Submit(ctx context.Context, task Task[I, O], input I) <-chan O {
resultChan := make(chan O, 1)
go func() {
select {
case p.ch <- struct{}{}:
defer func() {
<-p.ch
close(resultChan)
}()
result := task(input)
select {
case <-ctx.Done():
return
case resultChan <- result:
}
case <-ctx.Done():
close(resultChan)
return
}
}()
return resultChan
}