From 0e013745376a5ad2854a9d6cf8587558218688b2 Mon Sep 17 00:00:00 2001 From: WithoutPants <53250216+WithoutPants@users.noreply.github.com> Date: Mon, 24 May 2021 14:24:18 +1000 Subject: [PATCH] Job queueing (#1379) --- graphql/documents/data/job.graphql | 10 + graphql/documents/mutations/job.graphql | 7 + graphql/documents/mutations/metadata.graphql | 4 - graphql/documents/queries/job.graphql | 11 + .../queries/settings/metadata.graphql | 8 - graphql/documents/subscriptions.graphql | 19 +- graphql/schema/schema.graphql | 32 +- graphql/schema/types/job.graphql | 33 + graphql/schema/types/metadata.graphql | 6 - pkg/api/resolver_mutation_job.go | 23 + pkg/api/resolver_mutation_metadata.go | 74 +- pkg/api/resolver_mutation_stash_box.go | 5 +- pkg/api/resolver_query_job.go | 52 ++ pkg/api/resolver_query_metadata.go | 11 - pkg/api/resolver_subscription_job.go | 64 ++ pkg/api/resolver_subscription_metadata.go | 40 - pkg/job/job.go | 82 ++ pkg/job/manager.go | 394 +++++++++ pkg/job/manager_test.go | 343 ++++++++ pkg/job/progress.go | 138 +++ pkg/job/progress_test.go | 145 ++++ pkg/job/subscribe.go | 36 + pkg/manager/job_status.go | 46 - pkg/manager/manager.go | 12 +- pkg/manager/manager_tasks.go | 797 +++--------------- pkg/manager/subscribe.go | 44 + pkg/manager/task.go | 2 +- pkg/manager/task_autotag.go | 316 ++++++- pkg/manager/task_export.go | 4 - pkg/manager/task_import.go | 4 +- pkg/manager/task_plugin.go | 42 +- pkg/manager/task_scan.go | 232 ++++- pkg/manager/task_stash_box_tag.go | 12 + .../src/components/Changelog/versions/v080.md | 1 + .../SettingsTasksPanel/GenerateButton.tsx | 2 +- .../Settings/SettingsTasksPanel/JobTable.tsx | 211 +++++ .../SettingsTasksPanel/SettingsTasksPanel.tsx | 163 +--- ui/v2.5/src/components/Settings/styles.scss | 48 ++ .../Tagger/performers/PerformerTagger.tsx | 160 ++-- ui/v2.5/src/core/StashService.ts | 21 +- ui/v2.5/src/core/createClient.ts | 18 +- ui/v2.5/src/styles/_theme.scss | 9 +- 42 files changed, 2571 insertions(+), 1110 deletions(-) create mode 100644 graphql/documents/data/job.graphql create mode 100644 graphql/documents/mutations/job.graphql create mode 100644 graphql/documents/queries/job.graphql create mode 100644 graphql/schema/types/job.graphql create mode 100644 pkg/api/resolver_mutation_job.go create mode 100644 pkg/api/resolver_query_job.go create mode 100644 pkg/api/resolver_subscription_job.go delete mode 100644 pkg/api/resolver_subscription_metadata.go create mode 100644 pkg/job/job.go create mode 100644 pkg/job/manager.go create mode 100644 pkg/job/manager_test.go create mode 100644 pkg/job/progress.go create mode 100644 pkg/job/progress_test.go create mode 100644 pkg/job/subscribe.go delete mode 100644 pkg/manager/job_status.go create mode 100644 pkg/manager/subscribe.go create mode 100644 ui/v2.5/src/components/Settings/SettingsTasksPanel/JobTable.tsx diff --git a/graphql/documents/data/job.graphql b/graphql/documents/data/job.graphql new file mode 100644 index 000000000..f1f7c8529 --- /dev/null +++ b/graphql/documents/data/job.graphql @@ -0,0 +1,10 @@ +fragment JobData on Job { + id + status + subTasks + description + progress + startTime + endTime + addTime +} \ No newline at end of file diff --git a/graphql/documents/mutations/job.graphql b/graphql/documents/mutations/job.graphql new file mode 100644 index 000000000..4d64b5d31 --- /dev/null +++ b/graphql/documents/mutations/job.graphql @@ -0,0 +1,7 @@ +mutation StopJob($job_id: ID!) { + stopJob(job_id: $job_id) +} + +mutation StopAllJobs { + stopAllJobs +} \ No newline at end of file diff --git a/graphql/documents/mutations/metadata.graphql b/graphql/documents/mutations/metadata.graphql index 84902a87a..710a9aac9 100644 --- a/graphql/documents/mutations/metadata.graphql +++ b/graphql/documents/mutations/metadata.graphql @@ -34,10 +34,6 @@ mutation MigrateHashNaming { migrateHashNaming } -mutation StopJob { - stopJob -} - mutation BackupDatabase($input: BackupDatabaseInput!) { backupDatabase(input: $input) } diff --git a/graphql/documents/queries/job.graphql b/graphql/documents/queries/job.graphql new file mode 100644 index 000000000..2578c4cd9 --- /dev/null +++ b/graphql/documents/queries/job.graphql @@ -0,0 +1,11 @@ +query JobQueue { + jobQueue { + ...JobData + } +} + +query FindJob($input: FindJobInput!) { + findJob(input: $input) { + ...JobData + } +} diff --git a/graphql/documents/queries/settings/metadata.graphql b/graphql/documents/queries/settings/metadata.graphql index 05dd6d04c..c36c51299 100644 --- a/graphql/documents/queries/settings/metadata.graphql +++ b/graphql/documents/queries/settings/metadata.graphql @@ -1,11 +1,3 @@ -query JobStatus { - jobStatus { - progress - status - message - } -} - query SystemStatus { systemStatus { databaseSchema diff --git a/graphql/documents/subscriptions.graphql b/graphql/documents/subscriptions.graphql index a4c367ab1..fe5bcf3cc 100644 --- a/graphql/documents/subscriptions.graphql +++ b/graphql/documents/subscriptions.graphql @@ -1,8 +1,13 @@ -subscription MetadataUpdate { - metadataUpdate { - progress - status - message +subscription JobsSubscribe { + jobsSubscribe { + type + job { + id + status + subTasks + description + progress + } } } @@ -10,4 +15,8 @@ subscription LoggingSubscribe { loggingSubscribe { ...LogEntryData } +} + +subscription ScanCompleteSubscribe { + scanCompleteSubscribe } \ No newline at end of file diff --git a/graphql/schema/schema.graphql b/graphql/schema/schema.graphql index 63c274da1..993bfdd97 100644 --- a/graphql/schema/schema.graphql +++ b/graphql/schema/schema.graphql @@ -106,9 +106,12 @@ type Query { """Returns an array of paths for the given path""" directory(path: String): Directory! - # Metadata + # System status systemStatus: SystemStatus! - jobStatus: MetadataUpdateStatus! + + # Job status + jobQueue: [Job!] + findJob(input: FindJobInput!): Job dlnaStatus: DLNAStatus! @@ -207,31 +210,32 @@ type Mutation { exportObjects(input: ExportObjectsInput!): String """Performs an incremental import. Returns the job ID""" - importObjects(input: ImportObjectsInput!): String! + importObjects(input: ImportObjectsInput!): ID! """Start an full import. Completely wipes the database and imports from the metadata directory. Returns the job ID""" - metadataImport: String! + metadataImport: ID! """Start a full export. Outputs to the metadata directory. Returns the job ID""" - metadataExport: String! + metadataExport: ID! """Start a scan. Returns the job ID""" - metadataScan(input: ScanMetadataInput!): String! + metadataScan(input: ScanMetadataInput!): ID! """Start generating content. Returns the job ID""" - metadataGenerate(input: GenerateMetadataInput!): String! + metadataGenerate(input: GenerateMetadataInput!): ID! """Start auto-tagging. Returns the job ID""" - metadataAutoTag(input: AutoTagMetadataInput!): String! + metadataAutoTag(input: AutoTagMetadataInput!): ID! """Clean metadata. Returns the job ID""" - metadataClean(input: CleanMetadataInput!): String! + metadataClean(input: CleanMetadataInput!): ID! """Migrate generated files for the current hash naming""" - migrateHashNaming: String! + migrateHashNaming: ID! """Reload scrapers""" reloadScrapers: Boolean! """Run plugin task. Returns the job ID""" - runPluginTask(plugin_id: ID!, task_name: String!, args: [PluginArgInput!]): String! + runPluginTask(plugin_id: ID!, task_name: String!, args: [PluginArgInput!]): ID! reloadPlugins: Boolean! - stopJob: Boolean! + stopJob(job_id: ID!): Boolean! + stopAllJobs: Boolean! """Submit fingerprints to stash-box instance""" submitStashBoxFingerprints(input: StashBoxFingerprintSubmissionInput!): Boolean! @@ -254,9 +258,11 @@ type Mutation { type Subscription { """Update from the metadata manager""" - metadataUpdate: MetadataUpdateStatus! + jobsSubscribe: JobStatusUpdate! loggingSubscribe: [LogEntry!]! + + scanCompleteSubscribe: Boolean! } schema { diff --git a/graphql/schema/types/job.graphql b/graphql/schema/types/job.graphql new file mode 100644 index 000000000..ba8b09dbf --- /dev/null +++ b/graphql/schema/types/job.graphql @@ -0,0 +1,33 @@ +enum JobStatus { + READY + RUNNING + FINISHED + STOPPING + CANCELLED +} + +type Job { + id: ID! + status: JobStatus! + subTasks: [String!] + description: String! + progress: Float + startTime: Time + endTime: Time + addTime: Time! +} + +input FindJobInput { + id: ID! +} + +enum JobStatusUpdateType { + ADD + REMOVE + UPDATE +} + +type JobStatusUpdate { + type: JobStatusUpdateType! + job: Job! +} diff --git a/graphql/schema/types/metadata.graphql b/graphql/schema/types/metadata.graphql index 6c492fdeb..567b77079 100644 --- a/graphql/schema/types/metadata.graphql +++ b/graphql/schema/types/metadata.graphql @@ -63,12 +63,6 @@ input AutoTagMetadataInput { tags: [String!] } -type MetadataUpdateStatus { - progress: Float! - status: String! - message: String! -} - input ExportObjectTypeInput { ids: [String!] all: Boolean diff --git a/pkg/api/resolver_mutation_job.go b/pkg/api/resolver_mutation_job.go new file mode 100644 index 000000000..61e17b14f --- /dev/null +++ b/pkg/api/resolver_mutation_job.go @@ -0,0 +1,23 @@ +package api + +import ( + "context" + "strconv" + + "github.com/stashapp/stash/pkg/manager" +) + +func (r *mutationResolver) StopJob(ctx context.Context, jobID string) (bool, error) { + idInt, err := strconv.Atoi(jobID) + if err != nil { + return false, err + } + manager.GetInstance().JobManager.CancelJob(idInt) + + return true, nil +} + +func (r *mutationResolver) StopAllJobs(ctx context.Context) (bool, error) { + manager.GetInstance().JobManager.CancelAll() + return true, nil +} diff --git a/pkg/api/resolver_mutation_metadata.go b/pkg/api/resolver_mutation_metadata.go index 1e63f5bac..e24d883cf 100644 --- a/pkg/api/resolver_mutation_metadata.go +++ b/pkg/api/resolver_mutation_metadata.go @@ -4,6 +4,8 @@ import ( "context" "io/ioutil" "path/filepath" + "strconv" + "sync" "time" "github.com/stashapp/stash/pkg/database" @@ -15,18 +17,22 @@ import ( ) func (r *mutationResolver) MetadataScan(ctx context.Context, input models.ScanMetadataInput) (string, error) { - if err := manager.GetInstance().Scan(input); err != nil { + jobID, err := manager.GetInstance().Scan(input) + + if err != nil { return "", err } - return "todo", nil + + return strconv.Itoa(jobID), nil } func (r *mutationResolver) MetadataImport(ctx context.Context) (string, error) { - if err := manager.GetInstance().Import(); err != nil { + jobID, err := manager.GetInstance().Import() + if err != nil { return "", err } - return "todo", nil + return strconv.Itoa(jobID), nil } func (r *mutationResolver) ImportObjects(ctx context.Context, input models.ImportObjectsInput) (string, error) { @@ -35,30 +41,26 @@ func (r *mutationResolver) ImportObjects(ctx context.Context, input models.Impor return "", err } - _, err = manager.GetInstance().RunSingleTask(t) + jobID := manager.GetInstance().RunSingleTask(t) + + return strconv.Itoa(jobID), nil +} + +func (r *mutationResolver) MetadataExport(ctx context.Context) (string, error) { + jobID, err := manager.GetInstance().Export() if err != nil { return "", err } - return "todo", nil -} - -func (r *mutationResolver) MetadataExport(ctx context.Context) (string, error) { - if err := manager.GetInstance().Export(); err != nil { - return "", err - } - - return "todo", nil + return strconv.Itoa(jobID), nil } func (r *mutationResolver) ExportObjects(ctx context.Context, input models.ExportObjectsInput) (*string, error) { t := manager.CreateExportTask(config.GetInstance().GetVideoFileNamingAlgorithm(), input) - wg, err := manager.GetInstance().RunSingleTask(t) - if err != nil { - return nil, err - } - wg.Wait() + var wg sync.WaitGroup + wg.Add(1) + t.Start(&wg) if t.DownloadHash != "" { baseURL, _ := ctx.Value(BaseURLCtxKey).(string) @@ -73,40 +75,28 @@ func (r *mutationResolver) ExportObjects(ctx context.Context, input models.Expor } func (r *mutationResolver) MetadataGenerate(ctx context.Context, input models.GenerateMetadataInput) (string, error) { - if err := manager.GetInstance().Generate(input); err != nil { + jobID, err := manager.GetInstance().Generate(input) + + if err != nil { return "", err } - return "todo", nil + + return strconv.Itoa(jobID), nil } func (r *mutationResolver) MetadataAutoTag(ctx context.Context, input models.AutoTagMetadataInput) (string, error) { - manager.GetInstance().AutoTag(input) - return "todo", nil + jobID := manager.GetInstance().AutoTag(input) + return strconv.Itoa(jobID), nil } func (r *mutationResolver) MetadataClean(ctx context.Context, input models.CleanMetadataInput) (string, error) { - manager.GetInstance().Clean(input) - return "todo", nil + jobID := manager.GetInstance().Clean(input) + return strconv.Itoa(jobID), nil } func (r *mutationResolver) MigrateHashNaming(ctx context.Context) (string, error) { - manager.GetInstance().MigrateHash() - return "todo", nil -} - -func (r *mutationResolver) JobStatus(ctx context.Context) (*models.MetadataUpdateStatus, error) { - status := manager.GetInstance().Status - ret := models.MetadataUpdateStatus{ - Progress: status.Progress, - Status: status.Status.String(), - Message: "", - } - - return &ret, nil -} - -func (r *mutationResolver) StopJob(ctx context.Context) (bool, error) { - return manager.GetInstance().Status.Stop(), nil + jobID := manager.GetInstance().MigrateHash() + return strconv.Itoa(jobID), nil } func (r *mutationResolver) BackupDatabase(ctx context.Context, input models.BackupDatabaseInput) (*string, error) { diff --git a/pkg/api/resolver_mutation_stash_box.go b/pkg/api/resolver_mutation_stash_box.go index 4161ec91c..6e9983523 100644 --- a/pkg/api/resolver_mutation_stash_box.go +++ b/pkg/api/resolver_mutation_stash_box.go @@ -3,6 +3,7 @@ package api import ( "context" "fmt" + "strconv" "github.com/stashapp/stash/pkg/manager" "github.com/stashapp/stash/pkg/manager/config" @@ -23,6 +24,6 @@ func (r *mutationResolver) SubmitStashBoxFingerprints(ctx context.Context, input } func (r *mutationResolver) StashBoxBatchPerformerTag(ctx context.Context, input models.StashBoxBatchPerformerTagInput) (string, error) { - manager.GetInstance().StashBoxBatchPerformerTag(input) - return "todo", nil + jobID := manager.GetInstance().StashBoxBatchPerformerTag(input) + return strconv.Itoa(jobID), nil } diff --git a/pkg/api/resolver_query_job.go b/pkg/api/resolver_query_job.go new file mode 100644 index 000000000..3a8f4de97 --- /dev/null +++ b/pkg/api/resolver_query_job.go @@ -0,0 +1,52 @@ +package api + +import ( + "context" + "strconv" + + "github.com/stashapp/stash/pkg/job" + "github.com/stashapp/stash/pkg/manager" + "github.com/stashapp/stash/pkg/models" +) + +func (r *queryResolver) JobQueue(ctx context.Context) ([]*models.Job, error) { + queue := manager.GetInstance().JobManager.GetQueue() + + var ret []*models.Job + for _, j := range queue { + ret = append(ret, jobToJobModel(j)) + } + + return ret, nil +} + +func (r *queryResolver) FindJob(ctx context.Context, input models.FindJobInput) (*models.Job, error) { + jobID, err := strconv.Atoi(input.ID) + if err != nil { + return nil, err + } + j := manager.GetInstance().JobManager.GetJob(jobID) + if j == nil { + return nil, nil + } + + return jobToJobModel(*j), nil +} + +func jobToJobModel(j job.Job) *models.Job { + ret := &models.Job{ + ID: strconv.Itoa(j.ID), + Status: models.JobStatus(j.Status), + Description: j.Description, + SubTasks: j.Details, + StartTime: j.StartTime, + EndTime: j.EndTime, + AddTime: j.AddTime, + } + + if j.Progress != -1 { + ret.Progress = &j.Progress + } + + return ret +} diff --git a/pkg/api/resolver_query_metadata.go b/pkg/api/resolver_query_metadata.go index cb12d96e7..a1bacebb0 100644 --- a/pkg/api/resolver_query_metadata.go +++ b/pkg/api/resolver_query_metadata.go @@ -7,17 +7,6 @@ import ( "github.com/stashapp/stash/pkg/models" ) -func (r *queryResolver) JobStatus(ctx context.Context) (*models.MetadataUpdateStatus, error) { - status := manager.GetInstance().Status - ret := models.MetadataUpdateStatus{ - Progress: status.Progress, - Status: status.Status.String(), - Message: "", - } - - return &ret, nil -} - func (r *queryResolver) SystemStatus(ctx context.Context) (*models.SystemStatus, error) { return manager.GetInstance().GetSystemStatus(), nil } diff --git a/pkg/api/resolver_subscription_job.go b/pkg/api/resolver_subscription_job.go new file mode 100644 index 000000000..c7b3176c0 --- /dev/null +++ b/pkg/api/resolver_subscription_job.go @@ -0,0 +1,64 @@ +package api + +import ( + "context" + "time" + + "github.com/stashapp/stash/pkg/job" + "github.com/stashapp/stash/pkg/manager" + "github.com/stashapp/stash/pkg/models" +) + +type throttledUpdate struct { + id int + pendingUpdate *job.Job + lastUpdate time.Time + broadcastTimer *time.Timer + killTimer *time.Timer +} + +func (tu *throttledUpdate) broadcast(output chan *models.JobStatusUpdate) { + tu.lastUpdate = time.Now() + output <- &models.JobStatusUpdate{ + Type: models.JobStatusUpdateTypeUpdate, + Job: jobToJobModel(*tu.pendingUpdate), + } + + tu.broadcastTimer = nil + tu.pendingUpdate = nil +} + +func makeJobStatusUpdate(t models.JobStatusUpdateType, j job.Job) *models.JobStatusUpdate { + return &models.JobStatusUpdate{ + Type: t, + Job: jobToJobModel(j), + } +} + +func (r *subscriptionResolver) JobsSubscribe(ctx context.Context) (<-chan *models.JobStatusUpdate, error) { + msg := make(chan *models.JobStatusUpdate, 100) + + subscription := manager.GetInstance().JobManager.Subscribe(ctx) + + go func() { + for { + select { + case j := <-subscription.NewJob: + msg <- makeJobStatusUpdate(models.JobStatusUpdateTypeAdd, j) + case j := <-subscription.RemovedJob: + msg <- makeJobStatusUpdate(models.JobStatusUpdateTypeRemove, j) + case j := <-subscription.UpdatedJob: + msg <- makeJobStatusUpdate(models.JobStatusUpdateTypeUpdate, j) + case <-ctx.Done(): + close(msg) + return + } + } + }() + + return msg, nil +} + +func (r *subscriptionResolver) ScanCompleteSubscribe(ctx context.Context) (<-chan bool, error) { + return manager.GetInstance().ScanSubscribe(ctx), nil +} diff --git a/pkg/api/resolver_subscription_metadata.go b/pkg/api/resolver_subscription_metadata.go deleted file mode 100644 index b30b83892..000000000 --- a/pkg/api/resolver_subscription_metadata.go +++ /dev/null @@ -1,40 +0,0 @@ -package api - -import ( - "context" - "time" - - "github.com/stashapp/stash/pkg/manager" - "github.com/stashapp/stash/pkg/models" -) - -func (r *subscriptionResolver) MetadataUpdate(ctx context.Context) (<-chan *models.MetadataUpdateStatus, error) { - msg := make(chan *models.MetadataUpdateStatus, 1) - - ticker := time.NewTicker(5 * time.Second) - - go func() { - lastStatus := manager.TaskStatus{} - for { - select { - case _ = <-ticker.C: - thisStatus := manager.GetInstance().Status - if thisStatus != lastStatus { - ret := models.MetadataUpdateStatus{ - Progress: thisStatus.Progress, - Status: thisStatus.Status.String(), - Message: "", - } - msg <- &ret - } - lastStatus = thisStatus - case <-ctx.Done(): - ticker.Stop() - close(msg) - return - } - } - }() - - return msg, nil -} diff --git a/pkg/job/job.go b/pkg/job/job.go new file mode 100644 index 000000000..735f2d298 --- /dev/null +++ b/pkg/job/job.go @@ -0,0 +1,82 @@ +package job + +import ( + "context" + "time" +) + +// JobExec represents the implementation of a Job to be executed. +type JobExec interface { + Execute(ctx context.Context, progress *Progress) +} + +type jobExecImpl struct { + fn func(ctx context.Context, progress *Progress) +} + +func (j *jobExecImpl) Execute(ctx context.Context, progress *Progress) { + j.fn(ctx, progress) +} + +// MakeJobExec returns a simple JobExec implementation using the provided +// function. +func MakeJobExec(fn func(ctx context.Context, progress *Progress)) JobExec { + return &jobExecImpl{ + fn: fn, + } +} + +// Status is the status of a Job +type Status string + +const ( + // StatusReady means that the Job is not yet started. + StatusReady Status = "READY" + // StatusRunning means that the job is currently running. + StatusRunning Status = "RUNNING" + // StatusStopping means that the job is cancelled but is still running. + StatusStopping Status = "STOPPING" + // StatusFinished means that the job was completed. + StatusFinished Status = "FINISHED" + // StatusCancelled means that the job was cancelled and is now stopped. + StatusCancelled Status = "CANCELLED" +) + +// Job represents the status of a queued or running job. +type Job struct { + ID int + Status Status + // details of the current operations of the job + Details []string + Description string + // Progress in terms of 0 - 1. + Progress float64 + StartTime *time.Time + EndTime *time.Time + AddTime time.Time + + exec JobExec + cancelFunc context.CancelFunc +} + +func (j *Job) cancel() { + if j.Status == StatusReady { + j.Status = StatusCancelled + } else if j.Status == StatusRunning { + j.Status = StatusStopping + } + + if j.cancelFunc != nil { + j.cancelFunc() + } +} + +// IsCancelled returns true if cancel has been called on the context. +func IsCancelled(ctx context.Context) bool { + select { + case <-ctx.Done(): + return true + default: + return false + } +} diff --git a/pkg/job/manager.go b/pkg/job/manager.go new file mode 100644 index 000000000..4b4a5334f --- /dev/null +++ b/pkg/job/manager.go @@ -0,0 +1,394 @@ +package job + +import ( + "context" + "sync" + "time" +) + +const maxGraveyardSize = 10 +const defaultThrottleLimit = time.Second + +// Manager maintains a queue of jobs. Jobs are executed one at a time. +type Manager struct { + queue []*Job + graveyard []*Job + + mutex sync.Mutex + notEmpty *sync.Cond + stop chan struct{} + + lastID int + + subscriptions []*ManagerSubscription + updateThrottleLimit time.Duration +} + +// NewManager initialises and returns a new Manager. +func NewManager() *Manager { + ret := &Manager{ + stop: make(chan struct{}), + updateThrottleLimit: defaultThrottleLimit, + } + + ret.notEmpty = sync.NewCond(&ret.mutex) + + go ret.dispatcher() + + return ret +} + +// Stop is used to stop the dispatcher thread. Once Stop is called, no +// more Jobs will be processed. +func (m *Manager) Stop() { + m.CancelAll() + close(m.stop) +} + +// Add queues a job. +func (m *Manager) Add(description string, e JobExec) int { + m.mutex.Lock() + defer m.mutex.Unlock() + + t := time.Now() + + j := Job{ + ID: m.nextID(), + Status: StatusReady, + Description: description, + AddTime: t, + exec: e, + } + + m.queue = append(m.queue, &j) + + if len(m.queue) == 1 { + // notify that there is now a job in the queue + m.notEmpty.Broadcast() + } + + m.notifyNewJob(&j) + + return j.ID +} + +// Start adds a job and starts it immediately, concurrently with any other +// jobs. +func (m *Manager) Start(description string, e JobExec) int { + m.mutex.Lock() + defer m.mutex.Unlock() + + t := time.Now() + + j := Job{ + ID: m.nextID(), + Status: StatusReady, + Description: description, + AddTime: t, + exec: e, + } + + m.queue = append(m.queue, &j) + + m.dispatch(&j) + + return j.ID +} + +func (m *Manager) notifyNewJob(j *Job) { + // assumes lock held + for _, s := range m.subscriptions { + // don't block if channel is full + select { + case s.newJob <- *j: + default: + } + } +} + +func (m *Manager) nextID() int { + m.lastID += 1 + return m.lastID +} + +func (m *Manager) getReadyJob() *Job { + // assumes lock held + for _, j := range m.queue { + if j.Status == StatusReady { + return j + } + } + + return nil +} + +func (m *Manager) dispatcher() { + m.mutex.Lock() + + for { + // wait until we have something to process + j := m.getReadyJob() + + for j == nil { + m.notEmpty.Wait() + + // it's possible that we have been stopped - check here + select { + case <-m.stop: + m.mutex.Unlock() + return + default: + // keep going + j = m.getReadyJob() + } + } + + done := m.dispatch(j) + + // unlock the mutex and wait for the job to finish + m.mutex.Unlock() + <-done + m.mutex.Lock() + + // remove the job from the queue + m.removeJob(j) + + // process next job + } +} + +func (m *Manager) newProgress(j *Job) *Progress { + return &Progress{ + updater: &updater{ + m: m, + job: j, + }, + percent: ProgressIndefinite, + } +} + +func (m *Manager) dispatch(j *Job) (done chan struct{}) { + // assumes lock held + t := time.Now() + j.StartTime = &t + j.Status = StatusRunning + + ctx, cancelFunc := context.WithCancel(context.Background()) + j.cancelFunc = cancelFunc + + done = make(chan struct{}) + go func() { + progress := m.newProgress(j) + j.exec.Execute(ctx, progress) + + m.onJobFinish(j) + + close(done) + }() + + m.notifyJobUpdate(j) + + return +} + +func (m *Manager) onJobFinish(job *Job) { + m.mutex.Lock() + defer m.mutex.Unlock() + + if job.Status == StatusStopping { + job.Status = StatusCancelled + } else { + job.Status = StatusFinished + } + + t := time.Now() + job.EndTime = &t +} + +func (m *Manager) removeJob(job *Job) { + // assumes lock held + index, _ := m.getJob(m.queue, job.ID) + if index == -1 { + return + } + + // clear any subtasks + job.Details = nil + + m.queue = append(m.queue[:index], m.queue[index+1:]...) + + m.graveyard = append(m.graveyard, job) + if len(m.graveyard) > maxGraveyardSize { + m.graveyard = m.graveyard[1:] + } + + // notify job removed + for _, s := range m.subscriptions { + // don't block if channel is full + select { + case s.removedJob <- *job: + default: + } + } +} + +func (m *Manager) getJob(list []*Job, id int) (index int, job *Job) { + // assumes lock held + for i, j := range list { + if j.ID == id { + index = i + job = j + return + } + } + + return -1, nil +} + +// CancelJob cancels the job with the provided id. Jobs that have been started +// are notified that they are stopping. Jobs that have not yet started are +// removed from the queue. If no job exists with the provided id, then there is +// no effect. Likewise, if the job is already cancelled, there is no effect. +func (m *Manager) CancelJob(id int) { + m.mutex.Lock() + defer m.mutex.Unlock() + + _, j := m.getJob(m.queue, id) + if j != nil { + j.cancel() + + if j.Status == StatusCancelled { + // remove from the queue + m.removeJob(j) + } + } +} + +// CancelAll cancels all of the jobs in the queue. This is the same as +// calling CancelJob on all jobs in the queue. +func (m *Manager) CancelAll() { + m.mutex.Lock() + defer m.mutex.Unlock() + + // call cancel on all + for _, j := range m.queue { + j.cancel() + + if j.Status == StatusCancelled { + // add to graveyard + m.removeJob(j) + } + } +} + +// GetJob returns a copy of the Job for the provided id. Returns nil if the job +// does not exist. +func (m *Manager) GetJob(id int) *Job { + m.mutex.Lock() + defer m.mutex.Unlock() + + // get from the queue or graveyard + _, j := m.getJob(append(m.queue, m.graveyard...), id) + if j != nil { + // make a copy of the job and return the pointer + jCopy := *j + return &jCopy + } + + return nil +} + +// GetQueue returns a copy of the current job queue. +func (m *Manager) GetQueue() []Job { + m.mutex.Lock() + defer m.mutex.Unlock() + + var ret []Job + + for _, j := range m.queue { + jCopy := *j + ret = append(ret, jCopy) + } + + return ret +} + +// Subscribe subscribes to changes to jobs in the manager queue. +func (m *Manager) Subscribe(ctx context.Context) *ManagerSubscription { + m.mutex.Lock() + defer m.mutex.Unlock() + + ret := newSubscription() + + m.subscriptions = append(m.subscriptions, ret) + + go func() { + <-ctx.Done() + m.mutex.Lock() + defer m.mutex.Unlock() + + ret.close() + + // remove from the list + for i, s := range m.subscriptions { + if s == ret { + m.subscriptions = append(m.subscriptions[:i], m.subscriptions[i+1:]...) + break + } + } + }() + + return ret +} + +func (m *Manager) notifyJobUpdate(j *Job) { + // don't update if job is finished or cancelled - these are handled + // by removeJob + if j.Status == StatusCancelled || j.Status == StatusFinished { + return + } + + // assumes lock held + for _, s := range m.subscriptions { + // don't block if channel is full + select { + case s.updatedJob <- *j: + default: + } + } +} + +type updater struct { + m *Manager + job *Job + lastUpdate time.Time + updateTimer *time.Timer +} + +func (u *updater) notifyUpdate() { + // assumes lock held + u.m.notifyJobUpdate(u.job) + u.lastUpdate = time.Now() + u.updateTimer = nil +} + +func (u *updater) updateProgress(progress float64, details []string) { + u.m.mutex.Lock() + defer u.m.mutex.Unlock() + + u.job.Progress = progress + u.job.Details = details + + if time.Since(u.lastUpdate) < u.m.updateThrottleLimit { + if u.updateTimer == nil { + u.updateTimer = time.AfterFunc(u.m.updateThrottleLimit-time.Since(u.lastUpdate), func() { + u.m.mutex.Lock() + defer u.m.mutex.Unlock() + + u.notifyUpdate() + }) + } + } else { + u.notifyUpdate() + } +} diff --git a/pkg/job/manager_test.go b/pkg/job/manager_test.go new file mode 100644 index 000000000..b76d424a5 --- /dev/null +++ b/pkg/job/manager_test.go @@ -0,0 +1,343 @@ +package job + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +const sleepTime time.Duration = 10 * time.Millisecond + +type testExec struct { + started chan struct{} + finish chan struct{} + cancelled bool + progress *Progress +} + +func newTestExec(finish chan struct{}) *testExec { + return &testExec{ + started: make(chan struct{}), + finish: finish, + } +} + +func (e *testExec) Execute(ctx context.Context, p *Progress) { + e.progress = p + close(e.started) + + if e.finish != nil { + <-e.finish + + select { + case <-ctx.Done(): + e.cancelled = true + default: + // fall through + } + } +} + +func TestAdd(t *testing.T) { + m := NewManager() + + const jobName = "test job" + exec1 := newTestExec(make(chan struct{})) + jobID := m.Add(jobName, exec1) + + // expect jobID to be the first ID + assert := assert.New(t) + assert.Equal(1, jobID) + + // wait a tiny bit + time.Sleep(sleepTime) + + // expect job to have started + select { + case <-exec1.started: + // ok + default: + t.Error("exec was not started") + } + + // expect status to be running + j := m.GetJob(jobID) + + assert.Equal(StatusRunning, j.Status) + + // expect description to be set + assert.Equal(jobName, j.Description) + + // expect startTime and addTime to be set + assert.NotNil(j.StartTime) + assert.NotNil(j.AddTime) + + // expect endTime to not be set + assert.Nil(j.EndTime) + + // add another job to the queue + const otherJobName = "other job name" + exec2 := newTestExec(make(chan struct{})) + job2ID := m.Add(otherJobName, exec2) + + // expect status to be ready + j2 := m.GetJob(job2ID) + + assert.Equal(StatusReady, j2.Status) + + // expect addTime to be set + assert.NotNil(j2.AddTime) + + // expect startTime and endTime to not be set + assert.Nil(j2.StartTime) + assert.Nil(j2.EndTime) + + // allow first job to finish + close(exec1.finish) + + // wait a tiny bit + time.Sleep(sleepTime) + + // expect first job to be finished + j = m.GetJob(jobID) + assert.Equal(StatusFinished, j.Status) + + // expect end time to be set + assert.NotNil(j.EndTime) + + // expect second job to have started + select { + case <-exec2.started: + // ok + default: + t.Error("exec was not started") + } + + // expect status to be running + j2 = m.GetJob(job2ID) + + assert.Equal(StatusRunning, j2.Status) + + // expect startTime to be set + assert.NotNil(j2.StartTime) +} + +func TestCancel(t *testing.T) { + m := NewManager() + + // add two jobs + const jobName = "test job" + exec1 := newTestExec(make(chan struct{})) + jobID := m.Add(jobName, exec1) + + const otherJobName = "other job" + exec2 := newTestExec(make(chan struct{})) + job2ID := m.Add(otherJobName, exec2) + + // wait a tiny bit + time.Sleep(sleepTime) + + m.CancelJob(job2ID) + + // expect job to be cancelled + assert := assert.New(t) + j := m.GetJob(job2ID) + assert.Equal(StatusCancelled, j.Status) + + // expect end time not to be set + assert.Nil(j.EndTime) + + // expect job to be removed from the queue + assert.Len(m.GetQueue(), 1) + + // expect job to have not have been started + select { + case <-exec2.started: + t.Error("cancelled exec was started") + default: + } + + // cancel running job + m.CancelJob(jobID) + + // wait a tiny bit + time.Sleep(sleepTime) + + // expect status to be stopping + j = m.GetJob(jobID) + assert.Equal(StatusStopping, j.Status) + + // expect job to still be in the queue + assert.Len(m.GetQueue(), 1) + + // allow first job to finish + close(exec1.finish) + + // wait a tiny bit + time.Sleep(sleepTime) + + // expect job to be removed from the queue + assert.Len(m.GetQueue(), 0) + + // expect job to be cancelled + j = m.GetJob(jobID) + assert.Equal(StatusCancelled, j.Status) + + // expect endtime to be set + assert.NotNil(j.EndTime) + + // expect job to have been cancelled via context + assert.True(exec1.cancelled) +} + +func TestCancelAll(t *testing.T) { + m := NewManager() + + // add two jobs + const jobName = "test job" + exec1 := newTestExec(make(chan struct{})) + jobID := m.Add(jobName, exec1) + + const otherJobName = "other job" + exec2 := newTestExec(make(chan struct{})) + job2ID := m.Add(otherJobName, exec2) + + // wait a tiny bit + time.Sleep(sleepTime) + + m.CancelAll() + + // allow first job to finish + close(exec1.finish) + + // wait a tiny bit + time.Sleep(sleepTime) + + // expect all jobs to be cancelled + assert := assert.New(t) + j := m.GetJob(job2ID) + assert.Equal(StatusCancelled, j.Status) + + j = m.GetJob(jobID) + assert.Equal(StatusCancelled, j.Status) + + // expect all jobs to be removed from the queue + assert.Len(m.GetQueue(), 0) + + // expect job to have not have been started + select { + case <-exec2.started: + t.Error("cancelled exec was started") + default: + } +} + +func TestSubscribe(t *testing.T) { + m := NewManager() + + m.updateThrottleLimit = time.Millisecond * 100 + + ctx, cancel := context.WithCancel(context.Background()) + + s := m.Subscribe(ctx) + + // add a job + const jobName = "test job" + exec1 := newTestExec(make(chan struct{})) + jobID := m.Add(jobName, exec1) + + assert := assert.New(t) + + select { + case newJob := <-s.NewJob: + assert.Equal(jobID, newJob.ID) + assert.Equal(jobName, newJob.Description) + assert.Equal(StatusReady, newJob.Status) + case <-time.After(time.Second): + t.Error("new job was not received") + } + + // should receive an update when the job begins to run + select { + case updatedJob := <-s.UpdatedJob: + assert.Equal(jobID, updatedJob.ID) + assert.Equal(jobName, updatedJob.Description) + assert.Equal(StatusRunning, updatedJob.Status) + case <-time.After(time.Second): + t.Error("updated job was not received") + } + + // wait for it to start + select { + case <-exec1.started: + // ok + case <-time.After(time.Second): + t.Error("exec was not started") + } + + // test update throttling + exec1.progress.SetPercent(0.1) + + // first update should be immediate + select { + case updatedJob := <-s.UpdatedJob: + assert.Equal(0.1, updatedJob.Progress) + case <-time.After(m.updateThrottleLimit): + t.Error("updated job was not received") + } + + exec1.progress.SetPercent(0.2) + exec1.progress.SetPercent(0.3) + + // should only receive a single update with the second status + select { + case updatedJob := <-s.UpdatedJob: + assert.Equal(0.3, updatedJob.Progress) + case <-time.After(time.Second): + t.Error("updated job was not received") + } + + select { + case <-s.UpdatedJob: + t.Error("received an additional updatedJob") + default: + } + + // allow job to finish + close(exec1.finish) + + select { + case removedJob := <-s.RemovedJob: + assert.Equal(jobID, removedJob.ID) + assert.Equal(jobName, removedJob.Description) + assert.Equal(StatusFinished, removedJob.Status) + case <-time.After(time.Second): + t.Error("removed job was not received") + } + + // should not receive another update + select { + case <-s.UpdatedJob: + t.Error("updated job was received after update") + case <-time.After(m.updateThrottleLimit): + } + + // add another job and cancel it + exec2 := newTestExec(make(chan struct{})) + jobID = m.Add(jobName, exec2) + + m.CancelJob(jobID) + + select { + case removedJob := <-s.RemovedJob: + assert.Equal(jobID, removedJob.ID) + assert.Equal(jobName, removedJob.Description) + assert.Equal(StatusCancelled, removedJob.Status) + case <-time.After(time.Second): + t.Error("cancelled job was not received") + } + + cancel() +} diff --git a/pkg/job/progress.go b/pkg/job/progress.go new file mode 100644 index 000000000..93752ce11 --- /dev/null +++ b/pkg/job/progress.go @@ -0,0 +1,138 @@ +package job + +import "sync" + +// ProgressIndefinite is the special percent value to indicate that the +// percent progress is not known. +const ProgressIndefinite float64 = -1 + +// Progress is used by JobExec to communicate updates to the job's progress to +// the JobManager. +type Progress struct { + processed int + total int + percent float64 + currentTasks []*task + + mutex sync.Mutex + updater *updater +} + +type task struct { + description string +} + +func (p *Progress) updated() { + var details []string + for _, t := range p.currentTasks { + details = append(details, t.description) + } + + p.updater.updateProgress(p.percent, details) +} + +// Indefinite sets the progress to an indefinite amount. +func (p *Progress) Indefinite() { + p.mutex.Lock() + defer p.mutex.Unlock() + + p.total = 0 + p.calculatePercent() +} + +// SetTotal sets the total number of work units. This is used to calculate the +// progress percentage. +func (p *Progress) SetTotal(total int) { + p.mutex.Lock() + defer p.mutex.Unlock() + + p.total = total + p.calculatePercent() +} + +// SetProcessed sets the number of work units completed. This is used to +// calculate the progress percentage. +func (p *Progress) SetProcessed(processed int) { + p.mutex.Lock() + defer p.mutex.Unlock() + + p.processed = processed + p.calculatePercent() +} + +func (p *Progress) calculatePercent() { + if p.total <= 0 { + p.percent = ProgressIndefinite + } else if p.processed < 0 { + p.percent = 0 + } else { + p.percent = float64(p.processed) / float64(p.total) + if p.percent > 1 { + p.percent = 1 + } + } + + p.updated() +} + +// SetPercent sets the progress percent directly. This value will be +// overwritten if Indefinite, SetTotal, Increment or SetProcessed is called. +// Constrains the percent value between 0 and 1, inclusive. +func (p *Progress) SetPercent(percent float64) { + p.mutex.Lock() + defer p.mutex.Unlock() + + if percent < 0 { + percent = 0 + } else if percent > 1 { + percent = 1 + } + + p.percent = percent + p.updated() +} + +// Increment increments the number of processed work units, if this does not +// exceed the total units. This is used to calculate the percentage. +func (p *Progress) Increment() { + p.mutex.Lock() + defer p.mutex.Unlock() + + if p.processed < p.total { + p.processed += 1 + p.calculatePercent() + } +} + +func (p *Progress) addTask(t *task) { + p.mutex.Lock() + defer p.mutex.Unlock() + + p.currentTasks = append(p.currentTasks, t) + p.updated() +} + +func (p *Progress) removeTask(t *task) { + p.mutex.Lock() + defer p.mutex.Unlock() + + for i, tt := range p.currentTasks { + if tt == t { + p.currentTasks = append(p.currentTasks[:i], p.currentTasks[i+1:]...) + p.updated() + return + } + } +} + +// ExecuteTask executes a task as part of a job. The description is used to +// populate the Details slice in the parent Job. +func (p *Progress) ExecuteTask(description string, fn func()) { + t := &task{ + description: description, + } + + p.addTask(t) + defer p.removeTask(t) + fn() +} diff --git a/pkg/job/progress_test.go b/pkg/job/progress_test.go new file mode 100644 index 000000000..f2dbc0bc9 --- /dev/null +++ b/pkg/job/progress_test.go @@ -0,0 +1,145 @@ +package job + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func createProgress(m *Manager, j *Job) Progress { + return Progress{ + updater: &updater{ + m: m, + job: j, + }, + total: 100, + processed: 10, + percent: 10, + } +} + +func TestProgressIndefinite(t *testing.T) { + m := NewManager() + j := &Job{} + + p := createProgress(m, j) + + p.Indefinite() + + assert := assert.New(t) + + // ensure job progress was updated + assert.Equal(ProgressIndefinite, j.Progress) +} + +func TestProgressSetTotal(t *testing.T) { + m := NewManager() + j := &Job{} + + p := createProgress(m, j) + + p.SetTotal(50) + + assert := assert.New(t) + + // ensure job progress was updated + assert.Equal(0.2, j.Progress) + + p.SetTotal(0) + assert.Equal(ProgressIndefinite, j.Progress) + + p.SetTotal(-10) + assert.Equal(ProgressIndefinite, j.Progress) + + p.SetTotal(9) + assert.Equal(float64(1), j.Progress) +} + +func TestProgressSetProcessed(t *testing.T) { + m := NewManager() + j := &Job{} + + p := createProgress(m, j) + + p.SetProcessed(30) + + assert := assert.New(t) + + // ensure job progress was updated + assert.Equal(0.3, j.Progress) + + p.SetProcessed(-10) + assert.Equal(float64(0), j.Progress) + + p.SetProcessed(200) + assert.Equal(float64(1), j.Progress) +} + +func TestProgressSetPercent(t *testing.T) { + m := NewManager() + j := &Job{} + + p := createProgress(m, j) + + p.SetPercent(0.3) + + assert := assert.New(t) + + // ensure job progress was updated + assert.Equal(0.3, j.Progress) + + p.SetPercent(-10) + assert.Equal(float64(0), j.Progress) + + p.SetPercent(200) + assert.Equal(float64(1), j.Progress) +} + +func TestProgressIncrement(t *testing.T) { + m := NewManager() + j := &Job{} + + p := createProgress(m, j) + + p.SetProcessed(49) + p.Increment() + + assert := assert.New(t) + + // ensure job progress was updated + assert.Equal(0.5, j.Progress) + + p.SetProcessed(100) + p.Increment() + assert.Equal(float64(1), j.Progress) +} + +func TestExecuteTask(t *testing.T) { + m := NewManager() + j := &Job{} + + p := createProgress(m, j) + + c := make(chan struct{}, 1) + const taskDesciption = "taskDescription" + + go p.ExecuteTask(taskDesciption, func() { + <-c + }) + + time.Sleep(sleepTime) + + assert := assert.New(t) + + // ensure task is added to the job details + assert.Equal(taskDesciption, j.Details[0]) + + // allow task to finish + close(c) + + time.Sleep(sleepTime) + + // ensure task is removed from the job details + assert.Len(j.Details, 0) +} diff --git a/pkg/job/subscribe.go b/pkg/job/subscribe.go new file mode 100644 index 000000000..1ade17383 --- /dev/null +++ b/pkg/job/subscribe.go @@ -0,0 +1,36 @@ +package job + +// ManagerSubscription is a collection of channels that will receive updates +// from the job manager. +type ManagerSubscription struct { + // new jobs are sent to this channel + NewJob <-chan Job + // removed jobs are sent to this channel + RemovedJob <-chan Job + // updated jobs are sent to this channel + UpdatedJob <-chan Job + + newJob chan Job + removedJob chan Job + updatedJob chan Job +} + +func newSubscription() *ManagerSubscription { + ret := &ManagerSubscription{ + newJob: make(chan Job, 100), + removedJob: make(chan Job, 100), + updatedJob: make(chan Job, 100), + } + + ret.NewJob = ret.newJob + ret.RemovedJob = ret.removedJob + ret.UpdatedJob = ret.updatedJob + + return ret +} + +func (s *ManagerSubscription) close() { + close(s.newJob) + close(s.removedJob) + close(s.updatedJob) +} diff --git a/pkg/manager/job_status.go b/pkg/manager/job_status.go deleted file mode 100644 index ef4dfad62..000000000 --- a/pkg/manager/job_status.go +++ /dev/null @@ -1,46 +0,0 @@ -package manager - -type JobStatus int - -const ( - Idle JobStatus = 0 - Import JobStatus = 1 - Export JobStatus = 2 - Scan JobStatus = 3 - Generate JobStatus = 4 - Clean JobStatus = 5 - Scrape JobStatus = 6 - AutoTag JobStatus = 7 - Migrate JobStatus = 8 - PluginOperation JobStatus = 9 - StashBoxBatchPerformer JobStatus = 10 -) - -func (s JobStatus) String() string { - statusMessage := "" - - switch s { - case Idle: - statusMessage = "Idle" - case Import: - statusMessage = "Import" - case Export: - statusMessage = "Export" - case Scan: - statusMessage = "Scan" - case Generate: - statusMessage = "Generate" - case AutoTag: - statusMessage = "Auto Tag" - case Migrate: - statusMessage = "Migrate" - case Clean: - statusMessage = "Clean" - case PluginOperation: - statusMessage = "Plugin Operation" - case StashBoxBatchPerformer: - statusMessage = "Stash-Box Performer Batch Operation" - } - - return statusMessage -} diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 704ba353d..a47512eb5 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -12,6 +12,7 @@ import ( "github.com/stashapp/stash/pkg/database" "github.com/stashapp/stash/pkg/dlna" "github.com/stashapp/stash/pkg/ffmpeg" + "github.com/stashapp/stash/pkg/job" "github.com/stashapp/stash/pkg/logger" "github.com/stashapp/stash/pkg/manager/config" "github.com/stashapp/stash/pkg/manager/paths" @@ -25,12 +26,13 @@ import ( type singleton struct { Config *config.Instance - Status TaskStatus - Paths *paths.Paths + Paths *paths.Paths FFMPEGPath string FFProbePath string + JobManager *job.Manager + PluginCache *plugin.Cache ScraperCache *scraper.Cache @@ -39,6 +41,8 @@ type singleton struct { DLNAService *dlna.Service TxnManager models.TransactionManager + + scanSubs *subscriptionManager } var instance *singleton @@ -63,10 +67,12 @@ func Initialize() *singleton { instance = &singleton{ Config: cfg, - Status: TaskStatus{Status: Idle, Progress: -1}, + JobManager: job.NewManager(), DownloadStore: NewDownloadStore(), TxnManager: sqlite.NewTransactionManager(), + + scanSubs: &subscriptionManager{}, } sceneServer := SceneServer{ diff --git a/pkg/manager/manager_tasks.go b/pkg/manager/manager_tasks.go index a0b64e0e4..74d8643bf 100644 --- a/pkg/manager/manager_tasks.go +++ b/pkg/manager/manager_tasks.go @@ -4,14 +4,13 @@ import ( "context" "errors" "fmt" - "os" "strconv" "sync" "time" "github.com/remeh/sizedwaitgroup" - "github.com/stashapp/stash/pkg/autotag" + "github.com/stashapp/stash/pkg/job" "github.com/stashapp/stash/pkg/logger" "github.com/stashapp/stash/pkg/manager/config" "github.com/stashapp/stash/pkg/models" @@ -33,56 +32,6 @@ func isImage(pathname string) bool { return matchExtension(pathname, imgExt) } -type TaskStatus struct { - Status JobStatus - Progress float64 - LastUpdate time.Time - stopping bool - upTo int - total int -} - -func (t *TaskStatus) Stop() bool { - t.stopping = true - t.updated() - return true -} - -func (t *TaskStatus) SetStatus(s JobStatus) { - t.Status = s - t.updated() -} - -func (t *TaskStatus) setProgress(upTo int, total int) { - if total == 0 { - t.Progress = 1 - } - t.upTo = upTo - t.total = total - t.Progress = float64(upTo) / float64(total) - t.updated() -} - -func (t *TaskStatus) setProgressPercent(progress float64) { - if progress != t.Progress { - t.Progress = progress - t.updated() - } -} - -func (t *TaskStatus) incrementProgress() { - t.setProgress(t.upTo+1, t.total) -} - -func (t *TaskStatus) indefiniteProgress() { - t.Progress = -1 - t.updated() -} - -func (t *TaskStatus) updated() { - t.LastUpdate = time.Now() -} - func getScanPaths(inputPaths []string) []*models.StashConfig { if len(inputPaths) == 0 { return config.GetInstance().GetStashPaths() @@ -105,189 +54,34 @@ func getScanPaths(inputPaths []string) []*models.StashConfig { return ret } -func (s *singleton) neededScan(paths []*models.StashConfig) (total *int, newFiles *int) { - const timeout = 90 * time.Second - - // create a control channel through which to signal the counting loop when the timeout is reached - chTimeout := time.After(timeout) - - logger.Infof("Counting files to scan...") - - t := 0 - n := 0 - - timeoutErr := errors.New("timed out") - - for _, sp := range paths { - err := walkFilesToScan(sp, func(path string, info os.FileInfo, err error) error { - t++ - task := ScanTask{FilePath: path, TxnManager: s.TxnManager} - if !task.doesPathExist() { - n++ - } - - //check for timeout - select { - case <-chTimeout: - return timeoutErr - default: - } - - // check stop - if s.Status.stopping { - return timeoutErr - } - - return nil - }) - - if err == timeoutErr { - // timeout should return nil counts - return nil, nil - } - - if err != nil { - logger.Errorf("Error encountered counting files to scan: %s", err.Error()) - return nil, nil - } - } - - return &t, &n +// ScanSubscribe subscribes to a notification that is triggered when a +// scan or clean is complete. +func (s *singleton) ScanSubscribe(ctx context.Context) <-chan bool { + return s.scanSubs.subscribe(ctx) } -func (s *singleton) Scan(input models.ScanMetadataInput) error { +func (s *singleton) Scan(input models.ScanMetadataInput) (int, error) { if err := s.validateFFMPEG(); err != nil { - return err + return 0, err } - if s.Status.Status != Idle { - return nil + scanJob := ScanJob{ + txnManager: s.TxnManager, + input: input, + subscriptions: s.scanSubs, } - s.Status.SetStatus(Scan) - s.Status.indefiniteProgress() - go func() { - defer s.returnToIdleState() - - paths := getScanPaths(input.Paths) - - total, newFiles := s.neededScan(paths) - - if s.Status.stopping { - logger.Info("Stopping due to user request") - return - } - - if total == nil || newFiles == nil { - logger.Infof("Taking too long to count content. Skipping...") - logger.Infof("Starting scan") - } else { - logger.Infof("Starting scan of %d files. %d New files found", *total, *newFiles) - } - - start := time.Now() - config := config.GetInstance() - parallelTasks := config.GetParallelTasksWithAutoDetection() - logger.Infof("Scan started with %d parallel tasks", parallelTasks) - wg := sizedwaitgroup.New(parallelTasks) - - s.Status.Progress = 0 - fileNamingAlgo := config.GetVideoFileNamingAlgorithm() - calculateMD5 := config.IsCalculateMD5() - - i := 0 - stoppingErr := errors.New("stopping") - var err error - - var galleries []string - - for _, sp := range paths { - err = walkFilesToScan(sp, func(path string, info os.FileInfo, err error) error { - if total != nil { - s.Status.setProgress(i, *total) - i++ - } - - if s.Status.stopping { - return stoppingErr - } - - if isGallery(path) { - galleries = append(galleries, path) - } - - instance.Paths.Generated.EnsureTmpDir() - - wg.Add() - task := ScanTask{ - TxnManager: s.TxnManager, - FilePath: path, - UseFileMetadata: utils.IsTrue(input.UseFileMetadata), - StripFileExtension: utils.IsTrue(input.StripFileExtension), - fileNamingAlgorithm: fileNamingAlgo, - calculateMD5: calculateMD5, - GeneratePreview: utils.IsTrue(input.ScanGeneratePreviews), - GenerateImagePreview: utils.IsTrue(input.ScanGenerateImagePreviews), - GenerateSprite: utils.IsTrue(input.ScanGenerateSprites), - GeneratePhash: utils.IsTrue(input.ScanGeneratePhashes), - } - go task.Start(&wg) - - return nil - }) - - if err == stoppingErr { - logger.Info("Stopping due to user request") - break - } - - if err != nil { - logger.Errorf("Error encountered scanning files: %s", err.Error()) - break - } - } - - wg.Wait() - instance.Paths.Generated.EmptyTmpDir() - elapsed := time.Since(start) - logger.Info(fmt.Sprintf("Scan finished (%s)", elapsed)) - - if s.Status.stopping || err != nil { - return - } - - for _, path := range galleries { - wg.Add() - task := ScanTask{ - TxnManager: s.TxnManager, - FilePath: path, - UseFileMetadata: false, - } - go task.associateGallery(&wg) - wg.Wait() - } - logger.Info("Finished gallery association") - }() - - return nil + return s.JobManager.Add("Scanning...", &scanJob), nil } -func (s *singleton) Import() error { +func (s *singleton) Import() (int, error) { config := config.GetInstance() metadataPath := config.GetMetadataPath() if metadataPath == "" { - return errors.New("metadata path must be set in config") + return 0, errors.New("metadata path must be set in config") } - if s.Status.Status != Idle { - return nil - } - s.Status.SetStatus(Import) - s.Status.indefiniteProgress() - - go func() { - defer s.returnToIdleState() - + j := job.MakeJobExec(func(ctx context.Context, progress *job.Progress) { var wg sync.WaitGroup wg.Add(1) @@ -299,29 +93,20 @@ func (s *singleton) Import() error { MissingRefBehaviour: models.ImportMissingRefEnumFail, fileNamingAlgorithm: config.GetVideoFileNamingAlgorithm(), } - go task.Start(&wg) - wg.Wait() - }() + task.Start(&wg) + }) - return nil + return s.JobManager.Add("Importing...", j), nil } -func (s *singleton) Export() error { +func (s *singleton) Export() (int, error) { config := config.GetInstance() metadataPath := config.GetMetadataPath() if metadataPath == "" { - return errors.New("metadata path must be set in config") + return 0, errors.New("metadata path must be set in config") } - if s.Status.Status != Idle { - return nil - } - s.Status.SetStatus(Export) - s.Status.indefiniteProgress() - - go func() { - defer s.returnToIdleState() - + j := job.MakeJobExec(func(ctx context.Context, progress *job.Progress) { var wg sync.WaitGroup wg.Add(1) task := ExportTask{ @@ -329,31 +114,21 @@ func (s *singleton) Export() error { full: true, fileNamingAlgorithm: config.GetVideoFileNamingAlgorithm(), } - go task.Start(&wg) - wg.Wait() - }() + task.Start(&wg) + }) - return nil + return s.JobManager.Add("Exporting...", j), nil } -func (s *singleton) RunSingleTask(t Task) (*sync.WaitGroup, error) { - if s.Status.Status != Idle { - return nil, errors.New("task already running") - } - - s.Status.SetStatus(t.GetStatus()) - s.Status.indefiniteProgress() +func (s *singleton) RunSingleTask(t Task) int { var wg sync.WaitGroup wg.Add(1) - go func() { - defer s.returnToIdleState() + j := job.MakeJobExec(func(ctx context.Context, progress *job.Progress) { + t.Start(&wg) + }) - go t.Start(&wg) - wg.Wait() - }() - - return &wg, nil + return s.JobManager.Add(t.GetDescription(), j) } func setGeneratePreviewOptionsInput(optionsInput *models.GeneratePreviewOptionsInput) { @@ -384,18 +159,10 @@ func setGeneratePreviewOptionsInput(optionsInput *models.GeneratePreviewOptionsI } } -func (s *singleton) Generate(input models.GenerateMetadataInput) error { +func (s *singleton) Generate(input models.GenerateMetadataInput) (int, error) { if err := s.validateFFMPEG(); err != nil { - return err + return 0, err } - - if s.Status.Status != Idle { - return nil - } - s.Status.SetStatus(Generate) - s.Status.indefiniteProgress() - - //this.job.total = await ObjectionUtils.getCount(Scene); instance.Paths.Generated.EnsureTmpDir() sceneIDs, err := utils.StringSliceToIntSlice(input.SceneIDs) @@ -407,9 +174,8 @@ func (s *singleton) Generate(input models.GenerateMetadataInput) error { logger.Error(err.Error()) } - go func() { - defer s.returnToIdleState() - + // TODO - formalise this + j := job.MakeJobExec(func(ctx context.Context, progress *job.Progress) { var scenes []*models.Scene var err error var markers []*models.SceneMarker @@ -445,22 +211,29 @@ func (s *singleton) Generate(input models.GenerateMetadataInput) error { logger.Infof("Generate started with %d parallel tasks", parallelTasks) wg := sizedwaitgroup.New(parallelTasks) - s.Status.Progress = 0 lenScenes := len(scenes) total := lenScenes + len(markers) + progress.SetTotal(total) - if s.Status.stopping { + if job.IsCancelled(ctx) { logger.Info("Stopping due to user request") return } - totalsNeeded := s.neededGenerate(scenes, input) - if totalsNeeded == nil { - logger.Infof("Taking too long to count content. Skipping...") - logger.Infof("Generating content") - } else { - logger.Infof("Generating %d sprites %d previews %d image previews %d markers %d transcodes %d phashes", totalsNeeded.sprites, totalsNeeded.previews, totalsNeeded.imagePreviews, totalsNeeded.markers, totalsNeeded.transcodes, totalsNeeded.phashes) - } + // TODO - consider removing this. Even though we're only waiting a maximum of + // 90 seconds for this, it is all for a simple log message, and probably not worth + // waiting for + var totalsNeeded *totalsGenerate + progress.ExecuteTask("Calculating content to generate...", func() { + totalsNeeded = s.neededGenerate(scenes, input) + + if totalsNeeded == nil { + logger.Infof("Taking too long to count content. Skipping...") + logger.Infof("Generating content") + } else { + logger.Infof("Generating %d sprites %d previews %d image previews %d markers %d transcodes %d phashes", totalsNeeded.sprites, totalsNeeded.previews, totalsNeeded.imagePreviews, totalsNeeded.markers, totalsNeeded.transcodes, totalsNeeded.phashes) + } + }) fileNamingAlgo := config.GetVideoFileNamingAlgorithm() @@ -479,9 +252,9 @@ func (s *singleton) Generate(input models.GenerateMetadataInput) error { start := time.Now() instance.Paths.Generated.EnsureTmpDir() - for i, scene := range scenes { - s.Status.setProgress(i, total) - if s.Status.stopping { + for _, scene := range scenes { + progress.Increment() + if job.IsCancelled(ctx) { logger.Info("Stopping due to user request") wg.Wait() instance.Paths.Generated.EmptyTmpDir() @@ -500,7 +273,9 @@ func (s *singleton) Generate(input models.GenerateMetadataInput) error { fileNamingAlgorithm: fileNamingAlgo, } wg.Add() - go task.Start(&wg) + go progress.ExecuteTask(fmt.Sprintf("Generating sprites for %s", scene.Path), func() { + task.Start(&wg) + }) } if input.Previews { @@ -512,7 +287,9 @@ func (s *singleton) Generate(input models.GenerateMetadataInput) error { fileNamingAlgorithm: fileNamingAlgo, } wg.Add() - go task.Start(&wg) + go progress.ExecuteTask(fmt.Sprintf("Generating preview for %s", scene.Path), func() { + task.Start(&wg) + }) } if input.Markers { @@ -523,7 +300,9 @@ func (s *singleton) Generate(input models.GenerateMetadataInput) error { Overwrite: overwrite, fileNamingAlgorithm: fileNamingAlgo, } - go task.Start(&wg) + go progress.ExecuteTask(fmt.Sprintf("Generating markers for %s", scene.Path), func() { + task.Start(&wg) + }) } if input.Transcodes { @@ -533,7 +312,9 @@ func (s *singleton) Generate(input models.GenerateMetadataInput) error { Overwrite: overwrite, fileNamingAlgorithm: fileNamingAlgo, } - go task.Start(&wg) + go progress.ExecuteTask(fmt.Sprintf("Generating transcode for %s", scene.Path), func() { + task.Start(&wg) + }) } if input.Phashes { @@ -543,15 +324,17 @@ func (s *singleton) Generate(input models.GenerateMetadataInput) error { txnManager: s.TxnManager, } wg.Add() - go task.Start(&wg) + go progress.ExecuteTask(fmt.Sprintf("Generating phash for %s", scene.Path), func() { + task.Start(&wg) + }) } } wg.Wait() - for i, marker := range markers { - s.Status.setProgress(lenScenes+i, total) - if s.Status.stopping { + for _, marker := range markers { + progress.Increment() + if job.IsCancelled(ctx) { logger.Info("Stopping due to user request") wg.Wait() instance.Paths.Generated.EmptyTmpDir() @@ -572,7 +355,9 @@ func (s *singleton) Generate(input models.GenerateMetadataInput) error { Overwrite: overwrite, fileNamingAlgorithm: fileNamingAlgo, } - go task.Start(&wg) + go progress.ExecuteTask(fmt.Sprintf("Generating marker preview for marker ID %d", marker.ID), func() { + task.Start(&wg) + }) } wg.Wait() @@ -580,32 +365,24 @@ func (s *singleton) Generate(input models.GenerateMetadataInput) error { instance.Paths.Generated.EmptyTmpDir() elapsed := time.Since(start) logger.Info(fmt.Sprintf("Generate finished (%s)", elapsed)) - }() + }) - return nil + return s.JobManager.Add("Generating...", j), nil } -func (s *singleton) GenerateDefaultScreenshot(sceneId string) { - s.generateScreenshot(sceneId, nil) +func (s *singleton) GenerateDefaultScreenshot(sceneId string) int { + return s.generateScreenshot(sceneId, nil) } -func (s *singleton) GenerateScreenshot(sceneId string, at float64) { - s.generateScreenshot(sceneId, &at) +func (s *singleton) GenerateScreenshot(sceneId string, at float64) int { + return s.generateScreenshot(sceneId, &at) } // generate default screenshot if at is nil -func (s *singleton) generateScreenshot(sceneId string, at *float64) { - if s.Status.Status != Idle { - return - } - s.Status.SetStatus(Generate) - s.Status.indefiniteProgress() - +func (s *singleton) generateScreenshot(sceneId string, at *float64) int { instance.Paths.Generated.EnsureTmpDir() - go func() { - defer s.returnToIdleState() - + j := job.MakeJobExec(func(ctx context.Context, progress *job.Progress) { sceneIdInt, err := strconv.Atoi(sceneId) if err != nil { logger.Errorf("Error parsing scene id %s: %s", sceneId, err.Error()) @@ -631,314 +408,25 @@ func (s *singleton) generateScreenshot(sceneId string, at *float64) { var wg sync.WaitGroup wg.Add(1) - go task.Start(&wg) - - wg.Wait() + task.Start(&wg) logger.Infof("Generate screenshot finished") - }() + }) + + return s.JobManager.Add(fmt.Sprintf("Generating screenshot for scene id %s", sceneId), j) } -func (s *singleton) isFileBasedAutoTag(input models.AutoTagMetadataInput) bool { - const wildcard = "*" - performerIds := input.Performers - studioIds := input.Studios - tagIds := input.Tags - - return (len(performerIds) == 0 || performerIds[0] == wildcard) && (len(studioIds) == 0 || studioIds[0] == wildcard) && (len(tagIds) == 0 || tagIds[0] == wildcard) -} - -func (s *singleton) AutoTag(input models.AutoTagMetadataInput) { - if s.Status.Status != Idle { - return - } - s.Status.SetStatus(AutoTag) - s.Status.indefiniteProgress() - - go func() { - defer s.returnToIdleState() - - if s.isFileBasedAutoTag(input) { - // doing file-based auto-tag - s.autoTagFiles(input.Paths, len(input.Performers) > 0, len(input.Studios) > 0, len(input.Tags) > 0) - } else { - // doing specific performer/studio/tag auto-tag - s.autoTagSpecific(input) - } - }() -} - -func (s *singleton) autoTagFiles(paths []string, performers, studios, tags bool) { - t := autoTagFilesTask{ - paths: paths, - performers: performers, - studios: studios, - tags: tags, +func (s *singleton) AutoTag(input models.AutoTagMetadataInput) int { + j := autoTagJob{ txnManager: s.TxnManager, - status: &s.Status, + input: input, } - t.process() + return s.JobManager.Add("Auto-tagging...", &j) } -func (s *singleton) autoTagSpecific(input models.AutoTagMetadataInput) { - performerIds := input.Performers - studioIds := input.Studios - tagIds := input.Tags - - performerCount := len(performerIds) - studioCount := len(studioIds) - tagCount := len(tagIds) - - if err := s.TxnManager.WithReadTxn(context.TODO(), func(r models.ReaderRepository) error { - performerQuery := r.Performer() - studioQuery := r.Studio() - tagQuery := r.Tag() - - const wildcard = "*" - var err error - if performerCount == 1 && performerIds[0] == wildcard { - performerCount, err = performerQuery.Count() - if err != nil { - return fmt.Errorf("error getting performer count: %s", err.Error()) - } - } - if studioCount == 1 && studioIds[0] == wildcard { - studioCount, err = studioQuery.Count() - if err != nil { - return fmt.Errorf("error getting studio count: %s", err.Error()) - } - } - if tagCount == 1 && tagIds[0] == wildcard { - tagCount, err = tagQuery.Count() - if err != nil { - return fmt.Errorf("error getting tag count: %s", err.Error()) - } - } - - return nil - }); err != nil { - logger.Error(err.Error()) - return - } - - total := performerCount + studioCount + tagCount - s.Status.setProgress(0, total) - - logger.Infof("Starting autotag of %d performers, %d studios, %d tags", performerCount, studioCount, tagCount) - - s.autoTagPerformers(input.Paths, performerIds) - s.autoTagStudios(input.Paths, studioIds) - s.autoTagTags(input.Paths, tagIds) - - logger.Info("Finished autotag") -} - -func (s *singleton) autoTagPerformers(paths []string, performerIds []string) { - if s.Status.stopping { - return - } - - for _, performerId := range performerIds { - var performers []*models.Performer - - if err := s.TxnManager.WithReadTxn(context.TODO(), func(r models.ReaderRepository) error { - performerQuery := r.Performer() - - if performerId == "*" { - var err error - performers, err = performerQuery.All() - if err != nil { - return fmt.Errorf("error querying performers: %s", err.Error()) - } - } else { - performerIdInt, err := strconv.Atoi(performerId) - if err != nil { - return fmt.Errorf("error parsing performer id %s: %s", performerId, err.Error()) - } - - performer, err := performerQuery.Find(performerIdInt) - if err != nil { - return fmt.Errorf("error finding performer id %s: %s", performerId, err.Error()) - } - - if performer == nil { - return fmt.Errorf("performer with id %s not found", performerId) - } - performers = append(performers, performer) - } - - for _, performer := range performers { - if s.Status.stopping { - logger.Info("Stopping due to user request") - return nil - } - - if err := s.TxnManager.WithTxn(context.TODO(), func(r models.Repository) error { - if err := autotag.PerformerScenes(performer, paths, r.Scene()); err != nil { - return err - } - if err := autotag.PerformerImages(performer, paths, r.Image()); err != nil { - return err - } - if err := autotag.PerformerGalleries(performer, paths, r.Gallery()); err != nil { - return err - } - - return nil - }); err != nil { - return fmt.Errorf("error auto-tagging performer '%s': %s", performer.Name.String, err.Error()) - } - - s.Status.incrementProgress() - } - - return nil - }); err != nil { - logger.Error(err.Error()) - continue - } - } -} - -func (s *singleton) autoTagStudios(paths []string, studioIds []string) { - if s.Status.stopping { - return - } - - for _, studioId := range studioIds { - var studios []*models.Studio - - if err := s.TxnManager.WithReadTxn(context.TODO(), func(r models.ReaderRepository) error { - studioQuery := r.Studio() - if studioId == "*" { - var err error - studios, err = studioQuery.All() - if err != nil { - return fmt.Errorf("error querying studios: %s", err.Error()) - } - } else { - studioIdInt, err := strconv.Atoi(studioId) - if err != nil { - return fmt.Errorf("error parsing studio id %s: %s", studioId, err.Error()) - } - - studio, err := studioQuery.Find(studioIdInt) - if err != nil { - return fmt.Errorf("error finding studio id %s: %s", studioId, err.Error()) - } - - if studio == nil { - return fmt.Errorf("studio with id %s not found", studioId) - } - - studios = append(studios, studio) - } - - for _, studio := range studios { - if s.Status.stopping { - logger.Info("Stopping due to user request") - return nil - } - - if err := s.TxnManager.WithTxn(context.TODO(), func(r models.Repository) error { - if err := autotag.StudioScenes(studio, paths, r.Scene()); err != nil { - return err - } - if err := autotag.StudioImages(studio, paths, r.Image()); err != nil { - return err - } - if err := autotag.StudioGalleries(studio, paths, r.Gallery()); err != nil { - return err - } - - return nil - }); err != nil { - return fmt.Errorf("error auto-tagging studio '%s': %s", studio.Name.String, err.Error()) - } - - s.Status.incrementProgress() - } - - return nil - }); err != nil { - logger.Error(err.Error()) - continue - } - } -} - -func (s *singleton) autoTagTags(paths []string, tagIds []string) { - if s.Status.stopping { - return - } - - for _, tagId := range tagIds { - var tags []*models.Tag - if err := s.TxnManager.WithReadTxn(context.TODO(), func(r models.ReaderRepository) error { - tagQuery := r.Tag() - if tagId == "*" { - var err error - tags, err = tagQuery.All() - if err != nil { - return fmt.Errorf("error querying tags: %s", err.Error()) - } - } else { - tagIdInt, err := strconv.Atoi(tagId) - if err != nil { - return fmt.Errorf("error parsing tag id %s: %s", tagId, err.Error()) - } - - tag, err := tagQuery.Find(tagIdInt) - if err != nil { - return fmt.Errorf("error finding tag id %s: %s", tagId, err.Error()) - } - tags = append(tags, tag) - } - - for _, tag := range tags { - if s.Status.stopping { - logger.Info("Stopping due to user request") - return nil - } - - if err := s.TxnManager.WithTxn(context.TODO(), func(r models.Repository) error { - if err := autotag.TagScenes(tag, paths, r.Scene()); err != nil { - return err - } - if err := autotag.TagImages(tag, paths, r.Image()); err != nil { - return err - } - if err := autotag.TagGalleries(tag, paths, r.Gallery()); err != nil { - return err - } - - return nil - }); err != nil { - return fmt.Errorf("error auto-tagging tag '%s': %s", tag.Name, err.Error()) - } - - s.Status.incrementProgress() - } - - return nil - }); err != nil { - logger.Error(err.Error()) - continue - } - } -} - -func (s *singleton) Clean(input models.CleanMetadataInput) { - if s.Status.Status != Idle { - return - } - s.Status.SetStatus(Clean) - s.Status.indefiniteProgress() - - go func() { - defer s.returnToIdleState() - +func (s *singleton) Clean(input models.CleanMetadataInput) int { + j := job.MakeJobExec(func(ctx context.Context, progress *job.Progress) { var scenes []*models.Scene var images []*models.Image var galleries []*models.Gallery @@ -976,18 +464,18 @@ func (s *singleton) Clean(input models.CleanMetadataInput) { return } - if s.Status.stopping { + if job.IsCancelled(ctx) { logger.Info("Stopping due to user request") return } var wg sync.WaitGroup - s.Status.Progress = 0 total := len(scenes) + len(images) + len(galleries) + progress.SetTotal(total) fileNamingAlgo := config.GetInstance().GetVideoFileNamingAlgorithm() - for i, scene := range scenes { - s.Status.setProgress(i, total) - if s.Status.stopping { + for _, scene := range scenes { + progress.Increment() + if job.IsCancelled(ctx) { logger.Info("Stopping due to user request") return } @@ -1004,13 +492,16 @@ func (s *singleton) Clean(input models.CleanMetadataInput) { Scene: scene, fileNamingAlgorithm: fileNamingAlgo, } - go task.Start(&wg, input.DryRun) + go progress.ExecuteTask(fmt.Sprintf("Assessing scene %s for clean", scene.Path), func() { + task.Start(&wg, input.DryRun) + }) + wg.Wait() } - for i, img := range images { - s.Status.setProgress(len(scenes)+i, total) - if s.Status.stopping { + for _, img := range images { + progress.Increment() + if job.IsCancelled(ctx) { logger.Info("Stopping due to user request") return } @@ -1026,13 +517,15 @@ func (s *singleton) Clean(input models.CleanMetadataInput) { TxnManager: s.TxnManager, Image: img, } - go task.Start(&wg, input.DryRun) + go progress.ExecuteTask(fmt.Sprintf("Assessing image %s for clean", img.Path), func() { + task.Start(&wg, input.DryRun) + }) wg.Wait() } - for i, gallery := range galleries { - s.Status.setProgress(len(scenes)+len(galleries)+i, total) - if s.Status.stopping { + for _, gallery := range galleries { + progress.Increment() + if job.IsCancelled(ctx) { logger.Info("Stopping due to user request") return } @@ -1048,24 +541,22 @@ func (s *singleton) Clean(input models.CleanMetadataInput) { TxnManager: s.TxnManager, Gallery: gallery, } - go task.Start(&wg, input.DryRun) + go progress.ExecuteTask(fmt.Sprintf("Assessing gallery %s for clean", gallery.GetTitle()), func() { + task.Start(&wg, input.DryRun) + }) wg.Wait() } logger.Info("Finished Cleaning") - }() + + s.scanSubs.notify() + }) + + return s.JobManager.Add("Cleaning...", j) } -func (s *singleton) MigrateHash() { - if s.Status.Status != Idle { - return - } - s.Status.SetStatus(Migrate) - s.Status.indefiniteProgress() - - go func() { - defer s.returnToIdleState() - +func (s *singleton) MigrateHash() int { + j := job.MakeJobExec(func(ctx context.Context, progress *job.Progress) { fileNamingAlgo := config.GetInstance().GetVideoFileNamingAlgorithm() logger.Infof("Migrating generated files for %s naming hash", fileNamingAlgo.String()) @@ -1080,12 +571,12 @@ func (s *singleton) MigrateHash() { } var wg sync.WaitGroup - s.Status.Progress = 0 total := len(scenes) + progress.SetTotal(total) - for i, scene := range scenes { - s.Status.setProgress(i, total) - if s.Status.stopping { + for _, scene := range scenes { + progress.Increment() + if job.IsCancelled(ctx) { logger.Info("Stopping due to user request") return } @@ -1103,20 +594,9 @@ func (s *singleton) MigrateHash() { } logger.Info("Finished migrating") - }() -} + }) -func (s *singleton) returnToIdleState() { - if r := recover(); r != nil { - logger.Info("recovered from ", r) - } - - if s.Status.Status == Generate { - instance.Paths.Generated.RemoveTmpDir() - } - s.Status.SetStatus(Idle) - s.Status.indefiniteProgress() - s.Status.stopping = false + return s.JobManager.Add("Migrating scene hashes...", j) } type totalsGenerate struct { @@ -1222,15 +702,8 @@ func (s *singleton) neededGenerate(scenes []*models.Scene, input models.Generate return &totals } -func (s *singleton) StashBoxBatchPerformerTag(input models.StashBoxBatchPerformerTagInput) { - if s.Status.Status != Idle { - return - } - s.Status.SetStatus(StashBoxBatchPerformer) - s.Status.indefiniteProgress() - - go func() { - defer s.returnToIdleState() +func (s *singleton) StashBoxBatchPerformerTag(input models.StashBoxBatchPerformerTagInput) int { + j := job.MakeJobExec(func(ctx context.Context, progress *job.Progress) { logger.Infof("Initiating stash-box batch performer tag") boxes := config.GetInstance().GetStashBoxes() @@ -1289,7 +762,7 @@ func (s *singleton) StashBoxBatchPerformerTag(input models.StashBoxBatchPerforme performers, err = performerQuery.FindByStashIDStatus(false, box.Endpoint) } if err != nil { - return fmt.Errorf("Error querying performers: %s", err.Error()) + return fmt.Errorf("error querying performers: %s", err.Error()) } for _, performer := range performers { @@ -1309,21 +782,23 @@ func (s *singleton) StashBoxBatchPerformerTag(input models.StashBoxBatchPerforme } if len(tasks) == 0 { - s.returnToIdleState() return } - s.Status.setProgress(0, len(tasks)) + progress.SetTotal(len(tasks)) logger.Infof("Starting stash-box batch operation for %d performers", len(tasks)) var wg sync.WaitGroup for _, task := range tasks { wg.Add(1) - go task.Start(&wg) - wg.Wait() + progress.ExecuteTask(task.Description(), func() { + task.Start(&wg) + }) - s.Status.incrementProgress() + progress.Increment() } - }() + }) + + return s.JobManager.Add("Batch stash-box performer tag...", j) } diff --git a/pkg/manager/subscribe.go b/pkg/manager/subscribe.go new file mode 100644 index 000000000..ebb743858 --- /dev/null +++ b/pkg/manager/subscribe.go @@ -0,0 +1,44 @@ +package manager + +import ( + "context" + "sync" +) + +type subscriptionManager struct { + subscriptions []chan bool + mutex sync.Mutex +} + +func (m *subscriptionManager) subscribe(ctx context.Context) <-chan bool { + m.mutex.Lock() + defer m.mutex.Unlock() + + c := make(chan bool, 10) + m.subscriptions = append(m.subscriptions, c) + + go func() { + <-ctx.Done() + m.mutex.Lock() + defer m.mutex.Unlock() + close(c) + + for i, s := range m.subscriptions { + if s == c { + m.subscriptions = append(m.subscriptions[:i], m.subscriptions[i+1:]...) + break + } + } + }() + + return c +} + +func (m *subscriptionManager) notify() { + m.mutex.Lock() + defer m.mutex.Unlock() + + for _, s := range m.subscriptions { + s <- true + } +} diff --git a/pkg/manager/task.go b/pkg/manager/task.go index 1694c5f39..c8ab4af67 100644 --- a/pkg/manager/task.go +++ b/pkg/manager/task.go @@ -4,5 +4,5 @@ import "sync" type Task interface { Start(wg *sync.WaitGroup) - GetStatus() JobStatus + GetDescription() string } diff --git a/pkg/manager/task_autotag.go b/pkg/manager/task_autotag.go index 8cb3f80cf..884b8e2e1 100644 --- a/pkg/manager/task_autotag.go +++ b/pkg/manager/task_autotag.go @@ -2,23 +2,315 @@ package manager import ( "context" + "fmt" "path/filepath" + "strconv" "strings" "sync" "github.com/stashapp/stash/pkg/autotag" + "github.com/stashapp/stash/pkg/job" "github.com/stashapp/stash/pkg/logger" "github.com/stashapp/stash/pkg/models" ) +type autoTagJob struct { + txnManager models.TransactionManager + input models.AutoTagMetadataInput +} + +func (j *autoTagJob) Execute(ctx context.Context, progress *job.Progress) { + input := j.input + if j.isFileBasedAutoTag(input) { + // doing file-based auto-tag + j.autoTagFiles(ctx, progress, input.Paths, len(input.Performers) > 0, len(input.Studios) > 0, len(input.Tags) > 0) + } else { + // doing specific performer/studio/tag auto-tag + j.autoTagSpecific(ctx, progress) + } +} + +func (j *autoTagJob) isFileBasedAutoTag(input models.AutoTagMetadataInput) bool { + const wildcard = "*" + performerIds := input.Performers + studioIds := input.Studios + tagIds := input.Tags + + return (len(performerIds) == 0 || performerIds[0] == wildcard) && (len(studioIds) == 0 || studioIds[0] == wildcard) && (len(tagIds) == 0 || tagIds[0] == wildcard) +} + +func (j *autoTagJob) autoTagFiles(ctx context.Context, progress *job.Progress, paths []string, performers, studios, tags bool) { + t := autoTagFilesTask{ + paths: paths, + performers: performers, + studios: studios, + tags: tags, + ctx: ctx, + progress: progress, + txnManager: j.txnManager, + } + + t.process() +} + +func (j *autoTagJob) autoTagSpecific(ctx context.Context, progress *job.Progress) { + input := j.input + performerIds := input.Performers + studioIds := input.Studios + tagIds := input.Tags + + performerCount := len(performerIds) + studioCount := len(studioIds) + tagCount := len(tagIds) + + if err := j.txnManager.WithReadTxn(context.TODO(), func(r models.ReaderRepository) error { + performerQuery := r.Performer() + studioQuery := r.Studio() + tagQuery := r.Tag() + + const wildcard = "*" + var err error + if performerCount == 1 && performerIds[0] == wildcard { + performerCount, err = performerQuery.Count() + if err != nil { + return fmt.Errorf("error getting performer count: %s", err.Error()) + } + } + if studioCount == 1 && studioIds[0] == wildcard { + studioCount, err = studioQuery.Count() + if err != nil { + return fmt.Errorf("error getting studio count: %s", err.Error()) + } + } + if tagCount == 1 && tagIds[0] == wildcard { + tagCount, err = tagQuery.Count() + if err != nil { + return fmt.Errorf("error getting tag count: %s", err.Error()) + } + } + + return nil + }); err != nil { + logger.Error(err.Error()) + return + } + + total := performerCount + studioCount + tagCount + progress.SetTotal(total) + + logger.Infof("Starting autotag of %d performers, %d studios, %d tags", performerCount, studioCount, tagCount) + + j.autoTagPerformers(ctx, progress, input.Paths, performerIds) + j.autoTagStudios(ctx, progress, input.Paths, studioIds) + j.autoTagTags(ctx, progress, input.Paths, tagIds) + + logger.Info("Finished autotag") +} + +func (j *autoTagJob) autoTagPerformers(ctx context.Context, progress *job.Progress, paths []string, performerIds []string) { + if job.IsCancelled(ctx) { + return + } + + for _, performerId := range performerIds { + var performers []*models.Performer + + if err := j.txnManager.WithReadTxn(context.TODO(), func(r models.ReaderRepository) error { + performerQuery := r.Performer() + + if performerId == "*" { + var err error + performers, err = performerQuery.All() + if err != nil { + return fmt.Errorf("error querying performers: %s", err.Error()) + } + } else { + performerIdInt, err := strconv.Atoi(performerId) + if err != nil { + return fmt.Errorf("error parsing performer id %s: %s", performerId, err.Error()) + } + + performer, err := performerQuery.Find(performerIdInt) + if err != nil { + return fmt.Errorf("error finding performer id %s: %s", performerId, err.Error()) + } + + if performer == nil { + return fmt.Errorf("performer with id %s not found", performerId) + } + performers = append(performers, performer) + } + + for _, performer := range performers { + if job.IsCancelled(ctx) { + logger.Info("Stopping due to user request") + return nil + } + + if err := j.txnManager.WithTxn(context.TODO(), func(r models.Repository) error { + if err := autotag.PerformerScenes(performer, paths, r.Scene()); err != nil { + return err + } + if err := autotag.PerformerImages(performer, paths, r.Image()); err != nil { + return err + } + if err := autotag.PerformerGalleries(performer, paths, r.Gallery()); err != nil { + return err + } + + return nil + }); err != nil { + return fmt.Errorf("error auto-tagging performer '%s': %s", performer.Name.String, err.Error()) + } + + progress.Increment() + } + + return nil + }); err != nil { + logger.Error(err.Error()) + continue + } + } +} + +func (j *autoTagJob) autoTagStudios(ctx context.Context, progress *job.Progress, paths []string, studioIds []string) { + if job.IsCancelled(ctx) { + return + } + + for _, studioId := range studioIds { + var studios []*models.Studio + + if err := j.txnManager.WithReadTxn(context.TODO(), func(r models.ReaderRepository) error { + studioQuery := r.Studio() + if studioId == "*" { + var err error + studios, err = studioQuery.All() + if err != nil { + return fmt.Errorf("error querying studios: %s", err.Error()) + } + } else { + studioIdInt, err := strconv.Atoi(studioId) + if err != nil { + return fmt.Errorf("error parsing studio id %s: %s", studioId, err.Error()) + } + + studio, err := studioQuery.Find(studioIdInt) + if err != nil { + return fmt.Errorf("error finding studio id %s: %s", studioId, err.Error()) + } + + if studio == nil { + return fmt.Errorf("studio with id %s not found", studioId) + } + + studios = append(studios, studio) + } + + for _, studio := range studios { + if job.IsCancelled(ctx) { + logger.Info("Stopping due to user request") + return nil + } + + if err := j.txnManager.WithTxn(context.TODO(), func(r models.Repository) error { + if err := autotag.StudioScenes(studio, paths, r.Scene()); err != nil { + return err + } + if err := autotag.StudioImages(studio, paths, r.Image()); err != nil { + return err + } + if err := autotag.StudioGalleries(studio, paths, r.Gallery()); err != nil { + return err + } + + return nil + }); err != nil { + return fmt.Errorf("error auto-tagging studio '%s': %s", studio.Name.String, err.Error()) + } + + progress.Increment() + } + + return nil + }); err != nil { + logger.Error(err.Error()) + continue + } + } +} + +func (j *autoTagJob) autoTagTags(ctx context.Context, progress *job.Progress, paths []string, tagIds []string) { + if job.IsCancelled(ctx) { + return + } + + for _, tagId := range tagIds { + var tags []*models.Tag + if err := j.txnManager.WithReadTxn(context.TODO(), func(r models.ReaderRepository) error { + tagQuery := r.Tag() + if tagId == "*" { + var err error + tags, err = tagQuery.All() + if err != nil { + return fmt.Errorf("error querying tags: %s", err.Error()) + } + } else { + tagIdInt, err := strconv.Atoi(tagId) + if err != nil { + return fmt.Errorf("error parsing tag id %s: %s", tagId, err.Error()) + } + + tag, err := tagQuery.Find(tagIdInt) + if err != nil { + return fmt.Errorf("error finding tag id %s: %s", tagId, err.Error()) + } + tags = append(tags, tag) + } + + for _, tag := range tags { + if job.IsCancelled(ctx) { + logger.Info("Stopping due to user request") + return nil + } + + if err := j.txnManager.WithTxn(context.TODO(), func(r models.Repository) error { + if err := autotag.TagScenes(tag, paths, r.Scene()); err != nil { + return err + } + if err := autotag.TagImages(tag, paths, r.Image()); err != nil { + return err + } + if err := autotag.TagGalleries(tag, paths, r.Gallery()); err != nil { + return err + } + + return nil + }); err != nil { + return fmt.Errorf("error auto-tagging tag '%s': %s", tag.Name, err.Error()) + } + + progress.Increment() + } + + return nil + }); err != nil { + logger.Error(err.Error()) + continue + } + } +} + type autoTagFilesTask struct { paths []string performers bool studios bool tags bool + ctx context.Context + progress *job.Progress txnManager models.TransactionManager - status *TaskStatus } func (t *autoTagFilesTask) makeSceneFilter() *models.SceneFilterType { @@ -144,7 +436,7 @@ func (t *autoTagFilesTask) batchFindFilter(batchSize int) *models.FindFilterType } func (t *autoTagFilesTask) processScenes(r models.ReaderRepository) error { - if t.status.stopping { + if job.IsCancelled(t.ctx) { return nil } @@ -161,7 +453,7 @@ func (t *autoTagFilesTask) processScenes(r models.ReaderRepository) error { } for _, ss := range scenes { - if t.status.stopping { + if job.IsCancelled(t.ctx) { return nil } @@ -178,7 +470,7 @@ func (t *autoTagFilesTask) processScenes(r models.ReaderRepository) error { go tt.Start(&wg) wg.Wait() - t.status.incrementProgress() + t.progress.Increment() } if len(scenes) != batchSize { @@ -192,7 +484,7 @@ func (t *autoTagFilesTask) processScenes(r models.ReaderRepository) error { } func (t *autoTagFilesTask) processImages(r models.ReaderRepository) error { - if t.status.stopping { + if job.IsCancelled(t.ctx) { return nil } @@ -209,7 +501,7 @@ func (t *autoTagFilesTask) processImages(r models.ReaderRepository) error { } for _, ss := range images { - if t.status.stopping { + if job.IsCancelled(t.ctx) { return nil } @@ -226,7 +518,7 @@ func (t *autoTagFilesTask) processImages(r models.ReaderRepository) error { go tt.Start(&wg) wg.Wait() - t.status.incrementProgress() + t.progress.Increment() } if len(images) != batchSize { @@ -240,7 +532,7 @@ func (t *autoTagFilesTask) processImages(r models.ReaderRepository) error { } func (t *autoTagFilesTask) processGalleries(r models.ReaderRepository) error { - if t.status.stopping { + if job.IsCancelled(t.ctx) { return nil } @@ -257,7 +549,7 @@ func (t *autoTagFilesTask) processGalleries(r models.ReaderRepository) error { } for _, ss := range galleries { - if t.status.stopping { + if job.IsCancelled(t.ctx) { return nil } @@ -274,7 +566,7 @@ func (t *autoTagFilesTask) processGalleries(r models.ReaderRepository) error { go tt.Start(&wg) wg.Wait() - t.status.incrementProgress() + t.progress.Increment() } if len(galleries) != batchSize { @@ -294,7 +586,7 @@ func (t *autoTagFilesTask) process() { return err } - t.status.total = total + t.progress.SetTotal(total) logger.Infof("Starting autotag of %d files", total) @@ -310,7 +602,7 @@ func (t *autoTagFilesTask) process() { return err } - if t.status.stopping { + if job.IsCancelled(t.ctx) { logger.Info("Stopping due to user request") } diff --git a/pkg/manager/task_export.go b/pkg/manager/task_export.go index 59bacca24..65d925d1b 100644 --- a/pkg/manager/task_export.go +++ b/pkg/manager/task_export.go @@ -93,10 +93,6 @@ func CreateExportTask(a models.HashAlgorithm, input models.ExportObjectsInput) * } } -func (t *ExportTask) GetStatus() JobStatus { - return Export -} - func (t *ExportTask) Start(wg *sync.WaitGroup) { defer wg.Done() // @manager.total = Scene.count + Gallery.count + Performer.count + Studio.count + Movie.count diff --git a/pkg/manager/task_import.go b/pkg/manager/task_import.go index 76a9a2954..84acaaeef 100644 --- a/pkg/manager/task_import.go +++ b/pkg/manager/task_import.go @@ -75,8 +75,8 @@ func CreateImportTask(a models.HashAlgorithm, input models.ImportObjectsInput) ( }, nil } -func (t *ImportTask) GetStatus() JobStatus { - return Import +func (t *ImportTask) GetDescription() string { + return "Importing..." } func (t *ImportTask) Start(wg *sync.WaitGroup) { diff --git a/pkg/manager/task_plugin.go b/pkg/manager/task_plugin.go index 1f8be17cb..f2a408b53 100644 --- a/pkg/manager/task_plugin.go +++ b/pkg/manager/task_plugin.go @@ -1,25 +1,19 @@ package manager import ( - "time" + "context" + "fmt" + "github.com/stashapp/stash/pkg/job" "github.com/stashapp/stash/pkg/logger" "github.com/stashapp/stash/pkg/models" "github.com/stashapp/stash/pkg/plugin/common" ) -func (s *singleton) RunPluginTask(pluginID string, taskName string, args []*models.PluginArgInput, serverConnection common.StashServerConnection) { - if s.Status.Status != Idle { - return - } - s.Status.SetStatus(PluginOperation) - s.Status.indefiniteProgress() - - go func() { - defer s.returnToIdleState() - - progress := make(chan float64) - task, err := s.PluginCache.CreateTask(pluginID, taskName, serverConnection, args, progress) +func (s *singleton) RunPluginTask(pluginID string, taskName string, args []*models.PluginArgInput, serverConnection common.StashServerConnection) int { + j := job.MakeJobExec(func(ctx context.Context, progress *job.Progress) { + pluginProgress := make(chan float64) + task, err := s.PluginCache.CreateTask(pluginID, taskName, serverConnection, args, pluginProgress) if err != nil { logger.Errorf("Error creating plugin task: %s", err.Error()) return @@ -48,24 +42,20 @@ func (s *singleton) RunPluginTask(pluginID string, taskName string, args []*mode } }() - // TODO - refactor stop to use channels - // check for stop every five seconds - pollingTime := time.Second * 5 - stopPoller := time.Tick(pollingTime) for { select { case <-done: return - case p := <-progress: - s.Status.setProgressPercent(p) - case <-stopPoller: - if s.Status.stopping { - if err := task.Stop(); err != nil { - logger.Errorf("Error stopping plugin operation: %s", err.Error()) - } - return + case p := <-pluginProgress: + progress.SetPercent(p) + case <-ctx.Done(): + if err := task.Stop(); err != nil { + logger.Errorf("Error stopping plugin operation: %s", err.Error()) } + return } } - }() + }) + + return s.JobManager.Add(fmt.Sprintf("Running plugin task: %s", taskName), j) } diff --git a/pkg/manager/task_scan.go b/pkg/manager/task_scan.go index 963f9ab48..285b2b92f 100644 --- a/pkg/manager/task_scan.go +++ b/pkg/manager/task_scan.go @@ -4,6 +4,7 @@ import ( "archive/zip" "context" "database/sql" + "errors" "fmt" "os" "path/filepath" @@ -16,6 +17,7 @@ import ( "github.com/stashapp/stash/pkg/ffmpeg" "github.com/stashapp/stash/pkg/gallery" "github.com/stashapp/stash/pkg/image" + "github.com/stashapp/stash/pkg/job" "github.com/stashapp/stash/pkg/logger" "github.com/stashapp/stash/pkg/manager/config" "github.com/stashapp/stash/pkg/models" @@ -23,6 +25,175 @@ import ( "github.com/stashapp/stash/pkg/utils" ) +type ScanJob struct { + txnManager models.TransactionManager + input models.ScanMetadataInput + subscriptions *subscriptionManager +} + +func (j *ScanJob) Execute(ctx context.Context, progress *job.Progress) { + input := j.input + paths := getScanPaths(input.Paths) + + var total *int + var newFiles *int + progress.ExecuteTask("Counting files to scan...", func() { + total, newFiles = j.neededScan(ctx, paths) + }) + + if job.IsCancelled(ctx) { + logger.Info("Stopping due to user request") + return + } + + if total == nil || newFiles == nil { + logger.Infof("Taking too long to count content. Skipping...") + logger.Infof("Starting scan") + } else { + logger.Infof("Starting scan of %d files. %d New files found", *total, *newFiles) + } + + start := time.Now() + config := config.GetInstance() + parallelTasks := config.GetParallelTasksWithAutoDetection() + logger.Infof("Scan started with %d parallel tasks", parallelTasks) + wg := sizedwaitgroup.New(parallelTasks) + + if total != nil { + progress.SetTotal(*total) + } + + fileNamingAlgo := config.GetVideoFileNamingAlgorithm() + calculateMD5 := config.IsCalculateMD5() + + stoppingErr := errors.New("stopping") + var err error + + var galleries []string + + for _, sp := range paths { + err = walkFilesToScan(sp, func(path string, info os.FileInfo, err error) error { + if job.IsCancelled(ctx) { + return stoppingErr + } + + if isGallery(path) { + galleries = append(galleries, path) + } + + instance.Paths.Generated.EnsureTmpDir() + + wg.Add() + task := ScanTask{ + TxnManager: j.txnManager, + FilePath: path, + UseFileMetadata: utils.IsTrue(input.UseFileMetadata), + StripFileExtension: utils.IsTrue(input.StripFileExtension), + fileNamingAlgorithm: fileNamingAlgo, + calculateMD5: calculateMD5, + GeneratePreview: utils.IsTrue(input.ScanGeneratePreviews), + GenerateImagePreview: utils.IsTrue(input.ScanGenerateImagePreviews), + GenerateSprite: utils.IsTrue(input.ScanGenerateSprites), + GeneratePhash: utils.IsTrue(input.ScanGeneratePhashes), + progress: progress, + } + + go func() { + task.Start(&wg) + progress.Increment() + }() + + return nil + }) + + if err == stoppingErr { + logger.Info("Stopping due to user request") + break + } + + if err != nil { + logger.Errorf("Error encountered scanning files: %s", err.Error()) + break + } + } + + wg.Wait() + instance.Paths.Generated.EmptyTmpDir() + elapsed := time.Since(start) + logger.Info(fmt.Sprintf("Scan finished (%s)", elapsed)) + + if job.IsCancelled(ctx) || err != nil { + return + } + + progress.ExecuteTask("Associating galleries", func() { + for _, path := range galleries { + wg.Add() + task := ScanTask{ + TxnManager: j.txnManager, + FilePath: path, + UseFileMetadata: false, + } + + go task.associateGallery(&wg) + wg.Wait() + } + logger.Info("Finished gallery association") + }) + + j.subscriptions.notify() +} + +func (j *ScanJob) neededScan(ctx context.Context, paths []*models.StashConfig) (total *int, newFiles *int) { + const timeout = 90 * time.Second + + // create a control channel through which to signal the counting loop when the timeout is reached + chTimeout := time.After(timeout) + + logger.Infof("Counting files to scan...") + + t := 0 + n := 0 + + timeoutErr := errors.New("timed out") + + for _, sp := range paths { + err := walkFilesToScan(sp, func(path string, info os.FileInfo, err error) error { + t++ + task := ScanTask{FilePath: path, TxnManager: j.txnManager} + if !task.doesPathExist() { + n++ + } + + //check for timeout + select { + case <-chTimeout: + return timeoutErr + default: + } + + // check stop + if job.IsCancelled(ctx) { + return timeoutErr + } + + return nil + }) + + if err == timeoutErr { + // timeout should return nil counts + return nil, nil + } + + if err != nil { + logger.Errorf("Error encountered counting files to scan: %s", err.Error()) + return nil, nil + } + } + + return &t, &n +} + type ScanTask struct { TxnManager models.TransactionManager FilePath string @@ -35,40 +206,57 @@ type ScanTask struct { GeneratePreview bool GenerateImagePreview bool zipGallery *models.Gallery + progress *job.Progress } func (t *ScanTask) Start(wg *sizedwaitgroup.SizedWaitGroup) { - if isGallery(t.FilePath) { - t.scanGallery() - } else if isVideo(t.FilePath) { - s := t.scanScene() + defer wg.Done() - if s != nil { - iwg := sizedwaitgroup.New(2) + var s *models.Scene - if t.GenerateSprite { - iwg.Add() + t.progress.ExecuteTask("Scanning "+t.FilePath, func() { + if isGallery(t.FilePath) { + t.scanGallery() + } else if isVideo(t.FilePath) { + s = t.scanScene() + } else if isImage(t.FilePath) { + t.scanImage() + } + }) + + if s != nil { + iwg := sizedwaitgroup.New(2) + + if t.GenerateSprite { + iwg.Add() + + go t.progress.ExecuteTask(fmt.Sprintf("Generating sprites for %s", t.FilePath), func() { taskSprite := GenerateSpriteTask{ Scene: *s, Overwrite: false, fileNamingAlgorithm: t.fileNamingAlgorithm, } - go taskSprite.Start(&iwg) - } + taskSprite.Start(&iwg) + }) + } - if t.GeneratePhash { - iwg.Add() + if t.GeneratePhash { + iwg.Add() + + go t.progress.ExecuteTask(fmt.Sprintf("Generating phash for %s", t.FilePath), func() { taskPhash := GeneratePhashTask{ Scene: *s, fileNamingAlgorithm: t.fileNamingAlgorithm, txnManager: t.TxnManager, } - go taskPhash.Start(&iwg) - } + taskPhash.Start(&iwg) + }) + } - if t.GeneratePreview { - iwg.Add() + if t.GeneratePreview { + iwg.Add() + go t.progress.ExecuteTask(fmt.Sprintf("Generating preview for %s", t.FilePath), func() { config := config.GetInstance() var previewSegmentDuration = config.GetPreviewSegmentDuration() var previewSegments = config.GetPreviewSegments() @@ -92,16 +280,12 @@ func (t *ScanTask) Start(wg *sizedwaitgroup.SizedWaitGroup) { Overwrite: false, fileNamingAlgorithm: t.fileNamingAlgorithm, } - go taskPreview.Start(&iwg) - } - - iwg.Wait() + taskPreview.Start(wg) + }) } - } else if isImage(t.FilePath) { - t.scanImage() - } - wg.Done() + iwg.Wait() + } } func (t *ScanTask) scanGallery() { diff --git a/pkg/manager/task_stash_box_tag.go b/pkg/manager/task_stash_box_tag.go index ed049bc92..f7d9888ba 100644 --- a/pkg/manager/task_stash_box_tag.go +++ b/pkg/manager/task_stash_box_tag.go @@ -3,6 +3,7 @@ package manager import ( "context" "database/sql" + "fmt" "sync" "time" @@ -27,6 +28,17 @@ func (t *StashBoxPerformerTagTask) Start(wg *sync.WaitGroup) { t.stashBoxPerformerTag() } +func (t *StashBoxPerformerTagTask) Description() string { + var name string + if t.name != nil { + name = *t.name + } else if t.performer != nil { + name = t.performer.Name.String + } + + return fmt.Sprintf("Tagging performer %s from stash-box", name) +} + func (t *StashBoxPerformerTagTask) stashBoxPerformerTag() { var performer *models.ScrapedScenePerformer var err error diff --git a/ui/v2.5/src/components/Changelog/versions/v080.md b/ui/v2.5/src/components/Changelog/versions/v080.md index 899cfa56e..6df3f220f 100644 --- a/ui/v2.5/src/components/Changelog/versions/v080.md +++ b/ui/v2.5/src/components/Changelog/versions/v080.md @@ -1,4 +1,5 @@ ### ✨ New Features +* Revamped job management: tasks can now be queued. ([#1379](https://github.com/stashapp/stash/pull/1379)) * Added Handy/Funscript support. ([#1377](https://github.com/stashapp/stash/pull/1377)) * Added Performers tab to Studio page. ([#1405](https://github.com/stashapp/stash/pull/1405)) * Added [DLNA server](/settings?tab=dlna). ([#1364](https://github.com/stashapp/stash/pull/1364)) diff --git a/ui/v2.5/src/components/Settings/SettingsTasksPanel/GenerateButton.tsx b/ui/v2.5/src/components/Settings/SettingsTasksPanel/GenerateButton.tsx index 69d8f46cd..8cec5ffbc 100644 --- a/ui/v2.5/src/components/Settings/SettingsTasksPanel/GenerateButton.tsx +++ b/ui/v2.5/src/components/Settings/SettingsTasksPanel/GenerateButton.tsx @@ -22,7 +22,7 @@ export const GenerateButton: React.FC = () => { markers, transcodes, }); - Toast.success({ content: "Started generating" }); + Toast.success({ content: "Added generation job to queue" }); } catch (e) { Toast.error(e); } diff --git a/ui/v2.5/src/components/Settings/SettingsTasksPanel/JobTable.tsx b/ui/v2.5/src/components/Settings/SettingsTasksPanel/JobTable.tsx new file mode 100644 index 000000000..8c9f5fd50 --- /dev/null +++ b/ui/v2.5/src/components/Settings/SettingsTasksPanel/JobTable.tsx @@ -0,0 +1,211 @@ +import React, { useState, useEffect } from "react"; +import { Button, ProgressBar } from "react-bootstrap"; +import { + mutateStopJob, + useJobQueue, + useJobsSubscribe, +} from "src/core/StashService"; +import * as GQL from "src/core/generated-graphql"; +import { Icon } from "src/components/Shared"; +import { IconProp } from "@fortawesome/fontawesome-svg-core"; + +type JobFragment = Pick< + GQL.Job, + "id" | "status" | "subTasks" | "description" | "progress" +>; + +interface IJob { + job: JobFragment; +} + +const Task: React.FC = ({ job }) => { + const [stopping, setStopping] = useState(false); + const [className, setClassName] = useState(""); + + useEffect(() => { + setTimeout(() => setClassName("fade-in")); + }, []); + + useEffect(() => { + if ( + job.status === GQL.JobStatus.Cancelled || + job.status === GQL.JobStatus.Finished + ) { + // fade out around 10 seconds + setTimeout(() => { + setClassName("fade-out"); + }, 9800); + } + }, [job]); + + async function stopJob() { + setStopping(true); + await mutateStopJob(job.id); + } + + function canStop() { + return ( + !stopping && + (job.status === GQL.JobStatus.Ready || + job.status === GQL.JobStatus.Running) + ); + } + + function getStatusClass() { + switch (job.status) { + case GQL.JobStatus.Ready: + return "ready"; + case GQL.JobStatus.Running: + return "running"; + case GQL.JobStatus.Stopping: + return "stopping"; + case GQL.JobStatus.Finished: + return "finished"; + case GQL.JobStatus.Cancelled: + return "cancelled"; + } + } + + function getStatusIcon() { + let icon: IconProp = "circle"; + let iconClass = ""; + switch (job.status) { + case GQL.JobStatus.Ready: + icon = "hourglass-start"; + break; + case GQL.JobStatus.Running: + icon = "cog"; + iconClass = "fa-spin"; + break; + case GQL.JobStatus.Stopping: + icon = "cog"; + iconClass = "fa-spin"; + break; + case GQL.JobStatus.Finished: + icon = "check"; + break; + case GQL.JobStatus.Cancelled: + icon = "ban"; + break; + } + + return ; + } + + function maybeRenderProgress() { + if ( + job.status === GQL.JobStatus.Running && + job.progress !== undefined && + job.progress !== null + ) { + const progress = job.progress * 100; + return ( + + ); + } + } + + function maybeRenderSubTasks() { + if ( + job.status === GQL.JobStatus.Running || + job.status === GQL.JobStatus.Stopping + ) { + return ( +
+ {/* eslint-disable react/no-array-index-key */} + {(job.subTasks ?? []).map((t, i) => ( +
+ {t} +
+ ))} + {/* eslint-enable react/no-array-index-key */} +
+ ); + } + } + + return ( +
  • +
    + +
    +
    + {getStatusIcon()} + {job.description} +
    +
    {maybeRenderProgress()}
    + {maybeRenderSubTasks()} +
    +
    +
  • + ); +}; + +export const JobTable: React.FC = () => { + const jobStatus = useJobQueue(); + const jobsSubscribe = useJobsSubscribe(); + + const [queue, setQueue] = useState([]); + + useEffect(() => { + setQueue(jobStatus.data?.jobQueue ?? []); + }, [jobStatus]); + + useEffect(() => { + if (!jobsSubscribe.data) { + return; + } + + const event = jobsSubscribe.data.jobsSubscribe; + + function updateJob() { + setQueue((q) => + q.map((j) => { + if (j.id === event.job.id) { + return event.job; + } + + return j; + }) + ); + } + + switch (event.type) { + case GQL.JobStatusUpdateType.Add: + // add to the end of the queue + setQueue((q) => q.concat([event.job])); + break; + case GQL.JobStatusUpdateType.Remove: + // update the job then remove after a timeout + updateJob(); + setTimeout(() => { + setQueue((q) => q.filter((j) => j.id !== event.job.id)); + }, 10000); + break; + case GQL.JobStatusUpdateType.Update: + updateJob(); + break; + } + }, [jobsSubscribe.data]); + + return ( +
    +
      + {(queue ?? []).map((j) => ( + + ))} +
    +
    + ); +}; diff --git a/ui/v2.5/src/components/Settings/SettingsTasksPanel/SettingsTasksPanel.tsx b/ui/v2.5/src/components/Settings/SettingsTasksPanel/SettingsTasksPanel.tsx index 6a24a72bc..8d0a130ec 100644 --- a/ui/v2.5/src/components/Settings/SettingsTasksPanel/SettingsTasksPanel.tsx +++ b/ui/v2.5/src/components/Settings/SettingsTasksPanel/SettingsTasksPanel.tsx @@ -1,15 +1,12 @@ -import React, { useState, useEffect } from "react"; -import { Button, Form, ProgressBar } from "react-bootstrap"; +import React, { useState } from "react"; +import { Button, Form } from "react-bootstrap"; import { - useJobStatus, - useMetadataUpdate, mutateMetadataImport, mutateMetadataClean, mutateMetadataScan, mutateMetadataAutoTag, mutateMetadataExport, mutateMigrateHashNaming, - mutateStopJob, usePlugins, mutateRunPluginTask, mutateBackupDatabase, @@ -21,6 +18,7 @@ import { downloadFile } from "src/utils"; import { GenerateButton } from "./GenerateButton"; import { ImportDialog } from "./ImportDialog"; import { DirectorySelectionDialog } from "./DirectorySelectionDialog"; +import { JobTable } from "./JobTable"; type Plugin = Pick; type PluginTask = Pick; @@ -52,78 +50,20 @@ export const SettingsTasksPanel: React.FC = () => { setScanGenerateImagePreviews, ] = useState(false); - const [status, setStatus] = useState(""); - const [progress, setProgress] = useState(0); - const [autoTagPerformers, setAutoTagPerformers] = useState(true); const [autoTagStudios, setAutoTagStudios] = useState(true); const [autoTagTags, setAutoTagTags] = useState(true); - const jobStatus = useJobStatus(); - const metadataUpdate = useMetadataUpdate(); - const plugins = usePlugins(); - function statusToText(s: string) { - switch (s) { - case "Idle": - return "Idle"; - case "Scan": - return "Scanning for new content"; - case "Generate": - return "Generating supporting files"; - case "Clean": - return "Cleaning the database"; - case "Export": - return "Exporting to JSON"; - case "Import": - return "Importing from JSON"; - case "Auto Tag": - return "Auto tagging scenes"; - case "Plugin Operation": - return "Running Plugin Operation"; - case "Migrate": - return "Migrating"; - case "Stash-Box Performer Batch Operation": - return "Tagging performers from Stash-Box instance"; - default: - return "Idle"; - } - } - - useEffect(() => { - if (jobStatus?.data?.jobStatus) { - setStatus(statusToText(jobStatus.data.jobStatus.status)); - const newProgress = jobStatus.data.jobStatus.progress; - if (newProgress < 0) { - setProgress(-1); - } else { - setProgress(newProgress * 100); - } - } - }, [jobStatus]); - - useEffect(() => { - if (metadataUpdate?.data?.metadataUpdate) { - setStatus(statusToText(metadataUpdate.data.metadataUpdate.status)); - const newProgress = metadataUpdate.data.metadataUpdate.progress; - if (newProgress < 0) { - setProgress(-1); - } else { - setProgress(newProgress * 100); - } - } - }, [metadataUpdate]); - - function onImport() { + async function onImport() { setIsImportAlertOpen(false); - mutateMetadataImport() - .then(() => { - jobStatus.refetch(); - }) - .catch((e) => { - Toast.error(e); - }); + try { + await mutateMetadataImport(); + Toast.success({ content: "Added import task to queue" }); + } catch (e) { + Toast.error(e); + } } function renderImportAlert() { @@ -146,8 +86,6 @@ export const SettingsTasksPanel: React.FC = () => { setIsCleanAlertOpen(false); mutateMetadataClean({ dryRun: cleanDryRun, - }).then(() => { - jobStatus.refetch(); }); } @@ -216,8 +154,7 @@ export const SettingsTasksPanel: React.FC = () => { scanGenerateSprites, scanGeneratePhashes, }); - Toast.success({ content: "Started scan" }); - jobStatus.refetch(); + Toast.success({ content: "Added scan to job queue" }); } catch (e) { Toast.error(e); } @@ -252,53 +189,15 @@ export const SettingsTasksPanel: React.FC = () => { async function onAutoTag(paths?: string[]) { try { await mutateMetadataAutoTag(getAutoTagInput(paths)); - Toast.success({ content: "Started auto tagging" }); - jobStatus.refetch(); + Toast.success({ content: "Added Auto tagging job to queue" }); } catch (e) { Toast.error(e); } } - function maybeRenderStop() { - if (!status || status === "Idle") { - return undefined; - } - - return ( - - - - ); - } - - function renderJobStatus() { - return ( - <> - -
    Status: {status}
    - {!!status && status !== "Idle" ? ( - -1 ? progress : 100} - label={progress > -1 ? `${progress.toFixed(0)}%` : ""} - /> - ) : ( - "" - )} -
    - {maybeRenderStop()} - - ); - } - async function onPluginTaskClicked(plugin: Plugin, operation: PluginTask) { await mutateRunPluginTask(plugin.id, operation.name); + Toast.success({ content: `Added ${operation.name} job to queue` }); } function renderPluginTasks(plugin: Plugin, pluginTasks: PluginTask[]) { @@ -366,6 +265,24 @@ export const SettingsTasksPanel: React.FC = () => { ); } + async function onMigrateHashNaming() { + try { + await mutateMigrateHashNaming(); + Toast.success({ content: "Added hash migration task to queue" }); + } catch (err) { + Toast.error(err); + } + } + + async function onExport() { + try { + await mutateMetadataExport(); + Toast.success({ content: "Added export task to queue" }); + } catch (err) { + Toast.error(err); + } + } + if (isBackupRunning) { return ; } @@ -378,9 +295,9 @@ export const SettingsTasksPanel: React.FC = () => { {renderScanDialog()} {renderAutoTagDialog()} -

    Running Jobs

    +

    Job Queue

    - {renderJobStatus()} +
    @@ -533,13 +450,7 @@ export const SettingsTasksPanel: React.FC = () => { id="export" variant="secondary" type="submit" - onClick={() => - mutateMetadataExport() - .then(() => { - jobStatus.refetch(); - }) - .catch((e) => Toast.error(e)) - } + onClick={() => onExport()} > Full Export @@ -619,11 +530,7 @@ export const SettingsTasksPanel: React.FC = () => { diff --git a/ui/v2.5/src/components/Settings/styles.scss b/ui/v2.5/src/components/Settings/styles.scss index dd6c2e723..04ace918d 100644 --- a/ui/v2.5/src/components/Settings/styles.scss +++ b/ui/v2.5/src/components/Settings/styles.scss @@ -71,6 +71,54 @@ } } +.job-table { + height: 10em; + overflow-y: auto; + + ul { + list-style: none; + padding-inline-start: 0; + } + + li { + opacity: 0; + transition: opacity 0.25s; + + &.fade-in { + opacity: 1; + } + + > div { + align-items: flex-start; + display: flex; + } + } + + .job-status { + width: 100%; + } + + .stop:not(:disabled), + .stopping .fa-icon, + .cancelled .fa-icon { + color: $danger; + } + + .running .fa-icon, + .finished .fa-icon { + color: $success; + } + + .ready .fa-icon { + color: $warning; + } + + .cancelled, + .finished { + color: $text-muted; + } +} + #temp-enable-duration .duration-control:disabled { opacity: 0.5; } diff --git a/ui/v2.5/src/components/Tagger/performers/PerformerTagger.tsx b/ui/v2.5/src/components/Tagger/performers/PerformerTagger.tsx index 988633ea9..4d6c721fd 100755 --- a/ui/v2.5/src/components/Tagger/performers/PerformerTagger.tsx +++ b/ui/v2.5/src/components/Tagger/performers/PerformerTagger.tsx @@ -1,4 +1,4 @@ -import React, { useRef, useState } from "react"; +import React, { useEffect, useRef, useState } from "react"; import { Button, Card, Form, InputGroup, ProgressBar } from "react-bootstrap"; import { Link } from "react-router-dom"; import { HashLink } from "react-router-hash-link"; @@ -9,7 +9,8 @@ import { LoadingIndicator, Modal } from "src/components/Shared"; import { stashBoxPerformerQuery, useConfiguration, - useMetadataUpdate, + useJobsSubscribe, + mutateStashBoxBatchPerformerTag, } from "src/core/StashService"; import { Manual } from "src/components/Help/Manual"; @@ -24,6 +25,11 @@ import { import PerformerModal from "../PerformerModal"; import { useUpdatePerformer } from "../queries"; +type JobFragment = Pick< + GQL.Job, + "id" | "status" | "subTasks" | "description" | "progress" +>; + const CLASSNAME = "PerformerTagger"; interface IPerformerTaggerListProps { @@ -32,6 +38,8 @@ interface IPerformerTaggerListProps { isIdle: boolean; config: ITaggerConfig; stashBoxes?: GQL.StashBox[]; + onBatchAdd: (performerInput: string) => void; + onBatchUpdate: (ids: string[] | undefined, refresh: boolean) => void; } const PerformerTaggerList: React.FC = ({ @@ -40,6 +48,8 @@ const PerformerTaggerList: React.FC = ({ isIdle, config, stashBoxes, + onBatchAdd, + onBatchUpdate, }) => { const [loading, setLoading] = useState(false); const [searchResults, setSearchResults] = useState< @@ -73,7 +83,6 @@ const PerformerTaggerList: React.FC = ({ const [showBatchAdd, setShowBatchAdd] = useState(false); const [showBatchUpdate, setShowBatchUpdate] = useState(false); const performerInput = useRef(null); - const [doBatchQuery] = GQL.useStashBoxBatchPerformerTagMutation(); const [error, setError] = useState< Record @@ -138,40 +147,15 @@ const PerformerTaggerList: React.FC = ({ .finally(() => setLoadingUpdate(undefined)); }; - const handleBatchAdd = () => { + async function handleBatchAdd() { if (performerInput.current) { - const names = performerInput.current.value - .split(",") - .map((n) => n.trim()) - .filter((n) => n.length > 0); - - if (names.length > 0) { - doBatchQuery({ - variables: { - input: { - performer_names: names, - endpoint: selectedEndpoint.index, - refresh: false, - }, - }, - }); - } + onBatchAdd(performerInput.current.value); } setShowBatchAdd(false); - }; + } const handleBatchUpdate = () => { - const ids = !queryAll ? performers.map((p) => p.id) : undefined; - doBatchQuery({ - variables: { - input: { - performer_ids: ids, - endpoint: selectedEndpoint.index, - refresh, - exclude_fields: config.excludedPerformerFields ?? [], - }, - }, - }); + onBatchUpdate(!queryAll ? performers.map((p) => p.id) : undefined, refresh); setShowBatchUpdate(false); }; @@ -506,7 +490,7 @@ interface ITaggerProps { } export const PerformerTagger: React.FC = ({ performers }) => { - const jobStatus = useMetadataUpdate(); + const jobsSubscribe = useJobsSubscribe(); const stashConfig = useConfiguration(); const [{ data: config }, setConfig] = useLocalForage( LOCAL_FORAGE_KEY, @@ -515,6 +499,28 @@ export const PerformerTagger: React.FC = ({ performers }) => { const [showConfig, setShowConfig] = useState(false); const [showManual, setShowManual] = useState(false); + const [batchJobID, setBatchJobID] = useState(); + const [batchJob, setBatchJob] = useState(); + + // monitor batch operation + useEffect(() => { + if (!jobsSubscribe.data) { + return; + } + + const event = jobsSubscribe.data.jobsSubscribe; + if (event.job.id !== batchJobID) { + return; + } + + if (event.type !== GQL.JobStatusUpdateType.Remove) { + setBatchJob(event.job); + } else { + setBatchJob(undefined); + setBatchJobID(undefined); + } + }, [jobsSubscribe, batchJobID]); + if (!config) return ; const savedEndpointIndex = @@ -529,12 +535,73 @@ export const PerformerTagger: React.FC = ({ performers }) => { const selectedEndpoint = stashConfig.data?.configuration.general.stashBoxes[selectedEndpointIndex]; - const progress = - jobStatus.data?.metadataUpdate.status === - "Stash-Box Performer Batch Operation" && - jobStatus.data.metadataUpdate.progress >= 0 - ? jobStatus.data.metadataUpdate.progress * 100 - : null; + async function batchAdd(performerInput: string) { + if (performerInput && selectedEndpoint) { + const names = performerInput + .split(",") + .map((n) => n.trim()) + .filter((n) => n.length > 0); + + if (names.length > 0) { + const ret = await mutateStashBoxBatchPerformerTag({ + performer_names: names, + endpoint: selectedEndpointIndex, + refresh: false, + }); + + setBatchJobID(ret.data?.stashBoxBatchPerformerTag); + } + } + } + + async function batchUpdate(ids: string[] | undefined, refresh: boolean) { + if (config && selectedEndpoint) { + const ret = await mutateStashBoxBatchPerformerTag({ + performer_ids: ids, + endpoint: selectedEndpointIndex, + refresh, + exclude_fields: config.excludedPerformerFields ?? [], + }); + + setBatchJobID(ret.data?.stashBoxBatchPerformerTag); + } + } + + // const progress = + // jobStatus.data?.metadataUpdate.status === + // "Stash-Box Performer Batch Operation" && + // jobStatus.data.metadataUpdate.progress >= 0 + // ? jobStatus.data.metadataUpdate.progress * 100 + // : null; + + function renderStatus() { + if (batchJob) { + const progress = + batchJob.progress !== undefined && batchJob.progress !== null + ? batchJob.progress * 100 + : undefined; + return ( + +
    Status: Tagging performers
    + {progress !== undefined && ( + + )} +
    + ); + } + + if (batchJobID !== undefined) { + return ( + +
    Status: Tagging job queued
    +
    + ); + } + } return ( <> @@ -543,16 +610,7 @@ export const PerformerTagger: React.FC = ({ performers }) => { onClose={() => setShowManual(false)} defaultActiveTab="Tagger.md" /> - {progress !== null && ( - -
    Status: Tagging performers
    - -
    - )} + {renderStatus()}
    {selectedEndpointIndex !== -1 && selectedEndpoint ? ( <> @@ -581,9 +639,11 @@ export const PerformerTagger: React.FC = ({ performers }) => { endpoint: selectedEndpoint.endpoint, index: selectedEndpointIndex, }} - isIdle={progress === null} + isIdle={batchJobID === undefined} config={config} stashBoxes={stashConfig.data?.configuration.general.stashBoxes} + onBatchAdd={batchAdd} + onBatchUpdate={batchUpdate} /> ) : ( diff --git a/ui/v2.5/src/core/StashService.ts b/ui/v2.5/src/core/StashService.ts index 90a987f42..9601f10a1 100644 --- a/ui/v2.5/src/core/StashService.ts +++ b/ui/v2.5/src/core/StashService.ts @@ -701,6 +701,8 @@ export const useGenerateAPIKey = () => update: deleteCache([GQL.ConfigurationDocument]), }); +export const useJobsSubscribe = () => GQL.useJobsSubscribeSubscription(); + export const useConfigureDLNA = () => GQL.useConfigureDlnaMutation({ refetchQueries: getQueryNames([GQL.ConfigurationDocument]), @@ -715,8 +717,6 @@ export const useAddTempDLNAIP = () => GQL.useAddTempDlnaipMutation(); export const useRemoveTempDLNAIP = () => GQL.useRemoveTempDlnaipMutation(); -export const useMetadataUpdate = () => GQL.useMetadataUpdateSubscription(); - export const useLoggingSubscribe = () => GQL.useLoggingSubscribeSubscription(); export const querySystemStatus = () => @@ -735,14 +735,17 @@ export const useLogs = () => fetchPolicy: "no-cache", }); -export const useJobStatus = () => - GQL.useJobStatusQuery({ +export const useJobQueue = () => + GQL.useJobQueueQuery({ fetchPolicy: "no-cache", }); -export const mutateStopJob = () => +export const mutateStopJob = (jobID: string) => client.mutate({ mutation: GQL.StopJobDocument, + variables: { + job_id: jobID, + }, }); export const useDLNAStatus = () => @@ -942,6 +945,14 @@ export const mutateBackupDatabase = (input: GQL.BackupDatabaseInput) => variables: { input }, }); +export const mutateStashBoxBatchPerformerTag = ( + input: GQL.StashBoxBatchPerformerTagInput +) => + client.mutate({ + mutation: GQL.StashBoxBatchPerformerTagDocument, + variables: { input }, + }); + export const querySceneByPathRegex = (filter: GQL.FindFilterType) => client.query({ query: GQL.FindScenesByPathRegexDocument, diff --git a/ui/v2.5/src/core/createClient.ts b/ui/v2.5/src/core/createClient.ts index 913222e5a..b1b8eba66 100644 --- a/ui/v2.5/src/core/createClient.ts +++ b/ui/v2.5/src/core/createClient.ts @@ -142,23 +142,13 @@ export const createClient = () => { }); // Watch for scan/clean tasks and reset cache when they complete - let prevStatus = "Idle"; client - .subscribe({ - query: GQL.MetadataUpdateDocument, + .subscribe({ + query: GQL.ScanCompleteSubscribeDocument, }) .subscribe({ - next: (res) => { - const currentStatus = res.data?.metadataUpdate.status; - if (currentStatus) { - if ( - currentStatus === "Idle" && - (prevStatus === "Scan" || prevStatus === "Clean") - ) { - client.resetStore(); - } - prevStatus = currentStatus; - } + next: () => { + client.resetStore(); }, }); diff --git a/ui/v2.5/src/styles/_theme.scss b/ui/v2.5/src/styles/_theme.scss index 5963b10f0..6b2e07f11 100644 --- a/ui/v2.5/src/styles/_theme.scss +++ b/ui/v2.5/src/styles/_theme.scss @@ -47,12 +47,17 @@ button.minimal { color: $text-color; transition: none; - &:hover { + &:disabled { + background: none; + opacity: 0.5; + } + + &:hover:not(:disabled) { background: rgba(138, 155, 168, 0.15); color: $text-color; } - &:active { + &:active:not(:disabled) { background: rgba(138, 155, 168, 0.3); color: $text-color; }