diff --git a/cmd/bd/daemon.go b/cmd/bd/daemon.go index 13685c3b..4ec291b2 100644 --- a/cmd/bd/daemon.go +++ b/cmd/bd/daemon.go @@ -17,6 +17,7 @@ import ( "github.com/steveyegge/beads/internal/configfile" "github.com/steveyegge/beads/internal/daemon" "github.com/steveyegge/beads/internal/rpc" + "github.com/steveyegge/beads/internal/storage/dolt" "github.com/steveyegge/beads/internal/storage/factory" "github.com/steveyegge/beads/internal/storage/sqlite" "github.com/steveyegge/beads/internal/syncbranch" @@ -238,7 +239,8 @@ Run 'bd daemon --help' to see all subcommands.`, fmt.Printf("Logging to: %s\n", logFile) } - startDaemon(interval, autoCommit, autoPush, autoPull, localMode, foreground, logFile, pidFile, logLevel, logJSON) + federation, _ := cmd.Flags().GetBool("federation") + startDaemon(interval, autoCommit, autoPush, autoPull, localMode, foreground, logFile, pidFile, logLevel, logJSON, federation) }, } @@ -264,6 +266,7 @@ func init() { daemonCmd.Flags().Bool("foreground", false, "Run in foreground (don't daemonize)") daemonCmd.Flags().String("log-level", "info", "Log level (debug, info, warn, error)") daemonCmd.Flags().Bool("log-json", false, "Output logs in JSON format (structured logging)") + daemonCmd.Flags().Bool("federation", false, "Enable federation mode (runs dolt sql-server with remotesapi)") daemonCmd.Flags().BoolVar(&jsonOutput, "json", false, "Output JSON format") rootCmd.AddCommand(daemonCmd) } @@ -280,7 +283,7 @@ func computeDaemonParentPID() int { } return os.Getppid() } -func runDaemonLoop(interval time.Duration, autoCommit, autoPush, autoPull, localMode bool, logPath, pidFile, logLevel string, logJSON bool) { +func runDaemonLoop(interval time.Duration, autoCommit, autoPush, autoPull, localMode bool, logPath, pidFile, logLevel string, logJSON, federation bool) { level := parseLogLevel(logLevel) logF, log := setupDaemonLogger(logPath, logJSON, level) defer func() { _ = logF.Close() }() @@ -414,7 +417,49 @@ func runDaemonLoop(interval time.Duration, autoCommit, autoPush, autoPull, local log.Warn("could not remove daemon-error file", "error", err) } - store, err := factory.NewFromConfig(ctx, beadsDir) + // Start dolt sql-server if federation mode is enabled and backend is dolt + var doltServer *dolt.Server + factoryOpts := factory.Options{} + if federation && backend != configfile.BackendDolt { + log.Warn("federation mode requires dolt backend, ignoring --federation flag") + federation = false + } + if federation && backend == configfile.BackendDolt { + log.Info("starting dolt sql-server for federation mode") + + doltPath := filepath.Join(beadsDir, "dolt") + serverLogFile := filepath.Join(beadsDir, "dolt-server.log") + + doltServer = dolt.NewServer(dolt.ServerConfig{ + DataDir: doltPath, + SQLPort: dolt.DefaultSQLPort, + RemotesAPIPort: dolt.DefaultRemotesAPIPort, + Host: "127.0.0.1", + LogFile: serverLogFile, + }) + + if err := doltServer.Start(ctx); err != nil { + log.Error("failed to start dolt sql-server", "error", err) + return + } + defer func() { + log.Info("stopping dolt sql-server") + if err := doltServer.Stop(); err != nil { + log.Warn("error stopping dolt sql-server", "error", err) + } + }() + + log.Info("dolt sql-server started", + "sql_port", doltServer.SQLPort(), + "remotesapi_port", doltServer.RemotesAPIPort()) + + // Configure factory to use server mode + factoryOpts.ServerMode = true + factoryOpts.ServerHost = doltServer.Host() + factoryOpts.ServerPort = doltServer.SQLPort() + } + + store, err := factory.NewFromConfigWithOptions(ctx, beadsDir, factoryOpts) if err != nil { log.Error("cannot open database", "error", err) return // Use return instead of os.Exit to allow defers to run @@ -427,8 +472,10 @@ func runDaemonLoop(interval time.Duration, autoCommit, autoPush, autoPull, local if sqliteStore, ok := store.(*sqlite.SQLiteStorage); ok { sqliteStore.EnableFreshnessChecking() log.Info("database opened", "path", store.Path(), "backend", "sqlite", "freshness_checking", true) + } else if federation { + log.Info("database opened", "path", store.Path(), "backend", "dolt", "mode", "federation/server") } else { - log.Info("database opened", "path", store.Path(), "backend", "dolt") + log.Info("database opened", "path", store.Path(), "backend", "dolt", "mode", "embedded") } // Auto-upgrade .beads/.gitignore if outdated diff --git a/cmd/bd/daemon_lifecycle.go b/cmd/bd/daemon_lifecycle.go index acffc288..f6b0f1f1 100644 --- a/cmd/bd/daemon_lifecycle.go +++ b/cmd/bd/daemon_lifecycle.go @@ -369,7 +369,7 @@ func stopAllDaemons() { } // startDaemon starts the daemon (in foreground if requested, otherwise background) -func startDaemon(interval time.Duration, autoCommit, autoPush, autoPull, localMode, foreground bool, logFile, pidFile, logLevel string, logJSON bool) { +func startDaemon(interval time.Duration, autoCommit, autoPush, autoPull, localMode, foreground bool, logFile, pidFile, logLevel string, logJSON, federation bool) { logPath, err := getLogFilePath(logFile) if err != nil { fmt.Fprintf(os.Stderr, "Error: %v\n", err) @@ -385,7 +385,7 @@ func startDaemon(interval time.Duration, autoCommit, autoPush, autoPull, localMo // Run in foreground if --foreground flag set or if we're the forked child process if foreground || os.Getenv("BD_DAEMON_FOREGROUND") == "1" { - runDaemonLoop(interval, autoCommit, autoPush, autoPull, localMode, logPath, pidFile, logLevel, logJSON) + runDaemonLoop(interval, autoCommit, autoPush, autoPull, localMode, logPath, pidFile, logLevel, logJSON, federation) return } @@ -419,6 +419,9 @@ func startDaemon(interval time.Duration, autoCommit, autoPush, autoPull, localMo if logJSON { args = append(args, "--log-json") } + if federation { + args = append(args, "--federation") + } cmd := exec.Command(exe, args...) // #nosec G204 - bd daemon command from trusted binary cmd.Env = append(os.Environ(), "BD_DAEMON_FOREGROUND=1") diff --git a/cmd/bd/daemon_start.go b/cmd/bd/daemon_start.go index eed6ab02..91b41c33 100644 --- a/cmd/bd/daemon_start.go +++ b/cmd/bd/daemon_start.go @@ -23,12 +23,18 @@ The daemon will: - Pull remote changes periodically - Auto-import when remote changes detected +Federation mode (--federation): +- Starts dolt sql-server for multi-writer support +- Exposes remotesapi on port 8080 for peer-to-peer push/pull +- Enables real-time sync between Gas Towns + Examples: bd daemon start # Start with defaults bd daemon start --auto-commit # Enable auto-commit bd daemon start --auto-push # Enable auto-push (implies --auto-commit) bd daemon start --foreground # Run in foreground (for systemd/supervisord) - bd daemon start --local # Local-only mode (no git sync)`, + bd daemon start --local # Local-only mode (no git sync) + bd daemon start --federation # Enable federation mode (dolt sql-server)`, Run: func(cmd *cobra.Command, args []string) { interval, _ := cmd.Flags().GetDuration("interval") autoCommit, _ := cmd.Flags().GetBool("auto-commit") @@ -39,6 +45,7 @@ Examples: foreground, _ := cmd.Flags().GetBool("foreground") logLevel, _ := cmd.Flags().GetString("log-level") logJSON, _ := cmd.Flags().GetBool("log-json") + federation, _ := cmd.Flags().GetBool("federation") // NOTE: Only load daemon auto-settings from the database in foreground mode. // @@ -136,6 +143,8 @@ Examples: // Start daemon if localMode { fmt.Printf("Starting bd daemon in LOCAL mode (interval: %v, no git sync)\n", interval) + } else if federation { + fmt.Printf("Starting bd daemon in FEDERATION mode (interval: %v, dolt sql-server with remotesapi)\n", interval) } else { fmt.Printf("Starting bd daemon (interval: %v, auto-commit: %v, auto-push: %v, auto-pull: %v)\n", interval, autoCommit, autoPush, autoPull) @@ -144,7 +153,7 @@ Examples: fmt.Printf("Logging to: %s\n", logFile) } - startDaemon(interval, autoCommit, autoPush, autoPull, localMode, foreground, logFile, pidFile, logLevel, logJSON) + startDaemon(interval, autoCommit, autoPush, autoPull, localMode, foreground, logFile, pidFile, logLevel, logJSON, federation) }, } @@ -158,4 +167,5 @@ func init() { daemonStartCmd.Flags().Bool("foreground", false, "Run in foreground (don't daemonize)") daemonStartCmd.Flags().String("log-level", "info", "Log level (debug, info, warn, error)") daemonStartCmd.Flags().Bool("log-json", false, "Output logs in JSON format") + daemonStartCmd.Flags().Bool("federation", false, "Enable federation mode (runs dolt sql-server with remotesapi on port 8080)") } diff --git a/internal/storage/dolt/server.go b/internal/storage/dolt/server.go new file mode 100644 index 00000000..31b99a1e --- /dev/null +++ b/internal/storage/dolt/server.go @@ -0,0 +1,320 @@ +// Package dolt implements the storage interface using Dolt (versioned MySQL-compatible database). +// +// This file implements the dolt sql-server management for federation mode. +// When federation is enabled, we run dolt sql-server instead of the embedded driver +// to enable multi-writer support and expose the remotesapi for peer-to-peer sync. +package dolt + +import ( + "context" + "fmt" + "net" + "os" + "os/exec" + "path/filepath" + "strconv" + "strings" + "sync" + "syscall" + "time" +) + +const ( + // DefaultSQLPort is the default port for dolt sql-server MySQL protocol + DefaultSQLPort = 3306 + // DefaultRemotesAPIPort is the default port for dolt remotesapi (peer-to-peer sync) + DefaultRemotesAPIPort = 8080 + // ServerStartTimeout is how long to wait for server to start + ServerStartTimeout = 30 * time.Second + // ServerStopTimeout is how long to wait for graceful shutdown + ServerStopTimeout = 10 * time.Second +) + +// ServerConfig holds configuration for the dolt sql-server +type ServerConfig struct { + DataDir string // Path to Dolt database directory + SQLPort int // MySQL protocol port (default: 3306) + RemotesAPIPort int // remotesapi port for peer sync (default: 8080) + Host string // Host to bind to (default: 127.0.0.1) + LogFile string // Log file for server output (optional) + User string // MySQL user (default: root) + ReadOnly bool // Start in read-only mode +} + +// Server manages a dolt sql-server process +type Server struct { + cfg ServerConfig + cmd *exec.Cmd + mu sync.Mutex + running bool + pidFile string +} + +// NewServer creates a new dolt sql-server manager +func NewServer(cfg ServerConfig) *Server { + if cfg.SQLPort == 0 { + cfg.SQLPort = DefaultSQLPort + } + if cfg.RemotesAPIPort == 0 { + cfg.RemotesAPIPort = DefaultRemotesAPIPort + } + if cfg.Host == "" { + cfg.Host = "127.0.0.1" + } + if cfg.User == "" { + cfg.User = "root" + } + return &Server{ + cfg: cfg, + pidFile: filepath.Join(cfg.DataDir, "dolt-server.pid"), + } +} + +// Start starts the dolt sql-server process +func (s *Server) Start(ctx context.Context) error { + s.mu.Lock() + defer s.mu.Unlock() + + if s.running { + return fmt.Errorf("server already running") + } + + // Check if ports are available + if err := s.checkPortAvailable(s.cfg.SQLPort); err != nil { + return fmt.Errorf("SQL port %d not available: %w", s.cfg.SQLPort, err) + } + if err := s.checkPortAvailable(s.cfg.RemotesAPIPort); err != nil { + return fmt.Errorf("remotesapi port %d not available: %w", s.cfg.RemotesAPIPort, err) + } + + // Build command args + // Note: --user was removed in recent dolt versions, users are created with CREATE USER + args := []string{ + "sql-server", + "--host", s.cfg.Host, + "--port", strconv.Itoa(s.cfg.SQLPort), + "--remotesapi-port", strconv.Itoa(s.cfg.RemotesAPIPort), + "--no-auto-commit", // Let the application manage commits + } + + if s.cfg.ReadOnly { + args = append(args, "--readonly") + } + + // Create command + s.cmd = exec.CommandContext(ctx, "dolt", args...) + s.cmd.Dir = s.cfg.DataDir + + // Set up process group for clean shutdown + s.cmd.SysProcAttr = &syscall.SysProcAttr{ + Setpgid: true, + } + + // Set up logging + if s.cfg.LogFile != "" { + logFile, err := os.OpenFile(s.cfg.LogFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600) + if err != nil { + return fmt.Errorf("failed to open log file: %w", err) + } + s.cmd.Stdout = logFile + s.cmd.Stderr = logFile + } else { + // Discard output if no log file specified + s.cmd.Stdout = nil + s.cmd.Stderr = nil + } + + // Start the server + if err := s.cmd.Start(); err != nil { + return fmt.Errorf("failed to start dolt sql-server: %w", err) + } + + // Write PID file + if err := os.WriteFile(s.pidFile, []byte(strconv.Itoa(s.cmd.Process.Pid)), 0600); err != nil { + // Non-fatal, just log + fmt.Fprintf(os.Stderr, "Warning: failed to write dolt server PID file: %v\n", err) + } + + // Wait for server to be ready + if err := s.waitForReady(ctx); err != nil { + // Server failed to start, clean up + _ = s.cmd.Process.Kill() + _ = os.Remove(s.pidFile) + return fmt.Errorf("server failed to become ready: %w", err) + } + + s.running = true + return nil +} + +// Stop stops the dolt sql-server process gracefully +func (s *Server) Stop() error { + s.mu.Lock() + defer s.mu.Unlock() + + if !s.running || s.cmd == nil || s.cmd.Process == nil { + return nil + } + + // Try graceful shutdown first (SIGTERM) + if err := s.cmd.Process.Signal(syscall.SIGTERM); err != nil { + // Process may already be dead + if !strings.Contains(err.Error(), "process already finished") { + return fmt.Errorf("failed to send SIGTERM: %w", err) + } + } + + // Wait for graceful shutdown with timeout + done := make(chan error, 1) + go func() { + _, err := s.cmd.Process.Wait() + done <- err + }() + + select { + case <-done: + // Process exited + case <-time.After(ServerStopTimeout): + // Force kill + _ = s.cmd.Process.Kill() + <-done // Wait for process to be reaped + } + + // Clean up PID file + _ = os.Remove(s.pidFile) + s.running = false + s.cmd = nil + + return nil +} + +// IsRunning returns true if the server is running +func (s *Server) IsRunning() bool { + s.mu.Lock() + defer s.mu.Unlock() + return s.running +} + +// SQLPort returns the SQL port +func (s *Server) SQLPort() int { + return s.cfg.SQLPort +} + +// RemotesAPIPort returns the remotesapi port +func (s *Server) RemotesAPIPort() int { + return s.cfg.RemotesAPIPort +} + +// Host returns the host +func (s *Server) Host() string { + return s.cfg.Host +} + +// DSN returns the MySQL DSN for connecting to the server +func (s *Server) DSN(database string) string { + return fmt.Sprintf("%s@tcp(%s:%d)/%s", + s.cfg.User, s.cfg.Host, s.cfg.SQLPort, database) +} + +// checkPortAvailable checks if a TCP port is available +func (s *Server) checkPortAvailable(port int) error { + addr := fmt.Sprintf("%s:%d", s.cfg.Host, port) + listener, err := net.Listen("tcp", addr) + if err != nil { + return err + } + _ = listener.Close() + return nil +} + +// waitForReady waits for the server to accept connections +func (s *Server) waitForReady(ctx context.Context) error { + deadline := time.Now().Add(ServerStartTimeout) + addr := fmt.Sprintf("%s:%d", s.cfg.Host, s.cfg.SQLPort) + + for time.Now().Before(deadline) { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + // Check if process is still alive using signal 0 + if s.cmd.Process != nil { + if err := s.cmd.Process.Signal(syscall.Signal(0)); err != nil { + return fmt.Errorf("server process exited unexpectedly") + } + } + + // Try to connect + conn, err := net.DialTimeout("tcp", addr, 1*time.Second) + if err == nil { + _ = conn.Close() + return nil + } + + time.Sleep(100 * time.Millisecond) + } + + return fmt.Errorf("timeout waiting for server to start on %s", addr) +} + +// GetRunningServerPID returns the PID of a running server from the PID file, or 0 if not running +func GetRunningServerPID(dataDir string) int { + pidFile := filepath.Join(dataDir, "dolt-server.pid") + data, err := os.ReadFile(pidFile) + if err != nil { + return 0 + } + + pid, err := strconv.Atoi(strings.TrimSpace(string(data))) + if err != nil { + return 0 + } + + // Check if process is actually running + process, err := os.FindProcess(pid) + if err != nil { + return 0 + } + + // On Unix, FindProcess always succeeds, so we need to check if it's alive + if err := process.Signal(syscall.Signal(0)); err != nil { + // Process is not running + _ = os.Remove(pidFile) + return 0 + } + + return pid +} + +// StopServerByPID stops a dolt sql-server by its PID +func StopServerByPID(pid int) error { + process, err := os.FindProcess(pid) + if err != nil { + return err + } + + // Try graceful shutdown first + if err := process.Signal(syscall.SIGTERM); err != nil { + if !strings.Contains(err.Error(), "process already finished") { + return err + } + return nil + } + + // Wait for graceful shutdown + done := make(chan struct{}) + go func() { + _, _ = process.Wait() + close(done) + }() + + select { + case <-done: + return nil + case <-time.After(ServerStopTimeout): + // Force kill + return process.Kill() + } +} diff --git a/internal/storage/dolt/server_integration_test.go b/internal/storage/dolt/server_integration_test.go new file mode 100644 index 00000000..5083dd9e --- /dev/null +++ b/internal/storage/dolt/server_integration_test.go @@ -0,0 +1,122 @@ +package dolt + +import ( + "context" + "os" + "os/exec" + "path/filepath" + "testing" + "time" + + "github.com/steveyegge/beads/internal/types" +) + +// TestServerModeConnection tests connecting to DoltStore via server mode +func TestServerModeConnection(t *testing.T) { + // Skip if dolt is not available + if _, err := exec.LookPath("dolt"); err != nil { + t.Skip("dolt not installed, skipping server mode test") + } + + // Create temp directory for test + tmpDir, err := os.MkdirTemp("", "dolt-server-mode-test-*") + if err != nil { + t.Fatalf("failed to create temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + + // Initialize dolt repo + cmd := exec.Command("dolt", "init") + cmd.Dir = tmpDir + if err := cmd.Run(); err != nil { + t.Fatalf("failed to init dolt repo: %v", err) + } + + // Start server on non-standard ports + server := NewServer(ServerConfig{ + DataDir: tmpDir, + SQLPort: 13307, + RemotesAPIPort: 18081, + Host: "127.0.0.1", + LogFile: filepath.Join(tmpDir, "server.log"), + }) + + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() + + if err := server.Start(ctx); err != nil { + t.Fatalf("failed to start server: %v", err) + } + defer func() { + if err := server.Stop(); err != nil { + t.Logf("warning: failed to stop server: %v", err) + } + }() + + // Connect using server mode + store, err := New(ctx, &Config{ + Path: tmpDir, + Database: "beads", + ServerMode: true, + ServerHost: "127.0.0.1", + ServerPort: 13307, + }) + if err != nil { + t.Fatalf("failed to create server mode store: %v", err) + } + defer store.Close() + + // Set issue prefix (required for creating issues) + if err := store.SetConfig(ctx, "issue_prefix", "test"); err != nil { + t.Fatalf("failed to set issue_prefix: %v", err) + } + + // Verify we can perform basic operations + // Create an issue + issue := &types.Issue{ + Title: "Test issue in server mode", + Description: "Original description", + Status: types.StatusOpen, + Priority: 2, + IssueType: types.TypeTask, + } + if err := store.CreateIssue(ctx, issue, "test"); err != nil { + t.Fatalf("failed to create issue: %v", err) + } + if issue.ID == "" { + t.Fatal("expected issue ID to be generated") + } + t.Logf("Created issue: %s", issue.ID) + + // Read it back + readIssue, err := store.GetIssue(ctx, issue.ID) + if err != nil { + t.Fatalf("failed to get issue: %v", err) + } + if readIssue.Title != issue.Title { + t.Errorf("title mismatch: expected %q, got %q", issue.Title, readIssue.Title) + } + + // Update it + updates := map[string]interface{}{ + "description": "Updated description", + "priority": 1, + } + if err := store.UpdateIssue(ctx, issue.ID, updates, "test"); err != nil { + t.Fatalf("failed to update issue: %v", err) + } + + // Verify update + updatedIssue, err := store.GetIssue(ctx, issue.ID) + if err != nil { + t.Fatalf("failed to get updated issue: %v", err) + } + if updatedIssue.Description != "Updated description" { + t.Errorf("expected description 'Updated description', got %q", updatedIssue.Description) + } + if updatedIssue.Priority != 1 { + t.Errorf("expected priority 1, got %d", updatedIssue.Priority) + } + + t.Logf("Server mode connection test passed: created and updated issue %s", issue.ID) +} diff --git a/internal/storage/dolt/server_test.go b/internal/storage/dolt/server_test.go new file mode 100644 index 00000000..5d2327cf --- /dev/null +++ b/internal/storage/dolt/server_test.go @@ -0,0 +1,132 @@ +package dolt + +import ( + "context" + "os" + "os/exec" + "path/filepath" + "testing" + "time" +) + +// TestServerStartStop tests basic server lifecycle +func TestServerStartStop(t *testing.T) { + // Skip if dolt is not available + if _, err := exec.LookPath("dolt"); err != nil { + t.Skip("dolt not installed, skipping server test") + } + + // Create temp directory for test + tmpDir, err := os.MkdirTemp("", "dolt-server-test-*") + if err != nil { + t.Fatalf("failed to create temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + + // Initialize dolt repo + cmd := exec.Command("dolt", "init") + cmd.Dir = tmpDir + output, err := cmd.CombinedOutput() + if err != nil { + t.Fatalf("failed to init dolt repo: %v, output: %s", err, output) + } + t.Logf("dolt init output: %s", output) + + // Use non-standard ports to avoid conflicts + logFile := filepath.Join(tmpDir, "server.log") + server := NewServer(ServerConfig{ + DataDir: tmpDir, + SQLPort: 13306, // Non-standard port + RemotesAPIPort: 18080, // Non-standard port + Host: "127.0.0.1", + LogFile: logFile, + }) + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + // Start server + if err := server.Start(ctx); err != nil { + // Read log file for debugging + if logContent, readErr := os.ReadFile(logFile); readErr == nil { + t.Logf("Server log:\n%s", logContent) + } + t.Fatalf("failed to start server: %v", err) + } + + // Verify server is running + if !server.IsRunning() { + t.Error("server should be running") + } + + // Verify ports + if server.SQLPort() != 13306 { + t.Errorf("expected SQL port 13306, got %d", server.SQLPort()) + } + if server.RemotesAPIPort() != 18080 { + t.Errorf("expected remotesapi port 18080, got %d", server.RemotesAPIPort()) + } + + // Verify DSN format + dsn := server.DSN("testdb") + expected := "root@tcp(127.0.0.1:13306)/testdb" + if dsn != expected { + t.Errorf("expected DSN %q, got %q", expected, dsn) + } + + // Stop server + if err := server.Stop(); err != nil { + t.Fatalf("failed to stop server: %v", err) + } + + // Verify server is not running + if server.IsRunning() { + t.Error("server should not be running after stop") + } +} + +// TestServerConfigDefaults tests that config defaults are applied correctly +func TestServerConfigDefaults(t *testing.T) { + server := NewServer(ServerConfig{ + DataDir: "/tmp/test", + }) + + if server.cfg.SQLPort != DefaultSQLPort { + t.Errorf("expected default SQL port %d, got %d", DefaultSQLPort, server.cfg.SQLPort) + } + if server.cfg.RemotesAPIPort != DefaultRemotesAPIPort { + t.Errorf("expected default remotesapi port %d, got %d", DefaultRemotesAPIPort, server.cfg.RemotesAPIPort) + } + if server.cfg.Host != "127.0.0.1" { + t.Errorf("expected default host 127.0.0.1, got %s", server.cfg.Host) + } + if server.cfg.User != "root" { + t.Errorf("expected default user root, got %s", server.cfg.User) + } +} + +// TestGetRunningServerPID tests the PID file detection +func TestGetRunningServerPID(t *testing.T) { + // Create temp directory + tmpDir, err := os.MkdirTemp("", "dolt-pid-test-*") + if err != nil { + t.Fatalf("failed to create temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + + // No PID file should return 0 + if pid := GetRunningServerPID(tmpDir); pid != 0 { + t.Errorf("expected 0 for non-existent PID file, got %d", pid) + } + + // Create fake PID file with non-existent PID + pidFile := filepath.Join(tmpDir, "dolt-server.pid") + if err := os.WriteFile(pidFile, []byte("999999"), 0600); err != nil { + t.Fatalf("failed to write PID file: %v", err) + } + + // Should return 0 for non-running process + if pid := GetRunningServerPID(tmpDir); pid != 0 { + t.Errorf("expected 0 for non-running process, got %d", pid) + } +} diff --git a/internal/storage/dolt/store.go b/internal/storage/dolt/store.go index a8c7a763..ebbfd2d0 100644 --- a/internal/storage/dolt/store.go +++ b/internal/storage/dolt/store.go @@ -26,8 +26,10 @@ import ( "sync/atomic" "time" - // Import Dolt driver + // Import Dolt embedded driver _ "github.com/dolthub/driver" + // Import MySQL driver for server mode connections + _ "github.com/go-sql-driver/mysql" "github.com/steveyegge/beads/internal/storage" ) @@ -56,6 +58,12 @@ type Config struct { Remote string // Default remote name (e.g., "origin") Database string // Database name within Dolt (default: "beads") ReadOnly bool // Open in read-only mode (skip schema init) + + // Server mode options (federation) + ServerMode bool // Connect to dolt sql-server instead of embedded + ServerHost string // Server host (default: 127.0.0.1) + ServerPort int // Server port (default: 3306) + ServerUser string // MySQL user (default: root) } // New creates a new Dolt storage backend @@ -84,44 +92,39 @@ func New(ctx context.Context, cfg *Config) (*DoltStore, error) { cfg.Remote = "origin" } + // Server mode defaults + if cfg.ServerMode { + if cfg.ServerHost == "" { + cfg.ServerHost = "127.0.0.1" + } + if cfg.ServerPort == 0 { + cfg.ServerPort = DefaultSQLPort + } + if cfg.ServerUser == "" { + cfg.ServerUser = "root" + } + } + // Ensure directory exists if err := os.MkdirAll(cfg.Path, 0o750); err != nil { return nil, fmt.Errorf("failed to create database directory: %w", err) } - // First, connect without specifying a database to create it if needed - initConnStr := fmt.Sprintf( - "file://%s?commitname=%s&commitemail=%s", - cfg.Path, cfg.CommitterName, cfg.CommitterEmail) + var db *sql.DB + var connStr string + var err error - initDB, err := sql.Open("dolt", initConnStr) - if err != nil { - return nil, fmt.Errorf("failed to open Dolt for initialization: %w", err) + if cfg.ServerMode { + // Server mode: connect via MySQL protocol to dolt sql-server + db, connStr, err = openServerConnection(ctx, cfg) + } else { + // Embedded mode: use Dolt driver directly + db, connStr, err = openEmbeddedConnection(ctx, cfg) } - // Create the database if it doesn't exist - _, err = initDB.ExecContext(ctx, fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", cfg.Database)) if err != nil { - _ = initDB.Close() // nolint:gosec // G104: error ignored on early return - return nil, fmt.Errorf("failed to create database: %w", err) + return nil, err } - _ = initDB.Close() // nolint:gosec // G104: connection no longer needed - - // Now connect with the database specified - connStr := fmt.Sprintf( - "file://%s?commitname=%s&commitemail=%s&database=%s", - cfg.Path, cfg.CommitterName, cfg.CommitterEmail, cfg.Database) - - db, err := sql.Open("dolt", connStr) - if err != nil { - return nil, fmt.Errorf("failed to open Dolt database: %w", err) - } - - // Configure connection pool - // Dolt embedded mode is single-writer like SQLite - db.SetMaxOpenConns(1) - db.SetMaxIdleConns(1) - db.SetConnMaxLifetime(0) // Test connection if err := db.PingContext(ctx); err != nil { @@ -155,6 +158,82 @@ func New(ctx context.Context, cfg *Config) (*DoltStore, error) { return store, nil } +// openEmbeddedConnection opens a connection using the embedded Dolt driver +func openEmbeddedConnection(ctx context.Context, cfg *Config) (*sql.DB, string, error) { + // First, connect without specifying a database to create it if needed + initConnStr := fmt.Sprintf( + "file://%s?commitname=%s&commitemail=%s", + cfg.Path, cfg.CommitterName, cfg.CommitterEmail) + + initDB, err := sql.Open("dolt", initConnStr) + if err != nil { + return nil, "", fmt.Errorf("failed to open Dolt for initialization: %w", err) + } + + // Create the database if it doesn't exist + _, err = initDB.ExecContext(ctx, fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", cfg.Database)) + if err != nil { + _ = initDB.Close() + return nil, "", fmt.Errorf("failed to create database: %w", err) + } + _ = initDB.Close() + + // Now connect with the database specified + connStr := fmt.Sprintf( + "file://%s?commitname=%s&commitemail=%s&database=%s", + cfg.Path, cfg.CommitterName, cfg.CommitterEmail, cfg.Database) + + db, err := sql.Open("dolt", connStr) + if err != nil { + return nil, "", fmt.Errorf("failed to open Dolt database: %w", err) + } + + // Configure connection pool + // Dolt embedded mode is single-writer like SQLite + db.SetMaxOpenConns(1) + db.SetMaxIdleConns(1) + db.SetConnMaxLifetime(0) + + return db, connStr, nil +} + +// openServerConnection opens a connection to a dolt sql-server via MySQL protocol +func openServerConnection(ctx context.Context, cfg *Config) (*sql.DB, string, error) { + // DSN format: user@tcp(host:port)/database?parseTime=true + // parseTime=true tells the MySQL driver to parse DATETIME/TIMESTAMP to time.Time + connStr := fmt.Sprintf("%s@tcp(%s:%d)/%s?parseTime=true", + cfg.ServerUser, cfg.ServerHost, cfg.ServerPort, cfg.Database) + + db, err := sql.Open("mysql", connStr) + if err != nil { + return nil, "", fmt.Errorf("failed to open Dolt server connection: %w", err) + } + + // Server mode supports multi-writer, configure reasonable pool size + db.SetMaxOpenConns(10) + db.SetMaxIdleConns(5) + db.SetConnMaxLifetime(5 * time.Minute) + + // Ensure database exists (may need to create it) + // First connect without database to create it + initConnStr := fmt.Sprintf("%s@tcp(%s:%d)/?parseTime=true", + cfg.ServerUser, cfg.ServerHost, cfg.ServerPort) + initDB, err := sql.Open("mysql", initConnStr) + if err != nil { + _ = db.Close() + return nil, "", fmt.Errorf("failed to open init connection: %w", err) + } + defer func() { _ = initDB.Close() }() + + _, err = initDB.ExecContext(ctx, fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", cfg.Database)) + if err != nil { + _ = db.Close() + return nil, "", fmt.Errorf("failed to create database: %w", err) + } + + return db, connStr, nil +} + // initSchema creates all tables if they don't exist func (s *DoltStore) initSchema(ctx context.Context) error { // Execute schema creation - split into individual statements diff --git a/internal/storage/factory/factory.go b/internal/storage/factory/factory.go index d40b8d2f..c33c9a2e 100644 --- a/internal/storage/factory/factory.go +++ b/internal/storage/factory/factory.go @@ -26,6 +26,11 @@ func RegisterBackend(name string, factory BackendFactory) { type Options struct { ReadOnly bool LockTimeout time.Duration + + // Dolt server mode options (federation) + ServerMode bool // Connect to dolt sql-server instead of embedded + ServerHost string // Server host (default: 127.0.0.1) + ServerPort int // Server port (default: 3306) } // New creates a storage backend based on the backend type. diff --git a/internal/storage/factory/factory_dolt.go b/internal/storage/factory/factory_dolt.go index 3f03730e..4d38be13 100644 --- a/internal/storage/factory/factory_dolt.go +++ b/internal/storage/factory/factory_dolt.go @@ -41,6 +41,12 @@ func init() { fmt.Fprintf(os.Stderr, "\n Dolt database ready\n") } - return dolt.New(ctx, &dolt.Config{Path: path, ReadOnly: opts.ReadOnly}) + return dolt.New(ctx, &dolt.Config{ + Path: path, + ReadOnly: opts.ReadOnly, + ServerMode: opts.ServerMode, + ServerHost: opts.ServerHost, + ServerPort: opts.ServerPort, + }) }) }