Export performance optimization (#475)

* recreate metadata path if needed, before exporting data
This commit is contained in:
bnkai
2020-04-24 05:52:21 +03:00
committed by GitHub
parent ba09bfa64a
commit 9b1518beae
147 changed files with 11923 additions and 9958 deletions

View File

@@ -3,15 +3,18 @@ package manager
import (
"context"
"fmt"
"math"
"strconv"
"sync"
"github.com/jmoiron/sqlx"
"github.com/stashapp/stash/pkg/database"
"github.com/stashapp/stash/pkg/logger"
"github.com/stashapp/stash/pkg/manager/jsonschema"
"github.com/stashapp/stash/pkg/manager/paths"
"github.com/stashapp/stash/pkg/models"
"github.com/stashapp/stash/pkg/utils"
"math"
"runtime"
"strconv"
"sync"
"time"
)
type ExportTask struct {
@@ -22,29 +25,67 @@ type ExportTask struct {
func (t *ExportTask) Start(wg *sync.WaitGroup) {
defer wg.Done()
// @manager.total = Scene.count + Gallery.count + Performer.count + Studio.count + Movie.count
workerCount := runtime.GOMAXPROCS(0) // set worker count to number of cpus available
t.Mappings = &jsonschema.Mappings{}
t.Scraped = []jsonschema.ScrapedItem{}
ctx := context.TODO()
startTime := time.Now()
t.ExportScenes(ctx)
paths.EnsureJSONDirs()
t.ExportScenes(ctx, workerCount)
t.ExportGalleries(ctx)
t.ExportPerformers(ctx)
t.ExportStudios(ctx)
t.ExportMovies(ctx)
t.ExportPerformers(ctx, workerCount)
t.ExportStudios(ctx, workerCount)
t.ExportMovies(ctx, workerCount)
if err := instance.JSON.saveMappings(t.Mappings); err != nil {
logger.Errorf("[mappings] failed to save json: %s", err.Error())
}
t.ExportScrapedItems(ctx)
logger.Infof("Export complete in %s.", time.Since(startTime))
}
func (t *ExportTask) ExportScenes(ctx context.Context) {
tx := database.DB.MustBeginTx(ctx, nil)
defer tx.Commit()
func (t *ExportTask) ExportScenes(ctx context.Context, workers int) {
var scenesWg sync.WaitGroup
qb := models.NewSceneQueryBuilder()
scenes, err := qb.All()
if err != nil {
logger.Errorf("[scenes] failed to fetch all scenes: %s", err.Error())
}
jobCh := make(chan *models.Scene, workers*2) // make a buffered channel to feed workers
logger.Info("[scenes] exporting")
startTime := time.Now()
for w := 0; w < workers; w++ { // create export Scene workers
scenesWg.Add(1)
go exportScene(&scenesWg, jobCh, t, nil) // no db data is changed so tx is set to nil
}
for i, scene := range scenes {
index := i + 1
if (i % 100) == 0 { // make progress easier to read
logger.Progressf("[scenes] %d of %d", index, len(scenes))
}
t.Mappings.Scenes = append(t.Mappings.Scenes, jsonschema.PathMapping{Path: scene.Path, Checksum: scene.Checksum})
jobCh <- scene // feed workers
}
close(jobCh) // close channel so that workers will know no more jobs are available
scenesWg.Wait()
logger.Infof("[scenes] export complete in %s. %d workers used.", time.Since(startTime), workers)
}
func exportScene(wg *sync.WaitGroup, jobChan <-chan *models.Scene, t *ExportTask, tx *sqlx.Tx) {
defer wg.Done()
studioQB := models.NewStudioQueryBuilder()
movieQB := models.NewMovieQueryBuilder()
galleryQB := models.NewGalleryQueryBuilder()
@@ -52,18 +93,8 @@ func (t *ExportTask) ExportScenes(ctx context.Context) {
tagQB := models.NewTagQueryBuilder()
sceneMarkerQB := models.NewSceneMarkerQueryBuilder()
joinQB := models.NewJoinsQueryBuilder()
scenes, err := qb.All()
if err != nil {
logger.Errorf("[scenes] failed to fetch all scenes: %s", err.Error())
}
logger.Info("[scenes] exporting")
for i, scene := range scenes {
index := i + 1
logger.Progressf("[scenes] %d of %d", index, len(scenes))
t.Mappings.Scenes = append(t.Mappings.Scenes, jsonschema.PathMapping{Path: scene.Path, Checksum: scene.Checksum})
for scene := range jobChan {
newSceneJSON := jsonschema.Scene{
CreatedAt: models.JSONTime{Time: scene.CreatedAt.Timestamp},
UpdatedAt: models.JSONTime{Time: scene.UpdatedAt.Timestamp},
@@ -83,7 +114,7 @@ func (t *ExportTask) ExportScenes(ctx context.Context) {
galleryChecksum = gallery.Checksum
}
performers, _ := performerQB.FindBySceneID(scene.ID, tx)
performers, _ := performerQB.FindNameBySceneID(scene.ID, tx)
sceneMovies, _ := joinQB.GetSceneMovies(scene.ID, tx)
tags, _ := tagQB.FindBySceneID(scene.ID, tx)
sceneMarkers, _ := sceneMarkerQB.FindBySceneID(scene.ID, tx)
@@ -200,7 +231,6 @@ func (t *ExportTask) ExportScenes(ctx context.Context) {
}
}
logger.Infof("[scenes] export complete")
}
func (t *ExportTask) ExportGalleries(ctx context.Context) {
@@ -221,21 +251,42 @@ func (t *ExportTask) ExportGalleries(ctx context.Context) {
logger.Infof("[galleries] export complete")
}
func (t *ExportTask) ExportPerformers(ctx context.Context) {
func (t *ExportTask) ExportPerformers(ctx context.Context, workers int) {
var performersWg sync.WaitGroup
qb := models.NewPerformerQueryBuilder()
performers, err := qb.All()
if err != nil {
logger.Errorf("[performers] failed to fetch all performers: %s", err.Error())
}
jobCh := make(chan *models.Performer, workers*2) // make a buffered channel to feed workers
logger.Info("[performers] exporting")
startTime := time.Now()
for w := 0; w < workers; w++ { // create export Performer workers
performersWg.Add(1)
go exportPerformer(&performersWg, jobCh)
}
for i, performer := range performers {
index := i + 1
logger.Progressf("[performers] %d of %d", index, len(performers))
t.Mappings.Performers = append(t.Mappings.Performers, jsonschema.NameMapping{Name: performer.Name.String, Checksum: performer.Checksum})
jobCh <- performer // feed workers
}
close(jobCh) // close channel so workers will know that no more jobs are available
performersWg.Wait()
logger.Infof("[performers] export complete in %s. %d workers used.", time.Since(startTime), workers)
}
func exportPerformer(wg *sync.WaitGroup, jobChan <-chan *models.Performer) {
defer wg.Done()
for performer := range jobChan {
newPerformerJSON := jsonschema.Performer{
CreatedAt: models.JSONTime{Time: performer.CreatedAt.Timestamp},
UpdatedAt: models.JSONTime{Time: performer.UpdatedAt.Timestamp},
@@ -306,11 +357,11 @@ func (t *ExportTask) ExportPerformers(ctx context.Context) {
logger.Errorf("[performers] <%s> failed to save json: %s", performer.Checksum, err.Error())
}
}
logger.Infof("[performers] export complete")
}
func (t *ExportTask) ExportStudios(ctx context.Context) {
func (t *ExportTask) ExportStudios(ctx context.Context, workers int) {
var studiosWg sync.WaitGroup
qb := models.NewStudioQueryBuilder()
studios, err := qb.All()
if err != nil {
@@ -318,12 +369,33 @@ func (t *ExportTask) ExportStudios(ctx context.Context) {
}
logger.Info("[studios] exporting")
startTime := time.Now()
jobCh := make(chan *models.Studio, workers*2) // make a buffered channel to feed workers
for w := 0; w < workers; w++ { // create export Studio workers
studiosWg.Add(1)
go exportStudio(&studiosWg, jobCh)
}
for i, studio := range studios {
index := i + 1
logger.Progressf("[studios] %d of %d", index, len(studios))
t.Mappings.Studios = append(t.Mappings.Studios, jsonschema.NameMapping{Name: studio.Name.String, Checksum: studio.Checksum})
jobCh <- studio // feed workers
}
close(jobCh)
studiosWg.Wait()
logger.Infof("[studios] export complete in %s. %d workers used.", time.Since(startTime), workers)
}
func exportStudio(wg *sync.WaitGroup, jobChan <-chan *models.Studio) {
defer wg.Done()
for studio := range jobChan {
newStudioJSON := jsonschema.Studio{
CreatedAt: models.JSONTime{Time: studio.CreatedAt.Timestamp},
@@ -350,11 +422,11 @@ func (t *ExportTask) ExportStudios(ctx context.Context) {
logger.Errorf("[studios] <%s> failed to save json: %s", studio.Checksum, err.Error())
}
}
logger.Infof("[studios] export complete")
}
func (t *ExportTask) ExportMovies(ctx context.Context) {
func (t *ExportTask) ExportMovies(ctx context.Context, workers int) {
var moviesWg sync.WaitGroup
qb := models.NewMovieQueryBuilder()
movies, err := qb.All()
if err != nil {
@@ -362,13 +434,33 @@ func (t *ExportTask) ExportMovies(ctx context.Context) {
}
logger.Info("[movies] exporting")
startTime := time.Now()
jobCh := make(chan *models.Movie, workers*2) // make a buffered channel to feed workers
for w := 0; w < workers; w++ { // create export Studio workers
moviesWg.Add(1)
go exportMovie(&moviesWg, jobCh)
}
for i, movie := range movies {
index := i + 1
logger.Progressf("[movies] %d of %d", index, len(movies))
t.Mappings.Movies = append(t.Mappings.Movies, jsonschema.NameMapping{Name: movie.Name.String, Checksum: movie.Checksum})
jobCh <- movie // feed workers
}
close(jobCh)
moviesWg.Wait()
logger.Infof("[movies] export complete in %s. %d workers used.", time.Since(startTime), workers)
}
func exportMovie(wg *sync.WaitGroup, jobChan <-chan *models.Movie) {
defer wg.Done()
for movie := range jobChan {
newMovieJSON := jsonschema.Movie{
CreatedAt: models.JSONTime{Time: movie.CreatedAt.Timestamp},
UpdatedAt: models.JSONTime{Time: movie.UpdatedAt.Timestamp},
@@ -415,8 +507,6 @@ func (t *ExportTask) ExportMovies(ctx context.Context) {
logger.Errorf("[movies] <%s> failed to save json: %s", movie.Checksum, err.Error())
}
}
logger.Infof("[movies] export complete")
}
func (t *ExportTask) ExportScrapedItems(ctx context.Context) {