Overhaul HLS streaming (#3274)

* Overhaul HLS streaming
* Fix streaming transcode ffmpeg zombie processes
* Add changelog and release notes
* Documentation
---------
Co-authored-by: WithoutPants <53250216+WithoutPants@users.noreply.github.com>
This commit is contained in:
DingDongSoLong4
2023-02-24 05:55:46 +02:00
committed by GitHub
parent f767635080
commit 05669f5503
16 changed files with 1219 additions and 580 deletions

View File

@@ -1,40 +0,0 @@
package ffmpeg
import (
"fmt"
"io"
"strings"
)
const hlsSegmentLength = 10.0
// WriteHLSPlaylist writes a HLS playlist to w using baseUrl as the base URL for TS streams.
func WriteHLSPlaylist(duration float64, baseUrl string, w io.Writer) {
fmt.Fprint(w, "#EXTM3U\n")
fmt.Fprint(w, "#EXT-X-VERSION:3\n")
fmt.Fprint(w, "#EXT-X-MEDIA-SEQUENCE:0\n")
fmt.Fprint(w, "#EXT-X-ALLOW-CACHE:YES\n")
fmt.Fprintf(w, "#EXT-X-TARGETDURATION:%d\n", int(hlsSegmentLength))
fmt.Fprint(w, "#EXT-X-PLAYLIST-TYPE:VOD\n")
leftover := duration
upTo := 0.0
i := strings.LastIndex(baseUrl, ".m3u8")
tsURL := baseUrl[0:i] + ".ts"
for leftover > 0 {
thisLength := hlsSegmentLength
if leftover < thisLength {
thisLength = leftover
}
fmt.Fprintf(w, "#EXTINF: %f,\n", thisLength)
fmt.Fprintf(w, "%s?start=%f\n", tsURL, upTo)
leftover -= thisLength
upTo += thisLength
}
fmt.Fprint(w, "#EXT-X-ENDLIST\n")
}

View File

@@ -2,237 +2,129 @@ package ffmpeg
import (
"context"
"errors"
"io"
"net/http"
"os/exec"
"strings"
"syscall"
"sync"
"time"
"github.com/stashapp/stash/pkg/fsutil"
"github.com/stashapp/stash/pkg/logger"
"github.com/stashapp/stash/pkg/models"
)
const (
MimeWebm string = "video/webm"
MimeMkv string = "video/x-matroska"
MimeMp4 string = "video/mp4"
MimeHLS string = "application/vnd.apple.mpegurl"
MimeMpegts string = "video/MP2T"
MimeWebmVideo string = "video/webm"
MimeWebmAudio string = "audio/webm"
MimeMkvVideo string = "video/x-matroska"
MimeMkvAudio string = "audio/x-matroska"
MimeMp4Video string = "video/mp4"
MimeMp4Audio string = "audio/mp4"
)
// Stream represents an ongoing transcoded stream.
type Stream struct {
Stdout io.ReadCloser
Cmd *exec.Cmd
mimeType string
type StreamManager struct {
cacheDir string
encoder FFMpeg
ffprobe FFProbe
config StreamManagerConfig
lockManager *fsutil.ReadLockManager
context context.Context
cancelFunc context.CancelFunc
runningStreams map[string]*runningStream
streamsMutex sync.Mutex
}
// Serve is an http handler function that serves the stream.
func (s *Stream) Serve(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", s.mimeType)
w.WriteHeader(http.StatusOK)
logger.Infof("[stream] transcoding video file to %s", s.mimeType)
// process killing should be handled by command context
_, err := io.Copy(w, s.Stdout)
if err != nil && !errors.Is(err, syscall.EPIPE) {
logger.Errorf("[stream] error serving transcoded video file: %v", err)
}
type StreamManagerConfig interface {
GetMaxStreamingTranscodeSize() models.StreamingResolutionEnum
}
// StreamFormat represents a transcode stream format.
type StreamFormat struct {
MimeType string
codec VideoCodec
format Format
extraArgs []string
hls bool
}
var (
StreamFormatHLS = StreamFormat{
codec: VideoCodecLibX264,
format: FormatMpegTS,
MimeType: MimeMpegts,
extraArgs: []string{
"-acodec", "aac",
"-pix_fmt", "yuv420p",
"-preset", "veryfast",
"-crf", "25",
},
hls: true,
func NewStreamManager(cacheDir string, encoder FFMpeg, ffprobe FFProbe, config StreamManagerConfig, lockManager *fsutil.ReadLockManager) *StreamManager {
if cacheDir == "" {
logger.Warn("cache directory is not set. Live HLS transcoding will be disabled")
}
StreamFormatH264 = StreamFormat{
codec: VideoCodecLibX264,
format: FormatMP4,
MimeType: MimeMp4,
extraArgs: []string{
"-movflags", "frag_keyframe+empty_moov",
"-pix_fmt", "yuv420p",
"-preset", "veryfast",
"-crf", "25",
},
ctx, cancel := context.WithCancel(context.Background())
ret := &StreamManager{
cacheDir: cacheDir,
encoder: encoder,
ffprobe: ffprobe,
config: config,
lockManager: lockManager,
context: ctx,
cancelFunc: cancel,
runningStreams: make(map[string]*runningStream),
}
StreamFormatVP9 = StreamFormat{
codec: VideoCodecVP9,
format: FormatWebm,
MimeType: MimeWebm,
extraArgs: []string{
"-deadline", "realtime",
"-cpu-used", "5",
"-row-mt", "1",
"-crf", "30",
"-b:v", "0",
"-pix_fmt", "yuv420p",
},
}
StreamFormatVP8 = StreamFormat{
codec: VideoCodecVPX,
format: FormatWebm,
MimeType: MimeWebm,
extraArgs: []string{
"-deadline", "realtime",
"-cpu-used", "5",
"-crf", "12",
"-b:v", "3M",
"-pix_fmt", "yuv420p",
},
}
StreamFormatHEVC = StreamFormat{
codec: VideoCodecLibX265,
format: FormatMP4,
MimeType: MimeMp4,
extraArgs: []string{
"-movflags", "frag_keyframe",
"-preset", "veryfast",
"-crf", "30",
},
}
// it is very common in MKVs to have just the audio codec unsupported
// copy the video stream, transcode the audio and serve as Matroska
StreamFormatMKVAudio = StreamFormat{
codec: VideoCodecCopy,
format: FormatMatroska,
MimeType: MimeMkv,
extraArgs: []string{
"-c:a", "libopus",
"-b:a", "96k",
"-vbr", "on",
},
}
)
// TranscodeStreamOptions represents options for live transcoding a video file.
type TranscodeStreamOptions struct {
Input string
Codec StreamFormat
StartTime float64
MaxTranscodeSize int
// original video dimensions
VideoWidth int
VideoHeight int
// transcode the video, remove the audio
// in some videos where the audio codec is not supported by ffmpeg
// ffmpeg fails if you try to transcode the audio
VideoOnly bool
// arguments added before the input argument
ExtraInputArgs []string
// arguments added before the output argument
ExtraOutputArgs []string
}
func (o TranscodeStreamOptions) getStreamArgs() Args {
var args Args
args = append(args, "-hide_banner")
args = append(args, o.ExtraInputArgs...)
args = args.LogLevel(LogLevelError)
if o.StartTime != 0 {
args = args.Seek(o.StartTime)
}
if o.Codec.hls {
// we only serve a fixed segment length
args = args.Duration(hlsSegmentLength)
}
args = args.Input(o.Input)
if o.VideoOnly {
args = args.SkipAudio()
}
args = args.VideoCodec(o.Codec.codec)
// don't set scale when copying video stream
if o.Codec.codec != VideoCodecCopy {
var videoFilter VideoFilter
videoFilter = videoFilter.ScaleMax(o.VideoWidth, o.VideoHeight, o.MaxTranscodeSize)
args = args.VideoFilter(videoFilter)
}
if len(o.Codec.extraArgs) > 0 {
args = append(args, o.Codec.extraArgs...)
}
args = append(args,
// this is needed for 5-channel ac3 files
"-ac", "2",
)
args = append(args, o.ExtraOutputArgs...)
args = args.Format(o.Codec.format)
args = args.Output("pipe:")
return args
}
// GetTranscodeStream starts the live transcoding process using ffmpeg and returns a stream.
func (f *FFMpeg) GetTranscodeStream(ctx context.Context, options TranscodeStreamOptions) (*Stream, error) {
args := options.getStreamArgs()
cmd := f.Command(ctx, args)
logger.Debugf("Streaming via: %s", strings.Join(cmd.Args, " "))
stdout, err := cmd.StdoutPipe()
if nil != err {
logger.Error("FFMPEG stdout not available: " + err.Error())
return nil, err
}
stderr, err := cmd.StderrPipe()
if nil != err {
logger.Error("FFMPEG stderr not available: " + err.Error())
return nil, err
}
if err = cmd.Start(); err != nil {
return nil, err
}
// stderr must be consumed or the process deadlocks
go func() {
stderrData, _ := io.ReadAll(stderr)
stderrString := string(stderrData)
if len(stderrString) > 0 {
logger.Debugf("[stream] ffmpeg stderr: %s", stderrString)
for {
select {
case <-time.After(monitorInterval):
ret.monitorStreams()
case <-ctx.Done():
return
}
}
}()
ret := &Stream{
Stdout: stdout,
Cmd: cmd,
mimeType: options.Codec.MimeType,
}
return ret, nil
return ret
}
// Shutdown shuts down the stream manager, killing any running transcoding processes and removing all cached files.
func (sm *StreamManager) Shutdown() {
sm.cancelFunc()
sm.stopAndRemoveAll()
}
type StreamRequestContext struct {
context.Context
ResponseWriter http.ResponseWriter
}
func NewStreamRequestContext(w http.ResponseWriter, r *http.Request) *StreamRequestContext {
return &StreamRequestContext{
Context: r.Context(),
ResponseWriter: w,
}
}
func (c *StreamRequestContext) Cancel() {
hj, ok := (c.ResponseWriter).(http.Hijacker)
if !ok {
return
}
// hijack and close the connection
conn, bw, _ := hj.Hijack()
if conn != nil {
if bw != nil {
// notify end of stream
_, err := bw.WriteString("0\r\n")
if err != nil {
logger.Warnf("unable to write end of stream: %v", err)
}
_, err = bw.WriteString("\r\n")
if err != nil {
logger.Warnf("unable to write end of stream: %v", err)
}
// flush the buffer, but don't wait indefinitely
timeout := make(chan struct{}, 1)
go func() {
_ = bw.Flush()
close(timeout)
}()
const waitTime = time.Second
select {
case <-timeout:
case <-time.After(waitTime):
logger.Warnf("unable to flush buffer - closing connection")
}
}
conn.Close()
}
}

View File

@@ -0,0 +1,639 @@
package ffmpeg
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"math"
"net/http"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"sync/atomic"
"time"
"github.com/stashapp/stash/pkg/file"
"github.com/stashapp/stash/pkg/fsutil"
"github.com/stashapp/stash/pkg/logger"
"github.com/stashapp/stash/pkg/models"
)
const (
MimeHLS string = "application/vnd.apple.mpegurl"
MimeMpegTS string = "video/MP2T"
segmentLength = 2
maxSegmentWait = 15 * time.Second
monitorInterval = 200 * time.Millisecond
// segment gap before counting a request as a seek and
// restarting the transcode process at the requested segment
maxSegmentGap = 5
// maximum number of segments to generate
// ahead of the currently streaming segment
maxSegmentBuffer = 15
// maximum idle time between segment requests before
// stopping transcode and deleting cache folder
maxIdleTime = 30 * time.Second
)
type StreamType struct {
Name string
SegmentType *SegmentType
ServeManifest func(sm *StreamManager, w http.ResponseWriter, r *http.Request, vf *file.VideoFile, resolution string)
Args func(segment int, videoFilter VideoFilter, videoOnly bool, outputDir string) Args
}
var (
StreamTypeHLS = &StreamType{
Name: "hls",
SegmentType: SegmentTypeTS,
ServeManifest: serveHLSManifest,
Args: func(segment int, videoFilter VideoFilter, videoOnly bool, outputDir string) (args Args) {
args = append(args,
"-c:v", "libx264",
"-pix_fmt", "yuv420p",
"-preset", "veryfast",
"-crf", "25",
"-flags", "+cgop",
"-force_key_frames", fmt.Sprintf("expr:gte(t,n_forced*%d)", segmentLength),
"-sc_threshold", "0",
)
args = args.VideoFilter(videoFilter)
if videoOnly {
args = append(args, "-an")
} else {
args = append(args,
"-c:a", "aac",
"-ac", "2",
)
}
args = append(args,
"-sn",
"-copyts",
"-avoid_negative_ts", "disabled",
"-f", "hls",
"-start_number", fmt.Sprint(segment),
"-hls_time", "2",
"-hls_segment_type", "mpegts",
"-hls_playlist_type", "vod",
"-hls_segment_filename", filepath.Join(outputDir, ".%d.ts"),
filepath.Join(outputDir, "manifest.m3u8"),
)
return
},
}
StreamTypeHLSCopy = &StreamType{
Name: "hls-copy",
SegmentType: SegmentTypeTS,
ServeManifest: serveHLSManifest,
Args: func(segment int, videoFilter VideoFilter, videoOnly bool, outputDir string) (args Args) {
args = append(args,
"-c:v", "copy",
)
if videoOnly {
args = append(args, "-an")
} else {
args = append(args,
"-c:a", "aac",
"-ac", "2",
)
}
args = append(args,
"-sn",
"-copyts",
"-avoid_negative_ts", "disabled",
"-f", "hls",
"-start_number", fmt.Sprint(segment),
"-hls_time", "2",
"-hls_segment_type", "mpegts",
"-hls_playlist_type", "vod",
"-hls_segment_filename", filepath.Join(outputDir, ".%d.ts"),
filepath.Join(outputDir, "manifest.m3u8"),
)
return
},
}
)
type SegmentType struct {
Format string
MimeType string
MakeFilename func(segment int) string
ParseSegment func(str string) (int, error)
}
var (
SegmentTypeTS = &SegmentType{
Format: "%d.ts",
MimeType: MimeMpegTS,
MakeFilename: func(segment int) string {
return fmt.Sprintf("%d.ts", segment)
},
ParseSegment: func(str string) (int, error) {
segment, err := strconv.Atoi(str)
if err != nil || segment < 0 {
err = ErrInvalidSegment
}
return segment, err
},
}
)
var ErrInvalidSegment = errors.New("invalid segment")
type StreamOptions struct {
StreamType *StreamType
VideoFile *file.VideoFile
Resolution string
Hash string
Segment string
}
type transcodeProcess struct {
cmd *exec.Cmd
context context.Context
cancel context.CancelFunc
cancelled bool
outputDir string
segmentType *SegmentType
segment int
}
type waitingSegment struct {
segmentType *SegmentType
idx int
file string
path string
accessed time.Time
available chan error
done atomic.Bool
}
type runningStream struct {
dir string
streamType *StreamType
vf *file.VideoFile
maxTranscodeSize int
outputDir string
waitingSegments []*waitingSegment
tp *transcodeProcess
lastAccessed time.Time
lastSegment int
}
func (t StreamType) String() string {
return t.Name
}
func (t StreamType) FileDir(hash string, maxTranscodeSize int) string {
if maxTranscodeSize == 0 {
return fmt.Sprintf("%s_%s", hash, t)
} else {
return fmt.Sprintf("%s_%s_%d", hash, t, maxTranscodeSize)
}
}
func (s *runningStream) makeStreamArgs(segment int) Args {
args := Args{"-hide_banner"}
args = args.LogLevel(LogLevelError)
if segment > 0 {
args = args.Seek(float64(segment * segmentLength))
}
args = args.Input(s.vf.Path)
videoOnly := ProbeAudioCodec(s.vf.AudioCodec) == MissingUnsupported
var videoFilter VideoFilter
videoFilter = videoFilter.ScaleMax(s.vf.Width, s.vf.Height, s.maxTranscodeSize)
args = append(args, s.streamType.Args(segment, videoFilter, videoOnly, s.outputDir)...)
return args
}
// checkSegments renames temp segments that have been completely generated.
// existing segments are not replaced - if a segment is generated
// multiple times, then only the first one is kept.
func (tp *transcodeProcess) checkSegments() {
doSegment := func(filename string) {
if filename != "" {
oldPath := filepath.Join(tp.outputDir, filename)
newPath := filepath.Join(tp.outputDir, filename[1:])
if !segmentExists(newPath) {
_ = os.Rename(oldPath, newPath)
} else {
os.Remove(oldPath)
}
}
}
processState := tp.cmd.ProcessState
var lastFilename string
for i := tp.segment; ; i++ {
filename := fmt.Sprintf("."+tp.segmentType.Format, i)
if segmentExists(filepath.Join(tp.outputDir, filename)) {
// this segment exists so the previous segment is valid
doSegment(lastFilename)
} else {
// if the transcode process has exited then
// we need to do something with the last segment
if processState != nil {
if processState.Success() {
// if the process exited successfully then
// count the last segment as valid
doSegment(lastFilename)
} else if lastFilename != "" {
// if the process exited unsuccessfully then just delete
// the last segment, it's probably incomplete
os.Remove(filepath.Join(tp.outputDir, lastFilename))
}
}
break
}
lastFilename = filename
tp.segment = i
}
}
func lastSegment(vf *file.VideoFile) int {
return int(math.Ceil(vf.Duration/segmentLength)) - 1
}
func segmentExists(path string) bool {
exists, _ := fsutil.FileExists(path)
return exists
}
// serveHLSManifest serves a generated HLS playlist. The URLs for the segments
// are of the form {r.URL}/%d.ts{?urlQuery} where %d is the segment index.
func serveHLSManifest(sm *StreamManager, w http.ResponseWriter, r *http.Request, vf *file.VideoFile, resolution string) {
if sm.cacheDir == "" {
logger.Error("[transcode] cannot live transcode with HLS because cache dir is unset")
http.Error(w, "cannot live transcode with HLS because cache dir is unset", http.StatusServiceUnavailable)
return
}
probeResult, err := sm.ffprobe.NewVideoFile(vf.Path)
if err != nil {
logger.Warnf("[transcode] error generating HLS manifest: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
baseUrl := *r.URL
baseUrl.RawQuery = ""
baseURL := baseUrl.String()
var urlQuery string
if resolution != "" {
urlQuery = fmt.Sprintf("?resolution=%s", resolution)
}
var buf bytes.Buffer
fmt.Fprint(&buf, "#EXTM3U\n")
fmt.Fprint(&buf, "#EXT-X-VERSION:3\n")
fmt.Fprint(&buf, "#EXT-X-MEDIA-SEQUENCE:0\n")
fmt.Fprintf(&buf, "#EXT-X-TARGETDURATION:%d\n", segmentLength)
fmt.Fprint(&buf, "#EXT-X-PLAYLIST-TYPE:VOD\n")
leftover := probeResult.FileDuration
segment := 0
for leftover > 0 {
thisLength := float64(segmentLength)
if leftover < thisLength {
thisLength = leftover
}
fmt.Fprintf(&buf, "#EXTINF:%f,\n", thisLength)
fmt.Fprintf(&buf, "%s/%d.ts%s\n", baseURL, segment, urlQuery)
leftover -= thisLength
segment++
}
fmt.Fprint(&buf, "#EXT-X-ENDLIST\n")
w.Header().Set("Content-Type", MimeHLS)
http.ServeContent(w, r, "", time.Time{}, bytes.NewReader(buf.Bytes()))
}
func (sm *StreamManager) ServeManifest(w http.ResponseWriter, r *http.Request, streamType *StreamType, vf *file.VideoFile, resolution string) {
streamType.ServeManifest(sm, w, r, vf, resolution)
}
func (sm *StreamManager) serveWaitingSegment(w http.ResponseWriter, r *http.Request, segment *waitingSegment) {
select {
case <-r.Context().Done():
break
case err := <-segment.available:
if err == nil {
logger.Tracef("[transcode] streaming segment file %s", segment.file)
w.Header().Set("Content-Type", segment.segmentType.MimeType)
// Prevent caching as segments are generated on the fly
w.Header().Add("Cache-Control", "no-cache")
http.ServeFile(w, r, segment.path)
} else if !errors.Is(err, context.Canceled) {
logger.Errorf("[transcode] %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
segment.done.Store(true)
}
func (sm *StreamManager) ServeSegment(w http.ResponseWriter, r *http.Request, options StreamOptions) {
if sm.cacheDir == "" {
logger.Error("[transcode] cannot live transcode files because cache dir is unset")
http.Error(w, "cannot live transcode files because cache dir is unset", http.StatusServiceUnavailable)
return
}
if options.Hash == "" {
http.Error(w, "invalid hash", http.StatusBadRequest)
return
}
streamType := options.StreamType
segment, err := streamType.SegmentType.ParseSegment(options.Segment)
// error if segment is past the end of the video
if err != nil || segment > lastSegment(options.VideoFile) {
http.Error(w, "invalid segment", http.StatusBadRequest)
return
}
maxTranscodeSize := sm.config.GetMaxStreamingTranscodeSize().GetMaxResolution()
if options.Resolution != "" {
maxTranscodeSize = models.StreamingResolutionEnum(options.Resolution).GetMaxResolution()
}
dir := options.StreamType.FileDir(options.Hash, maxTranscodeSize)
outputDir := filepath.Join(sm.cacheDir, dir)
name := streamType.SegmentType.MakeFilename(segment)
file := filepath.Join(dir, name)
sm.streamsMutex.Lock()
stream := sm.runningStreams[dir]
if stream == nil {
stream = &runningStream{
dir: dir,
streamType: options.StreamType,
vf: options.VideoFile,
maxTranscodeSize: maxTranscodeSize,
outputDir: outputDir,
// initialize to cap 10 to avoid reallocations
waitingSegments: make([]*waitingSegment, 0, 10),
}
sm.runningStreams[dir] = stream
}
now := time.Now()
stream.lastAccessed = now
if segment != -1 {
stream.lastSegment = segment
}
waitingSegment := &waitingSegment{
segmentType: streamType.SegmentType,
idx: segment,
file: file,
path: filepath.Join(sm.cacheDir, file),
accessed: now,
available: make(chan error, 1),
}
stream.waitingSegments = append(stream.waitingSegments, waitingSegment)
sm.streamsMutex.Unlock()
sm.serveWaitingSegment(w, r, waitingSegment)
}
// assume lock is held
func (sm *StreamManager) startTranscode(stream *runningStream, segment int, done chan<- error) {
// generate segment 0 if init segment requested
if segment == -1 {
segment = 0
}
logger.Debugf("[transcode] starting transcode for %s at segment #%d", stream.dir, segment)
if err := os.MkdirAll(stream.outputDir, os.ModePerm); err != nil {
done <- err
return
}
lockCtx := sm.lockManager.ReadLock(sm.context, stream.vf.Path)
args := stream.makeStreamArgs(segment)
cmd := sm.encoder.Command(lockCtx, args)
stderr, err := cmd.StderrPipe()
if err != nil {
logger.Errorf("[transcode] ffmpeg stderr not available: %v", err)
}
stdout, err := cmd.StdoutPipe()
if nil != err {
logger.Errorf("[transcode] ffmpeg stdout not available: %v", err)
}
logger.Tracef("[transcode] running %s", cmd)
if err := cmd.Start(); err != nil {
lockCtx.Cancel()
done <- fmt.Errorf("error starting transcode process: %w", err)
return
}
tp := &transcodeProcess{
cmd: cmd,
context: lockCtx,
cancel: lockCtx.Cancel,
outputDir: stream.outputDir,
segmentType: stream.streamType.SegmentType,
segment: segment,
}
stream.tp = tp
go func() {
errStr, _ := io.ReadAll(stderr)
outStr, _ := io.ReadAll(stdout)
errCmd := cmd.Wait()
var err error
// don't log error if cancelled
if !tp.cancelled {
e := string(errStr)
if e == "" {
e = string(outStr)
}
if e != "" {
err = errors.New(e)
} else {
err = errCmd
}
if err != nil {
err = fmt.Errorf("[transcode] ffmpeg error when running command <%s>: %w", strings.Join(cmd.Args, " "), err)
}
}
sm.streamsMutex.Lock()
// make sure that cancel is called to prevent memory leaks
tp.cancel()
// clear remaining segments after ffmpeg exit
tp.checkSegments()
if stream.tp == tp {
stream.tp = nil
}
sm.streamsMutex.Unlock()
done <- err
}()
}
// assume lock is held
func (sm *StreamManager) stopTranscode(stream *runningStream) {
tp := stream.tp
if tp != nil {
tp.cancel()
tp.cancelled = true
}
}
func (sm *StreamManager) checkTranscode(stream *runningStream, now time.Time) {
if len(stream.waitingSegments) == 0 && stream.lastAccessed.Add(maxIdleTime).Before(now) {
// Stream expired. Cancel the transcode process and delete the files
logger.Debugf("[transcode] stream for %s not accessed recently. Cancelling transcode and removing files", stream.dir)
sm.stopTranscode(stream)
sm.removeTranscodeFiles(stream)
delete(sm.runningStreams, stream.dir)
return
}
if stream.tp != nil {
segmentType := stream.streamType.SegmentType
segment := stream.lastSegment
// if all segments up to maxSegmentBuffer exist, stop transcode
for i := segment; i < segment+maxSegmentBuffer; i++ {
if !segmentExists(filepath.Join(stream.outputDir, segmentType.MakeFilename(i))) {
return
}
}
logger.Debugf("[transcode] stopping transcode for %s, buffer is full", stream.dir)
sm.stopTranscode(stream)
}
}
func (s *waitingSegment) checkAvailable(now time.Time) bool {
if segmentExists(s.path) {
s.available <- nil
return true
} else if s.accessed.Add(maxSegmentWait).Before(now) {
s.available <- fmt.Errorf("timed out waiting for segment file %s to be generated", s.file)
return true
}
return false
}
// ensureTranscode will start a new transcode process if the transcode
// is more than maxSegmentGap behind the requested segment
func (sm *StreamManager) ensureTranscode(stream *runningStream, segment *waitingSegment) bool {
segmentIdx := segment.idx
tp := stream.tp
if tp == nil {
sm.startTranscode(stream, segmentIdx, segment.available)
return true
} else if segmentIdx < tp.segment || tp.segment+maxSegmentGap < segmentIdx {
// only stop the transcode process here - it will be restarted only
// after the old process exits as stream.tp will then be nil.
sm.stopTranscode(stream)
return true
}
return false
}
// runs every monitorInterval
func (sm *StreamManager) monitorStreams() {
sm.streamsMutex.Lock()
defer sm.streamsMutex.Unlock()
now := time.Now()
for _, stream := range sm.runningStreams {
if stream.tp != nil {
stream.tp.checkSegments()
}
transcodeStarted := false
temp := stream.waitingSegments[:0]
for _, segment := range stream.waitingSegments {
remove := false
if segment.done.Load() || segment.checkAvailable(now) {
remove = true
} else if !transcodeStarted {
transcodeStarted = sm.ensureTranscode(stream, segment)
}
if !remove {
temp = append(temp, segment)
}
}
stream.waitingSegments = temp
if !transcodeStarted {
sm.checkTranscode(stream, now)
}
}
}
// assume lock is held
func (sm *StreamManager) removeTranscodeFiles(stream *runningStream) {
path := stream.outputDir
if err := os.RemoveAll(path); err != nil {
logger.Warnf("[transcode] error removing segment directory %s: %v", path, err)
}
}
// stopAndRemoveAll stops all current streams and removes all cache files
func (sm *StreamManager) stopAndRemoveAll() {
sm.streamsMutex.Lock()
defer sm.streamsMutex.Unlock()
for _, stream := range sm.runningStreams {
for _, segment := range stream.waitingSegments {
if len(segment.available) == 0 {
segment.available <- context.Canceled
}
}
sm.stopTranscode(stream)
sm.removeTranscodeFiles(stream)
}
// ensure nothing else can use the map
sm.runningStreams = nil
}

View File

@@ -0,0 +1,199 @@
package ffmpeg
import (
"errors"
"io"
"net/http"
"os/exec"
"strings"
"syscall"
"github.com/stashapp/stash/pkg/file"
"github.com/stashapp/stash/pkg/fsutil"
"github.com/stashapp/stash/pkg/logger"
"github.com/stashapp/stash/pkg/models"
)
type StreamFormat struct {
MimeType string
Args func(videoFilter VideoFilter, videoOnly bool) Args
}
var (
StreamTypeMP4 = StreamFormat{
MimeType: MimeMp4Video,
Args: func(videoFilter VideoFilter, videoOnly bool) (args Args) {
args = args.VideoCodec(VideoCodecLibX264)
args = append(args,
"-movflags", "frag_keyframe+empty_moov",
"-pix_fmt", "yuv420p",
"-preset", "veryfast",
"-crf", "25",
)
args = args.VideoFilter(videoFilter)
if videoOnly {
args = args.SkipAudio()
} else {
args = append(args, "-ac", "2")
}
args = args.Format(FormatMP4)
return
},
}
StreamTypeWEBM = StreamFormat{
MimeType: MimeWebmVideo,
Args: func(videoFilter VideoFilter, videoOnly bool) (args Args) {
args = args.VideoCodec(VideoCodecVP9)
args = append(args,
"-pix_fmt", "yuv420p",
"-deadline", "realtime",
"-cpu-used", "5",
"-row-mt", "1",
"-crf", "30",
"-b:v", "0",
)
args = args.VideoFilter(videoFilter)
if videoOnly {
args = args.SkipAudio()
} else {
args = append(args, "-ac", "2")
}
args = args.Format(FormatWebm)
return
},
}
StreamTypeMKV = StreamFormat{
MimeType: MimeMkvVideo,
Args: func(videoFilter VideoFilter, videoOnly bool) (args Args) {
args = args.VideoCodec(VideoCodecCopy)
if videoOnly {
args = args.SkipAudio()
} else {
args = args.AudioCodec(AudioCodecLibOpus)
args = append(args,
"-b:a", "96k",
"-vbr", "on",
"-ac", "2",
)
}
args = args.Format(FormatMatroska)
return
},
}
)
type TranscodeOptions struct {
StreamType StreamFormat
VideoFile *file.VideoFile
Resolution string
StartTime float64
}
func (o TranscodeOptions) makeStreamArgs(vf *file.VideoFile, maxScale int, startTime float64) Args {
args := Args{"-hide_banner"}
args = args.LogLevel(LogLevelError)
if startTime != 0 {
args = args.Seek(startTime)
}
args = args.Input(vf.Path)
videoOnly := ProbeAudioCodec(vf.AudioCodec) == MissingUnsupported
var videoFilter VideoFilter
videoFilter = videoFilter.ScaleMax(vf.Width, vf.Height, maxScale)
args = append(args, o.StreamType.Args(videoFilter, videoOnly)...)
args = args.Output("pipe:")
return args
}
func (sm *StreamManager) ServeTranscode(w http.ResponseWriter, r *http.Request, options TranscodeOptions) {
streamRequestCtx := NewStreamRequestContext(w, r)
lockCtx := sm.lockManager.ReadLock(streamRequestCtx, options.VideoFile.Path)
// hijacking and closing the connection here causes video playback to hang in Chrome
// due to ERR_INCOMPLETE_CHUNKED_ENCODING
// We trust that the request context will be closed, so we don't need to call Cancel on the returned context here.
handler, err := sm.getTranscodeStream(lockCtx, options)
if err != nil {
logger.Errorf("[transcode] error transcoding video file: %v", err)
w.WriteHeader(http.StatusBadRequest)
if _, err := w.Write([]byte(err.Error())); err != nil {
logger.Warnf("[transcode] error writing response: %v", err)
}
return
}
handler(w, r)
}
func (sm *StreamManager) getTranscodeStream(ctx *fsutil.LockContext, options TranscodeOptions) (http.HandlerFunc, error) {
maxTranscodeSize := sm.config.GetMaxStreamingTranscodeSize().GetMaxResolution()
if options.Resolution != "" {
maxTranscodeSize = models.StreamingResolutionEnum(options.Resolution).GetMaxResolution()
}
args := options.makeStreamArgs(options.VideoFile, maxTranscodeSize, options.StartTime)
cmd := sm.encoder.Command(ctx, args)
stdout, err := cmd.StdoutPipe()
if nil != err {
logger.Errorf("[transcode] ffmpeg stdout not available: %v", err)
return nil, err
}
stderr, err := cmd.StderrPipe()
if nil != err {
logger.Errorf("[transcode] ffmpeg stderr not available: %v", err)
return nil, err
}
if err = cmd.Start(); err != nil {
return nil, err
}
ctx.AttachCommand(cmd)
// stderr must be consumed or the process deadlocks
go func() {
errStr, _ := io.ReadAll(stderr)
errCmd := cmd.Wait()
var err error
e := string(errStr)
if e != "" {
err = errors.New(e)
} else {
err = errCmd
}
// ignore ExitErrors, the process is always forcibly killed
var exitError *exec.ExitError
if err != nil && !errors.As(err, &exitError) {
logger.Errorf("[transcode] ffmpeg error when running command <%s>: %v", strings.Join(cmd.Args, " "), err)
}
}()
mimeType := options.StreamType.MimeType
handler := func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", mimeType)
w.WriteHeader(http.StatusOK)
// process killing should be handled by command context
_, err := io.Copy(w, stdout)
if err != nil && !errors.Is(err, syscall.EPIPE) {
logger.Errorf("[transcode] error serving transcoded video file: %v", err)
}
w.(http.Flusher).Flush()
}
return handler, nil
}