Adds mutex protection around dms.Eventing (#5042)

It's possible for concurrent map read/write panic in the Eventing.Subscribe function.
This commit is contained in:
WithoutPants
2024-07-04 10:52:04 +10:00
committed by GitHub
parent 15a7b8a859
commit ec23b26c60

View File

@@ -40,6 +40,7 @@ import (
"path" "path"
"strconv" "strconv"
"strings" "strings"
"sync"
"time" "time"
"github.com/anacrolix/dms/soap" "github.com/anacrolix/dms/soap"
@@ -274,6 +275,8 @@ type Server struct {
sceneServer sceneServer sceneServer sceneServer
ipWhitelistManager *ipWhitelistManager ipWhitelistManager *ipWhitelistManager
VideoSortOrder string VideoSortOrder string
subscribeLock sync.Mutex
} }
// UPnP SOAP service. // UPnP SOAP service.
@@ -537,13 +540,14 @@ func (me *Server) contentDirectoryEventSubHandler(w http.ResponseWriter, r *http
// The following code is a work in progress. It partially implements // The following code is a work in progress. It partially implements
// the spec on eventing but hasn't been completed as I have nothing to // the spec on eventing but hasn't been completed as I have nothing to
// test it with. // test it with.
service := me.services["ContentDirectory"]
switch { switch {
case r.Method == "SUBSCRIBE" && r.Header.Get("SID") == "": case r.Method == "SUBSCRIBE" && r.Header.Get("SID") == "":
urls := upnp.ParseCallbackURLs(r.Header.Get("CALLBACK")) urls := upnp.ParseCallbackURLs(r.Header.Get("CALLBACK"))
var timeout int var timeout int
_, _ = fmt.Sscanf(r.Header.Get("TIMEOUT"), "Second-%d", &timeout) _, _ = fmt.Sscanf(r.Header.Get("TIMEOUT"), "Second-%d", &timeout)
sid, timeout, _ := service.Subscribe(urls, timeout)
sid, timeout, _ := me.subscribe(urls, timeout)
w.Header()["SID"] = []string{sid} w.Header()["SID"] = []string{sid}
w.Header()["TIMEOUT"] = []string{fmt.Sprintf("Second-%d", timeout)} w.Header()["TIMEOUT"] = []string{fmt.Sprintf("Second-%d", timeout)}
// TODO: Shouldn't have to do this to get headers logged. // TODO: Shouldn't have to do this to get headers logged.
@@ -559,6 +563,16 @@ func (me *Server) contentDirectoryEventSubHandler(w http.ResponseWriter, r *http
} }
} }
// wrapper around service.Subscribe which requires concurrency protection
// TODO - this should be addressed upstream
func (me *Server) subscribe(urls []*url.URL, timeout int) (sid string, actualTimeout int, err error) {
me.subscribeLock.Lock()
defer me.subscribeLock.Unlock()
service := me.services["ContentDirectory"]
return service.Subscribe(urls, timeout)
}
func (me *Server) initMux(mux *http.ServeMux) { func (me *Server) initMux(mux *http.ServeMux) {
mux.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) { mux.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
resp.Header().Set("content-type", "text/html") resp.Header().Set("content-type", "text/html")