From c295e4f21564c436d635f2bcc5a6c3142a4ebfa8 Mon Sep 17 00:00:00 2001 From: Gabe Kangas Date: Wed, 31 May 2023 11:10:04 -0700 Subject: [PATCH] Update segment cleanup to support object storage (#2876) * Add support for S3 cleanup + standardize firing cleanup. Closes #2646 * fix: manually fix post-merge --- core/storageproviders/local.go | 96 ++++++++++++++++------ core/storageproviders/s3Storage.go | 105 ++++++++++++++++++++++-- core/streamState.go | 4 +- core/transcoder/hlsFilesystemCleanup.go | 78 ------------------ models/storageProvider.go | 2 + 5 files changed, 174 insertions(+), 111 deletions(-) delete mode 100644 core/transcoder/hlsFilesystemCleanup.go diff --git a/core/storageproviders/local.go b/core/storageproviders/local.go index 11d5c3b31..f71306ca6 100644 --- a/core/storageproviders/local.go +++ b/core/storageproviders/local.go @@ -1,21 +1,19 @@ package storageproviders import ( - "time" + "os" + "path/filepath" + "sort" + "github.com/pkg/errors" log "github.com/sirupsen/logrus" "github.com/owncast/owncast/config" "github.com/owncast/owncast/core/data" - "github.com/owncast/owncast/core/transcoder" ) // LocalStorage represents an instance of the local storage provider for HLS video. -type LocalStorage struct { - // Cleanup old public HLS content every N min from the webroot. - onlineCleanupTicker *time.Ticker - customVideoServingEndpoint string -} +type LocalStorage struct{} // NewLocalStorage returns a new LocalStorage instance. func NewLocalStorage() *LocalStorage { @@ -24,18 +22,6 @@ func NewLocalStorage() *LocalStorage { // Setup configures this storage provider. func (s *LocalStorage) Setup() error { - if data.GetVideoServingEndpoint() != "" { - s.customVideoServingEndpoint = data.GetVideoServingEndpoint() - } - - // NOTE: This cleanup timer will have to be disabled to support recordings in the future - // as all HLS segments have to be publicly available on disk to keep a recording of them. - s.onlineCleanupTicker = time.NewTicker(1 * time.Minute) - go func() { - for range s.onlineCleanupTicker.C { - transcoder.CleanupOldContent(config.HLSStoragePath) - } - }() return nil } @@ -56,12 +42,7 @@ func (s *LocalStorage) VariantPlaylistWritten(localFilePath string) { // MasterPlaylistWritten is called when the master hls playlist is written. func (s *LocalStorage) MasterPlaylistWritten(localFilePath string) { - if s.customVideoServingEndpoint != "" { - // Rewrite the playlist to use custom absolute remote URLs - if err := rewriteRemotePlaylist(localFilePath, s.customVideoServingEndpoint); err != nil { - log.Warnln(err) - } - } else if _, err := s.Save(localFilePath, 0); err != nil { + if _, err := s.Save(localFilePath, 0); err != nil { log.Warnln(err) } } @@ -70,3 +51,68 @@ func (s *LocalStorage) MasterPlaylistWritten(localFilePath string) { func (s *LocalStorage) Save(filePath string, retryCount int) (string, error) { return filePath, nil } + +func (s *LocalStorage) Cleanup() error { + // Determine how many files we should keep on disk + maxNumber := data.GetStreamLatencyLevel().SegmentCount + buffer := 10 + baseDirectory := config.HLSStoragePath + + files, err := getAllFilesRecursive(baseDirectory) + if err != nil { + return errors.Wrap(err, "unable find old video files for cleanup") + } + + // Delete old private HLS files on disk + for directory := range files { + files := files[directory] + if len(files) < maxNumber+buffer { + continue + } + + filesToDelete := files[maxNumber+buffer:] + log.Traceln("Deleting", len(filesToDelete), "old files from", baseDirectory, "for video variant", directory) + + for _, file := range filesToDelete { + fileToDelete := filepath.Join(baseDirectory, directory, file.Name()) + err := os.Remove(fileToDelete) + if err != nil { + return errors.Wrap(err, "unable to delete old video files") + } + } + } + return nil +} + +func getAllFilesRecursive(baseDirectory string) (map[string][]os.FileInfo, error) { + files := make(map[string][]os.FileInfo) + + var directory string + err := filepath.Walk(baseDirectory, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + + if info.IsDir() { + directory = info.Name() + } + + if filepath.Ext(info.Name()) == ".ts" { + files[directory] = append(files[directory], info) + } + + return nil + }) + if err != nil { + return nil, err + } + + // Sort by date so we can delete old files + for directory := range files { + sort.Slice(files[directory], func(i, j int) bool { + return files[directory][i].ModTime().UnixNano() > files[directory][j].ModTime().UnixNano() + }) + } + + return files, nil +} diff --git a/core/storageproviders/s3Storage.go b/core/storageproviders/s3Storage.go index 4b6b26f28..6e3177053 100644 --- a/core/storageproviders/s3Storage.go +++ b/core/storageproviders/s3Storage.go @@ -6,16 +6,19 @@ import ( "os" "path" "path/filepath" + "sort" "strings" "time" "github.com/owncast/owncast/core/data" "github.com/owncast/owncast/utils" + "github.com/pkg/errors" log "github.com/sirupsen/logrus" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/owncast/owncast/config" @@ -23,13 +26,8 @@ import ( // S3Storage is the s3 implementation of a storage provider. type S3Storage struct { - sess *session.Session - - // If we try to upload a playlist but it is not yet on disk - // then keep a reference to it here. - queuedPlaylistUpdates map[string]string - - uploader *s3manager.Uploader + sess *session.Session + s3Client *s3.S3 host string s3Endpoint string @@ -40,6 +38,12 @@ type S3Storage struct { s3Secret string s3ACL string s3ForcePathStyle bool + + // If we try to upload a playlist but it is not yet on disk + // then keep a reference to it here. + queuedPlaylistUpdates map[string]string + + uploader *s3manager.Uploader } // NewS3Storage returns a new S3Storage instance. @@ -55,6 +59,7 @@ func (s *S3Storage) Setup() error { s3Config := data.GetS3Config() customVideoServingEndpoint := data.GetVideoServingEndpoint() + if customVideoServingEndpoint != "" { s.host = customVideoServingEndpoint } else { @@ -71,6 +76,7 @@ func (s *S3Storage) Setup() error { s.s3ForcePathStyle = s3Config.ForcePathStyle s.sess = s.connectAWS() + s.s3Client = s3.New(s.sess) s.uploader = s3manager.NewUploader(s.sess) @@ -184,6 +190,21 @@ func (s *S3Storage) Save(filePath string, retryCount int) (string, error) { return response.Location, nil } +func (s *S3Storage) Cleanup() error { + // Determine how many files we should keep on S3 storage + maxNumber := data.GetStreamLatencyLevel().SegmentCount + buffer := 20 + + keys, err := s.getDeletableVideoSegmentsWithOffset(maxNumber + buffer) + if err != nil { + return err + } + + s.deleteObjects(keys) + + return nil +} + func (s *S3Storage) connectAWS() *session.Session { t := http.DefaultTransport.(*http.Transport).Clone() t.MaxIdleConnsPerHost = 100 @@ -213,3 +234,73 @@ func (s *S3Storage) connectAWS() *session.Session { } return sess } + +func (s *S3Storage) getDeletableVideoSegmentsWithOffset(offset int) ([]s3object, error) { + objectsToDelete, err := s.retrieveAllVideoSegments() + if err != nil { + return nil, err + } + + objectsToDelete = objectsToDelete[offset : len(objectsToDelete)-1] + + return objectsToDelete, nil +} + +func (s *S3Storage) deleteObjects(objects []s3object) { + keys := make([]*s3.ObjectIdentifier, len(objects)) + for i, object := range objects { + keys[i] = &s3.ObjectIdentifier{Key: aws.String(object.key)} + } + + log.Debugln("Deleting", len(keys), "objects from S3 bucket:", s.s3Bucket) + + deleteObjectsRequest := &s3.DeleteObjectsInput{ + Bucket: aws.String(s.s3Bucket), + Delete: &s3.Delete{ + Objects: keys, + Quiet: aws.Bool(true), + }, + } + + _, err := s.s3Client.DeleteObjects(deleteObjectsRequest) + if err != nil { + log.Errorf("Unable to delete objects from bucket %q, %v\n", s.s3Bucket, err) + } +} + +func (s *S3Storage) retrieveAllVideoSegments() ([]s3object, error) { + allObjectsListRequest := &s3.ListObjectsInput{ + Bucket: aws.String(s.s3Bucket), + } + + // Fetch all objects in the bucket + allObjectsListResponse, err := s.s3Client.ListObjects(allObjectsListRequest) + if err != nil { + return nil, errors.Wrap(err, "Unable to fetch list of items in bucket for cleanup") + } + + // Filter out non-video segments + allObjects := []s3object{} + for _, item := range allObjectsListResponse.Contents { + if !strings.HasSuffix(*item.Key, ".ts") { + continue + } + + allObjects = append(allObjects, s3object{ + key: *item.Key, + lastModified: *item.LastModified, + }) + } + + // Sort the results by timestamp + sort.Slice(allObjects, func(i, j int) bool { + return allObjects[i].lastModified.After(allObjects[j].lastModified) + }) + + return allObjects, nil +} + +type s3object struct { + key string + lastModified time.Time +} diff --git a/core/streamState.go b/core/streamState.go index 8013c18da..99db86f76 100644 --- a/core/streamState.go +++ b/core/streamState.go @@ -151,7 +151,9 @@ func startOnlineCleanupTimer() { _onlineCleanupTicker = time.NewTicker(1 * time.Minute) go func() { for range _onlineCleanupTicker.C { - transcoder.CleanupOldContent(config.HLSStoragePath) + if err := _storage.Cleanup(); err != nil { + log.Errorln(err) + } } }() } diff --git a/core/transcoder/hlsFilesystemCleanup.go b/core/transcoder/hlsFilesystemCleanup.go deleted file mode 100644 index c9071f716..000000000 --- a/core/transcoder/hlsFilesystemCleanup.go +++ /dev/null @@ -1,78 +0,0 @@ -package transcoder - -import ( - log "github.com/sirupsen/logrus" - - "os" - "path/filepath" - "sort" - - "github.com/owncast/owncast/core/data" -) - -// CleanupOldContent will delete old files from the private dir that are no longer being referenced -// in the stream. -func CleanupOldContent(baseDirectory string) { - // Determine how many files we should keep on disk - maxNumber := data.GetStreamLatencyLevel().SegmentCount - buffer := 10 - - files, err := getAllFilesRecursive(baseDirectory) - if err != nil { - log.Debugln("Unable to cleanup old video files", err) - return - } - - // Delete old private HLS files on disk - for directory := range files { - files := files[directory] - if len(files) < maxNumber+buffer { - continue - } - - filesToDelete := files[maxNumber+buffer:] - log.Traceln("Deleting", len(filesToDelete), "old files from", baseDirectory, "for video variant", directory) - - for _, file := range filesToDelete { - fileToDelete := filepath.Join(baseDirectory, directory, file.Name()) - err := os.Remove(fileToDelete) - if err != nil { - log.Debugln(err) - } - } - } -} - -func getAllFilesRecursive(baseDirectory string) (map[string][]os.FileInfo, error) { - var files = make(map[string][]os.FileInfo) - - var directory string - err := filepath.Walk(baseDirectory, func(path string, info os.FileInfo, err error) error { - if err != nil { - return err - } - - if info.IsDir() { - directory = info.Name() - } - - if filepath.Ext(info.Name()) == ".ts" { - files[directory] = append(files[directory], info) - } - - return nil - }) - - if err != nil { - return nil, err - } - - // Sort by date so we can delete old files - for directory := range files { - sort.Slice(files[directory], func(i, j int) bool { - return files[directory][i].ModTime().UnixNano() > files[directory][j].ModTime().UnixNano() - }) - } - - return files, nil -} diff --git a/models/storageProvider.go b/models/storageProvider.go index ee874b9ba..44961d5c2 100644 --- a/models/storageProvider.go +++ b/models/storageProvider.go @@ -8,4 +8,6 @@ type StorageProvider interface { SegmentWritten(localFilePath string) VariantPlaylistWritten(localFilePath string) MasterPlaylistWritten(localFilePath string) + + Cleanup() error }