Stop tasks and show task progress (#181)

* Add job status to tasks page

* Add support for stopping task

* Show progress of some tasks
This commit is contained in:
WithoutPants
2019-11-07 15:35:04 +11:00
committed by Leopere
parent d1ea2fffa5
commit c0911f1626
13 changed files with 283 additions and 68 deletions

View File

@@ -31,3 +31,18 @@ func (r *queryResolver) MetadataClean(ctx context.Context) (string, error) {
manager.GetInstance().Clean()
return "todo", nil
}
func (r *queryResolver) JobStatus(ctx context.Context) (*models.MetadataUpdateStatus, error) {
status := manager.GetInstance().Status
ret := models.MetadataUpdateStatus{
Progress: status.Progress,
Status: status.Status.String(),
Message: "",
}
return &ret, nil
}
func (r *queryResolver) StopJob(ctx context.Context) (bool, error) {
return manager.GetInstance().Status.Stop(), nil
}

View File

@@ -2,20 +2,32 @@ package api
import (
"context"
"github.com/stashapp/stash/pkg/manager"
"time"
"github.com/stashapp/stash/pkg/manager"
"github.com/stashapp/stash/pkg/models"
)
func (r *subscriptionResolver) MetadataUpdate(ctx context.Context) (<-chan string, error) {
msg := make(chan string, 1)
func (r *subscriptionResolver) MetadataUpdate(ctx context.Context) (<-chan *models.MetadataUpdateStatus, error) {
msg := make(chan *models.MetadataUpdateStatus, 1)
ticker := time.NewTicker(5 * time.Second)
go func() {
lastStatus := manager.TaskStatus{}
for {
select {
case _ = <-ticker.C:
manager.GetInstance().HandleMetadataUpdateSubscriptionTick(msg)
thisStatus := manager.GetInstance().Status
if thisStatus != lastStatus {
ret := models.MetadataUpdateStatus{
Progress: thisStatus.Progress,
Status: thisStatus.Status.String(),
Message: "",
}
msg <- &ret
}
lastStatus = thisStatus
case <-ctx.Done():
ticker.Stop()
close(msg)

View File

@@ -11,3 +11,22 @@ const (
Clean JobStatus = 5
Scrape JobStatus = 6
)
func (s JobStatus) String() string {
statusMessage := ""
switch s {
case Idle:
statusMessage = "Idle"
case Import:
statusMessage = "Import"
case Export:
statusMessage = "Export"
case Scan:
statusMessage = "Scan"
case Generate:
statusMessage = "Generate"
}
return statusMessage
}

View File

@@ -14,7 +14,7 @@ import (
)
type singleton struct {
Status JobStatus
Status TaskStatus
Paths *paths.Paths
JSON *jsonUtils
@@ -38,7 +38,7 @@ func Initialize() *singleton {
initFlags()
initEnvs()
instance = &singleton{
Status: Idle,
Status: TaskStatus{Status: Idle, Progress: -1},
Paths: paths.NewPaths(),
JSON: &jsonUtils{},
}

View File

@@ -1,36 +0,0 @@
package manager
import (
"encoding/json"
"github.com/stashapp/stash/pkg/logger"
)
type metadataUpdatePayload struct {
Progress float64 `json:"progress"`
Message string `json:"message"`
Logs []logger.LogItem `json:"logs"`
}
func (s *singleton) HandleMetadataUpdateSubscriptionTick(msg chan string) {
var statusMessage string
switch instance.Status {
case Idle:
statusMessage = "Idle"
case Import:
statusMessage = "Import"
case Export:
statusMessage = "Export"
case Scan:
statusMessage = "Scan"
case Generate:
statusMessage = "Generate"
}
payload := &metadataUpdatePayload{
Progress: 0, // TODO
Message: statusMessage,
Logs: logger.LogCache,
}
payloadJSON, _ := json.Marshal(payload)
msg <- string(payloadJSON)
}

View File

@@ -3,6 +3,7 @@ package manager
import (
"path/filepath"
"sync"
"time"
"github.com/bmatcuk/doublestar"
"github.com/stashapp/stash/pkg/logger"
@@ -11,11 +12,47 @@ import (
"github.com/stashapp/stash/pkg/utils"
)
type TaskStatus struct {
Status JobStatus
Progress float64
LastUpdate time.Time
stopping bool
}
func (t *TaskStatus) Stop() bool {
t.stopping = true
t.updated()
return true
}
func (t *TaskStatus) SetStatus(s JobStatus) {
t.Status = s
t.updated()
}
func (t *TaskStatus) setProgress(upTo int, total int) {
if total == 0 {
t.Progress = 1
}
t.Progress = float64(upTo) / float64(total)
t.updated()
}
func (t *TaskStatus) indefiniteProgress() {
t.Progress = -1
t.updated()
}
func (t *TaskStatus) updated() {
t.LastUpdate = time.Now()
}
func (s *singleton) Scan(nameFromMetadata bool) {
if s.Status != Idle {
if s.Status.Status != Idle {
return
}
s.Status = Scan
s.Status.SetStatus(Scan)
s.Status.indefiniteProgress()
go func() {
defer s.returnToIdleState()
@@ -26,10 +63,23 @@ func (s *singleton) Scan(nameFromMetadata bool) {
globResults, _ := doublestar.Glob(globPath)
results = append(results, globResults...)
}
logger.Infof("Starting scan of %d files", len(results))
if s.Status.stopping {
logger.Info("Stopping due to user request")
return
}
total := len(results)
logger.Infof("Starting scan of %d files", total)
var wg sync.WaitGroup
for _, path := range results {
s.Status.Progress = 0
for i, path := range results {
s.Status.setProgress(i, total)
if s.Status.stopping {
logger.Info("Stopping due to user request")
return
}
wg.Add(1)
task := ScanTask{FilePath: path, NameFromMetadata: nameFromMetadata}
go task.Start(&wg)
@@ -41,10 +91,11 @@ func (s *singleton) Scan(nameFromMetadata bool) {
}
func (s *singleton) Import() {
if s.Status != Idle {
if s.Status.Status != Idle {
return
}
s.Status = Import
s.Status.SetStatus(Import)
s.Status.indefiniteProgress()
go func() {
defer s.returnToIdleState()
@@ -58,10 +109,11 @@ func (s *singleton) Import() {
}
func (s *singleton) Export() {
if s.Status != Idle {
if s.Status.Status != Idle {
return
}
s.Status = Export
s.Status.SetStatus(Export)
s.Status.indefiniteProgress()
go func() {
defer s.returnToIdleState()
@@ -75,10 +127,11 @@ func (s *singleton) Export() {
}
func (s *singleton) Generate(sprites bool, previews bool, markers bool, transcodes bool) {
if s.Status != Idle {
if s.Status.Status != Idle {
return
}
s.Status = Generate
s.Status.SetStatus(Generate)
s.Status.indefiniteProgress()
qb := models.NewSceneQueryBuilder()
//this.job.total = await ObjectionUtils.getCount(Scene);
@@ -95,7 +148,21 @@ func (s *singleton) Generate(sprites bool, previews bool, markers bool, transcod
delta := utils.Btoi(sprites) + utils.Btoi(previews) + utils.Btoi(markers) + utils.Btoi(transcodes)
var wg sync.WaitGroup
for _, scene := range scenes {
s.Status.Progress = 0
total := len(scenes)
if s.Status.stopping {
logger.Info("Stopping due to user request")
return
}
for i, scene := range scenes {
s.Status.setProgress(i, total)
if s.Status.stopping {
logger.Info("Stopping due to user request")
return
}
if scene == nil {
logger.Errorf("nil scene, skipping generate")
continue
@@ -134,10 +201,11 @@ func (s *singleton) Generate(sprites bool, previews bool, markers bool, transcod
}
func (s *singleton) Clean() {
if s.Status != Idle {
if s.Status.Status != Idle {
return
}
s.Status = Clean
s.Status.SetStatus(Clean)
s.Status.indefiniteProgress()
qb := models.NewSceneQueryBuilder()
go func() {
@@ -150,8 +218,21 @@ func (s *singleton) Clean() {
return
}
if s.Status.stopping {
logger.Info("Stopping due to user request")
return
}
var wg sync.WaitGroup
for _, scene := range scenes {
s.Status.Progress = 0
total := len(scenes)
for i, scene := range scenes {
s.Status.setProgress(i, total)
if s.Status.stopping {
logger.Info("Stopping due to user request")
return
}
if scene == nil {
logger.Errorf("nil scene, skipping generate")
continue
@@ -173,8 +254,10 @@ func (s *singleton) returnToIdleState() {
logger.Info("recovered from ", r)
}
if s.Status == Generate {
if s.Status.Status == Generate {
instance.Paths.Generated.RemoveTmpDir()
}
s.Status = Idle
s.Status.SetStatus(Idle)
s.Status.indefiniteProgress()
s.Status.stopping = false
}

View File

@@ -1,14 +1,15 @@
package manager
import (
"github.com/stashapp/stash/pkg/ffmpeg"
"github.com/stashapp/stash/pkg/logger"
"github.com/stashapp/stash/pkg/models"
"github.com/stashapp/stash/pkg/utils"
"os"
"path/filepath"
"strconv"
"sync"
"github.com/stashapp/stash/pkg/ffmpeg"
"github.com/stashapp/stash/pkg/logger"
"github.com/stashapp/stash/pkg/models"
"github.com/stashapp/stash/pkg/utils"
)
type GenerateMarkersTask struct {