Data layer restructuring (#997)

* Move query builders to sqlite package
* Add transaction system
* Wrap model resolvers in transaction
* Add error return value for StringSliceToIntSlice
* Update/refactor mutation resolvers
* Convert query builders
* Remove unused join types
* Add stash id unit tests
* Use WAL journal mode
This commit is contained in:
WithoutPants
2021-01-18 12:23:20 +11:00
committed by GitHub
parent 7bae990c67
commit 1e04deb3d4
168 changed files with 12683 additions and 10863 deletions

View File

@@ -11,7 +11,6 @@ import (
"sync"
"time"
"github.com/jmoiron/sqlx"
"github.com/stashapp/stash/pkg/database"
"github.com/stashapp/stash/pkg/gallery"
"github.com/stashapp/stash/pkg/image"
@@ -29,7 +28,8 @@ import (
)
type ImportTask struct {
json jsonUtils
txnManager models.TransactionManager
json jsonUtils
BaseDir string
ZipFile io.Reader
@@ -44,6 +44,7 @@ type ImportTask struct {
func CreateImportTask(a models.HashAlgorithm, input models.ImportObjectsInput) *ImportTask {
return &ImportTask{
txnManager: GetInstance().TxnManager,
ZipFile: input.File.File,
Reset: false,
DuplicateBehaviour: input.DuplicateBehaviour,
@@ -200,22 +201,16 @@ func (t *ImportTask) ImportPerformers(ctx context.Context) {
logger.Progressf("[performers] %d of %d", index, len(t.mappings.Performers))
tx := database.DB.MustBeginTx(ctx, nil)
readerWriter := models.NewPerformerReaderWriter(tx)
importer := &performer.Importer{
ReaderWriter: readerWriter,
Input: *performerJSON,
}
if err := t.txnManager.WithTxn(ctx, func(r models.Repository) error {
readerWriter := r.Performer()
importer := &performer.Importer{
ReaderWriter: readerWriter,
Input: *performerJSON,
}
if err := performImport(importer, t.DuplicateBehaviour); err != nil {
tx.Rollback()
logger.Errorf("[performers] <%s> failed to import: %s", mappingJSON.Checksum, err.Error())
continue
}
if err := tx.Commit(); err != nil {
tx.Rollback()
logger.Errorf("[performers] <%s> import failed to commit: %s", mappingJSON.Checksum, err.Error())
return performImport(importer, t.DuplicateBehaviour)
}); err != nil {
logger.Errorf("[performers] <%s> import failed: %s", mappingJSON.Checksum, err.Error())
}
}
@@ -237,12 +232,9 @@ func (t *ImportTask) ImportStudios(ctx context.Context) {
logger.Progressf("[studios] %d of %d", index, len(t.mappings.Studios))
tx := database.DB.MustBeginTx(ctx, nil)
// fail on missing parent studio to begin with
if err := t.ImportStudio(studioJSON, pendingParent, tx); err != nil {
tx.Rollback()
if err := t.txnManager.WithTxn(ctx, func(r models.Repository) error {
return t.ImportStudio(studioJSON, pendingParent, r.Studio())
}); err != nil {
if err == studio.ErrParentStudioNotExist {
// add to the pending parent list so that it is created after the parent
s := pendingParent[studioJSON.ParentStudio]
@@ -254,11 +246,6 @@ func (t *ImportTask) ImportStudios(ctx context.Context) {
logger.Errorf("[studios] <%s> failed to create: %s", mappingJSON.Checksum, err.Error())
continue
}
if err := tx.Commit(); err != nil {
logger.Errorf("[studios] import failed to commit: %s", err.Error())
continue
}
}
// create the leftover studios, warning for missing parents
@@ -267,18 +254,12 @@ func (t *ImportTask) ImportStudios(ctx context.Context) {
for _, s := range pendingParent {
for _, orphanStudioJSON := range s {
tx := database.DB.MustBeginTx(ctx, nil)
if err := t.ImportStudio(orphanStudioJSON, nil, tx); err != nil {
tx.Rollback()
if err := t.txnManager.WithTxn(ctx, func(r models.Repository) error {
return t.ImportStudio(orphanStudioJSON, nil, r.Studio())
}); err != nil {
logger.Errorf("[studios] <%s> failed to create: %s", orphanStudioJSON.Name, err.Error())
continue
}
if err := tx.Commit(); err != nil {
logger.Errorf("[studios] import failed to commit: %s", err.Error())
continue
}
}
}
}
@@ -286,8 +267,7 @@ func (t *ImportTask) ImportStudios(ctx context.Context) {
logger.Info("[studios] import complete")
}
func (t *ImportTask) ImportStudio(studioJSON *jsonschema.Studio, pendingParent map[string][]*jsonschema.Studio, tx *sqlx.Tx) error {
readerWriter := models.NewStudioReaderWriter(tx)
func (t *ImportTask) ImportStudio(studioJSON *jsonschema.Studio, pendingParent map[string][]*jsonschema.Studio, readerWriter models.StudioReaderWriter) error {
importer := &studio.Importer{
ReaderWriter: readerWriter,
Input: *studioJSON,
@@ -307,7 +287,7 @@ func (t *ImportTask) ImportStudio(studioJSON *jsonschema.Studio, pendingParent m
s := pendingParent[studioJSON.Name]
for _, childStudioJSON := range s {
// map is nil since we're not checking parent studios at this point
if err := t.ImportStudio(childStudioJSON, nil, tx); err != nil {
if err := t.ImportStudio(childStudioJSON, nil, readerWriter); err != nil {
return fmt.Errorf("failed to create child studio <%s>: %s", childStudioJSON.Name, err.Error())
}
}
@@ -331,26 +311,20 @@ func (t *ImportTask) ImportMovies(ctx context.Context) {
logger.Progressf("[movies] %d of %d", index, len(t.mappings.Movies))
tx := database.DB.MustBeginTx(ctx, nil)
readerWriter := models.NewMovieReaderWriter(tx)
studioReaderWriter := models.NewStudioReaderWriter(tx)
if err := t.txnManager.WithTxn(ctx, func(r models.Repository) error {
readerWriter := r.Movie()
studioReaderWriter := r.Studio()
movieImporter := &movie.Importer{
ReaderWriter: readerWriter,
StudioWriter: studioReaderWriter,
Input: *movieJSON,
MissingRefBehaviour: t.MissingRefBehaviour,
}
movieImporter := &movie.Importer{
ReaderWriter: readerWriter,
StudioWriter: studioReaderWriter,
Input: *movieJSON,
MissingRefBehaviour: t.MissingRefBehaviour,
}
if err := performImport(movieImporter, t.DuplicateBehaviour); err != nil {
tx.Rollback()
logger.Errorf("[movies] <%s> failed to import: %s", mappingJSON.Checksum, err.Error())
continue
}
if err := tx.Commit(); err != nil {
tx.Rollback()
logger.Errorf("[movies] <%s> import failed to commit: %s", mappingJSON.Checksum, err.Error())
return performImport(movieImporter, t.DuplicateBehaviour)
}); err != nil {
logger.Errorf("[movies] <%s> import failed: %s", mappingJSON.Checksum, err.Error())
continue
}
}
@@ -371,31 +345,23 @@ func (t *ImportTask) ImportGalleries(ctx context.Context) {
logger.Progressf("[galleries] %d of %d", index, len(t.mappings.Galleries))
tx := database.DB.MustBeginTx(ctx, nil)
readerWriter := models.NewGalleryReaderWriter(tx)
tagWriter := models.NewTagReaderWriter(tx)
joinWriter := models.NewJoinReaderWriter(tx)
performerWriter := models.NewPerformerReaderWriter(tx)
studioWriter := models.NewStudioReaderWriter(tx)
if err := t.txnManager.WithTxn(ctx, func(r models.Repository) error {
readerWriter := r.Gallery()
tagWriter := r.Tag()
performerWriter := r.Performer()
studioWriter := r.Studio()
galleryImporter := &gallery.Importer{
ReaderWriter: readerWriter,
PerformerWriter: performerWriter,
StudioWriter: studioWriter,
TagWriter: tagWriter,
JoinWriter: joinWriter,
Input: *galleryJSON,
MissingRefBehaviour: t.MissingRefBehaviour,
}
galleryImporter := &gallery.Importer{
ReaderWriter: readerWriter,
PerformerWriter: performerWriter,
StudioWriter: studioWriter,
TagWriter: tagWriter,
Input: *galleryJSON,
MissingRefBehaviour: t.MissingRefBehaviour,
}
if err := performImport(galleryImporter, t.DuplicateBehaviour); err != nil {
tx.Rollback()
logger.Errorf("[galleries] <%s> failed to import: %s", mappingJSON.Checksum, err.Error())
continue
}
if err := tx.Commit(); err != nil {
tx.Rollback()
return performImport(galleryImporter, t.DuplicateBehaviour)
}); err != nil {
logger.Errorf("[galleries] <%s> import failed to commit: %s", mappingJSON.Checksum, err.Error())
continue
}
@@ -417,74 +383,71 @@ func (t *ImportTask) ImportTags(ctx context.Context) {
logger.Progressf("[tags] %d of %d", index, len(t.mappings.Tags))
tx := database.DB.MustBeginTx(ctx, nil)
readerWriter := models.NewTagReaderWriter(tx)
if err := t.txnManager.WithTxn(ctx, func(r models.Repository) error {
readerWriter := r.Tag()
tagImporter := &tag.Importer{
ReaderWriter: readerWriter,
Input: *tagJSON,
}
tagImporter := &tag.Importer{
ReaderWriter: readerWriter,
Input: *tagJSON,
}
if err := performImport(tagImporter, t.DuplicateBehaviour); err != nil {
tx.Rollback()
return performImport(tagImporter, t.DuplicateBehaviour)
}); err != nil {
logger.Errorf("[tags] <%s> failed to import: %s", mappingJSON.Checksum, err.Error())
continue
}
if err := tx.Commit(); err != nil {
tx.Rollback()
logger.Errorf("[tags] <%s> import failed to commit: %s", mappingJSON.Checksum, err.Error())
}
}
logger.Info("[tags] import complete")
}
func (t *ImportTask) ImportScrapedItems(ctx context.Context) {
tx := database.DB.MustBeginTx(ctx, nil)
qb := models.NewScrapedItemQueryBuilder()
sqb := models.NewStudioQueryBuilder()
currentTime := time.Now()
if err := t.txnManager.WithTxn(ctx, func(r models.Repository) error {
logger.Info("[scraped sites] importing")
qb := r.ScrapedItem()
sqb := r.Studio()
currentTime := time.Now()
for i, mappingJSON := range t.scraped {
index := i + 1
logger.Progressf("[scraped sites] %d of %d", index, len(t.mappings.Scenes))
for i, mappingJSON := range t.scraped {
index := i + 1
logger.Progressf("[scraped sites] %d of %d", index, len(t.mappings.Scenes))
newScrapedItem := models.ScrapedItem{
Title: sql.NullString{String: mappingJSON.Title, Valid: true},
Description: sql.NullString{String: mappingJSON.Description, Valid: true},
URL: sql.NullString{String: mappingJSON.URL, Valid: true},
Date: models.SQLiteDate{String: mappingJSON.Date, Valid: true},
Rating: sql.NullString{String: mappingJSON.Rating, Valid: true},
Tags: sql.NullString{String: mappingJSON.Tags, Valid: true},
Models: sql.NullString{String: mappingJSON.Models, Valid: true},
Episode: sql.NullInt64{Int64: int64(mappingJSON.Episode), Valid: true},
GalleryFilename: sql.NullString{String: mappingJSON.GalleryFilename, Valid: true},
GalleryURL: sql.NullString{String: mappingJSON.GalleryURL, Valid: true},
VideoFilename: sql.NullString{String: mappingJSON.VideoFilename, Valid: true},
VideoURL: sql.NullString{String: mappingJSON.VideoURL, Valid: true},
CreatedAt: models.SQLiteTimestamp{Timestamp: currentTime},
UpdatedAt: models.SQLiteTimestamp{Timestamp: t.getTimeFromJSONTime(mappingJSON.UpdatedAt)},
newScrapedItem := models.ScrapedItem{
Title: sql.NullString{String: mappingJSON.Title, Valid: true},
Description: sql.NullString{String: mappingJSON.Description, Valid: true},
URL: sql.NullString{String: mappingJSON.URL, Valid: true},
Date: models.SQLiteDate{String: mappingJSON.Date, Valid: true},
Rating: sql.NullString{String: mappingJSON.Rating, Valid: true},
Tags: sql.NullString{String: mappingJSON.Tags, Valid: true},
Models: sql.NullString{String: mappingJSON.Models, Valid: true},
Episode: sql.NullInt64{Int64: int64(mappingJSON.Episode), Valid: true},
GalleryFilename: sql.NullString{String: mappingJSON.GalleryFilename, Valid: true},
GalleryURL: sql.NullString{String: mappingJSON.GalleryURL, Valid: true},
VideoFilename: sql.NullString{String: mappingJSON.VideoFilename, Valid: true},
VideoURL: sql.NullString{String: mappingJSON.VideoURL, Valid: true},
CreatedAt: models.SQLiteTimestamp{Timestamp: currentTime},
UpdatedAt: models.SQLiteTimestamp{Timestamp: t.getTimeFromJSONTime(mappingJSON.UpdatedAt)},
}
studio, err := sqb.FindByName(mappingJSON.Studio, false)
if err != nil {
logger.Errorf("[scraped sites] failed to fetch studio: %s", err.Error())
}
if studio != nil {
newScrapedItem.StudioID = sql.NullInt64{Int64: int64(studio.ID), Valid: true}
}
_, err = qb.Create(newScrapedItem)
if err != nil {
logger.Errorf("[scraped sites] <%s> failed to create: %s", newScrapedItem.Title.String, err.Error())
}
}
studio, err := sqb.FindByName(mappingJSON.Studio, tx, false)
if err != nil {
logger.Errorf("[scraped sites] failed to fetch studio: %s", err.Error())
}
if studio != nil {
newScrapedItem.StudioID = sql.NullInt64{Int64: int64(studio.ID), Valid: true}
}
_, err = qb.Create(newScrapedItem, tx)
if err != nil {
logger.Errorf("[scraped sites] <%s> failed to create: %s", newScrapedItem.Title.String, err.Error())
}
}
logger.Info("[scraped sites] importing")
if err := tx.Commit(); err != nil {
return nil
}); err != nil {
logger.Errorf("[scraped sites] import failed to commit: %s", err.Error())
}
logger.Info("[scraped sites] import complete")
}
@@ -504,65 +467,52 @@ func (t *ImportTask) ImportScenes(ctx context.Context) {
sceneHash := mappingJSON.Checksum
tx := database.DB.MustBeginTx(ctx, nil)
readerWriter := models.NewSceneReaderWriter(tx)
tagWriter := models.NewTagReaderWriter(tx)
galleryWriter := models.NewGalleryReaderWriter(tx)
joinWriter := models.NewJoinReaderWriter(tx)
movieWriter := models.NewMovieReaderWriter(tx)
performerWriter := models.NewPerformerReaderWriter(tx)
studioWriter := models.NewStudioReaderWriter(tx)
markerWriter := models.NewSceneMarkerReaderWriter(tx)
if err := t.txnManager.WithTxn(ctx, func(r models.Repository) error {
readerWriter := r.Scene()
tagWriter := r.Tag()
galleryWriter := r.Gallery()
movieWriter := r.Movie()
performerWriter := r.Performer()
studioWriter := r.Studio()
markerWriter := r.SceneMarker()
sceneImporter := &scene.Importer{
ReaderWriter: readerWriter,
Input: *sceneJSON,
Path: mappingJSON.Path,
sceneImporter := &scene.Importer{
ReaderWriter: readerWriter,
Input: *sceneJSON,
Path: mappingJSON.Path,
FileNamingAlgorithm: t.fileNamingAlgorithm,
MissingRefBehaviour: t.MissingRefBehaviour,
GalleryWriter: galleryWriter,
JoinWriter: joinWriter,
MovieWriter: movieWriter,
PerformerWriter: performerWriter,
StudioWriter: studioWriter,
TagWriter: tagWriter,
}
if err := performImport(sceneImporter, t.DuplicateBehaviour); err != nil {
tx.Rollback()
logger.Errorf("[scenes] <%s> failed to import: %s", sceneHash, err.Error())
continue
}
// import the scene markers
failedMarkers := false
for _, m := range sceneJSON.Markers {
markerImporter := &scene.MarkerImporter{
SceneID: sceneImporter.ID,
Input: m,
FileNamingAlgorithm: t.fileNamingAlgorithm,
MissingRefBehaviour: t.MissingRefBehaviour,
ReaderWriter: markerWriter,
JoinWriter: joinWriter,
TagWriter: tagWriter,
GalleryWriter: galleryWriter,
MovieWriter: movieWriter,
PerformerWriter: performerWriter,
StudioWriter: studioWriter,
TagWriter: tagWriter,
}
if err := performImport(markerImporter, t.DuplicateBehaviour); err != nil {
failedMarkers = true
logger.Errorf("[scenes] <%s> failed to import markers: %s", sceneHash, err.Error())
break
if err := performImport(sceneImporter, t.DuplicateBehaviour); err != nil {
return err
}
}
if failedMarkers {
tx.Rollback()
continue
}
// import the scene markers
for _, m := range sceneJSON.Markers {
markerImporter := &scene.MarkerImporter{
SceneID: sceneImporter.ID,
Input: m,
MissingRefBehaviour: t.MissingRefBehaviour,
ReaderWriter: markerWriter,
TagWriter: tagWriter,
}
if err := tx.Commit(); err != nil {
tx.Rollback()
logger.Errorf("[scenes] <%s> import failed to commit: %s", sceneHash, err.Error())
if err := performImport(markerImporter, t.DuplicateBehaviour); err != nil {
return err
}
}
return nil
}); err != nil {
logger.Errorf("[scenes] <%s> import failed: %s", sceneHash, err.Error())
}
}
@@ -585,46 +535,37 @@ func (t *ImportTask) ImportImages(ctx context.Context) {
imageHash := mappingJSON.Checksum
tx := database.DB.MustBeginTx(ctx, nil)
readerWriter := models.NewImageReaderWriter(tx)
tagWriter := models.NewTagReaderWriter(tx)
galleryWriter := models.NewGalleryReaderWriter(tx)
joinWriter := models.NewJoinReaderWriter(tx)
performerWriter := models.NewPerformerReaderWriter(tx)
studioWriter := models.NewStudioReaderWriter(tx)
if err := t.txnManager.WithTxn(ctx, func(r models.Repository) error {
readerWriter := r.Image()
tagWriter := r.Tag()
galleryWriter := r.Gallery()
performerWriter := r.Performer()
studioWriter := r.Studio()
imageImporter := &image.Importer{
ReaderWriter: readerWriter,
Input: *imageJSON,
Path: mappingJSON.Path,
imageImporter := &image.Importer{
ReaderWriter: readerWriter,
Input: *imageJSON,
Path: mappingJSON.Path,
MissingRefBehaviour: t.MissingRefBehaviour,
MissingRefBehaviour: t.MissingRefBehaviour,
GalleryWriter: galleryWriter,
JoinWriter: joinWriter,
PerformerWriter: performerWriter,
StudioWriter: studioWriter,
TagWriter: tagWriter,
}
GalleryWriter: galleryWriter,
PerformerWriter: performerWriter,
StudioWriter: studioWriter,
TagWriter: tagWriter,
}
if err := performImport(imageImporter, t.DuplicateBehaviour); err != nil {
tx.Rollback()
logger.Errorf("[images] <%s> failed to import: %s", imageHash, err.Error())
continue
}
if err := tx.Commit(); err != nil {
tx.Rollback()
logger.Errorf("[images] <%s> import failed to commit: %s", imageHash, err.Error())
return performImport(imageImporter, t.DuplicateBehaviour)
}); err != nil {
logger.Errorf("[images] <%s> import failed: %s", imageHash, err.Error())
}
}
logger.Info("[images] import complete")
}
func (t *ImportTask) getPerformers(names []string, tx *sqlx.Tx) ([]*models.Performer, error) {
pqb := models.NewPerformerQueryBuilder()
performers, err := pqb.FindByNames(names, tx, false)
func (t *ImportTask) getPerformers(names []string, qb models.PerformerReader) ([]*models.Performer, error) {
performers, err := qb.FindByNames(names, false)
if err != nil {
return nil, err
}
@@ -648,12 +589,10 @@ func (t *ImportTask) getPerformers(names []string, tx *sqlx.Tx) ([]*models.Perfo
return performers, nil
}
func (t *ImportTask) getMoviesScenes(input []jsonschema.SceneMovie, sceneID int, tx *sqlx.Tx) ([]models.MoviesScenes, error) {
mqb := models.NewMovieQueryBuilder()
func (t *ImportTask) getMoviesScenes(input []jsonschema.SceneMovie, sceneID int, mqb models.MovieReader) ([]models.MoviesScenes, error) {
var movies []models.MoviesScenes
for _, inputMovie := range input {
movie, err := mqb.FindByName(inputMovie.MovieName, tx, false)
movie, err := mqb.FindByName(inputMovie.MovieName, false)
if err != nil {
return nil, err
}
@@ -680,9 +619,8 @@ func (t *ImportTask) getMoviesScenes(input []jsonschema.SceneMovie, sceneID int,
return movies, nil
}
func (t *ImportTask) getTags(sceneChecksum string, names []string, tx *sqlx.Tx) ([]*models.Tag, error) {
tqb := models.NewTagQueryBuilder()
tags, err := tqb.FindByNames(names, tx, false)
func (t *ImportTask) getTags(sceneChecksum string, names []string, tqb models.TagReader) ([]*models.Tag, error) {
tags, err := tqb.FindByNames(names, false)
if err != nil {
return nil, err
}