diff --git a/internal/api/byterange.go b/internal/api/byterange.go deleted file mode 100644 index 564444579..000000000 --- a/internal/api/byterange.go +++ /dev/null @@ -1,51 +0,0 @@ -package api - -import ( - "strconv" - "strings" -) - -type byteRange struct { - Start int64 - End *int64 - RawString string -} - -func createByteRange(s string) byteRange { - // strip bytes= - r := strings.TrimPrefix(s, "bytes=") - e := strings.Split(r, "-") - - ret := byteRange{ - RawString: s, - } - if len(e) > 0 { - ret.Start, _ = strconv.ParseInt(e[0], 10, 64) - } - if len(e) > 1 && e[1] != "" { - end, _ := strconv.ParseInt(e[1], 10, 64) - ret.End = &end - } - - return ret -} - -func (r byteRange) toHeaderValue(fileLength int64) string { - if r.End == nil { - return "" - } - end := *r.End - return "bytes " + strconv.FormatInt(r.Start, 10) + "-" + strconv.FormatInt(end, 10) + "/" + strconv.FormatInt(fileLength, 10) -} - -func (r byteRange) apply(bytes []byte) []byte { - if r.End == nil { - return bytes[r.Start:] - } - - end := *r.End + 1 - if int(end) > len(bytes) { - end = int64(len(bytes)) - } - return bytes[r.Start:end] -} diff --git a/internal/api/resolver_mutation_configure.go b/internal/api/resolver_mutation_configure.go index 3b56d75bd..56c14867b 100644 --- a/internal/api/resolver_mutation_configure.go +++ b/internal/api/resolver_mutation_configure.go @@ -123,6 +123,7 @@ func (r *mutationResolver) ConfigureGeneral(ctx context.Context, input ConfigGen c.Set(config.Metadata, input.MetadataPath) } + refreshStreamManager := false existingCachePath := c.GetCachePath() if input.CachePath != nil && existingCachePath != *input.CachePath { if err := validateDir(config.Cache, *input.CachePath, true); err != nil { @@ -130,6 +131,7 @@ func (r *mutationResolver) ConfigureGeneral(ctx context.Context, input ConfigGen } c.Set(config.Cache, input.CachePath) + refreshStreamManager = true } if input.VideoFileNamingAlgorithm != nil && *input.VideoFileNamingAlgorithm != c.GetVideoFileNamingAlgorithm() { @@ -328,6 +330,9 @@ func (r *mutationResolver) ConfigureGeneral(ctx context.Context, input ConfigGen if refreshScraperCache { manager.GetInstance().RefreshScraperCache() } + if refreshStreamManager { + manager.GetInstance().RefreshStreamManager() + } return makeConfigGeneralResult(), nil } diff --git a/internal/api/routes_scene.go b/internal/api/routes_scene.go index 62c693d61..e4597b6a3 100644 --- a/internal/api/routes_scene.go +++ b/internal/api/routes_scene.go @@ -56,11 +56,11 @@ func (rs sceneRoutes) Routes() chi.Router { // streaming endpoints r.Get("/stream", rs.StreamDirect) - r.Get("/stream.mkv", rs.StreamMKV) - r.Get("/stream.webm", rs.StreamWebM) - r.Get("/stream.m3u8", rs.StreamHLS) - r.Get("/stream.ts", rs.StreamTS) r.Get("/stream.mp4", rs.StreamMp4) + r.Get("/stream.webm", rs.StreamWebM) + r.Get("/stream.mkv", rs.StreamMKV) + r.Get("/stream.m3u8", rs.StreamHLS) + r.Get("/stream.m3u8/{segment}.ts", rs.StreamHLSSegment) r.Get("/screenshot", rs.Screenshot) r.Get("/preview", rs.Preview) @@ -85,11 +85,25 @@ func (rs sceneRoutes) Routes() chi.Router { func (rs sceneRoutes) StreamDirect(w http.ResponseWriter, r *http.Request) { scene := r.Context().Value(sceneKey).(*models.Scene) - ss := manager.SceneServer{ - TxnManager: rs.txnManager, - SceneCoverGetter: rs.sceneFinder, - } - ss.StreamSceneDirect(scene, w, r) + fileNamingAlgo := config.GetInstance().GetVideoFileNamingAlgorithm() + hash := scene.GetHash(fileNamingAlgo) + + filepath := manager.GetInstance().Paths.Scene.GetStreamPath(scene.Path, hash) + streamRequestCtx := ffmpeg.NewStreamRequestContext(w, r) + + // #2579 - hijacking and closing the connection here causes video playback to fail in Safari + // We trust that the request context will be closed, so we don't need to call Cancel on the + // returned context here. + _ = manager.GetInstance().ReadLockManager.ReadLock(streamRequestCtx, filepath) + http.ServeFile(w, r, filepath) +} + +func (rs sceneRoutes) StreamMp4(w http.ResponseWriter, r *http.Request) { + rs.streamTranscode(w, r, ffmpeg.StreamTypeMP4) +} + +func (rs sceneRoutes) StreamWebM(w http.ResponseWriter, r *http.Request) { + rs.streamTranscode(w, r, ffmpeg.StreamTypeWEBM) } func (rs sceneRoutes) StreamMKV(w http.ResponseWriter, r *http.Request) { @@ -114,122 +128,107 @@ func (rs sceneRoutes) StreamMKV(w http.ResponseWriter, r *http.Request) { return } - rs.streamTranscode(w, r, ffmpeg.StreamFormatMKVAudio) + rs.streamTranscode(w, r, ffmpeg.StreamTypeMKV) } -func (rs sceneRoutes) StreamWebM(w http.ResponseWriter, r *http.Request) { - rs.streamTranscode(w, r, ffmpeg.StreamFormatVP9) -} - -func (rs sceneRoutes) StreamMp4(w http.ResponseWriter, r *http.Request) { - rs.streamTranscode(w, r, ffmpeg.StreamFormatH264) -} - -func (rs sceneRoutes) StreamHLS(w http.ResponseWriter, r *http.Request) { +func (rs sceneRoutes) streamTranscode(w http.ResponseWriter, r *http.Request, streamType ffmpeg.StreamFormat) { scene := r.Context().Value(sceneKey).(*models.Scene) - pf := scene.Files.Primary() - if pf == nil { + streamManager := manager.GetInstance().StreamManager + if streamManager == nil { + http.Error(w, "Live transcoding disabled", http.StatusServiceUnavailable) return } - logger.Debug("Returning HLS playlist") - - // getting the playlist manifest only - w.Header().Set("Content-Type", ffmpeg.MimeHLS) - var str strings.Builder - - ffmpeg.WriteHLSPlaylist(pf.Duration, r.URL.String(), &str) - - requestByteRange := createByteRange(r.Header.Get("Range")) - if requestByteRange.RawString != "" { - logger.Debugf("Requested range: %s", requestByteRange.RawString) - } - - ret := requestByteRange.apply([]byte(str.String())) - rangeStr := requestByteRange.toHeaderValue(int64(str.Len())) - w.Header().Set("Content-Range", rangeStr) - - if n, err := w.Write(ret); err != nil { - logger.Warnf("[stream] error writing stream (wrote %v bytes): %v", n, err) - } -} - -func (rs sceneRoutes) StreamTS(w http.ResponseWriter, r *http.Request) { - rs.streamTranscode(w, r, ffmpeg.StreamFormatHLS) -} - -func (rs sceneRoutes) streamTranscode(w http.ResponseWriter, r *http.Request, streamFormat ffmpeg.StreamFormat) { - scene := r.Context().Value(sceneKey).(*models.Scene) - f := scene.Files.Primary() if f == nil { return } - logger.Debugf("Streaming as %s", streamFormat.MimeType) - // start stream based on query param, if provided if err := r.ParseForm(); err != nil { - logger.Warnf("[stream] error parsing query form: %v", err) + logger.Warnf("[transcode] error parsing query form: %v", err) } startTime := r.Form.Get("start") ss, _ := strconv.ParseFloat(startTime, 64) - requestedSize := r.Form.Get("resolution") + resolution := r.Form.Get("resolution") - audioCodec := ffmpeg.MissingUnsupported - if f.AudioCodec != "" { - audioCodec = ffmpeg.ProbeAudioCodec(f.AudioCodec) + options := ffmpeg.TranscodeOptions{ + StreamType: streamType, + VideoFile: f, + Resolution: resolution, + StartTime: ss, } - width := f.Width - height := f.Height + logger.Debugf("[transcode] streaming scene %d as %s", scene.ID, streamType.MimeType) + streamManager.ServeTranscode(w, r, options) +} - config := config.GetInstance() +func (rs sceneRoutes) StreamHLS(w http.ResponseWriter, r *http.Request) { + rs.streamManifest(w, r, ffmpeg.StreamTypeHLS, "HLS") +} - options := ffmpeg.TranscodeStreamOptions{ - Input: f.Path, - Codec: streamFormat, - VideoOnly: audioCodec == ffmpeg.MissingUnsupported, +func (rs sceneRoutes) streamManifest(w http.ResponseWriter, r *http.Request, streamType *ffmpeg.StreamType, logName string) { + scene := r.Context().Value(sceneKey).(*models.Scene) - VideoWidth: width, - VideoHeight: height, - - StartTime: ss, - MaxTranscodeSize: config.GetMaxStreamingTranscodeSize().GetMaxResolution(), - ExtraInputArgs: config.GetLiveTranscodeInputArgs(), - ExtraOutputArgs: config.GetLiveTranscodeOutputArgs(), - } - - if requestedSize != "" { - options.MaxTranscodeSize = models.StreamingResolutionEnum(requestedSize).GetMaxResolution() - } - - encoder := manager.GetInstance().FFMPEG - - lm := manager.GetInstance().ReadLockManager - streamRequestCtx := manager.NewStreamRequestContext(w, r) - lockCtx := lm.ReadLock(streamRequestCtx, f.Path) - - // hijacking and closing the connection here causes video playback to hang in Chrome - // due to ERR_INCOMPLETE_CHUNKED_ENCODING - // We trust that the request context will be closed, so we don't need to call Cancel on the returned context here. - - stream, err := encoder.GetTranscodeStream(lockCtx, options) - - if err != nil { - logger.Errorf("[stream] error transcoding video file: %v", err) - w.WriteHeader(http.StatusBadRequest) - if _, err := w.Write([]byte(err.Error())); err != nil { - logger.Warnf("[stream] error writing response: %v", err) - } + streamManager := manager.GetInstance().StreamManager + if streamManager == nil { + http.Error(w, "Live transcoding disabled", http.StatusServiceUnavailable) return } - lockCtx.AttachCommand(stream.Cmd) + f := scene.Files.Primary() + if f == nil { + return + } - stream.Serve(w, r) - w.(http.Flusher).Flush() + if err := r.ParseForm(); err != nil { + logger.Warnf("[transcode] error parsing query form: %v", err) + } + + resolution := r.Form.Get("resolution") + + logger.Debugf("[transcode] returning %s manifest for scene %d", logName, scene.ID) + streamManager.ServeManifest(w, r, streamType, f, resolution) +} + +func (rs sceneRoutes) StreamHLSSegment(w http.ResponseWriter, r *http.Request) { + rs.streamSegment(w, r, ffmpeg.StreamTypeHLS) +} + +func (rs sceneRoutes) streamSegment(w http.ResponseWriter, r *http.Request, streamType *ffmpeg.StreamType) { + scene := r.Context().Value(sceneKey).(*models.Scene) + + streamManager := manager.GetInstance().StreamManager + if streamManager == nil { + http.Error(w, "Live transcoding disabled", http.StatusServiceUnavailable) + return + } + + f := scene.Files.Primary() + if f == nil { + return + } + + if err := r.ParseForm(); err != nil { + logger.Warnf("[transcode] error parsing query form: %v", err) + } + + fileNamingAlgo := config.GetInstance().GetVideoFileNamingAlgorithm() + hash := scene.GetHash(fileNamingAlgo) + + segment := chi.URLParam(r, "segment") + resolution := r.Form.Get("resolution") + + options := ffmpeg.StreamOptions{ + StreamType: streamType, + VideoFile: f, + Resolution: resolution, + Hash: hash, + Segment: segment, + } + + streamManager.ServeSegment(w, r, options) } func (rs sceneRoutes) Screenshot(w http.ResponseWriter, r *http.Request) { diff --git a/internal/api/server.go b/internal/api/server.go index 0f6e26f52..5e862bfcf 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -503,7 +503,7 @@ func SecurityHeadersMiddleware(next http.Handler) http.Handler { } connectableOrigins += "; " - cspDirectives := "default-src data: 'self' 'unsafe-inline';" + connectableOrigins + "img-src data: *; script-src 'self' https://cdn.jsdelivr.net 'unsafe-inline' 'unsafe-eval'; style-src 'self' https://cdn.jsdelivr.net 'unsafe-inline'; style-src-elem 'self' https://cdn.jsdelivr.net 'unsafe-inline'; media-src 'self' blob:; child-src 'none'; object-src 'none'; form-action 'self'" + cspDirectives := "default-src data: 'self' 'unsafe-inline';" + connectableOrigins + "img-src data: *; script-src 'self' https://cdn.jsdelivr.net 'unsafe-inline' 'unsafe-eval'; style-src 'self' https://cdn.jsdelivr.net 'unsafe-inline'; style-src-elem 'self' https://cdn.jsdelivr.net 'unsafe-inline'; media-src 'self' blob:; child-src 'none'; worker-src blob:; object-src 'none'; form-action 'self'" w.Header().Set("Referrer-Policy", "same-origin") w.Header().Set("X-Content-Type-Options", "nosniff") diff --git a/internal/manager/manager.go b/internal/manager/manager.go index 554d36cf9..53a00c1ba 100644 --- a/internal/manager/manager.go +++ b/internal/manager/manager.go @@ -108,8 +108,9 @@ type Manager struct { Paths *paths.Paths - FFMPEG ffmpeg.FFMpeg - FFProbe ffmpeg.FFProbe + FFMPEG ffmpeg.FFMpeg + FFProbe ffmpeg.FFProbe + StreamManager *ffmpeg.StreamManager ReadLockManager *fsutil.ReadLockManager @@ -430,6 +431,7 @@ func initFFMPEG(ctx context.Context) error { instance.FFMPEG = ffmpeg.FFMpeg(ffmpegPath) instance.FFProbe = ffmpeg.FFProbe(ffprobePath) + instance.RefreshStreamManager() } return nil @@ -564,6 +566,19 @@ func (s *Manager) RefreshScraperCache() { s.ScraperCache = s.initScraperCache() } +// RefreshStreamManager refreshes the stream manager. Call this when cache directory +// changes. +func (s *Manager) RefreshStreamManager() { + // shutdown existing manager if needed + if s.StreamManager != nil { + s.StreamManager.Shutdown() + s.StreamManager = nil + } + + cacheDir := s.Config.GetCachePath() + s.StreamManager = ffmpeg.NewStreamManager(cacheDir, s.FFMPEG, s.FFProbe, s.Config, s.ReadLockManager) +} + func setSetupDefaults(input *SetupInput) { if input.ConfigLocation == "" { input.ConfigLocation = filepath.Join(fsutil.GetHomeDirectory(), ".stash", "config.yml") @@ -735,6 +750,11 @@ func (s *Manager) Shutdown(code int) { // stop any profiling at exit pprof.StopCPUProfile() + if s.StreamManager != nil { + s.StreamManager.Shutdown() + s.StreamManager = nil + } + // TODO: Each part of the manager needs to gracefully stop at some point // for now, we just close the database. err := s.Database.Close() diff --git a/internal/manager/running_streams.go b/internal/manager/running_streams.go index b715c0c5c..83f93c2ea 100644 --- a/internal/manager/running_streams.go +++ b/internal/manager/running_streams.go @@ -5,10 +5,10 @@ import ( "errors" "io" "net/http" - "time" "github.com/stashapp/stash/internal/manager/config" "github.com/stashapp/stash/internal/static" + "github.com/stashapp/stash/pkg/ffmpeg" "github.com/stashapp/stash/pkg/fsutil" "github.com/stashapp/stash/pkg/logger" "github.com/stashapp/stash/pkg/models" @@ -16,58 +16,6 @@ import ( "github.com/stashapp/stash/pkg/utils" ) -type StreamRequestContext struct { - context.Context - ResponseWriter http.ResponseWriter -} - -func NewStreamRequestContext(w http.ResponseWriter, r *http.Request) *StreamRequestContext { - return &StreamRequestContext{ - Context: r.Context(), - ResponseWriter: w, - } -} - -func (c *StreamRequestContext) Cancel() { - hj, ok := (c.ResponseWriter).(http.Hijacker) - if !ok { - return - } - - // hijack and close the connection - conn, bw, _ := hj.Hijack() - if conn != nil { - if bw != nil { - // notify end of stream - _, err := bw.WriteString("0\r\n") - if err != nil { - logger.Warnf("unable to write end of stream: %v", err) - } - _, err = bw.WriteString("\r\n") - if err != nil { - logger.Warnf("unable to write end of stream: %v", err) - } - - // flush the buffer, but don't wait indefinitely - timeout := make(chan struct{}, 1) - go func() { - _ = bw.Flush() - close(timeout) - }() - - const waitTime = time.Second - - select { - case <-timeout: - case <-time.After(waitTime): - logger.Warnf("unable to flush buffer - closing connection") - } - } - - conn.Close() - } -} - func KillRunningStreams(scene *models.Scene, fileNamingAlgo models.HashAlgorithm) { instance.ReadLockManager.Cancel(scene.Path) @@ -94,7 +42,7 @@ func (s *SceneServer) StreamSceneDirect(scene *models.Scene, w http.ResponseWrit fileNamingAlgo := config.GetInstance().GetVideoFileNamingAlgorithm() filepath := GetInstance().Paths.Scene.GetStreamPath(scene.Path, scene.GetHash(fileNamingAlgo)) - streamRequestCtx := NewStreamRequestContext(w, r) + streamRequestCtx := ffmpeg.NewStreamRequestContext(w, r) // #2579 - hijacking and closing the connection here causes video playback to fail in Safari // We trust that the request context will be closed, so we don't need to call Cancel on the diff --git a/internal/manager/scene.go b/internal/manager/scene.go index 30d1948d8..6abb192b4 100644 --- a/internal/manager/scene.go +++ b/internal/manager/scene.go @@ -11,6 +11,47 @@ import ( "github.com/stashapp/stash/pkg/models" ) +type SceneStreamEndpoint struct { + URL string `json:"url"` + MimeType *string `json:"mime_type"` + Label *string `json:"label"` +} + +type endpointType struct { + label string + mimeType string + extension string +} + +var ( + directEndpointType = endpointType{ + label: "Direct stream", + mimeType: ffmpeg.MimeMp4Video, + extension: "", + } + mp4EndpointType = endpointType{ + label: "MP4", + mimeType: ffmpeg.MimeMp4Video, + extension: ".mp4", + } + mkvEndpointType = endpointType{ + label: "MKV", + // use mp4 mimetype to trick the client, since many clients won't try mkv + mimeType: ffmpeg.MimeMp4Video, + extension: ".mkv", + } + webmEndpointType = endpointType{ + label: "WEBM", + mimeType: ffmpeg.MimeWebmVideo, + extension: ".webm", + } + hlsEndpointType = endpointType{ + label: "HLS", + mimeType: ffmpeg.MimeHLS, + extension: ".m3u8", + } +) + func GetVideoFileContainer(file *file.VideoFile) (ffmpeg.Container, error) { var container ffmpeg.Container format := file.Format @@ -30,48 +71,6 @@ func GetVideoFileContainer(file *file.VideoFile) (ffmpeg.Container, error) { return container, nil } -func includeSceneStreamPath(f *file.VideoFile, streamingResolution models.StreamingResolutionEnum, maxStreamingTranscodeSize models.StreamingResolutionEnum) bool { - // convert StreamingResolutionEnum to ResolutionEnum so we can get the min - // resolution - convertedRes := models.ResolutionEnum(streamingResolution) - - minResolution := convertedRes.GetMinResolution() - sceneResolution := f.GetMinResolution() - - // don't include if scene resolution is smaller than the streamingResolution - if sceneResolution != 0 && sceneResolution < minResolution { - return false - } - - // if we always allow everything, then return true - if maxStreamingTranscodeSize == models.StreamingResolutionEnumOriginal { - return true - } - - // convert StreamingResolutionEnum to ResolutionEnum - maxStreamingResolution := models.ResolutionEnum(maxStreamingTranscodeSize) - return maxStreamingResolution.GetMinResolution() >= minResolution -} - -type SceneStreamEndpoint struct { - URL string `json:"url"` - MimeType *string `json:"mime_type"` - Label *string `json:"label"` -} - -func makeStreamEndpoint(streamURL *url.URL, streamingResolution models.StreamingResolutionEnum, mimeType, label string) *SceneStreamEndpoint { - urlCopy := *streamURL - v := urlCopy.Query() - v.Set("resolution", streamingResolution.String()) - urlCopy.RawQuery = v.Encode() - - return &SceneStreamEndpoint{ - URL: urlCopy.String(), - MimeType: &mimeType, - Label: &label, - } -} - func GetSceneStreamPaths(scene *models.Scene, directStreamURL *url.URL, maxStreamingTranscodeSize models.StreamingResolutionEnum) ([]*SceneStreamEndpoint, error) { if scene == nil { return nil, fmt.Errorf("nil scene") @@ -82,13 +81,66 @@ func GetSceneStreamPaths(scene *models.Scene, directStreamURL *url.URL, maxStrea return nil, nil } - var ret []*SceneStreamEndpoint - mimeWebm := ffmpeg.MimeWebm - mimeHLS := ffmpeg.MimeHLS - mimeMp4 := ffmpeg.MimeMp4 + // convert StreamingResolutionEnum to ResolutionEnum + maxStreamingResolution := models.ResolutionEnum(maxStreamingTranscodeSize) + sceneResolution := pf.GetMinResolution() + includeSceneStreamPath := func(streamingResolution models.StreamingResolutionEnum) bool { + var minResolution int + if streamingResolution == models.StreamingResolutionEnumOriginal { + minResolution = sceneResolution + } else { + // convert StreamingResolutionEnum to ResolutionEnum so we can get the min + // resolution + convertedRes := models.ResolutionEnum(streamingResolution) + minResolution = convertedRes.GetMinResolution() - labelWebm := "webm" - labelHLS := "HLS" + // don't include if scene resolution is smaller than the streamingResolution + if sceneResolution != 0 && sceneResolution < minResolution { + return false + } + } + + // if we always allow everything, then return true + if maxStreamingTranscodeSize == models.StreamingResolutionEnumOriginal { + return true + } + + return maxStreamingResolution.GetMinResolution() >= minResolution + } + + makeStreamEndpoint := func(t endpointType, resolution models.StreamingResolutionEnum) *SceneStreamEndpoint { + url := *directStreamURL + url.Path += t.extension + + label := t.label + + if resolution != "" { + v := url.Query() + v.Set("resolution", resolution.String()) + url.RawQuery = v.Encode() + + switch resolution { + case models.StreamingResolutionEnumFourK: + label += " 4K (2160p)" + case models.StreamingResolutionEnumFullHd: + label += " Full HD (1080p)" + case models.StreamingResolutionEnumStandardHd: + label += " HD (720p)" + case models.StreamingResolutionEnumStandard: + label += " Standard (480p)" + case models.StreamingResolutionEnumLow: + label += " Low (240p)" + } + } + + return &SceneStreamEndpoint{ + URL: url.String(), + MimeType: &t.mimeType, + Label: &label, + } + } + + var endpoints []*SceneStreamEndpoint // direct stream should only apply when the audio codec is supported audioCodec := ffmpeg.MissingUnsupported @@ -99,99 +151,60 @@ func GetSceneStreamPaths(scene *models.Scene, directStreamURL *url.URL, maxStrea // don't care if we can't get the container container, _ := GetVideoFileContainer(pf) - replaceSuffix := func(suffix string) *url.URL { - urlCopy := *directStreamURL - urlCopy.Path += suffix - return &urlCopy - } - if HasTranscode(scene, config.GetInstance().GetVideoFileNamingAlgorithm()) || ffmpeg.IsValidAudioForContainer(audioCodec, container) { - label := "Direct stream" - ret = append(ret, &SceneStreamEndpoint{ - URL: directStreamURL.String(), - MimeType: &mimeMp4, - Label: &label, - }) + endpoints = append(endpoints, makeStreamEndpoint(directEndpointType, "")) } // only add mkv stream endpoint if the scene container is an mkv already if container == ffmpeg.Matroska { - label := "mkv" - ret = append(ret, &SceneStreamEndpoint{ - URL: replaceSuffix(".mkv").String(), - // set mkv to mp4 to trick the client, since many clients won't try mkv - MimeType: &mimeMp4, - Label: &label, - }) + endpoints = append(endpoints, makeStreamEndpoint(mkvEndpointType, "")) } - // WEBM quality transcoding options - // Note: These have the wrong mime type intentionally to allow jwplayer to selection between mp4/webm - webmLabelFourK := "WEBM 4K (2160p)" // "FOUR_K" - webmLabelFullHD := "WEBM Full HD (1080p)" // "FULL_HD" - webmLabelStandardHD := "WEBM HD (720p)" // "STANDARD_HD" - webmLabelStandard := "WEBM Standard (480p)" // "STANDARD" - webmLabelLow := "WEBM Low (240p)" // "LOW" + mp4Streams := []*SceneStreamEndpoint{} + webmStreams := []*SceneStreamEndpoint{} + hlsStreams := []*SceneStreamEndpoint{} - // Setup up lower quality transcoding options (MP4) - mp4LabelFourK := "MP4 4K (2160p)" // "FOUR_K" - mp4LabelFullHD := "MP4 Full HD (1080p)" // "FULL_HD" - mp4LabelStandardHD := "MP4 HD (720p)" // "STANDARD_HD" - mp4LabelStandard := "MP4 Standard (480p)" // "STANDARD" - mp4LabelLow := "MP4 Low (240p)" // "LOW" - - var webmStreams []*SceneStreamEndpoint - var mp4Streams []*SceneStreamEndpoint - - webmURL := replaceSuffix(".webm") - mp4URL := replaceSuffix(".mp4") - - if includeSceneStreamPath(pf, models.StreamingResolutionEnumFourK, maxStreamingTranscodeSize) { - webmStreams = append(webmStreams, makeStreamEndpoint(webmURL, models.StreamingResolutionEnumFourK, mimeMp4, webmLabelFourK)) - mp4Streams = append(mp4Streams, makeStreamEndpoint(mp4URL, models.StreamingResolutionEnumFourK, mimeMp4, mp4LabelFourK)) + if includeSceneStreamPath(models.StreamingResolutionEnumOriginal) { + mp4Streams = append(mp4Streams, makeStreamEndpoint(mp4EndpointType, models.StreamingResolutionEnumOriginal)) + webmStreams = append(webmStreams, makeStreamEndpoint(webmEndpointType, models.StreamingResolutionEnumOriginal)) + hlsStreams = append(hlsStreams, makeStreamEndpoint(hlsEndpointType, models.StreamingResolutionEnumOriginal)) } - if includeSceneStreamPath(pf, models.StreamingResolutionEnumFullHd, maxStreamingTranscodeSize) { - webmStreams = append(webmStreams, makeStreamEndpoint(webmURL, models.StreamingResolutionEnumFullHd, mimeMp4, webmLabelFullHD)) - mp4Streams = append(mp4Streams, makeStreamEndpoint(mp4URL, models.StreamingResolutionEnumFullHd, mimeMp4, mp4LabelFullHD)) + if includeSceneStreamPath(models.StreamingResolutionEnumFourK) { + mp4Streams = append(mp4Streams, makeStreamEndpoint(mp4EndpointType, models.StreamingResolutionEnumFourK)) + webmStreams = append(webmStreams, makeStreamEndpoint(webmEndpointType, models.StreamingResolutionEnumFourK)) + hlsStreams = append(hlsStreams, makeStreamEndpoint(hlsEndpointType, models.StreamingResolutionEnumFourK)) } - if includeSceneStreamPath(pf, models.StreamingResolutionEnumStandardHd, maxStreamingTranscodeSize) { - webmStreams = append(webmStreams, makeStreamEndpoint(webmURL, models.StreamingResolutionEnumStandardHd, mimeMp4, webmLabelStandardHD)) - mp4Streams = append(mp4Streams, makeStreamEndpoint(mp4URL, models.StreamingResolutionEnumStandardHd, mimeMp4, mp4LabelStandardHD)) + if includeSceneStreamPath(models.StreamingResolutionEnumFullHd) { + mp4Streams = append(mp4Streams, makeStreamEndpoint(mp4EndpointType, models.StreamingResolutionEnumFullHd)) + webmStreams = append(webmStreams, makeStreamEndpoint(webmEndpointType, models.StreamingResolutionEnumFullHd)) + hlsStreams = append(hlsStreams, makeStreamEndpoint(hlsEndpointType, models.StreamingResolutionEnumFullHd)) } - if includeSceneStreamPath(pf, models.StreamingResolutionEnumStandard, maxStreamingTranscodeSize) { - webmStreams = append(webmStreams, makeStreamEndpoint(webmURL, models.StreamingResolutionEnumStandard, mimeMp4, webmLabelStandard)) - mp4Streams = append(mp4Streams, makeStreamEndpoint(mp4URL, models.StreamingResolutionEnumStandard, mimeMp4, mp4LabelStandard)) + if includeSceneStreamPath(models.StreamingResolutionEnumStandardHd) { + mp4Streams = append(mp4Streams, makeStreamEndpoint(mp4EndpointType, models.StreamingResolutionEnumStandardHd)) + webmStreams = append(webmStreams, makeStreamEndpoint(webmEndpointType, models.StreamingResolutionEnumStandardHd)) + hlsStreams = append(hlsStreams, makeStreamEndpoint(hlsEndpointType, models.StreamingResolutionEnumStandardHd)) } - if includeSceneStreamPath(pf, models.StreamingResolutionEnumLow, maxStreamingTranscodeSize) { - webmStreams = append(webmStreams, makeStreamEndpoint(webmURL, models.StreamingResolutionEnumLow, mimeMp4, webmLabelLow)) - mp4Streams = append(mp4Streams, makeStreamEndpoint(mp4URL, models.StreamingResolutionEnumLow, mimeMp4, mp4LabelLow)) + if includeSceneStreamPath(models.StreamingResolutionEnumStandard) { + mp4Streams = append(mp4Streams, makeStreamEndpoint(mp4EndpointType, models.StreamingResolutionEnumStandard)) + webmStreams = append(webmStreams, makeStreamEndpoint(webmEndpointType, models.StreamingResolutionEnumStandard)) + hlsStreams = append(hlsStreams, makeStreamEndpoint(hlsEndpointType, models.StreamingResolutionEnumStandard)) } - ret = append(ret, webmStreams...) - ret = append(ret, mp4Streams...) - - defaultStreams := []*SceneStreamEndpoint{ - { - URL: replaceSuffix(".webm").String(), - MimeType: &mimeWebm, - Label: &labelWebm, - }, + if includeSceneStreamPath(models.StreamingResolutionEnumLow) { + mp4Streams = append(mp4Streams, makeStreamEndpoint(mp4EndpointType, models.StreamingResolutionEnumLow)) + webmStreams = append(webmStreams, makeStreamEndpoint(webmEndpointType, models.StreamingResolutionEnumLow)) + hlsStreams = append(hlsStreams, makeStreamEndpoint(hlsEndpointType, models.StreamingResolutionEnumLow)) } - ret = append(ret, defaultStreams...) + endpoints = append(endpoints, mp4Streams...) + endpoints = append(endpoints, webmStreams...) + endpoints = append(endpoints, hlsStreams...) - hls := SceneStreamEndpoint{ - URL: replaceSuffix(".m3u8").String(), - MimeType: &mimeHLS, - Label: &labelHLS, - } - ret = append(ret, &hls) - - return ret, nil + return endpoints, nil } // HasTranscode returns true if a transcoded video exists for the provided diff --git a/pkg/ffmpeg/hls.go b/pkg/ffmpeg/hls.go deleted file mode 100644 index f3b421c52..000000000 --- a/pkg/ffmpeg/hls.go +++ /dev/null @@ -1,40 +0,0 @@ -package ffmpeg - -import ( - "fmt" - "io" - "strings" -) - -const hlsSegmentLength = 10.0 - -// WriteHLSPlaylist writes a HLS playlist to w using baseUrl as the base URL for TS streams. -func WriteHLSPlaylist(duration float64, baseUrl string, w io.Writer) { - fmt.Fprint(w, "#EXTM3U\n") - fmt.Fprint(w, "#EXT-X-VERSION:3\n") - fmt.Fprint(w, "#EXT-X-MEDIA-SEQUENCE:0\n") - fmt.Fprint(w, "#EXT-X-ALLOW-CACHE:YES\n") - fmt.Fprintf(w, "#EXT-X-TARGETDURATION:%d\n", int(hlsSegmentLength)) - fmt.Fprint(w, "#EXT-X-PLAYLIST-TYPE:VOD\n") - - leftover := duration - upTo := 0.0 - - i := strings.LastIndex(baseUrl, ".m3u8") - tsURL := baseUrl[0:i] + ".ts" - - for leftover > 0 { - thisLength := hlsSegmentLength - if leftover < thisLength { - thisLength = leftover - } - - fmt.Fprintf(w, "#EXTINF: %f,\n", thisLength) - fmt.Fprintf(w, "%s?start=%f\n", tsURL, upTo) - - leftover -= thisLength - upTo += thisLength - } - - fmt.Fprint(w, "#EXT-X-ENDLIST\n") -} diff --git a/pkg/ffmpeg/stream.go b/pkg/ffmpeg/stream.go index 5b248b664..baf7a018a 100644 --- a/pkg/ffmpeg/stream.go +++ b/pkg/ffmpeg/stream.go @@ -2,237 +2,129 @@ package ffmpeg import ( "context" - "errors" - "io" "net/http" - "os/exec" - "strings" - "syscall" + "sync" + "time" + "github.com/stashapp/stash/pkg/fsutil" "github.com/stashapp/stash/pkg/logger" + "github.com/stashapp/stash/pkg/models" ) const ( - MimeWebm string = "video/webm" - MimeMkv string = "video/x-matroska" - MimeMp4 string = "video/mp4" - MimeHLS string = "application/vnd.apple.mpegurl" - MimeMpegts string = "video/MP2T" + MimeWebmVideo string = "video/webm" + MimeWebmAudio string = "audio/webm" + MimeMkvVideo string = "video/x-matroska" + MimeMkvAudio string = "audio/x-matroska" + MimeMp4Video string = "video/mp4" + MimeMp4Audio string = "audio/mp4" ) -// Stream represents an ongoing transcoded stream. -type Stream struct { - Stdout io.ReadCloser - Cmd *exec.Cmd - mimeType string +type StreamManager struct { + cacheDir string + encoder FFMpeg + ffprobe FFProbe + + config StreamManagerConfig + lockManager *fsutil.ReadLockManager + + context context.Context + cancelFunc context.CancelFunc + + runningStreams map[string]*runningStream + streamsMutex sync.Mutex } -// Serve is an http handler function that serves the stream. -func (s *Stream) Serve(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", s.mimeType) - w.WriteHeader(http.StatusOK) - - logger.Infof("[stream] transcoding video file to %s", s.mimeType) - - // process killing should be handled by command context - - _, err := io.Copy(w, s.Stdout) - if err != nil && !errors.Is(err, syscall.EPIPE) { - logger.Errorf("[stream] error serving transcoded video file: %v", err) - } +type StreamManagerConfig interface { + GetMaxStreamingTranscodeSize() models.StreamingResolutionEnum } -// StreamFormat represents a transcode stream format. -type StreamFormat struct { - MimeType string - codec VideoCodec - format Format - extraArgs []string - hls bool -} - -var ( - StreamFormatHLS = StreamFormat{ - codec: VideoCodecLibX264, - format: FormatMpegTS, - MimeType: MimeMpegts, - extraArgs: []string{ - "-acodec", "aac", - "-pix_fmt", "yuv420p", - "-preset", "veryfast", - "-crf", "25", - }, - hls: true, +func NewStreamManager(cacheDir string, encoder FFMpeg, ffprobe FFProbe, config StreamManagerConfig, lockManager *fsutil.ReadLockManager) *StreamManager { + if cacheDir == "" { + logger.Warn("cache directory is not set. Live HLS transcoding will be disabled") } - StreamFormatH264 = StreamFormat{ - codec: VideoCodecLibX264, - format: FormatMP4, - MimeType: MimeMp4, - extraArgs: []string{ - "-movflags", "frag_keyframe+empty_moov", - "-pix_fmt", "yuv420p", - "-preset", "veryfast", - "-crf", "25", - }, + ctx, cancel := context.WithCancel(context.Background()) + + ret := &StreamManager{ + cacheDir: cacheDir, + encoder: encoder, + ffprobe: ffprobe, + config: config, + lockManager: lockManager, + context: ctx, + cancelFunc: cancel, + runningStreams: make(map[string]*runningStream), } - StreamFormatVP9 = StreamFormat{ - codec: VideoCodecVP9, - format: FormatWebm, - MimeType: MimeWebm, - extraArgs: []string{ - "-deadline", "realtime", - "-cpu-used", "5", - "-row-mt", "1", - "-crf", "30", - "-b:v", "0", - "-pix_fmt", "yuv420p", - }, - } - - StreamFormatVP8 = StreamFormat{ - codec: VideoCodecVPX, - format: FormatWebm, - MimeType: MimeWebm, - extraArgs: []string{ - "-deadline", "realtime", - "-cpu-used", "5", - "-crf", "12", - "-b:v", "3M", - "-pix_fmt", "yuv420p", - }, - } - - StreamFormatHEVC = StreamFormat{ - codec: VideoCodecLibX265, - format: FormatMP4, - MimeType: MimeMp4, - extraArgs: []string{ - "-movflags", "frag_keyframe", - "-preset", "veryfast", - "-crf", "30", - }, - } - - // it is very common in MKVs to have just the audio codec unsupported - // copy the video stream, transcode the audio and serve as Matroska - StreamFormatMKVAudio = StreamFormat{ - codec: VideoCodecCopy, - format: FormatMatroska, - MimeType: MimeMkv, - extraArgs: []string{ - "-c:a", "libopus", - "-b:a", "96k", - "-vbr", "on", - }, - } -) - -// TranscodeStreamOptions represents options for live transcoding a video file. -type TranscodeStreamOptions struct { - Input string - Codec StreamFormat - StartTime float64 - MaxTranscodeSize int - - // original video dimensions - VideoWidth int - VideoHeight int - - // transcode the video, remove the audio - // in some videos where the audio codec is not supported by ffmpeg - // ffmpeg fails if you try to transcode the audio - VideoOnly bool - - // arguments added before the input argument - ExtraInputArgs []string - // arguments added before the output argument - ExtraOutputArgs []string -} - -func (o TranscodeStreamOptions) getStreamArgs() Args { - var args Args - args = append(args, "-hide_banner") - args = append(args, o.ExtraInputArgs...) - args = args.LogLevel(LogLevelError) - - if o.StartTime != 0 { - args = args.Seek(o.StartTime) - } - - if o.Codec.hls { - // we only serve a fixed segment length - args = args.Duration(hlsSegmentLength) - } - - args = args.Input(o.Input) - - if o.VideoOnly { - args = args.SkipAudio() - } - - args = args.VideoCodec(o.Codec.codec) - - // don't set scale when copying video stream - if o.Codec.codec != VideoCodecCopy { - var videoFilter VideoFilter - videoFilter = videoFilter.ScaleMax(o.VideoWidth, o.VideoHeight, o.MaxTranscodeSize) - args = args.VideoFilter(videoFilter) - } - - if len(o.Codec.extraArgs) > 0 { - args = append(args, o.Codec.extraArgs...) - } - - args = append(args, - // this is needed for 5-channel ac3 files - "-ac", "2", - ) - - args = append(args, o.ExtraOutputArgs...) - - args = args.Format(o.Codec.format) - args = args.Output("pipe:") - - return args -} - -// GetTranscodeStream starts the live transcoding process using ffmpeg and returns a stream. -func (f *FFMpeg) GetTranscodeStream(ctx context.Context, options TranscodeStreamOptions) (*Stream, error) { - args := options.getStreamArgs() - cmd := f.Command(ctx, args) - logger.Debugf("Streaming via: %s", strings.Join(cmd.Args, " ")) - - stdout, err := cmd.StdoutPipe() - if nil != err { - logger.Error("FFMPEG stdout not available: " + err.Error()) - return nil, err - } - - stderr, err := cmd.StderrPipe() - if nil != err { - logger.Error("FFMPEG stderr not available: " + err.Error()) - return nil, err - } - - if err = cmd.Start(); err != nil { - return nil, err - } - - // stderr must be consumed or the process deadlocks go func() { - stderrData, _ := io.ReadAll(stderr) - stderrString := string(stderrData) - if len(stderrString) > 0 { - logger.Debugf("[stream] ffmpeg stderr: %s", stderrString) + for { + select { + case <-time.After(monitorInterval): + ret.monitorStreams() + case <-ctx.Done(): + return + } } }() - ret := &Stream{ - Stdout: stdout, - Cmd: cmd, - mimeType: options.Codec.MimeType, - } - return ret, nil + return ret +} + +// Shutdown shuts down the stream manager, killing any running transcoding processes and removing all cached files. +func (sm *StreamManager) Shutdown() { + sm.cancelFunc() + sm.stopAndRemoveAll() +} + +type StreamRequestContext struct { + context.Context + ResponseWriter http.ResponseWriter +} + +func NewStreamRequestContext(w http.ResponseWriter, r *http.Request) *StreamRequestContext { + return &StreamRequestContext{ + Context: r.Context(), + ResponseWriter: w, + } +} + +func (c *StreamRequestContext) Cancel() { + hj, ok := (c.ResponseWriter).(http.Hijacker) + if !ok { + return + } + + // hijack and close the connection + conn, bw, _ := hj.Hijack() + if conn != nil { + if bw != nil { + // notify end of stream + _, err := bw.WriteString("0\r\n") + if err != nil { + logger.Warnf("unable to write end of stream: %v", err) + } + _, err = bw.WriteString("\r\n") + if err != nil { + logger.Warnf("unable to write end of stream: %v", err) + } + + // flush the buffer, but don't wait indefinitely + timeout := make(chan struct{}, 1) + go func() { + _ = bw.Flush() + close(timeout) + }() + + const waitTime = time.Second + + select { + case <-timeout: + case <-time.After(waitTime): + logger.Warnf("unable to flush buffer - closing connection") + } + } + + conn.Close() + } } diff --git a/pkg/ffmpeg/stream_segmented.go b/pkg/ffmpeg/stream_segmented.go new file mode 100644 index 000000000..cbcdcc9d7 --- /dev/null +++ b/pkg/ffmpeg/stream_segmented.go @@ -0,0 +1,639 @@ +package ffmpeg + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "math" + "net/http" + "os" + "os/exec" + "path/filepath" + "strconv" + "strings" + "sync/atomic" + "time" + + "github.com/stashapp/stash/pkg/file" + "github.com/stashapp/stash/pkg/fsutil" + "github.com/stashapp/stash/pkg/logger" + "github.com/stashapp/stash/pkg/models" +) + +const ( + MimeHLS string = "application/vnd.apple.mpegurl" + MimeMpegTS string = "video/MP2T" + + segmentLength = 2 + + maxSegmentWait = 15 * time.Second + monitorInterval = 200 * time.Millisecond + + // segment gap before counting a request as a seek and + // restarting the transcode process at the requested segment + maxSegmentGap = 5 + + // maximum number of segments to generate + // ahead of the currently streaming segment + maxSegmentBuffer = 15 + + // maximum idle time between segment requests before + // stopping transcode and deleting cache folder + maxIdleTime = 30 * time.Second +) + +type StreamType struct { + Name string + SegmentType *SegmentType + ServeManifest func(sm *StreamManager, w http.ResponseWriter, r *http.Request, vf *file.VideoFile, resolution string) + Args func(segment int, videoFilter VideoFilter, videoOnly bool, outputDir string) Args +} + +var ( + StreamTypeHLS = &StreamType{ + Name: "hls", + SegmentType: SegmentTypeTS, + ServeManifest: serveHLSManifest, + Args: func(segment int, videoFilter VideoFilter, videoOnly bool, outputDir string) (args Args) { + args = append(args, + "-c:v", "libx264", + "-pix_fmt", "yuv420p", + "-preset", "veryfast", + "-crf", "25", + "-flags", "+cgop", + "-force_key_frames", fmt.Sprintf("expr:gte(t,n_forced*%d)", segmentLength), + "-sc_threshold", "0", + ) + args = args.VideoFilter(videoFilter) + if videoOnly { + args = append(args, "-an") + } else { + args = append(args, + "-c:a", "aac", + "-ac", "2", + ) + } + args = append(args, + "-sn", + "-copyts", + "-avoid_negative_ts", "disabled", + "-f", "hls", + "-start_number", fmt.Sprint(segment), + "-hls_time", "2", + "-hls_segment_type", "mpegts", + "-hls_playlist_type", "vod", + "-hls_segment_filename", filepath.Join(outputDir, ".%d.ts"), + filepath.Join(outputDir, "manifest.m3u8"), + ) + return + }, + } + StreamTypeHLSCopy = &StreamType{ + Name: "hls-copy", + SegmentType: SegmentTypeTS, + ServeManifest: serveHLSManifest, + Args: func(segment int, videoFilter VideoFilter, videoOnly bool, outputDir string) (args Args) { + args = append(args, + "-c:v", "copy", + ) + if videoOnly { + args = append(args, "-an") + } else { + args = append(args, + "-c:a", "aac", + "-ac", "2", + ) + } + args = append(args, + "-sn", + "-copyts", + "-avoid_negative_ts", "disabled", + "-f", "hls", + "-start_number", fmt.Sprint(segment), + "-hls_time", "2", + "-hls_segment_type", "mpegts", + "-hls_playlist_type", "vod", + "-hls_segment_filename", filepath.Join(outputDir, ".%d.ts"), + filepath.Join(outputDir, "manifest.m3u8"), + ) + return + }, + } +) + +type SegmentType struct { + Format string + MimeType string + MakeFilename func(segment int) string + ParseSegment func(str string) (int, error) +} + +var ( + SegmentTypeTS = &SegmentType{ + Format: "%d.ts", + MimeType: MimeMpegTS, + MakeFilename: func(segment int) string { + return fmt.Sprintf("%d.ts", segment) + }, + ParseSegment: func(str string) (int, error) { + segment, err := strconv.Atoi(str) + if err != nil || segment < 0 { + err = ErrInvalidSegment + } + return segment, err + }, + } +) + +var ErrInvalidSegment = errors.New("invalid segment") + +type StreamOptions struct { + StreamType *StreamType + VideoFile *file.VideoFile + Resolution string + Hash string + Segment string +} + +type transcodeProcess struct { + cmd *exec.Cmd + context context.Context + cancel context.CancelFunc + cancelled bool + outputDir string + segmentType *SegmentType + segment int +} + +type waitingSegment struct { + segmentType *SegmentType + idx int + file string + path string + accessed time.Time + available chan error + done atomic.Bool +} + +type runningStream struct { + dir string + streamType *StreamType + vf *file.VideoFile + maxTranscodeSize int + outputDir string + + waitingSegments []*waitingSegment + tp *transcodeProcess + lastAccessed time.Time + lastSegment int +} + +func (t StreamType) String() string { + return t.Name +} + +func (t StreamType) FileDir(hash string, maxTranscodeSize int) string { + if maxTranscodeSize == 0 { + return fmt.Sprintf("%s_%s", hash, t) + } else { + return fmt.Sprintf("%s_%s_%d", hash, t, maxTranscodeSize) + } +} + +func (s *runningStream) makeStreamArgs(segment int) Args { + args := Args{"-hide_banner"} + args = args.LogLevel(LogLevelError) + + if segment > 0 { + args = args.Seek(float64(segment * segmentLength)) + } + + args = args.Input(s.vf.Path) + + videoOnly := ProbeAudioCodec(s.vf.AudioCodec) == MissingUnsupported + + var videoFilter VideoFilter + videoFilter = videoFilter.ScaleMax(s.vf.Width, s.vf.Height, s.maxTranscodeSize) + + args = append(args, s.streamType.Args(segment, videoFilter, videoOnly, s.outputDir)...) + + return args +} + +// checkSegments renames temp segments that have been completely generated. +// existing segments are not replaced - if a segment is generated +// multiple times, then only the first one is kept. +func (tp *transcodeProcess) checkSegments() { + doSegment := func(filename string) { + if filename != "" { + oldPath := filepath.Join(tp.outputDir, filename) + newPath := filepath.Join(tp.outputDir, filename[1:]) + if !segmentExists(newPath) { + _ = os.Rename(oldPath, newPath) + } else { + os.Remove(oldPath) + } + } + } + + processState := tp.cmd.ProcessState + var lastFilename string + for i := tp.segment; ; i++ { + filename := fmt.Sprintf("."+tp.segmentType.Format, i) + if segmentExists(filepath.Join(tp.outputDir, filename)) { + // this segment exists so the previous segment is valid + doSegment(lastFilename) + } else { + // if the transcode process has exited then + // we need to do something with the last segment + if processState != nil { + if processState.Success() { + // if the process exited successfully then + // count the last segment as valid + doSegment(lastFilename) + } else if lastFilename != "" { + // if the process exited unsuccessfully then just delete + // the last segment, it's probably incomplete + os.Remove(filepath.Join(tp.outputDir, lastFilename)) + } + } + break + } + + lastFilename = filename + tp.segment = i + } +} + +func lastSegment(vf *file.VideoFile) int { + return int(math.Ceil(vf.Duration/segmentLength)) - 1 +} + +func segmentExists(path string) bool { + exists, _ := fsutil.FileExists(path) + return exists +} + +// serveHLSManifest serves a generated HLS playlist. The URLs for the segments +// are of the form {r.URL}/%d.ts{?urlQuery} where %d is the segment index. +func serveHLSManifest(sm *StreamManager, w http.ResponseWriter, r *http.Request, vf *file.VideoFile, resolution string) { + if sm.cacheDir == "" { + logger.Error("[transcode] cannot live transcode with HLS because cache dir is unset") + http.Error(w, "cannot live transcode with HLS because cache dir is unset", http.StatusServiceUnavailable) + return + } + + probeResult, err := sm.ffprobe.NewVideoFile(vf.Path) + if err != nil { + logger.Warnf("[transcode] error generating HLS manifest: %v", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + baseUrl := *r.URL + baseUrl.RawQuery = "" + baseURL := baseUrl.String() + + var urlQuery string + if resolution != "" { + urlQuery = fmt.Sprintf("?resolution=%s", resolution) + } + + var buf bytes.Buffer + + fmt.Fprint(&buf, "#EXTM3U\n") + + fmt.Fprint(&buf, "#EXT-X-VERSION:3\n") + fmt.Fprint(&buf, "#EXT-X-MEDIA-SEQUENCE:0\n") + fmt.Fprintf(&buf, "#EXT-X-TARGETDURATION:%d\n", segmentLength) + fmt.Fprint(&buf, "#EXT-X-PLAYLIST-TYPE:VOD\n") + + leftover := probeResult.FileDuration + segment := 0 + + for leftover > 0 { + thisLength := float64(segmentLength) + if leftover < thisLength { + thisLength = leftover + } + + fmt.Fprintf(&buf, "#EXTINF:%f,\n", thisLength) + fmt.Fprintf(&buf, "%s/%d.ts%s\n", baseURL, segment, urlQuery) + + leftover -= thisLength + segment++ + } + + fmt.Fprint(&buf, "#EXT-X-ENDLIST\n") + + w.Header().Set("Content-Type", MimeHLS) + http.ServeContent(w, r, "", time.Time{}, bytes.NewReader(buf.Bytes())) +} + +func (sm *StreamManager) ServeManifest(w http.ResponseWriter, r *http.Request, streamType *StreamType, vf *file.VideoFile, resolution string) { + streamType.ServeManifest(sm, w, r, vf, resolution) +} + +func (sm *StreamManager) serveWaitingSegment(w http.ResponseWriter, r *http.Request, segment *waitingSegment) { + select { + case <-r.Context().Done(): + break + case err := <-segment.available: + if err == nil { + logger.Tracef("[transcode] streaming segment file %s", segment.file) + w.Header().Set("Content-Type", segment.segmentType.MimeType) + // Prevent caching as segments are generated on the fly + w.Header().Add("Cache-Control", "no-cache") + http.ServeFile(w, r, segment.path) + } else if !errors.Is(err, context.Canceled) { + logger.Errorf("[transcode] %v", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + } + } + segment.done.Store(true) +} + +func (sm *StreamManager) ServeSegment(w http.ResponseWriter, r *http.Request, options StreamOptions) { + if sm.cacheDir == "" { + logger.Error("[transcode] cannot live transcode files because cache dir is unset") + http.Error(w, "cannot live transcode files because cache dir is unset", http.StatusServiceUnavailable) + return + } + + if options.Hash == "" { + http.Error(w, "invalid hash", http.StatusBadRequest) + return + } + + streamType := options.StreamType + + segment, err := streamType.SegmentType.ParseSegment(options.Segment) + // error if segment is past the end of the video + if err != nil || segment > lastSegment(options.VideoFile) { + http.Error(w, "invalid segment", http.StatusBadRequest) + return + } + + maxTranscodeSize := sm.config.GetMaxStreamingTranscodeSize().GetMaxResolution() + if options.Resolution != "" { + maxTranscodeSize = models.StreamingResolutionEnum(options.Resolution).GetMaxResolution() + } + + dir := options.StreamType.FileDir(options.Hash, maxTranscodeSize) + outputDir := filepath.Join(sm.cacheDir, dir) + + name := streamType.SegmentType.MakeFilename(segment) + file := filepath.Join(dir, name) + + sm.streamsMutex.Lock() + + stream := sm.runningStreams[dir] + if stream == nil { + stream = &runningStream{ + dir: dir, + streamType: options.StreamType, + vf: options.VideoFile, + maxTranscodeSize: maxTranscodeSize, + outputDir: outputDir, + + // initialize to cap 10 to avoid reallocations + waitingSegments: make([]*waitingSegment, 0, 10), + } + sm.runningStreams[dir] = stream + } + + now := time.Now() + stream.lastAccessed = now + if segment != -1 { + stream.lastSegment = segment + } + + waitingSegment := &waitingSegment{ + segmentType: streamType.SegmentType, + idx: segment, + file: file, + path: filepath.Join(sm.cacheDir, file), + accessed: now, + available: make(chan error, 1), + } + stream.waitingSegments = append(stream.waitingSegments, waitingSegment) + + sm.streamsMutex.Unlock() + + sm.serveWaitingSegment(w, r, waitingSegment) +} + +// assume lock is held +func (sm *StreamManager) startTranscode(stream *runningStream, segment int, done chan<- error) { + // generate segment 0 if init segment requested + if segment == -1 { + segment = 0 + } + + logger.Debugf("[transcode] starting transcode for %s at segment #%d", stream.dir, segment) + + if err := os.MkdirAll(stream.outputDir, os.ModePerm); err != nil { + done <- err + return + } + + lockCtx := sm.lockManager.ReadLock(sm.context, stream.vf.Path) + + args := stream.makeStreamArgs(segment) + cmd := sm.encoder.Command(lockCtx, args) + + stderr, err := cmd.StderrPipe() + if err != nil { + logger.Errorf("[transcode] ffmpeg stderr not available: %v", err) + } + + stdout, err := cmd.StdoutPipe() + if nil != err { + logger.Errorf("[transcode] ffmpeg stdout not available: %v", err) + } + + logger.Tracef("[transcode] running %s", cmd) + if err := cmd.Start(); err != nil { + lockCtx.Cancel() + done <- fmt.Errorf("error starting transcode process: %w", err) + return + } + + tp := &transcodeProcess{ + cmd: cmd, + context: lockCtx, + cancel: lockCtx.Cancel, + outputDir: stream.outputDir, + segmentType: stream.streamType.SegmentType, + segment: segment, + } + stream.tp = tp + + go func() { + errStr, _ := io.ReadAll(stderr) + outStr, _ := io.ReadAll(stdout) + + errCmd := cmd.Wait() + + var err error + + // don't log error if cancelled + if !tp.cancelled { + e := string(errStr) + if e == "" { + e = string(outStr) + } + if e != "" { + err = errors.New(e) + } else { + err = errCmd + } + + if err != nil { + err = fmt.Errorf("[transcode] ffmpeg error when running command <%s>: %w", strings.Join(cmd.Args, " "), err) + } + } + + sm.streamsMutex.Lock() + + // make sure that cancel is called to prevent memory leaks + tp.cancel() + + // clear remaining segments after ffmpeg exit + tp.checkSegments() + + if stream.tp == tp { + stream.tp = nil + } + + sm.streamsMutex.Unlock() + + done <- err + }() +} + +// assume lock is held +func (sm *StreamManager) stopTranscode(stream *runningStream) { + tp := stream.tp + if tp != nil { + tp.cancel() + tp.cancelled = true + } +} + +func (sm *StreamManager) checkTranscode(stream *runningStream, now time.Time) { + if len(stream.waitingSegments) == 0 && stream.lastAccessed.Add(maxIdleTime).Before(now) { + // Stream expired. Cancel the transcode process and delete the files + logger.Debugf("[transcode] stream for %s not accessed recently. Cancelling transcode and removing files", stream.dir) + + sm.stopTranscode(stream) + sm.removeTranscodeFiles(stream) + + delete(sm.runningStreams, stream.dir) + return + } + + if stream.tp != nil { + segmentType := stream.streamType.SegmentType + segment := stream.lastSegment + // if all segments up to maxSegmentBuffer exist, stop transcode + for i := segment; i < segment+maxSegmentBuffer; i++ { + if !segmentExists(filepath.Join(stream.outputDir, segmentType.MakeFilename(i))) { + return + } + } + + logger.Debugf("[transcode] stopping transcode for %s, buffer is full", stream.dir) + sm.stopTranscode(stream) + } +} + +func (s *waitingSegment) checkAvailable(now time.Time) bool { + if segmentExists(s.path) { + s.available <- nil + return true + } else if s.accessed.Add(maxSegmentWait).Before(now) { + s.available <- fmt.Errorf("timed out waiting for segment file %s to be generated", s.file) + return true + } + return false +} + +// ensureTranscode will start a new transcode process if the transcode +// is more than maxSegmentGap behind the requested segment +func (sm *StreamManager) ensureTranscode(stream *runningStream, segment *waitingSegment) bool { + segmentIdx := segment.idx + tp := stream.tp + if tp == nil { + sm.startTranscode(stream, segmentIdx, segment.available) + return true + } else if segmentIdx < tp.segment || tp.segment+maxSegmentGap < segmentIdx { + // only stop the transcode process here - it will be restarted only + // after the old process exits as stream.tp will then be nil. + sm.stopTranscode(stream) + return true + } + return false +} + +// runs every monitorInterval +func (sm *StreamManager) monitorStreams() { + sm.streamsMutex.Lock() + defer sm.streamsMutex.Unlock() + + now := time.Now() + + for _, stream := range sm.runningStreams { + if stream.tp != nil { + stream.tp.checkSegments() + } + + transcodeStarted := false + temp := stream.waitingSegments[:0] + for _, segment := range stream.waitingSegments { + remove := false + if segment.done.Load() || segment.checkAvailable(now) { + remove = true + } else if !transcodeStarted { + transcodeStarted = sm.ensureTranscode(stream, segment) + } + if !remove { + temp = append(temp, segment) + } + } + stream.waitingSegments = temp + + if !transcodeStarted { + sm.checkTranscode(stream, now) + } + } +} + +// assume lock is held +func (sm *StreamManager) removeTranscodeFiles(stream *runningStream) { + path := stream.outputDir + if err := os.RemoveAll(path); err != nil { + logger.Warnf("[transcode] error removing segment directory %s: %v", path, err) + } +} + +// stopAndRemoveAll stops all current streams and removes all cache files +func (sm *StreamManager) stopAndRemoveAll() { + sm.streamsMutex.Lock() + defer sm.streamsMutex.Unlock() + + for _, stream := range sm.runningStreams { + for _, segment := range stream.waitingSegments { + if len(segment.available) == 0 { + segment.available <- context.Canceled + } + } + sm.stopTranscode(stream) + sm.removeTranscodeFiles(stream) + } + + // ensure nothing else can use the map + sm.runningStreams = nil +} diff --git a/pkg/ffmpeg/stream_transcode.go b/pkg/ffmpeg/stream_transcode.go new file mode 100644 index 000000000..2596fe04e --- /dev/null +++ b/pkg/ffmpeg/stream_transcode.go @@ -0,0 +1,199 @@ +package ffmpeg + +import ( + "errors" + "io" + "net/http" + "os/exec" + "strings" + "syscall" + + "github.com/stashapp/stash/pkg/file" + "github.com/stashapp/stash/pkg/fsutil" + "github.com/stashapp/stash/pkg/logger" + "github.com/stashapp/stash/pkg/models" +) + +type StreamFormat struct { + MimeType string + Args func(videoFilter VideoFilter, videoOnly bool) Args +} + +var ( + StreamTypeMP4 = StreamFormat{ + MimeType: MimeMp4Video, + Args: func(videoFilter VideoFilter, videoOnly bool) (args Args) { + args = args.VideoCodec(VideoCodecLibX264) + args = append(args, + "-movflags", "frag_keyframe+empty_moov", + "-pix_fmt", "yuv420p", + "-preset", "veryfast", + "-crf", "25", + ) + args = args.VideoFilter(videoFilter) + if videoOnly { + args = args.SkipAudio() + } else { + args = append(args, "-ac", "2") + } + args = args.Format(FormatMP4) + return + }, + } + StreamTypeWEBM = StreamFormat{ + MimeType: MimeWebmVideo, + Args: func(videoFilter VideoFilter, videoOnly bool) (args Args) { + args = args.VideoCodec(VideoCodecVP9) + args = append(args, + "-pix_fmt", "yuv420p", + "-deadline", "realtime", + "-cpu-used", "5", + "-row-mt", "1", + "-crf", "30", + "-b:v", "0", + ) + args = args.VideoFilter(videoFilter) + if videoOnly { + args = args.SkipAudio() + } else { + args = append(args, "-ac", "2") + } + args = args.Format(FormatWebm) + return + }, + } + StreamTypeMKV = StreamFormat{ + MimeType: MimeMkvVideo, + Args: func(videoFilter VideoFilter, videoOnly bool) (args Args) { + args = args.VideoCodec(VideoCodecCopy) + if videoOnly { + args = args.SkipAudio() + } else { + args = args.AudioCodec(AudioCodecLibOpus) + args = append(args, + "-b:a", "96k", + "-vbr", "on", + "-ac", "2", + ) + } + args = args.Format(FormatMatroska) + return + }, + } +) + +type TranscodeOptions struct { + StreamType StreamFormat + VideoFile *file.VideoFile + Resolution string + StartTime float64 +} + +func (o TranscodeOptions) makeStreamArgs(vf *file.VideoFile, maxScale int, startTime float64) Args { + args := Args{"-hide_banner"} + args = args.LogLevel(LogLevelError) + + if startTime != 0 { + args = args.Seek(startTime) + } + + args = args.Input(vf.Path) + + videoOnly := ProbeAudioCodec(vf.AudioCodec) == MissingUnsupported + + var videoFilter VideoFilter + videoFilter = videoFilter.ScaleMax(vf.Width, vf.Height, maxScale) + + args = append(args, o.StreamType.Args(videoFilter, videoOnly)...) + + args = args.Output("pipe:") + + return args +} + +func (sm *StreamManager) ServeTranscode(w http.ResponseWriter, r *http.Request, options TranscodeOptions) { + streamRequestCtx := NewStreamRequestContext(w, r) + lockCtx := sm.lockManager.ReadLock(streamRequestCtx, options.VideoFile.Path) + + // hijacking and closing the connection here causes video playback to hang in Chrome + // due to ERR_INCOMPLETE_CHUNKED_ENCODING + // We trust that the request context will be closed, so we don't need to call Cancel on the returned context here. + + handler, err := sm.getTranscodeStream(lockCtx, options) + + if err != nil { + logger.Errorf("[transcode] error transcoding video file: %v", err) + w.WriteHeader(http.StatusBadRequest) + if _, err := w.Write([]byte(err.Error())); err != nil { + logger.Warnf("[transcode] error writing response: %v", err) + } + return + } + + handler(w, r) +} + +func (sm *StreamManager) getTranscodeStream(ctx *fsutil.LockContext, options TranscodeOptions) (http.HandlerFunc, error) { + maxTranscodeSize := sm.config.GetMaxStreamingTranscodeSize().GetMaxResolution() + if options.Resolution != "" { + maxTranscodeSize = models.StreamingResolutionEnum(options.Resolution).GetMaxResolution() + } + + args := options.makeStreamArgs(options.VideoFile, maxTranscodeSize, options.StartTime) + cmd := sm.encoder.Command(ctx, args) + + stdout, err := cmd.StdoutPipe() + if nil != err { + logger.Errorf("[transcode] ffmpeg stdout not available: %v", err) + return nil, err + } + + stderr, err := cmd.StderrPipe() + if nil != err { + logger.Errorf("[transcode] ffmpeg stderr not available: %v", err) + return nil, err + } + + if err = cmd.Start(); err != nil { + return nil, err + } + ctx.AttachCommand(cmd) + + // stderr must be consumed or the process deadlocks + go func() { + errStr, _ := io.ReadAll(stderr) + + errCmd := cmd.Wait() + + var err error + + e := string(errStr) + if e != "" { + err = errors.New(e) + } else { + err = errCmd + } + + // ignore ExitErrors, the process is always forcibly killed + var exitError *exec.ExitError + if err != nil && !errors.As(err, &exitError) { + logger.Errorf("[transcode] ffmpeg error when running command <%s>: %v", strings.Join(cmd.Args, " "), err) + } + }() + + mimeType := options.StreamType.MimeType + handler := func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", mimeType) + w.WriteHeader(http.StatusOK) + + // process killing should be handled by command context + + _, err := io.Copy(w, stdout) + if err != nil && !errors.Is(err, syscall.EPIPE) { + logger.Errorf("[transcode] error serving transcoded video file: %v", err) + } + + w.(http.Flusher).Flush() + } + return handler, nil +} diff --git a/ui/v2.5/src/docs/en/Changelog/v0200.md b/ui/v2.5/src/docs/en/Changelog/v0200.md index 3314abd69..fda0c0cd0 100644 --- a/ui/v2.5/src/docs/en/Changelog/v0200.md +++ b/ui/v2.5/src/docs/en/Changelog/v0200.md @@ -1,3 +1,5 @@ +##### 💥 Note: The cache directory is now required if using HLS streaming. Please set the cache directory in the System Settings page. + ### ✨ New Features * Add configuration option to perform generation operations sequentially after scanning a new video file. ([#3378](https://github.com/stashapp/stash/pull/3378)) * Optionally show range in generated funscript heatmaps. ([#3373](https://github.com/stashapp/stash/pull/3373)) @@ -6,6 +8,9 @@ * Added tenth-place rating precision option. ([#3343](https://github.com/stashapp/stash/pull/3343)) * Added toggleable favorite button to Performer cards. ([#3369](https://github.com/stashapp/stash/pull/3369)) +### 🎨 Improvements +* Overhauled and improved HLS streaming. ([#3274](https://github.com/stashapp/stash/pull/3274)) + ### 🐛 Bug fixes * Fixed URL not being during stash-box scrape if the Studio URL is not set. ([#3439](https://github.com/stashapp/stash/pull/3439)) * Fixed generating previews for variable frame rate videos. ([#3376](https://github.com/stashapp/stash/pull/3376)) diff --git a/ui/v2.5/src/docs/en/Manual/Configuration.md b/ui/v2.5/src/docs/en/Manual/Configuration.md index 1cb85ddce..a1154f26d 100644 --- a/ui/v2.5/src/docs/en/Manual/Configuration.md +++ b/ui/v2.5/src/docs/en/Manual/Configuration.md @@ -77,6 +77,10 @@ This setting can be used to increase/decrease overall CPU utilisation in two sce Note: If this is set too high it will decrease overall performance and causes failures (out of memory). +## HLS Streaming + +If using HLS streaming (such as on Apple devices), the Cache path must be set. This directory is used to store temporary files during the live-transcoding process. The Cache path can be set in the System settings page. + ## ffmpeg arguments Additional arguments can be injected into ffmpeg when generating previews and sprites, and when live-transcoding videos. diff --git a/ui/v2.5/src/docs/en/ReleaseNotes/index.ts b/ui/v2.5/src/docs/en/ReleaseNotes/index.ts index 87a22e709..2cb77853e 100644 --- a/ui/v2.5/src/docs/en/ReleaseNotes/index.ts +++ b/ui/v2.5/src/docs/en/ReleaseNotes/index.ts @@ -1,4 +1,5 @@ import v0170 from "./v0170.md"; +import v0200 from "./v0200.md"; interface IReleaseNotes { // handle should be in the form of YYYYMMDD @@ -11,4 +12,8 @@ export const releaseNotes: IReleaseNotes[] = [ date: 20220906, content: v0170, }, + { + date: 20230224, + content: v0200, + }, ]; diff --git a/ui/v2.5/src/docs/en/ReleaseNotes/v0200.md b/ui/v2.5/src/docs/en/ReleaseNotes/v0200.md new file mode 100644 index 000000000..1b408fc4c --- /dev/null +++ b/ui/v2.5/src/docs/en/ReleaseNotes/v0200.md @@ -0,0 +1 @@ +The cache directory is now required if using HLS streaming. Please set the cache directory in the System Settings page. \ No newline at end of file diff --git a/ui/v2.5/src/locales/en-GB.json b/ui/v2.5/src/locales/en-GB.json index a6c8b96de..559af9e7b 100644 --- a/ui/v2.5/src/locales/en-GB.json +++ b/ui/v2.5/src/locales/en-GB.json @@ -256,7 +256,7 @@ "description": "Directory location for SQLite database file backups", "heading": "Backup Directory Path" }, - "cache_location": "Directory location of the cache", + "cache_location": "Directory location of the cache. Required if streaming using HLS (such as on Apple devices).", "cache_path_head": "Cache Path", "calculate_md5_and_ohash_desc": "Calculate MD5 checksum in addition to oshash. Enabling will cause initial scans to be slower. File naming hash must be set to oshash to disable MD5 calculation.", "calculate_md5_and_ohash_label": "Calculate MD5 for videos",