diff --git a/core/core.go b/core/core.go index aefcd3a20..bf43ebb42 100644 --- a/core/core.go +++ b/core/core.go @@ -78,7 +78,7 @@ func Start() error { rtmpPort := data.GetRTMPPortNumber() log.Infof("RTMP is accepting inbound streams on port %d.", rtmpPort) - webhooks.InitWorkerPool() + webhooks.SetupWebhooks(GetStatus) notifications.Setup(data.GetStore()) diff --git a/core/webhooks/stream.go b/core/webhooks/stream.go index a0184ec59..b44dcb07c 100644 --- a/core/webhooks/stream.go +++ b/core/webhooks/stream.go @@ -21,6 +21,7 @@ func sendStreamStatusEvent(eventType models.EventType, id string, timestamp time "name": data.GetServerName(), "summary": data.GetServerSummary(), "streamTitle": data.GetStreamTitle(), + "status": getStatus(), "timestamp": timestamp, }, }) diff --git a/core/webhooks/stream_test.go b/core/webhooks/stream_test.go index a9802623d..3da067812 100644 --- a/core/webhooks/stream_test.go +++ b/core/webhooks/stream_test.go @@ -21,6 +21,16 @@ func TestSendStreamStatusEvent(t *testing.T) { "name": "my server", "streamTitle": "my stream", "summary": "my server where I stream", - "timestamp": "1970-01-01T00:01:12.000000006Z" + "timestamp": "1970-01-01T00:01:12.000000006Z", + "status": { + "lastConnectTime": null, + "lastDisconnectTime": null, + "online": true, + "overallMaxViewerCount": 420, + "sessionMaxViewerCount": 69, + "streamTitle": "my stream", + "versionNumber": "1.2.3", + "viewerCount": 5 + } }`) } diff --git a/core/webhooks/webhooks_test.go b/core/webhooks/webhooks_test.go index cbd29f959..07f879c02 100644 --- a/core/webhooks/webhooks_test.go +++ b/core/webhooks/webhooks_test.go @@ -17,6 +17,17 @@ import ( jsonpatch "gopkg.in/evanphx/json-patch.v5" ) +func fakeGetStatus() models.Status { + return models.Status{ + Online: true, + ViewerCount: 5, + OverallMaxViewerCount: 420, + SessionMaxViewerCount: 69, + StreamTitle: "my stream", + VersionNumber: "1.2.3", + } +} + func TestMain(m *testing.M) { dbFile, err := os.CreateTemp(os.TempDir(), "owncast-test-db.db") if err != nil { @@ -29,7 +40,8 @@ func TestMain(m *testing.M) { panic(err) } - InitWorkerPool() + SetupWebhooks(fakeGetStatus) + defer close(queue) m.Run() diff --git a/core/webhooks/workerpool.go b/core/webhooks/workerpool.go index 838688340..1c5c6858c 100644 --- a/core/webhooks/workerpool.go +++ b/core/webhooks/workerpool.go @@ -24,10 +24,19 @@ type Job struct { wg *sync.WaitGroup } -var queue chan Job +var ( + queue chan Job + getStatus func() models.Status +) -// InitWorkerPool starts n go routines that await webhook jobs. -func InitWorkerPool() { +// SetupWebhooks initializes the webhook worker pool and sets the function to get the current status. +func SetupWebhooks(getStatusFunc func() models.Status) { + getStatus = getStatusFunc + initWorkerPool() +} + +// initWorkerPool starts n go routines that await webhook jobs. +func initWorkerPool() { queue = make(chan Job) // start workers