From d0376cdc75cc422952c3880aed319e98c6370e19 Mon Sep 17 00:00:00 2001 From: Gabe Kangas Date: Tue, 18 Jul 2023 20:26:44 -0700 Subject: [PATCH] fix: limit the different worker pools to available CPU cores. Should resolve #3189 --- activitypub/inbox/workerpool.go | 10 +++++----- activitypub/workerpool/outbound.go | 9 ++++----- core/webhooks/webhooks_test.go | 4 ++-- core/webhooks/workerpool.go | 7 +++---- 4 files changed, 14 insertions(+), 16 deletions(-) diff --git a/activitypub/inbox/workerpool.go b/activitypub/inbox/workerpool.go index ff275503e..38bc2100e 100644 --- a/activitypub/inbox/workerpool.go +++ b/activitypub/inbox/workerpool.go @@ -1,14 +1,14 @@ package inbox import ( + "runtime" + "github.com/owncast/owncast/activitypub/apmodels" log "github.com/sirupsen/logrus" ) -const ( - // InboxWorkerPoolSize defines the number of concurrent ActivityPub handlers. - InboxWorkerPoolSize = 10 -) +// workerPoolSize defines the number of concurrent ActivityPub handlers. +var workerPoolSize = runtime.GOMAXPROCS(0) // Job struct bundling the ActivityPub and the payload in one struct. type Job struct { @@ -22,7 +22,7 @@ func InitInboxWorkerPool() { queue = make(chan Job) // start workers - for i := 1; i <= InboxWorkerPoolSize; i++ { + for i := 1; i <= workerPoolSize; i++ { go worker(i, queue) } } diff --git a/activitypub/workerpool/outbound.go b/activitypub/workerpool/outbound.go index f72cc1be7..a3d0fadea 100644 --- a/activitypub/workerpool/outbound.go +++ b/activitypub/workerpool/outbound.go @@ -2,14 +2,13 @@ package workerpool import ( "net/http" + "runtime" log "github.com/sirupsen/logrus" ) -const ( - // ActivityPubWorkerPoolSize defines the number of concurrent HTTP ActivityPub requests. - ActivityPubWorkerPoolSize = 10 -) +// workerPoolSize defines the number of concurrent HTTP ActivityPub requests. +var workerPoolSize = runtime.GOMAXPROCS(0) // Job struct bundling the ActivityPub and the payload in one struct. type Job struct { @@ -23,7 +22,7 @@ func InitOutboundWorkerPool() { queue = make(chan Job) // start workers - for i := 1; i <= ActivityPubWorkerPoolSize; i++ { + for i := 1; i <= workerPoolSize; i++ { go worker(i, queue) } } diff --git a/core/webhooks/webhooks_test.go b/core/webhooks/webhooks_test.go index 07f879c02..b79f75b0e 100644 --- a/core/webhooks/webhooks_test.go +++ b/core/webhooks/webhooks_test.go @@ -51,7 +51,7 @@ func TestMain(m *testing.M) { // this test ensures that `SendToWebhooks` without a `WaitGroup` doesn't panic. func TestPublicSend(t *testing.T) { // Send enough events to be sure at least one worker delivers a second event. - const eventsCount = webhookWorkerPoolSize + 1 + eventsCount := webhookWorkerPoolSize + 1 var wg sync.WaitGroup wg.Add(eventsCount) @@ -267,7 +267,7 @@ func TestParallel(t *testing.T) { myId := atomic.AddUint32(&calls, 1) // We made it to the pool size + 1 event, so we're done with the test. - if myId == webhookWorkerPoolSize+1 { + if myId == uint32(webhookWorkerPoolSize)+1 { close(finished) return } diff --git a/core/webhooks/workerpool.go b/core/webhooks/workerpool.go index 1c5c6858c..df808866b 100644 --- a/core/webhooks/workerpool.go +++ b/core/webhooks/workerpool.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/json" "net/http" + "runtime" "sync" log "github.com/sirupsen/logrus" @@ -12,10 +13,8 @@ import ( "github.com/owncast/owncast/models" ) -const ( - // webhookWorkerPoolSize defines the number of concurrent HTTP webhook requests. - webhookWorkerPoolSize = 10 -) +// webhookWorkerPoolSize defines the number of concurrent HTTP webhook requests. +var webhookWorkerPoolSize = runtime.GOMAXPROCS(0) // Job struct bundling the webhook and the payload in one struct. type Job struct {