fix(dolt): add lock retry and stale lock cleanup for operational reliability (#1260)
Adds operational improvements to the Dolt storage backend for increased reliability in production environments with concurrent clients: 1. Lock retry with exponential backoff: - Add LockRetries and LockRetryDelay config options - Automatic retry on lock contention (default: 30 retries, ~6s window) - Exponential backoff starting at 100ms - Handles transient format version errors during manifest updates 2. Stale lock file cleanup: - Detect and clean orphaned .dolt/noms/LOCK files on startup - Prevents "database is read only" errors after crashes - Only removes empty locks older than 5 seconds 3. Transient error detection: - isTransientDoltError() detects retryable conditions - isLockError() identifies lock contention scenarios - cleanupStaleDoltLock() safely removes orphaned locks These improvements address common issues in multi-process environments where the Dolt embedded driver creates exclusive locks that persist after unexpected termination. Co-authored-by: upstream_syncer <matthew.baker@pihealth.ai> Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -52,12 +52,14 @@ type DoltStore struct {
|
|||||||
|
|
||||||
// Config holds Dolt database configuration
|
// Config holds Dolt database configuration
|
||||||
type Config struct {
|
type Config struct {
|
||||||
Path string // Path to Dolt database directory
|
Path string // Path to Dolt database directory
|
||||||
CommitterName string // Git-style committer name
|
CommitterName string // Git-style committer name
|
||||||
CommitterEmail string // Git-style committer email
|
CommitterEmail string // Git-style committer email
|
||||||
Remote string // Default remote name (e.g., "origin")
|
Remote string // Default remote name (e.g., "origin")
|
||||||
Database string // Database name within Dolt (default: "beads")
|
Database string // Database name within Dolt (default: "beads")
|
||||||
ReadOnly bool // Open in read-only mode (skip schema init)
|
ReadOnly bool // Open in read-only mode (skip schema init)
|
||||||
|
LockRetries int // Number of retries on lock contention (default: 30)
|
||||||
|
LockRetryDelay time.Duration // Initial retry delay (default: 100ms, doubles each retry)
|
||||||
|
|
||||||
// Server mode options (federation)
|
// Server mode options (federation)
|
||||||
ServerMode bool // Connect to dolt sql-server instead of embedded
|
ServerMode bool // Connect to dolt sql-server instead of embedded
|
||||||
@@ -92,6 +94,13 @@ func New(ctx context.Context, cfg *Config) (*DoltStore, error) {
|
|||||||
if cfg.Remote == "" {
|
if cfg.Remote == "" {
|
||||||
cfg.Remote = "origin"
|
cfg.Remote = "origin"
|
||||||
}
|
}
|
||||||
|
// Lock retry defaults
|
||||||
|
if cfg.LockRetries == 0 {
|
||||||
|
cfg.LockRetries = 30 // ~6 seconds with exponential backoff
|
||||||
|
}
|
||||||
|
if cfg.LockRetryDelay == 0 {
|
||||||
|
cfg.LockRetryDelay = 100 * time.Millisecond
|
||||||
|
}
|
||||||
|
|
||||||
// Server mode defaults
|
// Server mode defaults
|
||||||
if cfg.ServerMode {
|
if cfg.ServerMode {
|
||||||
@@ -115,6 +124,16 @@ func New(ctx context.Context, cfg *Config) (*DoltStore, error) {
|
|||||||
return nil, fmt.Errorf("failed to create database directory: %w", err)
|
return nil, fmt.Errorf("failed to create database directory: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Clean up stale LOCK file if present (for embedded mode only)
|
||||||
|
// The Dolt embedded driver creates a LOCK file in .dolt/noms/ that may persist
|
||||||
|
// after crashes or unexpected termination. This causes "database is read only" errors.
|
||||||
|
if !cfg.ServerMode {
|
||||||
|
if err := cleanupStaleDoltLock(cfg.Path, cfg.Database); err != nil {
|
||||||
|
// Log but don't fail - the lock may be legitimately held
|
||||||
|
fmt.Fprintf(os.Stderr, "Warning: could not check/clean Dolt lock: %v\n", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var db *sql.DB
|
var db *sql.DB
|
||||||
var connStr string
|
var connStr string
|
||||||
var err error
|
var err error
|
||||||
@@ -164,40 +183,84 @@ func New(ctx context.Context, cfg *Config) (*DoltStore, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// openEmbeddedConnection opens a connection using the embedded Dolt driver
|
// openEmbeddedConnection opens a connection using the embedded Dolt driver
|
||||||
|
// with retry logic for transient errors (lock contention, format version issues)
|
||||||
func openEmbeddedConnection(ctx context.Context, cfg *Config) (*sql.DB, string, error) {
|
func openEmbeddedConnection(ctx context.Context, cfg *Config) (*sql.DB, string, error) {
|
||||||
// First, connect without specifying a database to create it if needed
|
// Build connection string - we use a single connection and switch databases using USE.
|
||||||
initConnStr := fmt.Sprintf(
|
// The Dolt embedded driver shares internal state between connections to the same path.
|
||||||
|
connStr := fmt.Sprintf(
|
||||||
"file://%s?commitname=%s&commitemail=%s",
|
"file://%s?commitname=%s&commitemail=%s",
|
||||||
cfg.Path, cfg.CommitterName, cfg.CommitterEmail)
|
cfg.Path, cfg.CommitterName, cfg.CommitterEmail)
|
||||||
|
|
||||||
initDB, err := sql.Open("dolt", initConnStr)
|
// Retry logic for transient Dolt errors (lock contention, format version issues)
|
||||||
if err != nil {
|
var db *sql.DB
|
||||||
return nil, "", fmt.Errorf("failed to open Dolt for initialization: %w", err)
|
var lastErr error
|
||||||
|
retryDelay := cfg.LockRetryDelay
|
||||||
|
|
||||||
|
for attempt := 0; attempt <= cfg.LockRetries; attempt++ {
|
||||||
|
if attempt > 0 {
|
||||||
|
// Log transient error for debugging
|
||||||
|
fmt.Fprintf(os.Stderr, "Dolt transient error detected (attempt %d/%d), retrying in %v...\n",
|
||||||
|
attempt, cfg.LockRetries, retryDelay)
|
||||||
|
time.Sleep(retryDelay)
|
||||||
|
// Exponential backoff
|
||||||
|
retryDelay *= 2
|
||||||
|
}
|
||||||
|
|
||||||
|
db, lastErr = sql.Open("dolt", connStr)
|
||||||
|
if lastErr != nil {
|
||||||
|
if isTransientDoltError(lastErr) {
|
||||||
|
continue // Retry
|
||||||
|
}
|
||||||
|
return nil, "", fmt.Errorf("failed to open Dolt database: %w", lastErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the database if it doesn't exist
|
||||||
|
_, lastErr = db.ExecContext(ctx, fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", cfg.Database))
|
||||||
|
if lastErr != nil {
|
||||||
|
if isTransientDoltError(lastErr) {
|
||||||
|
_ = db.Close()
|
||||||
|
continue // Retry
|
||||||
|
}
|
||||||
|
_ = db.Close()
|
||||||
|
return nil, "", fmt.Errorf("failed to create database: %w", lastErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Switch to the target database using USE
|
||||||
|
_, lastErr = db.ExecContext(ctx, fmt.Sprintf("USE %s", cfg.Database))
|
||||||
|
if lastErr != nil {
|
||||||
|
if isTransientDoltError(lastErr) {
|
||||||
|
_ = db.Close()
|
||||||
|
continue // Retry
|
||||||
|
}
|
||||||
|
_ = db.Close()
|
||||||
|
return nil, "", fmt.Errorf("failed to switch to database %s: %w", cfg.Database, lastErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Configure connection pool
|
||||||
|
// Dolt embedded mode is single-writer like SQLite
|
||||||
|
db.SetMaxOpenConns(1)
|
||||||
|
db.SetMaxIdleConns(1)
|
||||||
|
db.SetConnMaxLifetime(0)
|
||||||
|
|
||||||
|
// Test connection
|
||||||
|
lastErr = db.PingContext(ctx)
|
||||||
|
if lastErr != nil {
|
||||||
|
if isTransientDoltError(lastErr) {
|
||||||
|
_ = db.Close()
|
||||||
|
continue // Retry
|
||||||
|
}
|
||||||
|
_ = db.Close()
|
||||||
|
return nil, "", fmt.Errorf("failed to ping Dolt database: %w", lastErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Success! Break out of retry loop
|
||||||
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create the database if it doesn't exist
|
// Check if all retries exhausted
|
||||||
_, err = initDB.ExecContext(ctx, fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", cfg.Database))
|
if lastErr != nil {
|
||||||
if err != nil {
|
return nil, "", fmt.Errorf("failed to connect to Dolt database after %d retries: %w", cfg.LockRetries, lastErr)
|
||||||
_ = initDB.Close()
|
|
||||||
return nil, "", fmt.Errorf("failed to create database: %w", err)
|
|
||||||
}
|
}
|
||||||
_ = initDB.Close()
|
|
||||||
|
|
||||||
// Now connect with the database specified
|
|
||||||
connStr := fmt.Sprintf(
|
|
||||||
"file://%s?commitname=%s&commitemail=%s&database=%s",
|
|
||||||
cfg.Path, cfg.CommitterName, cfg.CommitterEmail, cfg.Database)
|
|
||||||
|
|
||||||
db, err := sql.Open("dolt", connStr)
|
|
||||||
if err != nil {
|
|
||||||
return nil, "", fmt.Errorf("failed to open Dolt database: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Configure connection pool
|
|
||||||
// Dolt embedded mode is single-writer like SQLite
|
|
||||||
db.SetMaxOpenConns(1)
|
|
||||||
db.SetMaxIdleConns(1)
|
|
||||||
db.SetConnMaxLifetime(0)
|
|
||||||
|
|
||||||
return db, connStr, nil
|
return db, connStr, nil
|
||||||
}
|
}
|
||||||
@@ -581,3 +644,72 @@ type StatusEntry struct {
|
|||||||
Table string
|
Table string
|
||||||
Status string // "new", "modified", "deleted"
|
Status string // "new", "modified", "deleted"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// isTransientDoltError detects if an error is transient and should be retried.
|
||||||
|
// This includes lock errors and format version errors which can occur during
|
||||||
|
// concurrent access when the manifest is being updated.
|
||||||
|
func isTransientDoltError(err error) bool {
|
||||||
|
if err == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
// Check lock errors first
|
||||||
|
if isLockError(err) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
// Check for format version errors - these can occur transiently during
|
||||||
|
// concurrent manifest updates (e.g., during push/pull operations)
|
||||||
|
errStr := strings.ToLower(err.Error())
|
||||||
|
return strings.Contains(errStr, "invalid format version") ||
|
||||||
|
strings.Contains(errStr, "failed to load database") ||
|
||||||
|
strings.Contains(errStr, "manifest") && strings.Contains(errStr, "invalid")
|
||||||
|
}
|
||||||
|
|
||||||
|
// isLockError checks if an error is related to lock contention
|
||||||
|
func isLockError(err error) bool {
|
||||||
|
if err == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
errStr := strings.ToLower(err.Error())
|
||||||
|
return strings.Contains(errStr, "lock") ||
|
||||||
|
strings.Contains(errStr, "database is read only") ||
|
||||||
|
strings.Contains(errStr, "resource temporarily unavailable")
|
||||||
|
}
|
||||||
|
|
||||||
|
// cleanupStaleDoltLock removes stale LOCK files from the Dolt noms directory.
|
||||||
|
// The embedded Dolt driver creates a LOCK file that persists after crashes,
|
||||||
|
// causing subsequent opens to fail with "database is read only" errors.
|
||||||
|
func cleanupStaleDoltLock(dbPath string, database string) error {
|
||||||
|
// The LOCK file is in the noms directory under .dolt
|
||||||
|
// For a database at /path/to/dolt with database name "beads",
|
||||||
|
// the lock is at /path/to/dolt/beads/.dolt/noms/LOCK
|
||||||
|
lockPath := filepath.Join(dbPath, database, ".dolt", "noms", "LOCK")
|
||||||
|
|
||||||
|
info, err := os.Stat(lockPath)
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
// No lock file, nothing to do
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("stat lock file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if lock file is empty (Dolt creates empty LOCK files)
|
||||||
|
// An empty LOCK file is likely stale - the driver should have released it
|
||||||
|
if info.Size() == 0 {
|
||||||
|
// Check how old the lock is - if it's been more than a few seconds,
|
||||||
|
// it's likely stale from a crashed process
|
||||||
|
age := time.Since(info.ModTime())
|
||||||
|
if age > 5*time.Second {
|
||||||
|
fmt.Fprintf(os.Stderr, "Removing stale Dolt LOCK file (age: %v)\n", age.Round(time.Second))
|
||||||
|
if err := os.Remove(lockPath); err != nil {
|
||||||
|
return fmt.Errorf("remove stale lock: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// Lock is recent, might be held by another process
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Non-empty lock file - might contain PID info, don't touch it
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user