Job queueing (#1379)

This commit is contained in:
WithoutPants
2021-05-24 14:24:18 +10:00
committed by GitHub
parent 9aa2dfd96c
commit 0e01374537
42 changed files with 2571 additions and 1110 deletions

View File

@@ -0,0 +1,10 @@
fragment JobData on Job {
id
status
subTasks
description
progress
startTime
endTime
addTime
}

View File

@@ -0,0 +1,7 @@
mutation StopJob($job_id: ID!) {
stopJob(job_id: $job_id)
}
mutation StopAllJobs {
stopAllJobs
}

View File

@@ -34,10 +34,6 @@ mutation MigrateHashNaming {
migrateHashNaming
}
mutation StopJob {
stopJob
}
mutation BackupDatabase($input: BackupDatabaseInput!) {
backupDatabase(input: $input)
}

View File

@@ -0,0 +1,11 @@
query JobQueue {
jobQueue {
...JobData
}
}
query FindJob($input: FindJobInput!) {
findJob(input: $input) {
...JobData
}
}

View File

@@ -1,11 +1,3 @@
query JobStatus {
jobStatus {
progress
status
message
}
}
query SystemStatus {
systemStatus {
databaseSchema

View File

@@ -1,8 +1,13 @@
subscription MetadataUpdate {
metadataUpdate {
progress
status
message
subscription JobsSubscribe {
jobsSubscribe {
type
job {
id
status
subTasks
description
progress
}
}
}
@@ -10,4 +15,8 @@ subscription LoggingSubscribe {
loggingSubscribe {
...LogEntryData
}
}
subscription ScanCompleteSubscribe {
scanCompleteSubscribe
}

View File

@@ -106,9 +106,12 @@ type Query {
"""Returns an array of paths for the given path"""
directory(path: String): Directory!
# Metadata
# System status
systemStatus: SystemStatus!
jobStatus: MetadataUpdateStatus!
# Job status
jobQueue: [Job!]
findJob(input: FindJobInput!): Job
dlnaStatus: DLNAStatus!
@@ -207,31 +210,32 @@ type Mutation {
exportObjects(input: ExportObjectsInput!): String
"""Performs an incremental import. Returns the job ID"""
importObjects(input: ImportObjectsInput!): String!
importObjects(input: ImportObjectsInput!): ID!
"""Start an full import. Completely wipes the database and imports from the metadata directory. Returns the job ID"""
metadataImport: String!
metadataImport: ID!
"""Start a full export. Outputs to the metadata directory. Returns the job ID"""
metadataExport: String!
metadataExport: ID!
"""Start a scan. Returns the job ID"""
metadataScan(input: ScanMetadataInput!): String!
metadataScan(input: ScanMetadataInput!): ID!
"""Start generating content. Returns the job ID"""
metadataGenerate(input: GenerateMetadataInput!): String!
metadataGenerate(input: GenerateMetadataInput!): ID!
"""Start auto-tagging. Returns the job ID"""
metadataAutoTag(input: AutoTagMetadataInput!): String!
metadataAutoTag(input: AutoTagMetadataInput!): ID!
"""Clean metadata. Returns the job ID"""
metadataClean(input: CleanMetadataInput!): String!
metadataClean(input: CleanMetadataInput!): ID!
"""Migrate generated files for the current hash naming"""
migrateHashNaming: String!
migrateHashNaming: ID!
"""Reload scrapers"""
reloadScrapers: Boolean!
"""Run plugin task. Returns the job ID"""
runPluginTask(plugin_id: ID!, task_name: String!, args: [PluginArgInput!]): String!
runPluginTask(plugin_id: ID!, task_name: String!, args: [PluginArgInput!]): ID!
reloadPlugins: Boolean!
stopJob: Boolean!
stopJob(job_id: ID!): Boolean!
stopAllJobs: Boolean!
"""Submit fingerprints to stash-box instance"""
submitStashBoxFingerprints(input: StashBoxFingerprintSubmissionInput!): Boolean!
@@ -254,9 +258,11 @@ type Mutation {
type Subscription {
"""Update from the metadata manager"""
metadataUpdate: MetadataUpdateStatus!
jobsSubscribe: JobStatusUpdate!
loggingSubscribe: [LogEntry!]!
scanCompleteSubscribe: Boolean!
}
schema {

View File

@@ -0,0 +1,33 @@
enum JobStatus {
READY
RUNNING
FINISHED
STOPPING
CANCELLED
}
type Job {
id: ID!
status: JobStatus!
subTasks: [String!]
description: String!
progress: Float
startTime: Time
endTime: Time
addTime: Time!
}
input FindJobInput {
id: ID!
}
enum JobStatusUpdateType {
ADD
REMOVE
UPDATE
}
type JobStatusUpdate {
type: JobStatusUpdateType!
job: Job!
}

View File

@@ -63,12 +63,6 @@ input AutoTagMetadataInput {
tags: [String!]
}
type MetadataUpdateStatus {
progress: Float!
status: String!
message: String!
}
input ExportObjectTypeInput {
ids: [String!]
all: Boolean

View File

@@ -0,0 +1,23 @@
package api
import (
"context"
"strconv"
"github.com/stashapp/stash/pkg/manager"
)
func (r *mutationResolver) StopJob(ctx context.Context, jobID string) (bool, error) {
idInt, err := strconv.Atoi(jobID)
if err != nil {
return false, err
}
manager.GetInstance().JobManager.CancelJob(idInt)
return true, nil
}
func (r *mutationResolver) StopAllJobs(ctx context.Context) (bool, error) {
manager.GetInstance().JobManager.CancelAll()
return true, nil
}

View File

@@ -4,6 +4,8 @@ import (
"context"
"io/ioutil"
"path/filepath"
"strconv"
"sync"
"time"
"github.com/stashapp/stash/pkg/database"
@@ -15,18 +17,22 @@ import (
)
func (r *mutationResolver) MetadataScan(ctx context.Context, input models.ScanMetadataInput) (string, error) {
if err := manager.GetInstance().Scan(input); err != nil {
jobID, err := manager.GetInstance().Scan(input)
if err != nil {
return "", err
}
return "todo", nil
return strconv.Itoa(jobID), nil
}
func (r *mutationResolver) MetadataImport(ctx context.Context) (string, error) {
if err := manager.GetInstance().Import(); err != nil {
jobID, err := manager.GetInstance().Import()
if err != nil {
return "", err
}
return "todo", nil
return strconv.Itoa(jobID), nil
}
func (r *mutationResolver) ImportObjects(ctx context.Context, input models.ImportObjectsInput) (string, error) {
@@ -35,30 +41,26 @@ func (r *mutationResolver) ImportObjects(ctx context.Context, input models.Impor
return "", err
}
_, err = manager.GetInstance().RunSingleTask(t)
jobID := manager.GetInstance().RunSingleTask(t)
return strconv.Itoa(jobID), nil
}
func (r *mutationResolver) MetadataExport(ctx context.Context) (string, error) {
jobID, err := manager.GetInstance().Export()
if err != nil {
return "", err
}
return "todo", nil
}
func (r *mutationResolver) MetadataExport(ctx context.Context) (string, error) {
if err := manager.GetInstance().Export(); err != nil {
return "", err
}
return "todo", nil
return strconv.Itoa(jobID), nil
}
func (r *mutationResolver) ExportObjects(ctx context.Context, input models.ExportObjectsInput) (*string, error) {
t := manager.CreateExportTask(config.GetInstance().GetVideoFileNamingAlgorithm(), input)
wg, err := manager.GetInstance().RunSingleTask(t)
if err != nil {
return nil, err
}
wg.Wait()
var wg sync.WaitGroup
wg.Add(1)
t.Start(&wg)
if t.DownloadHash != "" {
baseURL, _ := ctx.Value(BaseURLCtxKey).(string)
@@ -73,40 +75,28 @@ func (r *mutationResolver) ExportObjects(ctx context.Context, input models.Expor
}
func (r *mutationResolver) MetadataGenerate(ctx context.Context, input models.GenerateMetadataInput) (string, error) {
if err := manager.GetInstance().Generate(input); err != nil {
jobID, err := manager.GetInstance().Generate(input)
if err != nil {
return "", err
}
return "todo", nil
return strconv.Itoa(jobID), nil
}
func (r *mutationResolver) MetadataAutoTag(ctx context.Context, input models.AutoTagMetadataInput) (string, error) {
manager.GetInstance().AutoTag(input)
return "todo", nil
jobID := manager.GetInstance().AutoTag(input)
return strconv.Itoa(jobID), nil
}
func (r *mutationResolver) MetadataClean(ctx context.Context, input models.CleanMetadataInput) (string, error) {
manager.GetInstance().Clean(input)
return "todo", nil
jobID := manager.GetInstance().Clean(input)
return strconv.Itoa(jobID), nil
}
func (r *mutationResolver) MigrateHashNaming(ctx context.Context) (string, error) {
manager.GetInstance().MigrateHash()
return "todo", nil
}
func (r *mutationResolver) JobStatus(ctx context.Context) (*models.MetadataUpdateStatus, error) {
status := manager.GetInstance().Status
ret := models.MetadataUpdateStatus{
Progress: status.Progress,
Status: status.Status.String(),
Message: "",
}
return &ret, nil
}
func (r *mutationResolver) StopJob(ctx context.Context) (bool, error) {
return manager.GetInstance().Status.Stop(), nil
jobID := manager.GetInstance().MigrateHash()
return strconv.Itoa(jobID), nil
}
func (r *mutationResolver) BackupDatabase(ctx context.Context, input models.BackupDatabaseInput) (*string, error) {

View File

@@ -3,6 +3,7 @@ package api
import (
"context"
"fmt"
"strconv"
"github.com/stashapp/stash/pkg/manager"
"github.com/stashapp/stash/pkg/manager/config"
@@ -23,6 +24,6 @@ func (r *mutationResolver) SubmitStashBoxFingerprints(ctx context.Context, input
}
func (r *mutationResolver) StashBoxBatchPerformerTag(ctx context.Context, input models.StashBoxBatchPerformerTagInput) (string, error) {
manager.GetInstance().StashBoxBatchPerformerTag(input)
return "todo", nil
jobID := manager.GetInstance().StashBoxBatchPerformerTag(input)
return strconv.Itoa(jobID), nil
}

View File

@@ -0,0 +1,52 @@
package api
import (
"context"
"strconv"
"github.com/stashapp/stash/pkg/job"
"github.com/stashapp/stash/pkg/manager"
"github.com/stashapp/stash/pkg/models"
)
func (r *queryResolver) JobQueue(ctx context.Context) ([]*models.Job, error) {
queue := manager.GetInstance().JobManager.GetQueue()
var ret []*models.Job
for _, j := range queue {
ret = append(ret, jobToJobModel(j))
}
return ret, nil
}
func (r *queryResolver) FindJob(ctx context.Context, input models.FindJobInput) (*models.Job, error) {
jobID, err := strconv.Atoi(input.ID)
if err != nil {
return nil, err
}
j := manager.GetInstance().JobManager.GetJob(jobID)
if j == nil {
return nil, nil
}
return jobToJobModel(*j), nil
}
func jobToJobModel(j job.Job) *models.Job {
ret := &models.Job{
ID: strconv.Itoa(j.ID),
Status: models.JobStatus(j.Status),
Description: j.Description,
SubTasks: j.Details,
StartTime: j.StartTime,
EndTime: j.EndTime,
AddTime: j.AddTime,
}
if j.Progress != -1 {
ret.Progress = &j.Progress
}
return ret
}

View File

@@ -7,17 +7,6 @@ import (
"github.com/stashapp/stash/pkg/models"
)
func (r *queryResolver) JobStatus(ctx context.Context) (*models.MetadataUpdateStatus, error) {
status := manager.GetInstance().Status
ret := models.MetadataUpdateStatus{
Progress: status.Progress,
Status: status.Status.String(),
Message: "",
}
return &ret, nil
}
func (r *queryResolver) SystemStatus(ctx context.Context) (*models.SystemStatus, error) {
return manager.GetInstance().GetSystemStatus(), nil
}

View File

@@ -0,0 +1,64 @@
package api
import (
"context"
"time"
"github.com/stashapp/stash/pkg/job"
"github.com/stashapp/stash/pkg/manager"
"github.com/stashapp/stash/pkg/models"
)
type throttledUpdate struct {
id int
pendingUpdate *job.Job
lastUpdate time.Time
broadcastTimer *time.Timer
killTimer *time.Timer
}
func (tu *throttledUpdate) broadcast(output chan *models.JobStatusUpdate) {
tu.lastUpdate = time.Now()
output <- &models.JobStatusUpdate{
Type: models.JobStatusUpdateTypeUpdate,
Job: jobToJobModel(*tu.pendingUpdate),
}
tu.broadcastTimer = nil
tu.pendingUpdate = nil
}
func makeJobStatusUpdate(t models.JobStatusUpdateType, j job.Job) *models.JobStatusUpdate {
return &models.JobStatusUpdate{
Type: t,
Job: jobToJobModel(j),
}
}
func (r *subscriptionResolver) JobsSubscribe(ctx context.Context) (<-chan *models.JobStatusUpdate, error) {
msg := make(chan *models.JobStatusUpdate, 100)
subscription := manager.GetInstance().JobManager.Subscribe(ctx)
go func() {
for {
select {
case j := <-subscription.NewJob:
msg <- makeJobStatusUpdate(models.JobStatusUpdateTypeAdd, j)
case j := <-subscription.RemovedJob:
msg <- makeJobStatusUpdate(models.JobStatusUpdateTypeRemove, j)
case j := <-subscription.UpdatedJob:
msg <- makeJobStatusUpdate(models.JobStatusUpdateTypeUpdate, j)
case <-ctx.Done():
close(msg)
return
}
}
}()
return msg, nil
}
func (r *subscriptionResolver) ScanCompleteSubscribe(ctx context.Context) (<-chan bool, error) {
return manager.GetInstance().ScanSubscribe(ctx), nil
}

View File

@@ -1,40 +0,0 @@
package api
import (
"context"
"time"
"github.com/stashapp/stash/pkg/manager"
"github.com/stashapp/stash/pkg/models"
)
func (r *subscriptionResolver) MetadataUpdate(ctx context.Context) (<-chan *models.MetadataUpdateStatus, error) {
msg := make(chan *models.MetadataUpdateStatus, 1)
ticker := time.NewTicker(5 * time.Second)
go func() {
lastStatus := manager.TaskStatus{}
for {
select {
case _ = <-ticker.C:
thisStatus := manager.GetInstance().Status
if thisStatus != lastStatus {
ret := models.MetadataUpdateStatus{
Progress: thisStatus.Progress,
Status: thisStatus.Status.String(),
Message: "",
}
msg <- &ret
}
lastStatus = thisStatus
case <-ctx.Done():
ticker.Stop()
close(msg)
return
}
}
}()
return msg, nil
}

82
pkg/job/job.go Normal file
View File

@@ -0,0 +1,82 @@
package job
import (
"context"
"time"
)
// JobExec represents the implementation of a Job to be executed.
type JobExec interface {
Execute(ctx context.Context, progress *Progress)
}
type jobExecImpl struct {
fn func(ctx context.Context, progress *Progress)
}
func (j *jobExecImpl) Execute(ctx context.Context, progress *Progress) {
j.fn(ctx, progress)
}
// MakeJobExec returns a simple JobExec implementation using the provided
// function.
func MakeJobExec(fn func(ctx context.Context, progress *Progress)) JobExec {
return &jobExecImpl{
fn: fn,
}
}
// Status is the status of a Job
type Status string
const (
// StatusReady means that the Job is not yet started.
StatusReady Status = "READY"
// StatusRunning means that the job is currently running.
StatusRunning Status = "RUNNING"
// StatusStopping means that the job is cancelled but is still running.
StatusStopping Status = "STOPPING"
// StatusFinished means that the job was completed.
StatusFinished Status = "FINISHED"
// StatusCancelled means that the job was cancelled and is now stopped.
StatusCancelled Status = "CANCELLED"
)
// Job represents the status of a queued or running job.
type Job struct {
ID int
Status Status
// details of the current operations of the job
Details []string
Description string
// Progress in terms of 0 - 1.
Progress float64
StartTime *time.Time
EndTime *time.Time
AddTime time.Time
exec JobExec
cancelFunc context.CancelFunc
}
func (j *Job) cancel() {
if j.Status == StatusReady {
j.Status = StatusCancelled
} else if j.Status == StatusRunning {
j.Status = StatusStopping
}
if j.cancelFunc != nil {
j.cancelFunc()
}
}
// IsCancelled returns true if cancel has been called on the context.
func IsCancelled(ctx context.Context) bool {
select {
case <-ctx.Done():
return true
default:
return false
}
}

394
pkg/job/manager.go Normal file
View File

@@ -0,0 +1,394 @@
package job
import (
"context"
"sync"
"time"
)
const maxGraveyardSize = 10
const defaultThrottleLimit = time.Second
// Manager maintains a queue of jobs. Jobs are executed one at a time.
type Manager struct {
queue []*Job
graveyard []*Job
mutex sync.Mutex
notEmpty *sync.Cond
stop chan struct{}
lastID int
subscriptions []*ManagerSubscription
updateThrottleLimit time.Duration
}
// NewManager initialises and returns a new Manager.
func NewManager() *Manager {
ret := &Manager{
stop: make(chan struct{}),
updateThrottleLimit: defaultThrottleLimit,
}
ret.notEmpty = sync.NewCond(&ret.mutex)
go ret.dispatcher()
return ret
}
// Stop is used to stop the dispatcher thread. Once Stop is called, no
// more Jobs will be processed.
func (m *Manager) Stop() {
m.CancelAll()
close(m.stop)
}
// Add queues a job.
func (m *Manager) Add(description string, e JobExec) int {
m.mutex.Lock()
defer m.mutex.Unlock()
t := time.Now()
j := Job{
ID: m.nextID(),
Status: StatusReady,
Description: description,
AddTime: t,
exec: e,
}
m.queue = append(m.queue, &j)
if len(m.queue) == 1 {
// notify that there is now a job in the queue
m.notEmpty.Broadcast()
}
m.notifyNewJob(&j)
return j.ID
}
// Start adds a job and starts it immediately, concurrently with any other
// jobs.
func (m *Manager) Start(description string, e JobExec) int {
m.mutex.Lock()
defer m.mutex.Unlock()
t := time.Now()
j := Job{
ID: m.nextID(),
Status: StatusReady,
Description: description,
AddTime: t,
exec: e,
}
m.queue = append(m.queue, &j)
m.dispatch(&j)
return j.ID
}
func (m *Manager) notifyNewJob(j *Job) {
// assumes lock held
for _, s := range m.subscriptions {
// don't block if channel is full
select {
case s.newJob <- *j:
default:
}
}
}
func (m *Manager) nextID() int {
m.lastID += 1
return m.lastID
}
func (m *Manager) getReadyJob() *Job {
// assumes lock held
for _, j := range m.queue {
if j.Status == StatusReady {
return j
}
}
return nil
}
func (m *Manager) dispatcher() {
m.mutex.Lock()
for {
// wait until we have something to process
j := m.getReadyJob()
for j == nil {
m.notEmpty.Wait()
// it's possible that we have been stopped - check here
select {
case <-m.stop:
m.mutex.Unlock()
return
default:
// keep going
j = m.getReadyJob()
}
}
done := m.dispatch(j)
// unlock the mutex and wait for the job to finish
m.mutex.Unlock()
<-done
m.mutex.Lock()
// remove the job from the queue
m.removeJob(j)
// process next job
}
}
func (m *Manager) newProgress(j *Job) *Progress {
return &Progress{
updater: &updater{
m: m,
job: j,
},
percent: ProgressIndefinite,
}
}
func (m *Manager) dispatch(j *Job) (done chan struct{}) {
// assumes lock held
t := time.Now()
j.StartTime = &t
j.Status = StatusRunning
ctx, cancelFunc := context.WithCancel(context.Background())
j.cancelFunc = cancelFunc
done = make(chan struct{})
go func() {
progress := m.newProgress(j)
j.exec.Execute(ctx, progress)
m.onJobFinish(j)
close(done)
}()
m.notifyJobUpdate(j)
return
}
func (m *Manager) onJobFinish(job *Job) {
m.mutex.Lock()
defer m.mutex.Unlock()
if job.Status == StatusStopping {
job.Status = StatusCancelled
} else {
job.Status = StatusFinished
}
t := time.Now()
job.EndTime = &t
}
func (m *Manager) removeJob(job *Job) {
// assumes lock held
index, _ := m.getJob(m.queue, job.ID)
if index == -1 {
return
}
// clear any subtasks
job.Details = nil
m.queue = append(m.queue[:index], m.queue[index+1:]...)
m.graveyard = append(m.graveyard, job)
if len(m.graveyard) > maxGraveyardSize {
m.graveyard = m.graveyard[1:]
}
// notify job removed
for _, s := range m.subscriptions {
// don't block if channel is full
select {
case s.removedJob <- *job:
default:
}
}
}
func (m *Manager) getJob(list []*Job, id int) (index int, job *Job) {
// assumes lock held
for i, j := range list {
if j.ID == id {
index = i
job = j
return
}
}
return -1, nil
}
// CancelJob cancels the job with the provided id. Jobs that have been started
// are notified that they are stopping. Jobs that have not yet started are
// removed from the queue. If no job exists with the provided id, then there is
// no effect. Likewise, if the job is already cancelled, there is no effect.
func (m *Manager) CancelJob(id int) {
m.mutex.Lock()
defer m.mutex.Unlock()
_, j := m.getJob(m.queue, id)
if j != nil {
j.cancel()
if j.Status == StatusCancelled {
// remove from the queue
m.removeJob(j)
}
}
}
// CancelAll cancels all of the jobs in the queue. This is the same as
// calling CancelJob on all jobs in the queue.
func (m *Manager) CancelAll() {
m.mutex.Lock()
defer m.mutex.Unlock()
// call cancel on all
for _, j := range m.queue {
j.cancel()
if j.Status == StatusCancelled {
// add to graveyard
m.removeJob(j)
}
}
}
// GetJob returns a copy of the Job for the provided id. Returns nil if the job
// does not exist.
func (m *Manager) GetJob(id int) *Job {
m.mutex.Lock()
defer m.mutex.Unlock()
// get from the queue or graveyard
_, j := m.getJob(append(m.queue, m.graveyard...), id)
if j != nil {
// make a copy of the job and return the pointer
jCopy := *j
return &jCopy
}
return nil
}
// GetQueue returns a copy of the current job queue.
func (m *Manager) GetQueue() []Job {
m.mutex.Lock()
defer m.mutex.Unlock()
var ret []Job
for _, j := range m.queue {
jCopy := *j
ret = append(ret, jCopy)
}
return ret
}
// Subscribe subscribes to changes to jobs in the manager queue.
func (m *Manager) Subscribe(ctx context.Context) *ManagerSubscription {
m.mutex.Lock()
defer m.mutex.Unlock()
ret := newSubscription()
m.subscriptions = append(m.subscriptions, ret)
go func() {
<-ctx.Done()
m.mutex.Lock()
defer m.mutex.Unlock()
ret.close()
// remove from the list
for i, s := range m.subscriptions {
if s == ret {
m.subscriptions = append(m.subscriptions[:i], m.subscriptions[i+1:]...)
break
}
}
}()
return ret
}
func (m *Manager) notifyJobUpdate(j *Job) {
// don't update if job is finished or cancelled - these are handled
// by removeJob
if j.Status == StatusCancelled || j.Status == StatusFinished {
return
}
// assumes lock held
for _, s := range m.subscriptions {
// don't block if channel is full
select {
case s.updatedJob <- *j:
default:
}
}
}
type updater struct {
m *Manager
job *Job
lastUpdate time.Time
updateTimer *time.Timer
}
func (u *updater) notifyUpdate() {
// assumes lock held
u.m.notifyJobUpdate(u.job)
u.lastUpdate = time.Now()
u.updateTimer = nil
}
func (u *updater) updateProgress(progress float64, details []string) {
u.m.mutex.Lock()
defer u.m.mutex.Unlock()
u.job.Progress = progress
u.job.Details = details
if time.Since(u.lastUpdate) < u.m.updateThrottleLimit {
if u.updateTimer == nil {
u.updateTimer = time.AfterFunc(u.m.updateThrottleLimit-time.Since(u.lastUpdate), func() {
u.m.mutex.Lock()
defer u.m.mutex.Unlock()
u.notifyUpdate()
})
}
} else {
u.notifyUpdate()
}
}

343
pkg/job/manager_test.go Normal file
View File

@@ -0,0 +1,343 @@
package job
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
const sleepTime time.Duration = 10 * time.Millisecond
type testExec struct {
started chan struct{}
finish chan struct{}
cancelled bool
progress *Progress
}
func newTestExec(finish chan struct{}) *testExec {
return &testExec{
started: make(chan struct{}),
finish: finish,
}
}
func (e *testExec) Execute(ctx context.Context, p *Progress) {
e.progress = p
close(e.started)
if e.finish != nil {
<-e.finish
select {
case <-ctx.Done():
e.cancelled = true
default:
// fall through
}
}
}
func TestAdd(t *testing.T) {
m := NewManager()
const jobName = "test job"
exec1 := newTestExec(make(chan struct{}))
jobID := m.Add(jobName, exec1)
// expect jobID to be the first ID
assert := assert.New(t)
assert.Equal(1, jobID)
// wait a tiny bit
time.Sleep(sleepTime)
// expect job to have started
select {
case <-exec1.started:
// ok
default:
t.Error("exec was not started")
}
// expect status to be running
j := m.GetJob(jobID)
assert.Equal(StatusRunning, j.Status)
// expect description to be set
assert.Equal(jobName, j.Description)
// expect startTime and addTime to be set
assert.NotNil(j.StartTime)
assert.NotNil(j.AddTime)
// expect endTime to not be set
assert.Nil(j.EndTime)
// add another job to the queue
const otherJobName = "other job name"
exec2 := newTestExec(make(chan struct{}))
job2ID := m.Add(otherJobName, exec2)
// expect status to be ready
j2 := m.GetJob(job2ID)
assert.Equal(StatusReady, j2.Status)
// expect addTime to be set
assert.NotNil(j2.AddTime)
// expect startTime and endTime to not be set
assert.Nil(j2.StartTime)
assert.Nil(j2.EndTime)
// allow first job to finish
close(exec1.finish)
// wait a tiny bit
time.Sleep(sleepTime)
// expect first job to be finished
j = m.GetJob(jobID)
assert.Equal(StatusFinished, j.Status)
// expect end time to be set
assert.NotNil(j.EndTime)
// expect second job to have started
select {
case <-exec2.started:
// ok
default:
t.Error("exec was not started")
}
// expect status to be running
j2 = m.GetJob(job2ID)
assert.Equal(StatusRunning, j2.Status)
// expect startTime to be set
assert.NotNil(j2.StartTime)
}
func TestCancel(t *testing.T) {
m := NewManager()
// add two jobs
const jobName = "test job"
exec1 := newTestExec(make(chan struct{}))
jobID := m.Add(jobName, exec1)
const otherJobName = "other job"
exec2 := newTestExec(make(chan struct{}))
job2ID := m.Add(otherJobName, exec2)
// wait a tiny bit
time.Sleep(sleepTime)
m.CancelJob(job2ID)
// expect job to be cancelled
assert := assert.New(t)
j := m.GetJob(job2ID)
assert.Equal(StatusCancelled, j.Status)
// expect end time not to be set
assert.Nil(j.EndTime)
// expect job to be removed from the queue
assert.Len(m.GetQueue(), 1)
// expect job to have not have been started
select {
case <-exec2.started:
t.Error("cancelled exec was started")
default:
}
// cancel running job
m.CancelJob(jobID)
// wait a tiny bit
time.Sleep(sleepTime)
// expect status to be stopping
j = m.GetJob(jobID)
assert.Equal(StatusStopping, j.Status)
// expect job to still be in the queue
assert.Len(m.GetQueue(), 1)
// allow first job to finish
close(exec1.finish)
// wait a tiny bit
time.Sleep(sleepTime)
// expect job to be removed from the queue
assert.Len(m.GetQueue(), 0)
// expect job to be cancelled
j = m.GetJob(jobID)
assert.Equal(StatusCancelled, j.Status)
// expect endtime to be set
assert.NotNil(j.EndTime)
// expect job to have been cancelled via context
assert.True(exec1.cancelled)
}
func TestCancelAll(t *testing.T) {
m := NewManager()
// add two jobs
const jobName = "test job"
exec1 := newTestExec(make(chan struct{}))
jobID := m.Add(jobName, exec1)
const otherJobName = "other job"
exec2 := newTestExec(make(chan struct{}))
job2ID := m.Add(otherJobName, exec2)
// wait a tiny bit
time.Sleep(sleepTime)
m.CancelAll()
// allow first job to finish
close(exec1.finish)
// wait a tiny bit
time.Sleep(sleepTime)
// expect all jobs to be cancelled
assert := assert.New(t)
j := m.GetJob(job2ID)
assert.Equal(StatusCancelled, j.Status)
j = m.GetJob(jobID)
assert.Equal(StatusCancelled, j.Status)
// expect all jobs to be removed from the queue
assert.Len(m.GetQueue(), 0)
// expect job to have not have been started
select {
case <-exec2.started:
t.Error("cancelled exec was started")
default:
}
}
func TestSubscribe(t *testing.T) {
m := NewManager()
m.updateThrottleLimit = time.Millisecond * 100
ctx, cancel := context.WithCancel(context.Background())
s := m.Subscribe(ctx)
// add a job
const jobName = "test job"
exec1 := newTestExec(make(chan struct{}))
jobID := m.Add(jobName, exec1)
assert := assert.New(t)
select {
case newJob := <-s.NewJob:
assert.Equal(jobID, newJob.ID)
assert.Equal(jobName, newJob.Description)
assert.Equal(StatusReady, newJob.Status)
case <-time.After(time.Second):
t.Error("new job was not received")
}
// should receive an update when the job begins to run
select {
case updatedJob := <-s.UpdatedJob:
assert.Equal(jobID, updatedJob.ID)
assert.Equal(jobName, updatedJob.Description)
assert.Equal(StatusRunning, updatedJob.Status)
case <-time.After(time.Second):
t.Error("updated job was not received")
}
// wait for it to start
select {
case <-exec1.started:
// ok
case <-time.After(time.Second):
t.Error("exec was not started")
}
// test update throttling
exec1.progress.SetPercent(0.1)
// first update should be immediate
select {
case updatedJob := <-s.UpdatedJob:
assert.Equal(0.1, updatedJob.Progress)
case <-time.After(m.updateThrottleLimit):
t.Error("updated job was not received")
}
exec1.progress.SetPercent(0.2)
exec1.progress.SetPercent(0.3)
// should only receive a single update with the second status
select {
case updatedJob := <-s.UpdatedJob:
assert.Equal(0.3, updatedJob.Progress)
case <-time.After(time.Second):
t.Error("updated job was not received")
}
select {
case <-s.UpdatedJob:
t.Error("received an additional updatedJob")
default:
}
// allow job to finish
close(exec1.finish)
select {
case removedJob := <-s.RemovedJob:
assert.Equal(jobID, removedJob.ID)
assert.Equal(jobName, removedJob.Description)
assert.Equal(StatusFinished, removedJob.Status)
case <-time.After(time.Second):
t.Error("removed job was not received")
}
// should not receive another update
select {
case <-s.UpdatedJob:
t.Error("updated job was received after update")
case <-time.After(m.updateThrottleLimit):
}
// add another job and cancel it
exec2 := newTestExec(make(chan struct{}))
jobID = m.Add(jobName, exec2)
m.CancelJob(jobID)
select {
case removedJob := <-s.RemovedJob:
assert.Equal(jobID, removedJob.ID)
assert.Equal(jobName, removedJob.Description)
assert.Equal(StatusCancelled, removedJob.Status)
case <-time.After(time.Second):
t.Error("cancelled job was not received")
}
cancel()
}

138
pkg/job/progress.go Normal file
View File

@@ -0,0 +1,138 @@
package job
import "sync"
// ProgressIndefinite is the special percent value to indicate that the
// percent progress is not known.
const ProgressIndefinite float64 = -1
// Progress is used by JobExec to communicate updates to the job's progress to
// the JobManager.
type Progress struct {
processed int
total int
percent float64
currentTasks []*task
mutex sync.Mutex
updater *updater
}
type task struct {
description string
}
func (p *Progress) updated() {
var details []string
for _, t := range p.currentTasks {
details = append(details, t.description)
}
p.updater.updateProgress(p.percent, details)
}
// Indefinite sets the progress to an indefinite amount.
func (p *Progress) Indefinite() {
p.mutex.Lock()
defer p.mutex.Unlock()
p.total = 0
p.calculatePercent()
}
// SetTotal sets the total number of work units. This is used to calculate the
// progress percentage.
func (p *Progress) SetTotal(total int) {
p.mutex.Lock()
defer p.mutex.Unlock()
p.total = total
p.calculatePercent()
}
// SetProcessed sets the number of work units completed. This is used to
// calculate the progress percentage.
func (p *Progress) SetProcessed(processed int) {
p.mutex.Lock()
defer p.mutex.Unlock()
p.processed = processed
p.calculatePercent()
}
func (p *Progress) calculatePercent() {
if p.total <= 0 {
p.percent = ProgressIndefinite
} else if p.processed < 0 {
p.percent = 0
} else {
p.percent = float64(p.processed) / float64(p.total)
if p.percent > 1 {
p.percent = 1
}
}
p.updated()
}
// SetPercent sets the progress percent directly. This value will be
// overwritten if Indefinite, SetTotal, Increment or SetProcessed is called.
// Constrains the percent value between 0 and 1, inclusive.
func (p *Progress) SetPercent(percent float64) {
p.mutex.Lock()
defer p.mutex.Unlock()
if percent < 0 {
percent = 0
} else if percent > 1 {
percent = 1
}
p.percent = percent
p.updated()
}
// Increment increments the number of processed work units, if this does not
// exceed the total units. This is used to calculate the percentage.
func (p *Progress) Increment() {
p.mutex.Lock()
defer p.mutex.Unlock()
if p.processed < p.total {
p.processed += 1
p.calculatePercent()
}
}
func (p *Progress) addTask(t *task) {
p.mutex.Lock()
defer p.mutex.Unlock()
p.currentTasks = append(p.currentTasks, t)
p.updated()
}
func (p *Progress) removeTask(t *task) {
p.mutex.Lock()
defer p.mutex.Unlock()
for i, tt := range p.currentTasks {
if tt == t {
p.currentTasks = append(p.currentTasks[:i], p.currentTasks[i+1:]...)
p.updated()
return
}
}
}
// ExecuteTask executes a task as part of a job. The description is used to
// populate the Details slice in the parent Job.
func (p *Progress) ExecuteTask(description string, fn func()) {
t := &task{
description: description,
}
p.addTask(t)
defer p.removeTask(t)
fn()
}

145
pkg/job/progress_test.go Normal file
View File

@@ -0,0 +1,145 @@
package job
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func createProgress(m *Manager, j *Job) Progress {
return Progress{
updater: &updater{
m: m,
job: j,
},
total: 100,
processed: 10,
percent: 10,
}
}
func TestProgressIndefinite(t *testing.T) {
m := NewManager()
j := &Job{}
p := createProgress(m, j)
p.Indefinite()
assert := assert.New(t)
// ensure job progress was updated
assert.Equal(ProgressIndefinite, j.Progress)
}
func TestProgressSetTotal(t *testing.T) {
m := NewManager()
j := &Job{}
p := createProgress(m, j)
p.SetTotal(50)
assert := assert.New(t)
// ensure job progress was updated
assert.Equal(0.2, j.Progress)
p.SetTotal(0)
assert.Equal(ProgressIndefinite, j.Progress)
p.SetTotal(-10)
assert.Equal(ProgressIndefinite, j.Progress)
p.SetTotal(9)
assert.Equal(float64(1), j.Progress)
}
func TestProgressSetProcessed(t *testing.T) {
m := NewManager()
j := &Job{}
p := createProgress(m, j)
p.SetProcessed(30)
assert := assert.New(t)
// ensure job progress was updated
assert.Equal(0.3, j.Progress)
p.SetProcessed(-10)
assert.Equal(float64(0), j.Progress)
p.SetProcessed(200)
assert.Equal(float64(1), j.Progress)
}
func TestProgressSetPercent(t *testing.T) {
m := NewManager()
j := &Job{}
p := createProgress(m, j)
p.SetPercent(0.3)
assert := assert.New(t)
// ensure job progress was updated
assert.Equal(0.3, j.Progress)
p.SetPercent(-10)
assert.Equal(float64(0), j.Progress)
p.SetPercent(200)
assert.Equal(float64(1), j.Progress)
}
func TestProgressIncrement(t *testing.T) {
m := NewManager()
j := &Job{}
p := createProgress(m, j)
p.SetProcessed(49)
p.Increment()
assert := assert.New(t)
// ensure job progress was updated
assert.Equal(0.5, j.Progress)
p.SetProcessed(100)
p.Increment()
assert.Equal(float64(1), j.Progress)
}
func TestExecuteTask(t *testing.T) {
m := NewManager()
j := &Job{}
p := createProgress(m, j)
c := make(chan struct{}, 1)
const taskDesciption = "taskDescription"
go p.ExecuteTask(taskDesciption, func() {
<-c
})
time.Sleep(sleepTime)
assert := assert.New(t)
// ensure task is added to the job details
assert.Equal(taskDesciption, j.Details[0])
// allow task to finish
close(c)
time.Sleep(sleepTime)
// ensure task is removed from the job details
assert.Len(j.Details, 0)
}

36
pkg/job/subscribe.go Normal file
View File

@@ -0,0 +1,36 @@
package job
// ManagerSubscription is a collection of channels that will receive updates
// from the job manager.
type ManagerSubscription struct {
// new jobs are sent to this channel
NewJob <-chan Job
// removed jobs are sent to this channel
RemovedJob <-chan Job
// updated jobs are sent to this channel
UpdatedJob <-chan Job
newJob chan Job
removedJob chan Job
updatedJob chan Job
}
func newSubscription() *ManagerSubscription {
ret := &ManagerSubscription{
newJob: make(chan Job, 100),
removedJob: make(chan Job, 100),
updatedJob: make(chan Job, 100),
}
ret.NewJob = ret.newJob
ret.RemovedJob = ret.removedJob
ret.UpdatedJob = ret.updatedJob
return ret
}
func (s *ManagerSubscription) close() {
close(s.newJob)
close(s.removedJob)
close(s.updatedJob)
}

View File

@@ -1,46 +0,0 @@
package manager
type JobStatus int
const (
Idle JobStatus = 0
Import JobStatus = 1
Export JobStatus = 2
Scan JobStatus = 3
Generate JobStatus = 4
Clean JobStatus = 5
Scrape JobStatus = 6
AutoTag JobStatus = 7
Migrate JobStatus = 8
PluginOperation JobStatus = 9
StashBoxBatchPerformer JobStatus = 10
)
func (s JobStatus) String() string {
statusMessage := ""
switch s {
case Idle:
statusMessage = "Idle"
case Import:
statusMessage = "Import"
case Export:
statusMessage = "Export"
case Scan:
statusMessage = "Scan"
case Generate:
statusMessage = "Generate"
case AutoTag:
statusMessage = "Auto Tag"
case Migrate:
statusMessage = "Migrate"
case Clean:
statusMessage = "Clean"
case PluginOperation:
statusMessage = "Plugin Operation"
case StashBoxBatchPerformer:
statusMessage = "Stash-Box Performer Batch Operation"
}
return statusMessage
}

View File

@@ -12,6 +12,7 @@ import (
"github.com/stashapp/stash/pkg/database"
"github.com/stashapp/stash/pkg/dlna"
"github.com/stashapp/stash/pkg/ffmpeg"
"github.com/stashapp/stash/pkg/job"
"github.com/stashapp/stash/pkg/logger"
"github.com/stashapp/stash/pkg/manager/config"
"github.com/stashapp/stash/pkg/manager/paths"
@@ -25,12 +26,13 @@ import (
type singleton struct {
Config *config.Instance
Status TaskStatus
Paths *paths.Paths
Paths *paths.Paths
FFMPEGPath string
FFProbePath string
JobManager *job.Manager
PluginCache *plugin.Cache
ScraperCache *scraper.Cache
@@ -39,6 +41,8 @@ type singleton struct {
DLNAService *dlna.Service
TxnManager models.TransactionManager
scanSubs *subscriptionManager
}
var instance *singleton
@@ -63,10 +67,12 @@ func Initialize() *singleton {
instance = &singleton{
Config: cfg,
Status: TaskStatus{Status: Idle, Progress: -1},
JobManager: job.NewManager(),
DownloadStore: NewDownloadStore(),
TxnManager: sqlite.NewTransactionManager(),
scanSubs: &subscriptionManager{},
}
sceneServer := SceneServer{

File diff suppressed because it is too large Load Diff

44
pkg/manager/subscribe.go Normal file
View File

@@ -0,0 +1,44 @@
package manager
import (
"context"
"sync"
)
type subscriptionManager struct {
subscriptions []chan bool
mutex sync.Mutex
}
func (m *subscriptionManager) subscribe(ctx context.Context) <-chan bool {
m.mutex.Lock()
defer m.mutex.Unlock()
c := make(chan bool, 10)
m.subscriptions = append(m.subscriptions, c)
go func() {
<-ctx.Done()
m.mutex.Lock()
defer m.mutex.Unlock()
close(c)
for i, s := range m.subscriptions {
if s == c {
m.subscriptions = append(m.subscriptions[:i], m.subscriptions[i+1:]...)
break
}
}
}()
return c
}
func (m *subscriptionManager) notify() {
m.mutex.Lock()
defer m.mutex.Unlock()
for _, s := range m.subscriptions {
s <- true
}
}

View File

@@ -4,5 +4,5 @@ import "sync"
type Task interface {
Start(wg *sync.WaitGroup)
GetStatus() JobStatus
GetDescription() string
}

View File

@@ -2,23 +2,315 @@ package manager
import (
"context"
"fmt"
"path/filepath"
"strconv"
"strings"
"sync"
"github.com/stashapp/stash/pkg/autotag"
"github.com/stashapp/stash/pkg/job"
"github.com/stashapp/stash/pkg/logger"
"github.com/stashapp/stash/pkg/models"
)
type autoTagJob struct {
txnManager models.TransactionManager
input models.AutoTagMetadataInput
}
func (j *autoTagJob) Execute(ctx context.Context, progress *job.Progress) {
input := j.input
if j.isFileBasedAutoTag(input) {
// doing file-based auto-tag
j.autoTagFiles(ctx, progress, input.Paths, len(input.Performers) > 0, len(input.Studios) > 0, len(input.Tags) > 0)
} else {
// doing specific performer/studio/tag auto-tag
j.autoTagSpecific(ctx, progress)
}
}
func (j *autoTagJob) isFileBasedAutoTag(input models.AutoTagMetadataInput) bool {
const wildcard = "*"
performerIds := input.Performers
studioIds := input.Studios
tagIds := input.Tags
return (len(performerIds) == 0 || performerIds[0] == wildcard) && (len(studioIds) == 0 || studioIds[0] == wildcard) && (len(tagIds) == 0 || tagIds[0] == wildcard)
}
func (j *autoTagJob) autoTagFiles(ctx context.Context, progress *job.Progress, paths []string, performers, studios, tags bool) {
t := autoTagFilesTask{
paths: paths,
performers: performers,
studios: studios,
tags: tags,
ctx: ctx,
progress: progress,
txnManager: j.txnManager,
}
t.process()
}
func (j *autoTagJob) autoTagSpecific(ctx context.Context, progress *job.Progress) {
input := j.input
performerIds := input.Performers
studioIds := input.Studios
tagIds := input.Tags
performerCount := len(performerIds)
studioCount := len(studioIds)
tagCount := len(tagIds)
if err := j.txnManager.WithReadTxn(context.TODO(), func(r models.ReaderRepository) error {
performerQuery := r.Performer()
studioQuery := r.Studio()
tagQuery := r.Tag()
const wildcard = "*"
var err error
if performerCount == 1 && performerIds[0] == wildcard {
performerCount, err = performerQuery.Count()
if err != nil {
return fmt.Errorf("error getting performer count: %s", err.Error())
}
}
if studioCount == 1 && studioIds[0] == wildcard {
studioCount, err = studioQuery.Count()
if err != nil {
return fmt.Errorf("error getting studio count: %s", err.Error())
}
}
if tagCount == 1 && tagIds[0] == wildcard {
tagCount, err = tagQuery.Count()
if err != nil {
return fmt.Errorf("error getting tag count: %s", err.Error())
}
}
return nil
}); err != nil {
logger.Error(err.Error())
return
}
total := performerCount + studioCount + tagCount
progress.SetTotal(total)
logger.Infof("Starting autotag of %d performers, %d studios, %d tags", performerCount, studioCount, tagCount)
j.autoTagPerformers(ctx, progress, input.Paths, performerIds)
j.autoTagStudios(ctx, progress, input.Paths, studioIds)
j.autoTagTags(ctx, progress, input.Paths, tagIds)
logger.Info("Finished autotag")
}
func (j *autoTagJob) autoTagPerformers(ctx context.Context, progress *job.Progress, paths []string, performerIds []string) {
if job.IsCancelled(ctx) {
return
}
for _, performerId := range performerIds {
var performers []*models.Performer
if err := j.txnManager.WithReadTxn(context.TODO(), func(r models.ReaderRepository) error {
performerQuery := r.Performer()
if performerId == "*" {
var err error
performers, err = performerQuery.All()
if err != nil {
return fmt.Errorf("error querying performers: %s", err.Error())
}
} else {
performerIdInt, err := strconv.Atoi(performerId)
if err != nil {
return fmt.Errorf("error parsing performer id %s: %s", performerId, err.Error())
}
performer, err := performerQuery.Find(performerIdInt)
if err != nil {
return fmt.Errorf("error finding performer id %s: %s", performerId, err.Error())
}
if performer == nil {
return fmt.Errorf("performer with id %s not found", performerId)
}
performers = append(performers, performer)
}
for _, performer := range performers {
if job.IsCancelled(ctx) {
logger.Info("Stopping due to user request")
return nil
}
if err := j.txnManager.WithTxn(context.TODO(), func(r models.Repository) error {
if err := autotag.PerformerScenes(performer, paths, r.Scene()); err != nil {
return err
}
if err := autotag.PerformerImages(performer, paths, r.Image()); err != nil {
return err
}
if err := autotag.PerformerGalleries(performer, paths, r.Gallery()); err != nil {
return err
}
return nil
}); err != nil {
return fmt.Errorf("error auto-tagging performer '%s': %s", performer.Name.String, err.Error())
}
progress.Increment()
}
return nil
}); err != nil {
logger.Error(err.Error())
continue
}
}
}
func (j *autoTagJob) autoTagStudios(ctx context.Context, progress *job.Progress, paths []string, studioIds []string) {
if job.IsCancelled(ctx) {
return
}
for _, studioId := range studioIds {
var studios []*models.Studio
if err := j.txnManager.WithReadTxn(context.TODO(), func(r models.ReaderRepository) error {
studioQuery := r.Studio()
if studioId == "*" {
var err error
studios, err = studioQuery.All()
if err != nil {
return fmt.Errorf("error querying studios: %s", err.Error())
}
} else {
studioIdInt, err := strconv.Atoi(studioId)
if err != nil {
return fmt.Errorf("error parsing studio id %s: %s", studioId, err.Error())
}
studio, err := studioQuery.Find(studioIdInt)
if err != nil {
return fmt.Errorf("error finding studio id %s: %s", studioId, err.Error())
}
if studio == nil {
return fmt.Errorf("studio with id %s not found", studioId)
}
studios = append(studios, studio)
}
for _, studio := range studios {
if job.IsCancelled(ctx) {
logger.Info("Stopping due to user request")
return nil
}
if err := j.txnManager.WithTxn(context.TODO(), func(r models.Repository) error {
if err := autotag.StudioScenes(studio, paths, r.Scene()); err != nil {
return err
}
if err := autotag.StudioImages(studio, paths, r.Image()); err != nil {
return err
}
if err := autotag.StudioGalleries(studio, paths, r.Gallery()); err != nil {
return err
}
return nil
}); err != nil {
return fmt.Errorf("error auto-tagging studio '%s': %s", studio.Name.String, err.Error())
}
progress.Increment()
}
return nil
}); err != nil {
logger.Error(err.Error())
continue
}
}
}
func (j *autoTagJob) autoTagTags(ctx context.Context, progress *job.Progress, paths []string, tagIds []string) {
if job.IsCancelled(ctx) {
return
}
for _, tagId := range tagIds {
var tags []*models.Tag
if err := j.txnManager.WithReadTxn(context.TODO(), func(r models.ReaderRepository) error {
tagQuery := r.Tag()
if tagId == "*" {
var err error
tags, err = tagQuery.All()
if err != nil {
return fmt.Errorf("error querying tags: %s", err.Error())
}
} else {
tagIdInt, err := strconv.Atoi(tagId)
if err != nil {
return fmt.Errorf("error parsing tag id %s: %s", tagId, err.Error())
}
tag, err := tagQuery.Find(tagIdInt)
if err != nil {
return fmt.Errorf("error finding tag id %s: %s", tagId, err.Error())
}
tags = append(tags, tag)
}
for _, tag := range tags {
if job.IsCancelled(ctx) {
logger.Info("Stopping due to user request")
return nil
}
if err := j.txnManager.WithTxn(context.TODO(), func(r models.Repository) error {
if err := autotag.TagScenes(tag, paths, r.Scene()); err != nil {
return err
}
if err := autotag.TagImages(tag, paths, r.Image()); err != nil {
return err
}
if err := autotag.TagGalleries(tag, paths, r.Gallery()); err != nil {
return err
}
return nil
}); err != nil {
return fmt.Errorf("error auto-tagging tag '%s': %s", tag.Name, err.Error())
}
progress.Increment()
}
return nil
}); err != nil {
logger.Error(err.Error())
continue
}
}
}
type autoTagFilesTask struct {
paths []string
performers bool
studios bool
tags bool
ctx context.Context
progress *job.Progress
txnManager models.TransactionManager
status *TaskStatus
}
func (t *autoTagFilesTask) makeSceneFilter() *models.SceneFilterType {
@@ -144,7 +436,7 @@ func (t *autoTagFilesTask) batchFindFilter(batchSize int) *models.FindFilterType
}
func (t *autoTagFilesTask) processScenes(r models.ReaderRepository) error {
if t.status.stopping {
if job.IsCancelled(t.ctx) {
return nil
}
@@ -161,7 +453,7 @@ func (t *autoTagFilesTask) processScenes(r models.ReaderRepository) error {
}
for _, ss := range scenes {
if t.status.stopping {
if job.IsCancelled(t.ctx) {
return nil
}
@@ -178,7 +470,7 @@ func (t *autoTagFilesTask) processScenes(r models.ReaderRepository) error {
go tt.Start(&wg)
wg.Wait()
t.status.incrementProgress()
t.progress.Increment()
}
if len(scenes) != batchSize {
@@ -192,7 +484,7 @@ func (t *autoTagFilesTask) processScenes(r models.ReaderRepository) error {
}
func (t *autoTagFilesTask) processImages(r models.ReaderRepository) error {
if t.status.stopping {
if job.IsCancelled(t.ctx) {
return nil
}
@@ -209,7 +501,7 @@ func (t *autoTagFilesTask) processImages(r models.ReaderRepository) error {
}
for _, ss := range images {
if t.status.stopping {
if job.IsCancelled(t.ctx) {
return nil
}
@@ -226,7 +518,7 @@ func (t *autoTagFilesTask) processImages(r models.ReaderRepository) error {
go tt.Start(&wg)
wg.Wait()
t.status.incrementProgress()
t.progress.Increment()
}
if len(images) != batchSize {
@@ -240,7 +532,7 @@ func (t *autoTagFilesTask) processImages(r models.ReaderRepository) error {
}
func (t *autoTagFilesTask) processGalleries(r models.ReaderRepository) error {
if t.status.stopping {
if job.IsCancelled(t.ctx) {
return nil
}
@@ -257,7 +549,7 @@ func (t *autoTagFilesTask) processGalleries(r models.ReaderRepository) error {
}
for _, ss := range galleries {
if t.status.stopping {
if job.IsCancelled(t.ctx) {
return nil
}
@@ -274,7 +566,7 @@ func (t *autoTagFilesTask) processGalleries(r models.ReaderRepository) error {
go tt.Start(&wg)
wg.Wait()
t.status.incrementProgress()
t.progress.Increment()
}
if len(galleries) != batchSize {
@@ -294,7 +586,7 @@ func (t *autoTagFilesTask) process() {
return err
}
t.status.total = total
t.progress.SetTotal(total)
logger.Infof("Starting autotag of %d files", total)
@@ -310,7 +602,7 @@ func (t *autoTagFilesTask) process() {
return err
}
if t.status.stopping {
if job.IsCancelled(t.ctx) {
logger.Info("Stopping due to user request")
}

View File

@@ -93,10 +93,6 @@ func CreateExportTask(a models.HashAlgorithm, input models.ExportObjectsInput) *
}
}
func (t *ExportTask) GetStatus() JobStatus {
return Export
}
func (t *ExportTask) Start(wg *sync.WaitGroup) {
defer wg.Done()
// @manager.total = Scene.count + Gallery.count + Performer.count + Studio.count + Movie.count

View File

@@ -75,8 +75,8 @@ func CreateImportTask(a models.HashAlgorithm, input models.ImportObjectsInput) (
}, nil
}
func (t *ImportTask) GetStatus() JobStatus {
return Import
func (t *ImportTask) GetDescription() string {
return "Importing..."
}
func (t *ImportTask) Start(wg *sync.WaitGroup) {

View File

@@ -1,25 +1,19 @@
package manager
import (
"time"
"context"
"fmt"
"github.com/stashapp/stash/pkg/job"
"github.com/stashapp/stash/pkg/logger"
"github.com/stashapp/stash/pkg/models"
"github.com/stashapp/stash/pkg/plugin/common"
)
func (s *singleton) RunPluginTask(pluginID string, taskName string, args []*models.PluginArgInput, serverConnection common.StashServerConnection) {
if s.Status.Status != Idle {
return
}
s.Status.SetStatus(PluginOperation)
s.Status.indefiniteProgress()
go func() {
defer s.returnToIdleState()
progress := make(chan float64)
task, err := s.PluginCache.CreateTask(pluginID, taskName, serverConnection, args, progress)
func (s *singleton) RunPluginTask(pluginID string, taskName string, args []*models.PluginArgInput, serverConnection common.StashServerConnection) int {
j := job.MakeJobExec(func(ctx context.Context, progress *job.Progress) {
pluginProgress := make(chan float64)
task, err := s.PluginCache.CreateTask(pluginID, taskName, serverConnection, args, pluginProgress)
if err != nil {
logger.Errorf("Error creating plugin task: %s", err.Error())
return
@@ -48,24 +42,20 @@ func (s *singleton) RunPluginTask(pluginID string, taskName string, args []*mode
}
}()
// TODO - refactor stop to use channels
// check for stop every five seconds
pollingTime := time.Second * 5
stopPoller := time.Tick(pollingTime)
for {
select {
case <-done:
return
case p := <-progress:
s.Status.setProgressPercent(p)
case <-stopPoller:
if s.Status.stopping {
if err := task.Stop(); err != nil {
logger.Errorf("Error stopping plugin operation: %s", err.Error())
}
return
case p := <-pluginProgress:
progress.SetPercent(p)
case <-ctx.Done():
if err := task.Stop(); err != nil {
logger.Errorf("Error stopping plugin operation: %s", err.Error())
}
return
}
}
}()
})
return s.JobManager.Add(fmt.Sprintf("Running plugin task: %s", taskName), j)
}

View File

@@ -4,6 +4,7 @@ import (
"archive/zip"
"context"
"database/sql"
"errors"
"fmt"
"os"
"path/filepath"
@@ -16,6 +17,7 @@ import (
"github.com/stashapp/stash/pkg/ffmpeg"
"github.com/stashapp/stash/pkg/gallery"
"github.com/stashapp/stash/pkg/image"
"github.com/stashapp/stash/pkg/job"
"github.com/stashapp/stash/pkg/logger"
"github.com/stashapp/stash/pkg/manager/config"
"github.com/stashapp/stash/pkg/models"
@@ -23,6 +25,175 @@ import (
"github.com/stashapp/stash/pkg/utils"
)
type ScanJob struct {
txnManager models.TransactionManager
input models.ScanMetadataInput
subscriptions *subscriptionManager
}
func (j *ScanJob) Execute(ctx context.Context, progress *job.Progress) {
input := j.input
paths := getScanPaths(input.Paths)
var total *int
var newFiles *int
progress.ExecuteTask("Counting files to scan...", func() {
total, newFiles = j.neededScan(ctx, paths)
})
if job.IsCancelled(ctx) {
logger.Info("Stopping due to user request")
return
}
if total == nil || newFiles == nil {
logger.Infof("Taking too long to count content. Skipping...")
logger.Infof("Starting scan")
} else {
logger.Infof("Starting scan of %d files. %d New files found", *total, *newFiles)
}
start := time.Now()
config := config.GetInstance()
parallelTasks := config.GetParallelTasksWithAutoDetection()
logger.Infof("Scan started with %d parallel tasks", parallelTasks)
wg := sizedwaitgroup.New(parallelTasks)
if total != nil {
progress.SetTotal(*total)
}
fileNamingAlgo := config.GetVideoFileNamingAlgorithm()
calculateMD5 := config.IsCalculateMD5()
stoppingErr := errors.New("stopping")
var err error
var galleries []string
for _, sp := range paths {
err = walkFilesToScan(sp, func(path string, info os.FileInfo, err error) error {
if job.IsCancelled(ctx) {
return stoppingErr
}
if isGallery(path) {
galleries = append(galleries, path)
}
instance.Paths.Generated.EnsureTmpDir()
wg.Add()
task := ScanTask{
TxnManager: j.txnManager,
FilePath: path,
UseFileMetadata: utils.IsTrue(input.UseFileMetadata),
StripFileExtension: utils.IsTrue(input.StripFileExtension),
fileNamingAlgorithm: fileNamingAlgo,
calculateMD5: calculateMD5,
GeneratePreview: utils.IsTrue(input.ScanGeneratePreviews),
GenerateImagePreview: utils.IsTrue(input.ScanGenerateImagePreviews),
GenerateSprite: utils.IsTrue(input.ScanGenerateSprites),
GeneratePhash: utils.IsTrue(input.ScanGeneratePhashes),
progress: progress,
}
go func() {
task.Start(&wg)
progress.Increment()
}()
return nil
})
if err == stoppingErr {
logger.Info("Stopping due to user request")
break
}
if err != nil {
logger.Errorf("Error encountered scanning files: %s", err.Error())
break
}
}
wg.Wait()
instance.Paths.Generated.EmptyTmpDir()
elapsed := time.Since(start)
logger.Info(fmt.Sprintf("Scan finished (%s)", elapsed))
if job.IsCancelled(ctx) || err != nil {
return
}
progress.ExecuteTask("Associating galleries", func() {
for _, path := range galleries {
wg.Add()
task := ScanTask{
TxnManager: j.txnManager,
FilePath: path,
UseFileMetadata: false,
}
go task.associateGallery(&wg)
wg.Wait()
}
logger.Info("Finished gallery association")
})
j.subscriptions.notify()
}
func (j *ScanJob) neededScan(ctx context.Context, paths []*models.StashConfig) (total *int, newFiles *int) {
const timeout = 90 * time.Second
// create a control channel through which to signal the counting loop when the timeout is reached
chTimeout := time.After(timeout)
logger.Infof("Counting files to scan...")
t := 0
n := 0
timeoutErr := errors.New("timed out")
for _, sp := range paths {
err := walkFilesToScan(sp, func(path string, info os.FileInfo, err error) error {
t++
task := ScanTask{FilePath: path, TxnManager: j.txnManager}
if !task.doesPathExist() {
n++
}
//check for timeout
select {
case <-chTimeout:
return timeoutErr
default:
}
// check stop
if job.IsCancelled(ctx) {
return timeoutErr
}
return nil
})
if err == timeoutErr {
// timeout should return nil counts
return nil, nil
}
if err != nil {
logger.Errorf("Error encountered counting files to scan: %s", err.Error())
return nil, nil
}
}
return &t, &n
}
type ScanTask struct {
TxnManager models.TransactionManager
FilePath string
@@ -35,40 +206,57 @@ type ScanTask struct {
GeneratePreview bool
GenerateImagePreview bool
zipGallery *models.Gallery
progress *job.Progress
}
func (t *ScanTask) Start(wg *sizedwaitgroup.SizedWaitGroup) {
if isGallery(t.FilePath) {
t.scanGallery()
} else if isVideo(t.FilePath) {
s := t.scanScene()
defer wg.Done()
if s != nil {
iwg := sizedwaitgroup.New(2)
var s *models.Scene
if t.GenerateSprite {
iwg.Add()
t.progress.ExecuteTask("Scanning "+t.FilePath, func() {
if isGallery(t.FilePath) {
t.scanGallery()
} else if isVideo(t.FilePath) {
s = t.scanScene()
} else if isImage(t.FilePath) {
t.scanImage()
}
})
if s != nil {
iwg := sizedwaitgroup.New(2)
if t.GenerateSprite {
iwg.Add()
go t.progress.ExecuteTask(fmt.Sprintf("Generating sprites for %s", t.FilePath), func() {
taskSprite := GenerateSpriteTask{
Scene: *s,
Overwrite: false,
fileNamingAlgorithm: t.fileNamingAlgorithm,
}
go taskSprite.Start(&iwg)
}
taskSprite.Start(&iwg)
})
}
if t.GeneratePhash {
iwg.Add()
if t.GeneratePhash {
iwg.Add()
go t.progress.ExecuteTask(fmt.Sprintf("Generating phash for %s", t.FilePath), func() {
taskPhash := GeneratePhashTask{
Scene: *s,
fileNamingAlgorithm: t.fileNamingAlgorithm,
txnManager: t.TxnManager,
}
go taskPhash.Start(&iwg)
}
taskPhash.Start(&iwg)
})
}
if t.GeneratePreview {
iwg.Add()
if t.GeneratePreview {
iwg.Add()
go t.progress.ExecuteTask(fmt.Sprintf("Generating preview for %s", t.FilePath), func() {
config := config.GetInstance()
var previewSegmentDuration = config.GetPreviewSegmentDuration()
var previewSegments = config.GetPreviewSegments()
@@ -92,16 +280,12 @@ func (t *ScanTask) Start(wg *sizedwaitgroup.SizedWaitGroup) {
Overwrite: false,
fileNamingAlgorithm: t.fileNamingAlgorithm,
}
go taskPreview.Start(&iwg)
}
iwg.Wait()
taskPreview.Start(wg)
})
}
} else if isImage(t.FilePath) {
t.scanImage()
}
wg.Done()
iwg.Wait()
}
}
func (t *ScanTask) scanGallery() {

View File

@@ -3,6 +3,7 @@ package manager
import (
"context"
"database/sql"
"fmt"
"sync"
"time"
@@ -27,6 +28,17 @@ func (t *StashBoxPerformerTagTask) Start(wg *sync.WaitGroup) {
t.stashBoxPerformerTag()
}
func (t *StashBoxPerformerTagTask) Description() string {
var name string
if t.name != nil {
name = *t.name
} else if t.performer != nil {
name = t.performer.Name.String
}
return fmt.Sprintf("Tagging performer %s from stash-box", name)
}
func (t *StashBoxPerformerTagTask) stashBoxPerformerTag() {
var performer *models.ScrapedScenePerformer
var err error

View File

@@ -1,4 +1,5 @@
### ✨ New Features
* Revamped job management: tasks can now be queued. ([#1379](https://github.com/stashapp/stash/pull/1379))
* Added Handy/Funscript support. ([#1377](https://github.com/stashapp/stash/pull/1377))
* Added Performers tab to Studio page. ([#1405](https://github.com/stashapp/stash/pull/1405))
* Added [DLNA server](/settings?tab=dlna). ([#1364](https://github.com/stashapp/stash/pull/1364))

View File

@@ -22,7 +22,7 @@ export const GenerateButton: React.FC = () => {
markers,
transcodes,
});
Toast.success({ content: "Started generating" });
Toast.success({ content: "Added generation job to queue" });
} catch (e) {
Toast.error(e);
}

View File

@@ -0,0 +1,211 @@
import React, { useState, useEffect } from "react";
import { Button, ProgressBar } from "react-bootstrap";
import {
mutateStopJob,
useJobQueue,
useJobsSubscribe,
} from "src/core/StashService";
import * as GQL from "src/core/generated-graphql";
import { Icon } from "src/components/Shared";
import { IconProp } from "@fortawesome/fontawesome-svg-core";
type JobFragment = Pick<
GQL.Job,
"id" | "status" | "subTasks" | "description" | "progress"
>;
interface IJob {
job: JobFragment;
}
const Task: React.FC<IJob> = ({ job }) => {
const [stopping, setStopping] = useState(false);
const [className, setClassName] = useState("");
useEffect(() => {
setTimeout(() => setClassName("fade-in"));
}, []);
useEffect(() => {
if (
job.status === GQL.JobStatus.Cancelled ||
job.status === GQL.JobStatus.Finished
) {
// fade out around 10 seconds
setTimeout(() => {
setClassName("fade-out");
}, 9800);
}
}, [job]);
async function stopJob() {
setStopping(true);
await mutateStopJob(job.id);
}
function canStop() {
return (
!stopping &&
(job.status === GQL.JobStatus.Ready ||
job.status === GQL.JobStatus.Running)
);
}
function getStatusClass() {
switch (job.status) {
case GQL.JobStatus.Ready:
return "ready";
case GQL.JobStatus.Running:
return "running";
case GQL.JobStatus.Stopping:
return "stopping";
case GQL.JobStatus.Finished:
return "finished";
case GQL.JobStatus.Cancelled:
return "cancelled";
}
}
function getStatusIcon() {
let icon: IconProp = "circle";
let iconClass = "";
switch (job.status) {
case GQL.JobStatus.Ready:
icon = "hourglass-start";
break;
case GQL.JobStatus.Running:
icon = "cog";
iconClass = "fa-spin";
break;
case GQL.JobStatus.Stopping:
icon = "cog";
iconClass = "fa-spin";
break;
case GQL.JobStatus.Finished:
icon = "check";
break;
case GQL.JobStatus.Cancelled:
icon = "ban";
break;
}
return <Icon icon={icon} className={`fa-fw ${iconClass}`} />;
}
function maybeRenderProgress() {
if (
job.status === GQL.JobStatus.Running &&
job.progress !== undefined &&
job.progress !== null
) {
const progress = job.progress * 100;
return (
<ProgressBar
animated
now={progress}
label={`${progress.toFixed(0)}%`}
/>
);
}
}
function maybeRenderSubTasks() {
if (
job.status === GQL.JobStatus.Running ||
job.status === GQL.JobStatus.Stopping
) {
return (
<div>
{/* eslint-disable react/no-array-index-key */}
{(job.subTasks ?? []).map((t, i) => (
<div className="job-subtask" key={i}>
{t}
</div>
))}
{/* eslint-enable react/no-array-index-key */}
</div>
);
}
}
return (
<li className={`job ${className}`}>
<div>
<Button
className="minimal stop"
size="sm"
onClick={() => stopJob()}
disabled={!canStop()}
>
<Icon icon="times" />
</Button>
<div className={`job-status ${getStatusClass()}`}>
<div>
{getStatusIcon()}
<span>{job.description}</span>
</div>
<div>{maybeRenderProgress()}</div>
{maybeRenderSubTasks()}
</div>
</div>
</li>
);
};
export const JobTable: React.FC = () => {
const jobStatus = useJobQueue();
const jobsSubscribe = useJobsSubscribe();
const [queue, setQueue] = useState<JobFragment[]>([]);
useEffect(() => {
setQueue(jobStatus.data?.jobQueue ?? []);
}, [jobStatus]);
useEffect(() => {
if (!jobsSubscribe.data) {
return;
}
const event = jobsSubscribe.data.jobsSubscribe;
function updateJob() {
setQueue((q) =>
q.map((j) => {
if (j.id === event.job.id) {
return event.job;
}
return j;
})
);
}
switch (event.type) {
case GQL.JobStatusUpdateType.Add:
// add to the end of the queue
setQueue((q) => q.concat([event.job]));
break;
case GQL.JobStatusUpdateType.Remove:
// update the job then remove after a timeout
updateJob();
setTimeout(() => {
setQueue((q) => q.filter((j) => j.id !== event.job.id));
}, 10000);
break;
case GQL.JobStatusUpdateType.Update:
updateJob();
break;
}
}, [jobsSubscribe.data]);
return (
<div className="job-table">
<ul>
{(queue ?? []).map((j) => (
<Task job={j} key={j.id} />
))}
</ul>
</div>
);
};

View File

@@ -1,15 +1,12 @@
import React, { useState, useEffect } from "react";
import { Button, Form, ProgressBar } from "react-bootstrap";
import React, { useState } from "react";
import { Button, Form } from "react-bootstrap";
import {
useJobStatus,
useMetadataUpdate,
mutateMetadataImport,
mutateMetadataClean,
mutateMetadataScan,
mutateMetadataAutoTag,
mutateMetadataExport,
mutateMigrateHashNaming,
mutateStopJob,
usePlugins,
mutateRunPluginTask,
mutateBackupDatabase,
@@ -21,6 +18,7 @@ import { downloadFile } from "src/utils";
import { GenerateButton } from "./GenerateButton";
import { ImportDialog } from "./ImportDialog";
import { DirectorySelectionDialog } from "./DirectorySelectionDialog";
import { JobTable } from "./JobTable";
type Plugin = Pick<GQL.Plugin, "id">;
type PluginTask = Pick<GQL.PluginTask, "name" | "description">;
@@ -52,78 +50,20 @@ export const SettingsTasksPanel: React.FC = () => {
setScanGenerateImagePreviews,
] = useState<boolean>(false);
const [status, setStatus] = useState<string>("");
const [progress, setProgress] = useState<number>(0);
const [autoTagPerformers, setAutoTagPerformers] = useState<boolean>(true);
const [autoTagStudios, setAutoTagStudios] = useState<boolean>(true);
const [autoTagTags, setAutoTagTags] = useState<boolean>(true);
const jobStatus = useJobStatus();
const metadataUpdate = useMetadataUpdate();
const plugins = usePlugins();
function statusToText(s: string) {
switch (s) {
case "Idle":
return "Idle";
case "Scan":
return "Scanning for new content";
case "Generate":
return "Generating supporting files";
case "Clean":
return "Cleaning the database";
case "Export":
return "Exporting to JSON";
case "Import":
return "Importing from JSON";
case "Auto Tag":
return "Auto tagging scenes";
case "Plugin Operation":
return "Running Plugin Operation";
case "Migrate":
return "Migrating";
case "Stash-Box Performer Batch Operation":
return "Tagging performers from Stash-Box instance";
default:
return "Idle";
}
}
useEffect(() => {
if (jobStatus?.data?.jobStatus) {
setStatus(statusToText(jobStatus.data.jobStatus.status));
const newProgress = jobStatus.data.jobStatus.progress;
if (newProgress < 0) {
setProgress(-1);
} else {
setProgress(newProgress * 100);
}
}
}, [jobStatus]);
useEffect(() => {
if (metadataUpdate?.data?.metadataUpdate) {
setStatus(statusToText(metadataUpdate.data.metadataUpdate.status));
const newProgress = metadataUpdate.data.metadataUpdate.progress;
if (newProgress < 0) {
setProgress(-1);
} else {
setProgress(newProgress * 100);
}
}
}, [metadataUpdate]);
function onImport() {
async function onImport() {
setIsImportAlertOpen(false);
mutateMetadataImport()
.then(() => {
jobStatus.refetch();
})
.catch((e) => {
Toast.error(e);
});
try {
await mutateMetadataImport();
Toast.success({ content: "Added import task to queue" });
} catch (e) {
Toast.error(e);
}
}
function renderImportAlert() {
@@ -146,8 +86,6 @@ export const SettingsTasksPanel: React.FC = () => {
setIsCleanAlertOpen(false);
mutateMetadataClean({
dryRun: cleanDryRun,
}).then(() => {
jobStatus.refetch();
});
}
@@ -216,8 +154,7 @@ export const SettingsTasksPanel: React.FC = () => {
scanGenerateSprites,
scanGeneratePhashes,
});
Toast.success({ content: "Started scan" });
jobStatus.refetch();
Toast.success({ content: "Added scan to job queue" });
} catch (e) {
Toast.error(e);
}
@@ -252,53 +189,15 @@ export const SettingsTasksPanel: React.FC = () => {
async function onAutoTag(paths?: string[]) {
try {
await mutateMetadataAutoTag(getAutoTagInput(paths));
Toast.success({ content: "Started auto tagging" });
jobStatus.refetch();
Toast.success({ content: "Added Auto tagging job to queue" });
} catch (e) {
Toast.error(e);
}
}
function maybeRenderStop() {
if (!status || status === "Idle") {
return undefined;
}
return (
<Form.Group>
<Button
id="stop"
variant="danger"
onClick={() => mutateStopJob().then(() => jobStatus.refetch())}
>
Stop
</Button>
</Form.Group>
);
}
function renderJobStatus() {
return (
<>
<Form.Group>
<h5>Status: {status}</h5>
{!!status && status !== "Idle" ? (
<ProgressBar
animated
now={progress > -1 ? progress : 100}
label={progress > -1 ? `${progress.toFixed(0)}%` : ""}
/>
) : (
""
)}
</Form.Group>
{maybeRenderStop()}
</>
);
}
async function onPluginTaskClicked(plugin: Plugin, operation: PluginTask) {
await mutateRunPluginTask(plugin.id, operation.name);
Toast.success({ content: `Added ${operation.name} job to queue` });
}
function renderPluginTasks(plugin: Plugin, pluginTasks: PluginTask[]) {
@@ -366,6 +265,24 @@ export const SettingsTasksPanel: React.FC = () => {
);
}
async function onMigrateHashNaming() {
try {
await mutateMigrateHashNaming();
Toast.success({ content: "Added hash migration task to queue" });
} catch (err) {
Toast.error(err);
}
}
async function onExport() {
try {
await mutateMetadataExport();
Toast.success({ content: "Added export task to queue" });
} catch (err) {
Toast.error(err);
}
}
if (isBackupRunning) {
return <LoadingIndicator message="Backup up database" />;
}
@@ -378,9 +295,9 @@ export const SettingsTasksPanel: React.FC = () => {
{renderScanDialog()}
{renderAutoTagDialog()}
<h4>Running Jobs</h4>
<h4>Job Queue</h4>
{renderJobStatus()}
<JobTable />
<hr />
@@ -533,13 +450,7 @@ export const SettingsTasksPanel: React.FC = () => {
id="export"
variant="secondary"
type="submit"
onClick={() =>
mutateMetadataExport()
.then(() => {
jobStatus.refetch();
})
.catch((e) => Toast.error(e))
}
onClick={() => onExport()}
>
Full Export
</Button>
@@ -619,11 +530,7 @@ export const SettingsTasksPanel: React.FC = () => {
<Button
id="migrateHashNaming"
variant="danger"
onClick={() =>
mutateMigrateHashNaming().then(() => {
jobStatus.refetch();
})
}
onClick={() => onMigrateHashNaming()}
>
Rename generated files
</Button>

View File

@@ -71,6 +71,54 @@
}
}
.job-table {
height: 10em;
overflow-y: auto;
ul {
list-style: none;
padding-inline-start: 0;
}
li {
opacity: 0;
transition: opacity 0.25s;
&.fade-in {
opacity: 1;
}
> div {
align-items: flex-start;
display: flex;
}
}
.job-status {
width: 100%;
}
.stop:not(:disabled),
.stopping .fa-icon,
.cancelled .fa-icon {
color: $danger;
}
.running .fa-icon,
.finished .fa-icon {
color: $success;
}
.ready .fa-icon {
color: $warning;
}
.cancelled,
.finished {
color: $text-muted;
}
}
#temp-enable-duration .duration-control:disabled {
opacity: 0.5;
}

View File

@@ -1,4 +1,4 @@
import React, { useRef, useState } from "react";
import React, { useEffect, useRef, useState } from "react";
import { Button, Card, Form, InputGroup, ProgressBar } from "react-bootstrap";
import { Link } from "react-router-dom";
import { HashLink } from "react-router-hash-link";
@@ -9,7 +9,8 @@ import { LoadingIndicator, Modal } from "src/components/Shared";
import {
stashBoxPerformerQuery,
useConfiguration,
useMetadataUpdate,
useJobsSubscribe,
mutateStashBoxBatchPerformerTag,
} from "src/core/StashService";
import { Manual } from "src/components/Help/Manual";
@@ -24,6 +25,11 @@ import {
import PerformerModal from "../PerformerModal";
import { useUpdatePerformer } from "../queries";
type JobFragment = Pick<
GQL.Job,
"id" | "status" | "subTasks" | "description" | "progress"
>;
const CLASSNAME = "PerformerTagger";
interface IPerformerTaggerListProps {
@@ -32,6 +38,8 @@ interface IPerformerTaggerListProps {
isIdle: boolean;
config: ITaggerConfig;
stashBoxes?: GQL.StashBox[];
onBatchAdd: (performerInput: string) => void;
onBatchUpdate: (ids: string[] | undefined, refresh: boolean) => void;
}
const PerformerTaggerList: React.FC<IPerformerTaggerListProps> = ({
@@ -40,6 +48,8 @@ const PerformerTaggerList: React.FC<IPerformerTaggerListProps> = ({
isIdle,
config,
stashBoxes,
onBatchAdd,
onBatchUpdate,
}) => {
const [loading, setLoading] = useState(false);
const [searchResults, setSearchResults] = useState<
@@ -73,7 +83,6 @@ const PerformerTaggerList: React.FC<IPerformerTaggerListProps> = ({
const [showBatchAdd, setShowBatchAdd] = useState(false);
const [showBatchUpdate, setShowBatchUpdate] = useState(false);
const performerInput = useRef<HTMLTextAreaElement | null>(null);
const [doBatchQuery] = GQL.useStashBoxBatchPerformerTagMutation();
const [error, setError] = useState<
Record<string, { message?: string; details?: string } | undefined>
@@ -138,40 +147,15 @@ const PerformerTaggerList: React.FC<IPerformerTaggerListProps> = ({
.finally(() => setLoadingUpdate(undefined));
};
const handleBatchAdd = () => {
async function handleBatchAdd() {
if (performerInput.current) {
const names = performerInput.current.value
.split(",")
.map((n) => n.trim())
.filter((n) => n.length > 0);
if (names.length > 0) {
doBatchQuery({
variables: {
input: {
performer_names: names,
endpoint: selectedEndpoint.index,
refresh: false,
},
},
});
}
onBatchAdd(performerInput.current.value);
}
setShowBatchAdd(false);
};
}
const handleBatchUpdate = () => {
const ids = !queryAll ? performers.map((p) => p.id) : undefined;
doBatchQuery({
variables: {
input: {
performer_ids: ids,
endpoint: selectedEndpoint.index,
refresh,
exclude_fields: config.excludedPerformerFields ?? [],
},
},
});
onBatchUpdate(!queryAll ? performers.map((p) => p.id) : undefined, refresh);
setShowBatchUpdate(false);
};
@@ -506,7 +490,7 @@ interface ITaggerProps {
}
export const PerformerTagger: React.FC<ITaggerProps> = ({ performers }) => {
const jobStatus = useMetadataUpdate();
const jobsSubscribe = useJobsSubscribe();
const stashConfig = useConfiguration();
const [{ data: config }, setConfig] = useLocalForage<ITaggerConfig>(
LOCAL_FORAGE_KEY,
@@ -515,6 +499,28 @@ export const PerformerTagger: React.FC<ITaggerProps> = ({ performers }) => {
const [showConfig, setShowConfig] = useState(false);
const [showManual, setShowManual] = useState(false);
const [batchJobID, setBatchJobID] = useState<string | undefined | null>();
const [batchJob, setBatchJob] = useState<JobFragment | undefined>();
// monitor batch operation
useEffect(() => {
if (!jobsSubscribe.data) {
return;
}
const event = jobsSubscribe.data.jobsSubscribe;
if (event.job.id !== batchJobID) {
return;
}
if (event.type !== GQL.JobStatusUpdateType.Remove) {
setBatchJob(event.job);
} else {
setBatchJob(undefined);
setBatchJobID(undefined);
}
}, [jobsSubscribe, batchJobID]);
if (!config) return <LoadingIndicator />;
const savedEndpointIndex =
@@ -529,12 +535,73 @@ export const PerformerTagger: React.FC<ITaggerProps> = ({ performers }) => {
const selectedEndpoint =
stashConfig.data?.configuration.general.stashBoxes[selectedEndpointIndex];
const progress =
jobStatus.data?.metadataUpdate.status ===
"Stash-Box Performer Batch Operation" &&
jobStatus.data.metadataUpdate.progress >= 0
? jobStatus.data.metadataUpdate.progress * 100
: null;
async function batchAdd(performerInput: string) {
if (performerInput && selectedEndpoint) {
const names = performerInput
.split(",")
.map((n) => n.trim())
.filter((n) => n.length > 0);
if (names.length > 0) {
const ret = await mutateStashBoxBatchPerformerTag({
performer_names: names,
endpoint: selectedEndpointIndex,
refresh: false,
});
setBatchJobID(ret.data?.stashBoxBatchPerformerTag);
}
}
}
async function batchUpdate(ids: string[] | undefined, refresh: boolean) {
if (config && selectedEndpoint) {
const ret = await mutateStashBoxBatchPerformerTag({
performer_ids: ids,
endpoint: selectedEndpointIndex,
refresh,
exclude_fields: config.excludedPerformerFields ?? [],
});
setBatchJobID(ret.data?.stashBoxBatchPerformerTag);
}
}
// const progress =
// jobStatus.data?.metadataUpdate.status ===
// "Stash-Box Performer Batch Operation" &&
// jobStatus.data.metadataUpdate.progress >= 0
// ? jobStatus.data.metadataUpdate.progress * 100
// : null;
function renderStatus() {
if (batchJob) {
const progress =
batchJob.progress !== undefined && batchJob.progress !== null
? batchJob.progress * 100
: undefined;
return (
<Form.Group className="px-4">
<h5>Status: Tagging performers</h5>
{progress !== undefined && (
<ProgressBar
animated
now={progress}
label={`${progress.toFixed(0)}%`}
/>
)}
</Form.Group>
);
}
if (batchJobID !== undefined) {
return (
<Form.Group className="px-4">
<h5>Status: Tagging job queued</h5>
</Form.Group>
);
}
}
return (
<>
@@ -543,16 +610,7 @@ export const PerformerTagger: React.FC<ITaggerProps> = ({ performers }) => {
onClose={() => setShowManual(false)}
defaultActiveTab="Tagger.md"
/>
{progress !== null && (
<Form.Group className="px-4">
<h5>Status: Tagging performers</h5>
<ProgressBar
animated
now={progress}
label={`${progress.toFixed(0)}%`}
/>
</Form.Group>
)}
{renderStatus()}
<div className="tagger-container mx-md-auto">
{selectedEndpointIndex !== -1 && selectedEndpoint ? (
<>
@@ -581,9 +639,11 @@ export const PerformerTagger: React.FC<ITaggerProps> = ({ performers }) => {
endpoint: selectedEndpoint.endpoint,
index: selectedEndpointIndex,
}}
isIdle={progress === null}
isIdle={batchJobID === undefined}
config={config}
stashBoxes={stashConfig.data?.configuration.general.stashBoxes}
onBatchAdd={batchAdd}
onBatchUpdate={batchUpdate}
/>
</>
) : (

View File

@@ -701,6 +701,8 @@ export const useGenerateAPIKey = () =>
update: deleteCache([GQL.ConfigurationDocument]),
});
export const useJobsSubscribe = () => GQL.useJobsSubscribeSubscription();
export const useConfigureDLNA = () =>
GQL.useConfigureDlnaMutation({
refetchQueries: getQueryNames([GQL.ConfigurationDocument]),
@@ -715,8 +717,6 @@ export const useAddTempDLNAIP = () => GQL.useAddTempDlnaipMutation();
export const useRemoveTempDLNAIP = () => GQL.useRemoveTempDlnaipMutation();
export const useMetadataUpdate = () => GQL.useMetadataUpdateSubscription();
export const useLoggingSubscribe = () => GQL.useLoggingSubscribeSubscription();
export const querySystemStatus = () =>
@@ -735,14 +735,17 @@ export const useLogs = () =>
fetchPolicy: "no-cache",
});
export const useJobStatus = () =>
GQL.useJobStatusQuery({
export const useJobQueue = () =>
GQL.useJobQueueQuery({
fetchPolicy: "no-cache",
});
export const mutateStopJob = () =>
export const mutateStopJob = (jobID: string) =>
client.mutate<GQL.StopJobMutation>({
mutation: GQL.StopJobDocument,
variables: {
job_id: jobID,
},
});
export const useDLNAStatus = () =>
@@ -942,6 +945,14 @@ export const mutateBackupDatabase = (input: GQL.BackupDatabaseInput) =>
variables: { input },
});
export const mutateStashBoxBatchPerformerTag = (
input: GQL.StashBoxBatchPerformerTagInput
) =>
client.mutate<GQL.StashBoxBatchPerformerTagMutation>({
mutation: GQL.StashBoxBatchPerformerTagDocument,
variables: { input },
});
export const querySceneByPathRegex = (filter: GQL.FindFilterType) =>
client.query<GQL.FindScenesByPathRegexQuery>({
query: GQL.FindScenesByPathRegexDocument,

View File

@@ -142,23 +142,13 @@ export const createClient = () => {
});
// Watch for scan/clean tasks and reset cache when they complete
let prevStatus = "Idle";
client
.subscribe<GQL.MetadataUpdateSubscription>({
query: GQL.MetadataUpdateDocument,
.subscribe<GQL.ScanCompleteSubscribeSubscription>({
query: GQL.ScanCompleteSubscribeDocument,
})
.subscribe({
next: (res) => {
const currentStatus = res.data?.metadataUpdate.status;
if (currentStatus) {
if (
currentStatus === "Idle" &&
(prevStatus === "Scan" || prevStatus === "Clean")
) {
client.resetStore();
}
prevStatus = currentStatus;
}
next: () => {
client.resetStore();
},
});

View File

@@ -47,12 +47,17 @@ button.minimal {
color: $text-color;
transition: none;
&:hover {
&:disabled {
background: none;
opacity: 0.5;
}
&:hover:not(:disabled) {
background: rgba(138, 155, 168, 0.15);
color: $text-color;
}
&:active {
&:active:not(:disabled) {
background: rgba(138, 155, 168, 0.3);
color: $text-color;
}