mirror of
https://github.com/stashapp/stash.git
synced 2025-12-18 04:44:37 +03:00
Fix database locked errors (#3153)
* Make read-only operations use WithReadTxn * Allow one database write thread * Add unit test for concurrent transactions * Perform some actions after commit to release txn * Suppress some errors from cancelled context
This commit is contained in:
@@ -109,7 +109,7 @@ func (j *cleanJob) execute(ctx context.Context) error {
|
||||
folderCount int
|
||||
)
|
||||
|
||||
if err := txn.WithTxn(ctx, j.Repository, func(ctx context.Context) error {
|
||||
if err := txn.WithReadTxn(ctx, j.Repository, func(ctx context.Context) error {
|
||||
var err error
|
||||
fileCount, err = j.Repository.CountAllInPaths(ctx, j.options.Paths)
|
||||
if err != nil {
|
||||
@@ -169,7 +169,7 @@ func (j *cleanJob) assessFiles(ctx context.Context, toDelete *deleteSet) error {
|
||||
progress := j.progress
|
||||
|
||||
more := true
|
||||
if err := txn.WithTxn(ctx, j.Repository, func(ctx context.Context) error {
|
||||
if err := txn.WithReadTxn(ctx, j.Repository, func(ctx context.Context) error {
|
||||
for more {
|
||||
if job.IsCancelled(ctx) {
|
||||
return nil
|
||||
@@ -253,7 +253,7 @@ func (j *cleanJob) assessFolders(ctx context.Context, toDelete *deleteSet) error
|
||||
progress := j.progress
|
||||
|
||||
more := true
|
||||
if err := txn.WithTxn(ctx, j.Repository, func(ctx context.Context) error {
|
||||
if err := txn.WithReadTxn(ctx, j.Repository, func(ctx context.Context) error {
|
||||
for more {
|
||||
if job.IsCancelled(ctx) {
|
||||
return nil
|
||||
|
||||
@@ -85,7 +85,6 @@ type scanJob struct {
|
||||
|
||||
startTime time.Time
|
||||
fileQueue chan scanFile
|
||||
dbQueue chan func(ctx context.Context) error
|
||||
retryList []scanFile
|
||||
retrying bool
|
||||
folderPathToID sync.Map
|
||||
@@ -148,9 +147,11 @@ func (s *scanJob) execute(ctx context.Context) {
|
||||
s.startTime = time.Now()
|
||||
|
||||
s.fileQueue = make(chan scanFile, scanQueueSize)
|
||||
s.dbQueue = make(chan func(ctx context.Context) error, scanQueueSize)
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if err := s.queueFiles(ctx, paths); err != nil {
|
||||
if errors.Is(err, context.Canceled) {
|
||||
return
|
||||
@@ -163,6 +164,8 @@ func (s *scanJob) execute(ctx context.Context) {
|
||||
logger.Infof("Finished adding files to queue. %d files queued", s.count)
|
||||
}()
|
||||
|
||||
defer wg.Wait()
|
||||
|
||||
if err := s.processQueue(ctx); err != nil {
|
||||
if errors.Is(err, context.Canceled) {
|
||||
return
|
||||
@@ -329,38 +332,50 @@ func (s *scanJob) processQueue(ctx context.Context) error {
|
||||
|
||||
wg := sizedwaitgroup.New(parallelTasks)
|
||||
|
||||
for f := range s.fileQueue {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
if err := func() error {
|
||||
defer wg.Wait()
|
||||
|
||||
for f := range s.fileQueue {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
wg.Add()
|
||||
ff := f
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
s.processQueueItem(ctx, ff)
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Add()
|
||||
ff := f
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
s.processQueueItem(ctx, ff)
|
||||
}()
|
||||
return nil
|
||||
}(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
s.retrying = true
|
||||
for _, f := range s.retryList {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
|
||||
if err := func() error {
|
||||
defer wg.Wait()
|
||||
|
||||
for _, f := range s.retryList {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
wg.Add()
|
||||
ff := f
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
s.processQueueItem(ctx, ff)
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Add()
|
||||
ff := f
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
s.processQueueItem(ctx, ff)
|
||||
}()
|
||||
return nil
|
||||
}(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
close(s.dbQueue)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -73,26 +73,6 @@ func (h *ScanHandler) validate() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *ScanHandler) logInfo(ctx context.Context, format string, args ...interface{}) {
|
||||
// log at the end so that if anything fails above due to a locked database
|
||||
// error and the transaction must be retried, then we shouldn't get multiple
|
||||
// logs of the same thing.
|
||||
txn.AddPostCompleteHook(ctx, func(ctx context.Context) error {
|
||||
logger.Infof(format, args...)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (h *ScanHandler) logError(ctx context.Context, format string, args ...interface{}) {
|
||||
// log at the end so that if anything fails above due to a locked database
|
||||
// error and the transaction must be retried, then we shouldn't get multiple
|
||||
// logs of the same thing.
|
||||
txn.AddPostCompleteHook(ctx, func(ctx context.Context) error {
|
||||
logger.Errorf(format, args...)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (h *ScanHandler) Handle(ctx context.Context, f file.File, oldFile file.File) error {
|
||||
if err := h.validate(); err != nil {
|
||||
return err
|
||||
@@ -132,7 +112,7 @@ func (h *ScanHandler) Handle(ctx context.Context, f file.File, oldFile file.File
|
||||
GalleryIDs: models.NewRelatedIDs([]int{}),
|
||||
}
|
||||
|
||||
h.logInfo(ctx, "%s doesn't exist. Creating new image...", f.Base().Path)
|
||||
logger.Infof("%s doesn't exist. Creating new image...", f.Base().Path)
|
||||
|
||||
if _, err := h.associateGallery(ctx, newImage, imageFile); err != nil {
|
||||
return err
|
||||
@@ -162,12 +142,17 @@ func (h *ScanHandler) Handle(ctx context.Context, f file.File, oldFile file.File
|
||||
}
|
||||
|
||||
if h.ScanConfig.IsGenerateThumbnails() {
|
||||
for _, s := range existing {
|
||||
if err := h.ThumbnailGenerator.GenerateThumbnail(ctx, s, imageFile); err != nil {
|
||||
// just log if cover generation fails. We can try again on rescan
|
||||
h.logError(ctx, "Error generating thumbnail for %s: %v", imageFile.Path, err)
|
||||
// do this after the commit so that the transaction isn't held up
|
||||
txn.AddPostCommitHook(ctx, func(ctx context.Context) error {
|
||||
for _, s := range existing {
|
||||
if err := h.ThumbnailGenerator.GenerateThumbnail(ctx, s, imageFile); err != nil {
|
||||
// just log if cover generation fails. We can try again on rescan
|
||||
logger.Errorf("Error generating thumbnail for %s: %v", imageFile.Path, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -202,7 +187,7 @@ func (h *ScanHandler) associateExisting(ctx context.Context, existing []*models.
|
||||
}
|
||||
|
||||
if !found {
|
||||
h.logInfo(ctx, "Adding %s to image %s", f.Path, i.DisplayName())
|
||||
logger.Infof("Adding %s to image %s", f.Path, i.DisplayName())
|
||||
|
||||
if err := h.CreatorUpdater.AddFileID(ctx, i.ID, f.ID); err != nil {
|
||||
return fmt.Errorf("adding file to image: %w", err)
|
||||
@@ -249,7 +234,7 @@ func (h *ScanHandler) getOrCreateFolderBasedGallery(ctx context.Context, f file.
|
||||
UpdatedAt: now,
|
||||
}
|
||||
|
||||
h.logInfo(ctx, "Creating folder-based gallery for %s", filepath.Dir(f.Base().Path))
|
||||
logger.Infof("Creating folder-based gallery for %s", filepath.Dir(f.Base().Path))
|
||||
|
||||
if err := h.GalleryFinder.Create(ctx, newGallery, nil); err != nil {
|
||||
return nil, fmt.Errorf("creating folder based gallery: %w", err)
|
||||
@@ -273,7 +258,7 @@ func (h *ScanHandler) associateFolderImages(ctx context.Context, g *models.Galle
|
||||
}
|
||||
|
||||
for _, ii := range i {
|
||||
h.logInfo(ctx, "Adding %s to gallery %s", ii.Path, g.Path)
|
||||
logger.Infof("Adding %s to gallery %s", ii.Path, g.Path)
|
||||
|
||||
if _, err := h.CreatorUpdater.UpdatePartial(ctx, ii.ID, models.ImagePartial{
|
||||
GalleryIDs: &models.UpdateIDs{
|
||||
@@ -307,7 +292,7 @@ func (h *ScanHandler) getOrCreateZipBasedGallery(ctx context.Context, zipFile fi
|
||||
UpdatedAt: now,
|
||||
}
|
||||
|
||||
h.logInfo(ctx, "%s doesn't exist. Creating new gallery...", zipFile.Base().Path)
|
||||
logger.Infof("%s doesn't exist. Creating new gallery...", zipFile.Base().Path)
|
||||
|
||||
if err := h.GalleryFinder.Create(ctx, newGallery, []file.ID{zipFile.Base().ID}); err != nil {
|
||||
return nil, fmt.Errorf("creating zip-based gallery: %w", err)
|
||||
@@ -345,7 +330,7 @@ func (h *ScanHandler) associateGallery(ctx context.Context, newImage *models.Ima
|
||||
if g != nil && !intslice.IntInclude(newImage.GalleryIDs.List(), g.ID) {
|
||||
ret = true
|
||||
newImage.GalleryIDs.Add(g.ID)
|
||||
h.logInfo(ctx, "Adding %s to gallery %s", f.Base().Path, g.Path)
|
||||
logger.Infof("Adding %s to gallery %s", f.Base().Path, g.Path)
|
||||
}
|
||||
|
||||
return ret, nil
|
||||
|
||||
@@ -9,7 +9,7 @@ import (
|
||||
|
||||
type TxnManager struct{}
|
||||
|
||||
func (*TxnManager) Begin(ctx context.Context) (context.Context, error) {
|
||||
func (*TxnManager) Begin(ctx context.Context, exclusive bool) (context.Context, error) {
|
||||
return ctx, nil
|
||||
}
|
||||
|
||||
@@ -25,6 +25,9 @@ func (*TxnManager) Rollback(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (*TxnManager) Complete(ctx context.Context) {
|
||||
}
|
||||
|
||||
func (*TxnManager) AddPostCommitHook(ctx context.Context, hook txn.TxnFunc) {
|
||||
}
|
||||
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/stashapp/stash/pkg/file"
|
||||
"github.com/stashapp/stash/pkg/txn"
|
||||
)
|
||||
@@ -29,7 +27,3 @@ type Repository struct {
|
||||
Tag TagReaderWriter
|
||||
SavedFilter SavedFilterReaderWriter
|
||||
}
|
||||
|
||||
func (r *Repository) WithTxn(ctx context.Context, fn txn.TxnFunc) error {
|
||||
return txn.WithTxn(ctx, r, fn)
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"github.com/stashapp/stash/pkg/models"
|
||||
"github.com/stashapp/stash/pkg/models/paths"
|
||||
"github.com/stashapp/stash/pkg/plugin"
|
||||
"github.com/stashapp/stash/pkg/txn"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -119,17 +120,22 @@ func (h *ScanHandler) Handle(ctx context.Context, f file.File, oldFile file.File
|
||||
}
|
||||
}
|
||||
|
||||
for _, s := range existing {
|
||||
if err := h.CoverGenerator.GenerateCover(ctx, s, videoFile); err != nil {
|
||||
// just log if cover generation fails. We can try again on rescan
|
||||
logger.Errorf("Error generating cover for %s: %v", videoFile.Path, err)
|
||||
// do this after the commit so that cover generation doesn't hold up the transaction
|
||||
txn.AddPostCommitHook(ctx, func(ctx context.Context) error {
|
||||
for _, s := range existing {
|
||||
if err := h.CoverGenerator.GenerateCover(ctx, s, videoFile); err != nil {
|
||||
// just log if cover generation fails. We can try again on rescan
|
||||
logger.Errorf("Error generating cover for %s: %v", videoFile.Path, err)
|
||||
}
|
||||
|
||||
if err := h.ScanGenerator.Generate(ctx, s, videoFile); err != nil {
|
||||
// just log if cover generation fails. We can try again on rescan
|
||||
logger.Errorf("Error generating content for %s: %v", videoFile.Path, err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := h.ScanGenerator.Generate(ctx, s, videoFile); err != nil {
|
||||
// just log if cover generation fails. We can try again on rescan
|
||||
logger.Errorf("Error generating content for %s: %v", videoFile.Path, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -95,7 +95,7 @@ func (s autotagScraper) viaScene(ctx context.Context, _client *http.Client, scen
|
||||
const trimExt = false
|
||||
|
||||
// populate performers, studio and tags based on scene path
|
||||
if err := txn.WithTxn(ctx, s.txnManager, func(ctx context.Context) error {
|
||||
if err := txn.WithReadTxn(ctx, s.txnManager, func(ctx context.Context) error {
|
||||
path := scene.Path
|
||||
if path == "" {
|
||||
return nil
|
||||
@@ -144,7 +144,7 @@ func (s autotagScraper) viaGallery(ctx context.Context, _client *http.Client, ga
|
||||
var ret *ScrapedGallery
|
||||
|
||||
// populate performers, studio and tags based on scene path
|
||||
if err := txn.WithTxn(ctx, s.txnManager, func(ctx context.Context) error {
|
||||
if err := txn.WithReadTxn(ctx, s.txnManager, func(ctx context.Context) error {
|
||||
path := gallery.Path
|
||||
performers, err := autotagMatchPerformers(ctx, path, s.performerReader, trimExt)
|
||||
if err != nil {
|
||||
|
||||
@@ -350,7 +350,7 @@ func (c Cache) ScrapeID(ctx context.Context, scraperID string, id int, ty Scrape
|
||||
|
||||
func (c Cache) getScene(ctx context.Context, sceneID int) (*models.Scene, error) {
|
||||
var ret *models.Scene
|
||||
if err := txn.WithTxn(ctx, c.txnManager, func(ctx context.Context) error {
|
||||
if err := txn.WithReadTxn(ctx, c.txnManager, func(ctx context.Context) error {
|
||||
var err error
|
||||
ret, err = c.repository.SceneFinder.Find(ctx, sceneID)
|
||||
return err
|
||||
@@ -362,7 +362,7 @@ func (c Cache) getScene(ctx context.Context, sceneID int) (*models.Scene, error)
|
||||
|
||||
func (c Cache) getGallery(ctx context.Context, galleryID int) (*models.Gallery, error) {
|
||||
var ret *models.Gallery
|
||||
if err := txn.WithTxn(ctx, c.txnManager, func(ctx context.Context) error {
|
||||
if err := txn.WithReadTxn(ctx, c.txnManager, func(ctx context.Context) error {
|
||||
var err error
|
||||
ret, err = c.repository.GalleryFinder.Find(ctx, galleryID)
|
||||
|
||||
|
||||
@@ -47,7 +47,7 @@ func (c Cache) postScrape(ctx context.Context, content ScrapedContent) (ScrapedC
|
||||
}
|
||||
|
||||
func (c Cache) postScrapePerformer(ctx context.Context, p models.ScrapedPerformer) (ScrapedContent, error) {
|
||||
if err := txn.WithTxn(ctx, c.txnManager, func(ctx context.Context) error {
|
||||
if err := txn.WithReadTxn(ctx, c.txnManager, func(ctx context.Context) error {
|
||||
tqb := c.repository.TagFinder
|
||||
|
||||
tags, err := postProcessTags(ctx, tqb, p.Tags)
|
||||
@@ -73,7 +73,7 @@ func (c Cache) postScrapePerformer(ctx context.Context, p models.ScrapedPerforme
|
||||
|
||||
func (c Cache) postScrapeMovie(ctx context.Context, m models.ScrapedMovie) (ScrapedContent, error) {
|
||||
if m.Studio != nil {
|
||||
if err := txn.WithTxn(ctx, c.txnManager, func(ctx context.Context) error {
|
||||
if err := txn.WithReadTxn(ctx, c.txnManager, func(ctx context.Context) error {
|
||||
return match.ScrapedStudio(ctx, c.repository.StudioFinder, m.Studio, nil)
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
@@ -106,7 +106,7 @@ func (c Cache) postScrapeScenePerformer(ctx context.Context, p models.ScrapedPer
|
||||
}
|
||||
|
||||
func (c Cache) postScrapeScene(ctx context.Context, scene ScrapedScene) (ScrapedContent, error) {
|
||||
if err := txn.WithTxn(ctx, c.txnManager, func(ctx context.Context) error {
|
||||
if err := txn.WithReadTxn(ctx, c.txnManager, func(ctx context.Context) error {
|
||||
pqb := c.repository.PerformerFinder
|
||||
mqb := c.repository.MovieFinder
|
||||
tqb := c.repository.TagFinder
|
||||
@@ -160,7 +160,7 @@ func (c Cache) postScrapeScene(ctx context.Context, scene ScrapedScene) (Scraped
|
||||
}
|
||||
|
||||
func (c Cache) postScrapeGallery(ctx context.Context, g ScrapedGallery) (ScrapedContent, error) {
|
||||
if err := txn.WithTxn(ctx, c.txnManager, func(ctx context.Context) error {
|
||||
if err := txn.WithReadTxn(ctx, c.txnManager, func(ctx context.Context) error {
|
||||
pqb := c.repository.PerformerFinder
|
||||
tqb := c.repository.TagFinder
|
||||
sqb := c.repository.StudioFinder
|
||||
|
||||
@@ -131,7 +131,7 @@ func (c Client) FindStashBoxSceneByFingerprints(ctx context.Context, sceneID int
|
||||
func (c Client) FindStashBoxScenesByFingerprints(ctx context.Context, ids []int) ([][]*scraper.ScrapedScene, error) {
|
||||
var fingerprints [][]*graphql.FingerprintQueryInput
|
||||
|
||||
if err := txn.WithTxn(ctx, c.txnManager, func(ctx context.Context) error {
|
||||
if err := txn.WithReadTxn(ctx, c.txnManager, func(ctx context.Context) error {
|
||||
qb := c.repository.Scene
|
||||
|
||||
for _, sceneID := range ids {
|
||||
@@ -245,7 +245,7 @@ func (c Client) SubmitStashBoxFingerprints(ctx context.Context, sceneIDs []strin
|
||||
|
||||
var fingerprints []graphql.FingerprintSubmission
|
||||
|
||||
if err := txn.WithTxn(ctx, c.txnManager, func(ctx context.Context) error {
|
||||
if err := txn.WithReadTxn(ctx, c.txnManager, func(ctx context.Context) error {
|
||||
qb := c.repository.Scene
|
||||
|
||||
for _, sceneID := range ids {
|
||||
@@ -386,7 +386,7 @@ func (c Client) FindStashBoxPerformersByNames(ctx context.Context, performerIDs
|
||||
|
||||
var performers []*models.Performer
|
||||
|
||||
if err := txn.WithTxn(ctx, c.txnManager, func(ctx context.Context) error {
|
||||
if err := txn.WithReadTxn(ctx, c.txnManager, func(ctx context.Context) error {
|
||||
qb := c.repository.Performer
|
||||
|
||||
for _, performerID := range ids {
|
||||
@@ -420,7 +420,7 @@ func (c Client) FindStashBoxPerformersByPerformerNames(ctx context.Context, perf
|
||||
|
||||
var performers []*models.Performer
|
||||
|
||||
if err := txn.WithTxn(ctx, c.txnManager, func(ctx context.Context) error {
|
||||
if err := txn.WithReadTxn(ctx, c.txnManager, func(ctx context.Context) error {
|
||||
qb := c.repository.Performer
|
||||
|
||||
for _, performerID := range ids {
|
||||
@@ -705,7 +705,7 @@ func (c Client) sceneFragmentToScrapedScene(ctx context.Context, s *graphql.Scen
|
||||
ss.Image = getFirstImage(ctx, c.getHTTPClient(), s.Images)
|
||||
}
|
||||
|
||||
if err := txn.WithTxn(ctx, c.txnManager, func(ctx context.Context) error {
|
||||
if err := txn.WithReadTxn(ctx, c.txnManager, func(ctx context.Context) error {
|
||||
pqb := c.repository.Performer
|
||||
tqb := c.repository.Tag
|
||||
|
||||
|
||||
@@ -8,7 +8,6 @@ import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/fvbommel/sortorder"
|
||||
@@ -73,7 +72,7 @@ type Database struct {
|
||||
|
||||
schemaVersion uint
|
||||
|
||||
writeMu sync.Mutex
|
||||
lockChan chan struct{}
|
||||
}
|
||||
|
||||
func NewDatabase() *Database {
|
||||
@@ -87,6 +86,7 @@ func NewDatabase() *Database {
|
||||
Image: NewImageStore(fileStore),
|
||||
Gallery: NewGalleryStore(fileStore, folderStore),
|
||||
Performer: NewPerformerStore(),
|
||||
lockChan: make(chan struct{}, 1),
|
||||
}
|
||||
|
||||
return ret
|
||||
@@ -106,8 +106,8 @@ func (db *Database) Ready() error {
|
||||
// necessary migrations must be run separately using RunMigrations.
|
||||
// Returns true if the database is new.
|
||||
func (db *Database) Open(dbPath string) error {
|
||||
db.writeMu.Lock()
|
||||
defer db.writeMu.Unlock()
|
||||
db.lockNoCtx()
|
||||
defer db.unlock()
|
||||
|
||||
db.dbPath = dbPath
|
||||
|
||||
@@ -152,9 +152,36 @@ func (db *Database) Open(dbPath string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// lock locks the database for writing.
|
||||
// This method will block until the lock is acquired of the context is cancelled.
|
||||
func (db *Database) lock(ctx context.Context) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case db.lockChan <- struct{}{}:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// lock locks the database for writing. This method will block until the lock is acquired.
|
||||
func (db *Database) lockNoCtx() {
|
||||
db.lockChan <- struct{}{}
|
||||
}
|
||||
|
||||
// unlock unlocks the database
|
||||
func (db *Database) unlock() {
|
||||
// will block the caller if the lock is not held, so check first
|
||||
select {
|
||||
case <-db.lockChan:
|
||||
return
|
||||
default:
|
||||
panic("database is not locked")
|
||||
}
|
||||
}
|
||||
|
||||
func (db *Database) Close() error {
|
||||
db.writeMu.Lock()
|
||||
defer db.writeMu.Unlock()
|
||||
db.lockNoCtx()
|
||||
defer db.unlock()
|
||||
|
||||
if db.db != nil {
|
||||
if err := db.db.Close(); err != nil {
|
||||
|
||||
@@ -929,18 +929,15 @@ func Test_imageQueryBuilder_Destroy(t *testing.T) {
|
||||
for _, tt := range tests {
|
||||
runWithRollbackTxn(t, tt.name, func(t *testing.T, ctx context.Context) {
|
||||
assert := assert.New(t)
|
||||
withRollbackTxn(func(ctx context.Context) error {
|
||||
if err := qb.Destroy(ctx, tt.id); (err != nil) != tt.wantErr {
|
||||
t.Errorf("imageQueryBuilder.Destroy() error = %v, wantErr %v", err, tt.wantErr)
|
||||
}
|
||||
if err := qb.Destroy(ctx, tt.id); (err != nil) != tt.wantErr {
|
||||
t.Errorf("imageQueryBuilder.Destroy() error = %v, wantErr %v", err, tt.wantErr)
|
||||
}
|
||||
|
||||
// ensure cannot be found
|
||||
i, err := qb.Find(ctx, tt.id)
|
||||
// ensure cannot be found
|
||||
i, err := qb.Find(ctx, tt.id)
|
||||
|
||||
assert.NotNil(err)
|
||||
assert.Nil(i)
|
||||
return nil
|
||||
})
|
||||
assert.NotNil(err)
|
||||
assert.Nil(i)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1398,18 +1398,15 @@ func Test_sceneQueryBuilder_Destroy(t *testing.T) {
|
||||
for _, tt := range tests {
|
||||
runWithRollbackTxn(t, tt.name, func(t *testing.T, ctx context.Context) {
|
||||
assert := assert.New(t)
|
||||
withRollbackTxn(func(ctx context.Context) error {
|
||||
if err := qb.Destroy(ctx, tt.id); (err != nil) != tt.wantErr {
|
||||
t.Errorf("sceneQueryBuilder.Destroy() error = %v, wantErr %v", err, tt.wantErr)
|
||||
}
|
||||
if err := qb.Destroy(ctx, tt.id); (err != nil) != tt.wantErr {
|
||||
t.Errorf("sceneQueryBuilder.Destroy() error = %v, wantErr %v", err, tt.wantErr)
|
||||
}
|
||||
|
||||
// ensure cannot be found
|
||||
i, err := qb.Find(ctx, tt.id)
|
||||
// ensure cannot be found
|
||||
i, err := qb.Find(ctx, tt.id)
|
||||
|
||||
assert.NotNil(err)
|
||||
assert.Nil(i)
|
||||
return nil
|
||||
})
|
||||
assert.NotNil(err)
|
||||
assert.Nil(i)
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -1477,26 +1474,23 @@ func Test_sceneQueryBuilder_Find(t *testing.T) {
|
||||
for _, tt := range tests {
|
||||
runWithRollbackTxn(t, tt.name, func(t *testing.T, ctx context.Context) {
|
||||
assert := assert.New(t)
|
||||
withTxn(func(ctx context.Context) error {
|
||||
got, err := qb.Find(ctx, tt.id)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("sceneQueryBuilder.Find() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return nil
|
||||
got, err := qb.Find(ctx, tt.id)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("sceneQueryBuilder.Find() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
|
||||
if got != nil {
|
||||
// load relationships
|
||||
if err := loadSceneRelationships(ctx, *tt.want, got); err != nil {
|
||||
t.Errorf("loadSceneRelationships() error = %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if got != nil {
|
||||
// load relationships
|
||||
if err := loadSceneRelationships(ctx, *tt.want, got); err != nil {
|
||||
t.Errorf("loadSceneRelationships() error = %v", err)
|
||||
return nil
|
||||
}
|
||||
clearSceneFileIDs(got)
|
||||
}
|
||||
|
||||
clearSceneFileIDs(got)
|
||||
}
|
||||
|
||||
assert.Equal(tt.want, got)
|
||||
return nil
|
||||
})
|
||||
assert.Equal(tt.want, got)
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -1620,23 +1614,19 @@ func Test_sceneQueryBuilder_FindByChecksum(t *testing.T) {
|
||||
|
||||
for _, tt := range tests {
|
||||
runWithRollbackTxn(t, tt.name, func(t *testing.T, ctx context.Context) {
|
||||
withTxn(func(ctx context.Context) error {
|
||||
assert := assert.New(t)
|
||||
got, err := qb.FindByChecksum(ctx, tt.checksum)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("sceneQueryBuilder.FindByChecksum() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return nil
|
||||
}
|
||||
assert := assert.New(t)
|
||||
got, err := qb.FindByChecksum(ctx, tt.checksum)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("sceneQueryBuilder.FindByChecksum() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
|
||||
if err := postFindScenes(ctx, tt.want, got); err != nil {
|
||||
t.Errorf("loadSceneRelationships() error = %v", err)
|
||||
return nil
|
||||
}
|
||||
if err := postFindScenes(ctx, tt.want, got); err != nil {
|
||||
t.Errorf("loadSceneRelationships() error = %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
assert.Equal(tt.want, got)
|
||||
|
||||
return nil
|
||||
})
|
||||
assert.Equal(tt.want, got)
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -1694,23 +1684,20 @@ func Test_sceneQueryBuilder_FindByOSHash(t *testing.T) {
|
||||
|
||||
for _, tt := range tests {
|
||||
runWithRollbackTxn(t, tt.name, func(t *testing.T, ctx context.Context) {
|
||||
withTxn(func(ctx context.Context) error {
|
||||
got, err := qb.FindByOSHash(ctx, tt.oshash)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("sceneQueryBuilder.FindByOSHash() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return nil
|
||||
}
|
||||
got, err := qb.FindByOSHash(ctx, tt.oshash)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("sceneQueryBuilder.FindByOSHash() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
|
||||
if err := postFindScenes(ctx, tt.want, got); err != nil {
|
||||
t.Errorf("loadSceneRelationships() error = %v", err)
|
||||
return nil
|
||||
}
|
||||
if err := postFindScenes(ctx, tt.want, got); err != nil {
|
||||
t.Errorf("loadSceneRelationships() error = %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("sceneQueryBuilder.FindByOSHash() = %v, want %v", got, tt.want)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("sceneQueryBuilder.FindByOSHash() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -1768,23 +1755,19 @@ func Test_sceneQueryBuilder_FindByPath(t *testing.T) {
|
||||
|
||||
for _, tt := range tests {
|
||||
runWithRollbackTxn(t, tt.name, func(t *testing.T, ctx context.Context) {
|
||||
withTxn(func(ctx context.Context) error {
|
||||
assert := assert.New(t)
|
||||
got, err := qb.FindByPath(ctx, tt.path)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("sceneQueryBuilder.FindByPath() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return nil
|
||||
}
|
||||
assert := assert.New(t)
|
||||
got, err := qb.FindByPath(ctx, tt.path)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("sceneQueryBuilder.FindByPath() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
|
||||
if err := postFindScenes(ctx, tt.want, got); err != nil {
|
||||
t.Errorf("loadSceneRelationships() error = %v", err)
|
||||
return nil
|
||||
}
|
||||
if err := postFindScenes(ctx, tt.want, got); err != nil {
|
||||
t.Errorf("loadSceneRelationships() error = %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
assert.Equal(tt.want, got)
|
||||
|
||||
return nil
|
||||
})
|
||||
assert.Equal(tt.want, got)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@ type key int
|
||||
const (
|
||||
txnKey key = iota + 1
|
||||
dbKey
|
||||
exclusiveKey
|
||||
)
|
||||
|
||||
func (db *Database) WithDatabase(ctx context.Context) (context.Context, error) {
|
||||
@@ -28,7 +29,7 @@ func (db *Database) WithDatabase(ctx context.Context) (context.Context, error) {
|
||||
return context.WithValue(ctx, dbKey, db.db), nil
|
||||
}
|
||||
|
||||
func (db *Database) Begin(ctx context.Context) (context.Context, error) {
|
||||
func (db *Database) Begin(ctx context.Context, exclusive bool) (context.Context, error) {
|
||||
if tx, _ := getTx(ctx); tx != nil {
|
||||
// log the stack trace so we can see
|
||||
logger.Error(string(debug.Stack()))
|
||||
@@ -36,11 +37,23 @@ func (db *Database) Begin(ctx context.Context) (context.Context, error) {
|
||||
return nil, fmt.Errorf("already in transaction")
|
||||
}
|
||||
|
||||
if exclusive {
|
||||
if err := db.lock(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
tx, err := db.db.BeginTxx(ctx, nil)
|
||||
if err != nil {
|
||||
// begin failed, unlock
|
||||
if exclusive {
|
||||
db.unlock()
|
||||
}
|
||||
return nil, fmt.Errorf("beginning transaction: %w", err)
|
||||
}
|
||||
|
||||
ctx = context.WithValue(ctx, exclusiveKey, exclusive)
|
||||
|
||||
return context.WithValue(ctx, txnKey, tx), nil
|
||||
}
|
||||
|
||||
@@ -50,6 +63,8 @@ func (db *Database) Commit(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
defer db.txnComplete(ctx)
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -63,6 +78,8 @@ func (db *Database) Rollback(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
defer db.txnComplete(ctx)
|
||||
|
||||
if err := tx.Rollback(); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -70,6 +87,12 @@ func (db *Database) Rollback(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *Database) txnComplete(ctx context.Context) {
|
||||
if exclusive := ctx.Value(exclusiveKey).(bool); exclusive {
|
||||
db.unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func getTx(ctx context.Context) (*sqlx.Tx, error) {
|
||||
tx, ok := ctx.Value(txnKey).(*sqlx.Tx)
|
||||
if !ok || tx == nil {
|
||||
|
||||
243
pkg/sqlite/transaction_test.go
Normal file
243
pkg/sqlite/transaction_test.go
Normal file
@@ -0,0 +1,243 @@
|
||||
//go:build integration
|
||||
// +build integration
|
||||
|
||||
package sqlite_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/stashapp/stash/pkg/models"
|
||||
"github.com/stashapp/stash/pkg/txn"
|
||||
)
|
||||
|
||||
// this test is left commented out as it is not deterministic.
|
||||
// func TestConcurrentExclusiveTxn(t *testing.T) {
|
||||
// const (
|
||||
// workers = 8
|
||||
// loops = 100
|
||||
// innerLoops = 10
|
||||
// sleepTime = 2 * time.Millisecond
|
||||
// )
|
||||
// ctx := context.Background()
|
||||
|
||||
// var wg sync.WaitGroup
|
||||
// for k := 0; k < workers; k++ {
|
||||
// wg.Add(1)
|
||||
// go func(wk int) {
|
||||
// for l := 0; l < loops; l++ {
|
||||
// // change this to WithReadTxn to see locked database error
|
||||
// if err := txn.WithTxn(ctx, db, func(ctx context.Context) error {
|
||||
// for ll := 0; ll < innerLoops; ll++ {
|
||||
// scene := &models.Scene{
|
||||
// Title: "test",
|
||||
// }
|
||||
|
||||
// if err := db.Scene.Create(ctx, scene, nil); err != nil {
|
||||
// return err
|
||||
// }
|
||||
|
||||
// if err := db.Scene.Destroy(ctx, scene.ID); err != nil {
|
||||
// return err
|
||||
// }
|
||||
// }
|
||||
// time.Sleep(sleepTime)
|
||||
|
||||
// return nil
|
||||
// }); err != nil {
|
||||
// t.Errorf("worker %d loop %d: %v", wk, l, err)
|
||||
// }
|
||||
// }
|
||||
|
||||
// wg.Done()
|
||||
// }(k)
|
||||
// }
|
||||
|
||||
// wg.Wait()
|
||||
// }
|
||||
|
||||
func TestConcurrentReadTxn(t *testing.T) {
|
||||
var wg sync.WaitGroup
|
||||
ctx := context.Background()
|
||||
c := make(chan struct{}, 1)
|
||||
|
||||
// first thread
|
||||
wg.Add(2)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if err := txn.WithReadTxn(ctx, db, func(ctx context.Context) error {
|
||||
scene := &models.Scene{
|
||||
Title: "test",
|
||||
}
|
||||
|
||||
if err := db.Scene.Create(ctx, scene, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// wait for other thread to start
|
||||
c <- struct{}{}
|
||||
<-c
|
||||
|
||||
if err := db.Scene.Destroy(ctx, scene.ID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
t.Errorf("unexpected error in first thread: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// second thread
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
_ = txn.WithReadTxn(ctx, db, func(ctx context.Context) error {
|
||||
// wait for first thread
|
||||
<-c
|
||||
defer func() {
|
||||
c <- struct{}{}
|
||||
}()
|
||||
|
||||
scene := &models.Scene{
|
||||
Title: "test",
|
||||
}
|
||||
|
||||
// expect error when we try to do this, as the other thread has already
|
||||
// modified this table
|
||||
if err := db.Scene.Create(ctx, scene, nil); err != nil {
|
||||
if !db.IsLocked(err) {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
return err
|
||||
} else {
|
||||
t.Errorf("expected locked error in second thread")
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestConcurrentExclusiveAndReadTxn(t *testing.T) {
|
||||
var wg sync.WaitGroup
|
||||
ctx := context.Background()
|
||||
c := make(chan struct{}, 1)
|
||||
|
||||
// first thread
|
||||
wg.Add(2)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if err := txn.WithTxn(ctx, db, func(ctx context.Context) error {
|
||||
scene := &models.Scene{
|
||||
Title: "test",
|
||||
}
|
||||
|
||||
if err := db.Scene.Create(ctx, scene, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// wait for other thread to start
|
||||
c <- struct{}{}
|
||||
<-c
|
||||
|
||||
if err := db.Scene.Destroy(ctx, scene.ID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
t.Errorf("unexpected error in first thread: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// second thread
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
_ = txn.WithReadTxn(ctx, db, func(ctx context.Context) error {
|
||||
// wait for first thread
|
||||
<-c
|
||||
defer func() {
|
||||
c <- struct{}{}
|
||||
}()
|
||||
|
||||
if _, err := db.Scene.Find(ctx, sceneIDs[sceneIdx1WithPerformer]); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// this test is left commented out as it is not deterministic.
|
||||
// func TestConcurrentExclusiveAndReadTxns(t *testing.T) {
|
||||
// const (
|
||||
// writeWorkers = 4
|
||||
// readWorkers = 4
|
||||
// loops = 200
|
||||
// innerLoops = 10
|
||||
// sleepTime = 1 * time.Millisecond
|
||||
// )
|
||||
// ctx := context.Background()
|
||||
|
||||
// var wg sync.WaitGroup
|
||||
// for k := 0; k < writeWorkers; k++ {
|
||||
// wg.Add(1)
|
||||
// go func(wk int) {
|
||||
// for l := 0; l < loops; l++ {
|
||||
// if err := txn.WithTxn(ctx, db, func(ctx context.Context) error {
|
||||
// for ll := 0; ll < innerLoops; ll++ {
|
||||
// scene := &models.Scene{
|
||||
// Title: "test",
|
||||
// }
|
||||
|
||||
// if err := db.Scene.Create(ctx, scene, nil); err != nil {
|
||||
// return err
|
||||
// }
|
||||
|
||||
// if err := db.Scene.Destroy(ctx, scene.ID); err != nil {
|
||||
// return err
|
||||
// }
|
||||
// }
|
||||
// time.Sleep(sleepTime)
|
||||
|
||||
// return nil
|
||||
// }); err != nil {
|
||||
// t.Errorf("write worker %d loop %d: %v", wk, l, err)
|
||||
// }
|
||||
// }
|
||||
|
||||
// wg.Done()
|
||||
// }(k)
|
||||
// }
|
||||
|
||||
// for k := 0; k < readWorkers; k++ {
|
||||
// wg.Add(1)
|
||||
// go func(wk int) {
|
||||
// for l := 0; l < loops; l++ {
|
||||
// if err := txn.WithReadTxn(ctx, db, func(ctx context.Context) error {
|
||||
// for ll := 0; ll < innerLoops; ll++ {
|
||||
// if _, err := db.Scene.Find(ctx, sceneIDs[ll%totalScenes]); err != nil {
|
||||
// return err
|
||||
// }
|
||||
// }
|
||||
// time.Sleep(sleepTime)
|
||||
|
||||
// return nil
|
||||
// }); err != nil {
|
||||
// t.Errorf("read worker %d loop %d: %v", wk, l, err)
|
||||
// }
|
||||
// }
|
||||
|
||||
// wg.Done()
|
||||
// }(k)
|
||||
// }
|
||||
|
||||
// wg.Wait()
|
||||
// }
|
||||
@@ -6,7 +6,7 @@ import (
|
||||
)
|
||||
|
||||
type Manager interface {
|
||||
Begin(ctx context.Context) (context.Context, error)
|
||||
Begin(ctx context.Context, exclusive bool) (context.Context, error)
|
||||
Commit(ctx context.Context) error
|
||||
Rollback(ctx context.Context) error
|
||||
|
||||
@@ -17,18 +17,43 @@ type DatabaseProvider interface {
|
||||
WithDatabase(ctx context.Context) (context.Context, error)
|
||||
}
|
||||
|
||||
type DatabaseProviderManager interface {
|
||||
DatabaseProvider
|
||||
Manager
|
||||
}
|
||||
|
||||
type TxnFunc func(ctx context.Context) error
|
||||
|
||||
// WithTxn executes fn in a transaction. If fn returns an error then
|
||||
// the transaction is rolled back. Otherwise it is committed.
|
||||
// Transaction is exclusive. Only one thread may run a transaction
|
||||
// using this function at a time. This function will wait until the
|
||||
// lock is available before executing.
|
||||
// This function should be used for making changes to the database.
|
||||
func WithTxn(ctx context.Context, m Manager, fn TxnFunc) error {
|
||||
const execComplete = true
|
||||
return withTxn(ctx, m, fn, execComplete)
|
||||
const (
|
||||
execComplete = true
|
||||
exclusive = true
|
||||
)
|
||||
return withTxn(ctx, m, fn, exclusive, execComplete)
|
||||
}
|
||||
|
||||
func withTxn(ctx context.Context, m Manager, fn TxnFunc, execCompleteOnLocked bool) error {
|
||||
// WithReadTxn executes fn in a transaction. If fn returns an error then
|
||||
// the transaction is rolled back. Otherwise it is committed.
|
||||
// Transaction is not exclusive and does not enforce read-only restrictions.
|
||||
// Multiple threads can run transactions using this function concurrently,
|
||||
// but concurrent writes may result in locked database error.
|
||||
func WithReadTxn(ctx context.Context, m Manager, fn TxnFunc) error {
|
||||
const (
|
||||
execComplete = true
|
||||
exclusive = false
|
||||
)
|
||||
return withTxn(ctx, m, fn, exclusive, execComplete)
|
||||
}
|
||||
|
||||
func withTxn(ctx context.Context, m Manager, fn TxnFunc, exclusive bool, execCompleteOnLocked bool) error {
|
||||
var err error
|
||||
ctx, err = begin(ctx, m)
|
||||
ctx, err = begin(ctx, m, exclusive)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -59,9 +84,9 @@ func withTxn(ctx context.Context, m Manager, fn TxnFunc, execCompleteOnLocked bo
|
||||
return err
|
||||
}
|
||||
|
||||
func begin(ctx context.Context, m Manager) (context.Context, error) {
|
||||
func begin(ctx context.Context, m Manager, exclusive bool) (context.Context, error) {
|
||||
var err error
|
||||
ctx, err = m.Begin(ctx)
|
||||
ctx, err = m.Begin(ctx, exclusive)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -102,6 +127,9 @@ func WithDatabase(ctx context.Context, p DatabaseProvider, fn TxnFunc) error {
|
||||
return fn(ctx)
|
||||
}
|
||||
|
||||
// Retryer is a provides WithTxn function that retries the transaction
|
||||
// if it fails with a locked database error.
|
||||
// Transactions are run in exclusive mode.
|
||||
type Retryer struct {
|
||||
Manager Manager
|
||||
// use value < 0 to retry forever
|
||||
@@ -113,8 +141,11 @@ func (r Retryer) WithTxn(ctx context.Context, fn TxnFunc) error {
|
||||
var attempt int
|
||||
var err error
|
||||
for attempt = 1; attempt <= r.Retries || r.Retries < 0; attempt++ {
|
||||
const execComplete = false
|
||||
err = withTxn(ctx, r.Manager, fn, execComplete)
|
||||
const (
|
||||
execComplete = false
|
||||
exclusive = true
|
||||
)
|
||||
err = withTxn(ctx, r.Manager, fn, exclusive, execComplete)
|
||||
|
||||
if err == nil {
|
||||
return nil
|
||||
|
||||
Reference in New Issue
Block a user