feat(federation): add dolt sql-server mode for daemon (bd-wkumz.2)
Add --federation flag to bd daemon start that runs dolt sql-server instead of the embedded driver. Enables multi-writer support and exposes remotesapi on port 8080 for peer-to-peer push/pull. Changes: - Add --federation flag to daemon start command - Create dolt server manager (internal/storage/dolt/server.go) - Update DoltStore to support server mode via MySQL protocol - Integrate server lifecycle into daemon (auto-start/stop) - Add tests for server management and server mode connections Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
committed by
Steve Yegge
parent
458fb7197a
commit
da4584ae57
@@ -17,6 +17,7 @@ import (
|
||||
"github.com/steveyegge/beads/internal/configfile"
|
||||
"github.com/steveyegge/beads/internal/daemon"
|
||||
"github.com/steveyegge/beads/internal/rpc"
|
||||
"github.com/steveyegge/beads/internal/storage/dolt"
|
||||
"github.com/steveyegge/beads/internal/storage/factory"
|
||||
"github.com/steveyegge/beads/internal/storage/sqlite"
|
||||
"github.com/steveyegge/beads/internal/syncbranch"
|
||||
@@ -238,7 +239,8 @@ Run 'bd daemon --help' to see all subcommands.`,
|
||||
fmt.Printf("Logging to: %s\n", logFile)
|
||||
}
|
||||
|
||||
startDaemon(interval, autoCommit, autoPush, autoPull, localMode, foreground, logFile, pidFile, logLevel, logJSON)
|
||||
federation, _ := cmd.Flags().GetBool("federation")
|
||||
startDaemon(interval, autoCommit, autoPush, autoPull, localMode, foreground, logFile, pidFile, logLevel, logJSON, federation)
|
||||
},
|
||||
}
|
||||
|
||||
@@ -264,6 +266,7 @@ func init() {
|
||||
daemonCmd.Flags().Bool("foreground", false, "Run in foreground (don't daemonize)")
|
||||
daemonCmd.Flags().String("log-level", "info", "Log level (debug, info, warn, error)")
|
||||
daemonCmd.Flags().Bool("log-json", false, "Output logs in JSON format (structured logging)")
|
||||
daemonCmd.Flags().Bool("federation", false, "Enable federation mode (runs dolt sql-server with remotesapi)")
|
||||
daemonCmd.Flags().BoolVar(&jsonOutput, "json", false, "Output JSON format")
|
||||
rootCmd.AddCommand(daemonCmd)
|
||||
}
|
||||
@@ -280,7 +283,7 @@ func computeDaemonParentPID() int {
|
||||
}
|
||||
return os.Getppid()
|
||||
}
|
||||
func runDaemonLoop(interval time.Duration, autoCommit, autoPush, autoPull, localMode bool, logPath, pidFile, logLevel string, logJSON bool) {
|
||||
func runDaemonLoop(interval time.Duration, autoCommit, autoPush, autoPull, localMode bool, logPath, pidFile, logLevel string, logJSON, federation bool) {
|
||||
level := parseLogLevel(logLevel)
|
||||
logF, log := setupDaemonLogger(logPath, logJSON, level)
|
||||
defer func() { _ = logF.Close() }()
|
||||
@@ -414,7 +417,49 @@ func runDaemonLoop(interval time.Duration, autoCommit, autoPush, autoPull, local
|
||||
log.Warn("could not remove daemon-error file", "error", err)
|
||||
}
|
||||
|
||||
store, err := factory.NewFromConfig(ctx, beadsDir)
|
||||
// Start dolt sql-server if federation mode is enabled and backend is dolt
|
||||
var doltServer *dolt.Server
|
||||
factoryOpts := factory.Options{}
|
||||
if federation && backend != configfile.BackendDolt {
|
||||
log.Warn("federation mode requires dolt backend, ignoring --federation flag")
|
||||
federation = false
|
||||
}
|
||||
if federation && backend == configfile.BackendDolt {
|
||||
log.Info("starting dolt sql-server for federation mode")
|
||||
|
||||
doltPath := filepath.Join(beadsDir, "dolt")
|
||||
serverLogFile := filepath.Join(beadsDir, "dolt-server.log")
|
||||
|
||||
doltServer = dolt.NewServer(dolt.ServerConfig{
|
||||
DataDir: doltPath,
|
||||
SQLPort: dolt.DefaultSQLPort,
|
||||
RemotesAPIPort: dolt.DefaultRemotesAPIPort,
|
||||
Host: "127.0.0.1",
|
||||
LogFile: serverLogFile,
|
||||
})
|
||||
|
||||
if err := doltServer.Start(ctx); err != nil {
|
||||
log.Error("failed to start dolt sql-server", "error", err)
|
||||
return
|
||||
}
|
||||
defer func() {
|
||||
log.Info("stopping dolt sql-server")
|
||||
if err := doltServer.Stop(); err != nil {
|
||||
log.Warn("error stopping dolt sql-server", "error", err)
|
||||
}
|
||||
}()
|
||||
|
||||
log.Info("dolt sql-server started",
|
||||
"sql_port", doltServer.SQLPort(),
|
||||
"remotesapi_port", doltServer.RemotesAPIPort())
|
||||
|
||||
// Configure factory to use server mode
|
||||
factoryOpts.ServerMode = true
|
||||
factoryOpts.ServerHost = doltServer.Host()
|
||||
factoryOpts.ServerPort = doltServer.SQLPort()
|
||||
}
|
||||
|
||||
store, err := factory.NewFromConfigWithOptions(ctx, beadsDir, factoryOpts)
|
||||
if err != nil {
|
||||
log.Error("cannot open database", "error", err)
|
||||
return // Use return instead of os.Exit to allow defers to run
|
||||
@@ -427,8 +472,10 @@ func runDaemonLoop(interval time.Duration, autoCommit, autoPush, autoPull, local
|
||||
if sqliteStore, ok := store.(*sqlite.SQLiteStorage); ok {
|
||||
sqliteStore.EnableFreshnessChecking()
|
||||
log.Info("database opened", "path", store.Path(), "backend", "sqlite", "freshness_checking", true)
|
||||
} else if federation {
|
||||
log.Info("database opened", "path", store.Path(), "backend", "dolt", "mode", "federation/server")
|
||||
} else {
|
||||
log.Info("database opened", "path", store.Path(), "backend", "dolt")
|
||||
log.Info("database opened", "path", store.Path(), "backend", "dolt", "mode", "embedded")
|
||||
}
|
||||
|
||||
// Auto-upgrade .beads/.gitignore if outdated
|
||||
|
||||
@@ -369,7 +369,7 @@ func stopAllDaemons() {
|
||||
}
|
||||
|
||||
// startDaemon starts the daemon (in foreground if requested, otherwise background)
|
||||
func startDaemon(interval time.Duration, autoCommit, autoPush, autoPull, localMode, foreground bool, logFile, pidFile, logLevel string, logJSON bool) {
|
||||
func startDaemon(interval time.Duration, autoCommit, autoPush, autoPull, localMode, foreground bool, logFile, pidFile, logLevel string, logJSON, federation bool) {
|
||||
logPath, err := getLogFilePath(logFile)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
|
||||
@@ -385,7 +385,7 @@ func startDaemon(interval time.Duration, autoCommit, autoPush, autoPull, localMo
|
||||
|
||||
// Run in foreground if --foreground flag set or if we're the forked child process
|
||||
if foreground || os.Getenv("BD_DAEMON_FOREGROUND") == "1" {
|
||||
runDaemonLoop(interval, autoCommit, autoPush, autoPull, localMode, logPath, pidFile, logLevel, logJSON)
|
||||
runDaemonLoop(interval, autoCommit, autoPush, autoPull, localMode, logPath, pidFile, logLevel, logJSON, federation)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -419,6 +419,9 @@ func startDaemon(interval time.Duration, autoCommit, autoPush, autoPull, localMo
|
||||
if logJSON {
|
||||
args = append(args, "--log-json")
|
||||
}
|
||||
if federation {
|
||||
args = append(args, "--federation")
|
||||
}
|
||||
|
||||
cmd := exec.Command(exe, args...) // #nosec G204 - bd daemon command from trusted binary
|
||||
cmd.Env = append(os.Environ(), "BD_DAEMON_FOREGROUND=1")
|
||||
|
||||
@@ -23,12 +23,18 @@ The daemon will:
|
||||
- Pull remote changes periodically
|
||||
- Auto-import when remote changes detected
|
||||
|
||||
Federation mode (--federation):
|
||||
- Starts dolt sql-server for multi-writer support
|
||||
- Exposes remotesapi on port 8080 for peer-to-peer push/pull
|
||||
- Enables real-time sync between Gas Towns
|
||||
|
||||
Examples:
|
||||
bd daemon start # Start with defaults
|
||||
bd daemon start --auto-commit # Enable auto-commit
|
||||
bd daemon start --auto-push # Enable auto-push (implies --auto-commit)
|
||||
bd daemon start --foreground # Run in foreground (for systemd/supervisord)
|
||||
bd daemon start --local # Local-only mode (no git sync)`,
|
||||
bd daemon start --local # Local-only mode (no git sync)
|
||||
bd daemon start --federation # Enable federation mode (dolt sql-server)`,
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
interval, _ := cmd.Flags().GetDuration("interval")
|
||||
autoCommit, _ := cmd.Flags().GetBool("auto-commit")
|
||||
@@ -39,6 +45,7 @@ Examples:
|
||||
foreground, _ := cmd.Flags().GetBool("foreground")
|
||||
logLevel, _ := cmd.Flags().GetString("log-level")
|
||||
logJSON, _ := cmd.Flags().GetBool("log-json")
|
||||
federation, _ := cmd.Flags().GetBool("federation")
|
||||
|
||||
// NOTE: Only load daemon auto-settings from the database in foreground mode.
|
||||
//
|
||||
@@ -136,6 +143,8 @@ Examples:
|
||||
// Start daemon
|
||||
if localMode {
|
||||
fmt.Printf("Starting bd daemon in LOCAL mode (interval: %v, no git sync)\n", interval)
|
||||
} else if federation {
|
||||
fmt.Printf("Starting bd daemon in FEDERATION mode (interval: %v, dolt sql-server with remotesapi)\n", interval)
|
||||
} else {
|
||||
fmt.Printf("Starting bd daemon (interval: %v, auto-commit: %v, auto-push: %v, auto-pull: %v)\n",
|
||||
interval, autoCommit, autoPush, autoPull)
|
||||
@@ -144,7 +153,7 @@ Examples:
|
||||
fmt.Printf("Logging to: %s\n", logFile)
|
||||
}
|
||||
|
||||
startDaemon(interval, autoCommit, autoPush, autoPull, localMode, foreground, logFile, pidFile, logLevel, logJSON)
|
||||
startDaemon(interval, autoCommit, autoPush, autoPull, localMode, foreground, logFile, pidFile, logLevel, logJSON, federation)
|
||||
},
|
||||
}
|
||||
|
||||
@@ -158,4 +167,5 @@ func init() {
|
||||
daemonStartCmd.Flags().Bool("foreground", false, "Run in foreground (don't daemonize)")
|
||||
daemonStartCmd.Flags().String("log-level", "info", "Log level (debug, info, warn, error)")
|
||||
daemonStartCmd.Flags().Bool("log-json", false, "Output logs in JSON format")
|
||||
daemonStartCmd.Flags().Bool("federation", false, "Enable federation mode (runs dolt sql-server with remotesapi on port 8080)")
|
||||
}
|
||||
|
||||
320
internal/storage/dolt/server.go
Normal file
320
internal/storage/dolt/server.go
Normal file
@@ -0,0 +1,320 @@
|
||||
// Package dolt implements the storage interface using Dolt (versioned MySQL-compatible database).
|
||||
//
|
||||
// This file implements the dolt sql-server management for federation mode.
|
||||
// When federation is enabled, we run dolt sql-server instead of the embedded driver
|
||||
// to enable multi-writer support and expose the remotesapi for peer-to-peer sync.
|
||||
package dolt
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
// DefaultSQLPort is the default port for dolt sql-server MySQL protocol
|
||||
DefaultSQLPort = 3306
|
||||
// DefaultRemotesAPIPort is the default port for dolt remotesapi (peer-to-peer sync)
|
||||
DefaultRemotesAPIPort = 8080
|
||||
// ServerStartTimeout is how long to wait for server to start
|
||||
ServerStartTimeout = 30 * time.Second
|
||||
// ServerStopTimeout is how long to wait for graceful shutdown
|
||||
ServerStopTimeout = 10 * time.Second
|
||||
)
|
||||
|
||||
// ServerConfig holds configuration for the dolt sql-server
|
||||
type ServerConfig struct {
|
||||
DataDir string // Path to Dolt database directory
|
||||
SQLPort int // MySQL protocol port (default: 3306)
|
||||
RemotesAPIPort int // remotesapi port for peer sync (default: 8080)
|
||||
Host string // Host to bind to (default: 127.0.0.1)
|
||||
LogFile string // Log file for server output (optional)
|
||||
User string // MySQL user (default: root)
|
||||
ReadOnly bool // Start in read-only mode
|
||||
}
|
||||
|
||||
// Server manages a dolt sql-server process
|
||||
type Server struct {
|
||||
cfg ServerConfig
|
||||
cmd *exec.Cmd
|
||||
mu sync.Mutex
|
||||
running bool
|
||||
pidFile string
|
||||
}
|
||||
|
||||
// NewServer creates a new dolt sql-server manager
|
||||
func NewServer(cfg ServerConfig) *Server {
|
||||
if cfg.SQLPort == 0 {
|
||||
cfg.SQLPort = DefaultSQLPort
|
||||
}
|
||||
if cfg.RemotesAPIPort == 0 {
|
||||
cfg.RemotesAPIPort = DefaultRemotesAPIPort
|
||||
}
|
||||
if cfg.Host == "" {
|
||||
cfg.Host = "127.0.0.1"
|
||||
}
|
||||
if cfg.User == "" {
|
||||
cfg.User = "root"
|
||||
}
|
||||
return &Server{
|
||||
cfg: cfg,
|
||||
pidFile: filepath.Join(cfg.DataDir, "dolt-server.pid"),
|
||||
}
|
||||
}
|
||||
|
||||
// Start starts the dolt sql-server process
|
||||
func (s *Server) Start(ctx context.Context) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if s.running {
|
||||
return fmt.Errorf("server already running")
|
||||
}
|
||||
|
||||
// Check if ports are available
|
||||
if err := s.checkPortAvailable(s.cfg.SQLPort); err != nil {
|
||||
return fmt.Errorf("SQL port %d not available: %w", s.cfg.SQLPort, err)
|
||||
}
|
||||
if err := s.checkPortAvailable(s.cfg.RemotesAPIPort); err != nil {
|
||||
return fmt.Errorf("remotesapi port %d not available: %w", s.cfg.RemotesAPIPort, err)
|
||||
}
|
||||
|
||||
// Build command args
|
||||
// Note: --user was removed in recent dolt versions, users are created with CREATE USER
|
||||
args := []string{
|
||||
"sql-server",
|
||||
"--host", s.cfg.Host,
|
||||
"--port", strconv.Itoa(s.cfg.SQLPort),
|
||||
"--remotesapi-port", strconv.Itoa(s.cfg.RemotesAPIPort),
|
||||
"--no-auto-commit", // Let the application manage commits
|
||||
}
|
||||
|
||||
if s.cfg.ReadOnly {
|
||||
args = append(args, "--readonly")
|
||||
}
|
||||
|
||||
// Create command
|
||||
s.cmd = exec.CommandContext(ctx, "dolt", args...)
|
||||
s.cmd.Dir = s.cfg.DataDir
|
||||
|
||||
// Set up process group for clean shutdown
|
||||
s.cmd.SysProcAttr = &syscall.SysProcAttr{
|
||||
Setpgid: true,
|
||||
}
|
||||
|
||||
// Set up logging
|
||||
if s.cfg.LogFile != "" {
|
||||
logFile, err := os.OpenFile(s.cfg.LogFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open log file: %w", err)
|
||||
}
|
||||
s.cmd.Stdout = logFile
|
||||
s.cmd.Stderr = logFile
|
||||
} else {
|
||||
// Discard output if no log file specified
|
||||
s.cmd.Stdout = nil
|
||||
s.cmd.Stderr = nil
|
||||
}
|
||||
|
||||
// Start the server
|
||||
if err := s.cmd.Start(); err != nil {
|
||||
return fmt.Errorf("failed to start dolt sql-server: %w", err)
|
||||
}
|
||||
|
||||
// Write PID file
|
||||
if err := os.WriteFile(s.pidFile, []byte(strconv.Itoa(s.cmd.Process.Pid)), 0600); err != nil {
|
||||
// Non-fatal, just log
|
||||
fmt.Fprintf(os.Stderr, "Warning: failed to write dolt server PID file: %v\n", err)
|
||||
}
|
||||
|
||||
// Wait for server to be ready
|
||||
if err := s.waitForReady(ctx); err != nil {
|
||||
// Server failed to start, clean up
|
||||
_ = s.cmd.Process.Kill()
|
||||
_ = os.Remove(s.pidFile)
|
||||
return fmt.Errorf("server failed to become ready: %w", err)
|
||||
}
|
||||
|
||||
s.running = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop stops the dolt sql-server process gracefully
|
||||
func (s *Server) Stop() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if !s.running || s.cmd == nil || s.cmd.Process == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Try graceful shutdown first (SIGTERM)
|
||||
if err := s.cmd.Process.Signal(syscall.SIGTERM); err != nil {
|
||||
// Process may already be dead
|
||||
if !strings.Contains(err.Error(), "process already finished") {
|
||||
return fmt.Errorf("failed to send SIGTERM: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for graceful shutdown with timeout
|
||||
done := make(chan error, 1)
|
||||
go func() {
|
||||
_, err := s.cmd.Process.Wait()
|
||||
done <- err
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
// Process exited
|
||||
case <-time.After(ServerStopTimeout):
|
||||
// Force kill
|
||||
_ = s.cmd.Process.Kill()
|
||||
<-done // Wait for process to be reaped
|
||||
}
|
||||
|
||||
// Clean up PID file
|
||||
_ = os.Remove(s.pidFile)
|
||||
s.running = false
|
||||
s.cmd = nil
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsRunning returns true if the server is running
|
||||
func (s *Server) IsRunning() bool {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.running
|
||||
}
|
||||
|
||||
// SQLPort returns the SQL port
|
||||
func (s *Server) SQLPort() int {
|
||||
return s.cfg.SQLPort
|
||||
}
|
||||
|
||||
// RemotesAPIPort returns the remotesapi port
|
||||
func (s *Server) RemotesAPIPort() int {
|
||||
return s.cfg.RemotesAPIPort
|
||||
}
|
||||
|
||||
// Host returns the host
|
||||
func (s *Server) Host() string {
|
||||
return s.cfg.Host
|
||||
}
|
||||
|
||||
// DSN returns the MySQL DSN for connecting to the server
|
||||
func (s *Server) DSN(database string) string {
|
||||
return fmt.Sprintf("%s@tcp(%s:%d)/%s",
|
||||
s.cfg.User, s.cfg.Host, s.cfg.SQLPort, database)
|
||||
}
|
||||
|
||||
// checkPortAvailable checks if a TCP port is available
|
||||
func (s *Server) checkPortAvailable(port int) error {
|
||||
addr := fmt.Sprintf("%s:%d", s.cfg.Host, port)
|
||||
listener, err := net.Listen("tcp", addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_ = listener.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
// waitForReady waits for the server to accept connections
|
||||
func (s *Server) waitForReady(ctx context.Context) error {
|
||||
deadline := time.Now().Add(ServerStartTimeout)
|
||||
addr := fmt.Sprintf("%s:%d", s.cfg.Host, s.cfg.SQLPort)
|
||||
|
||||
for time.Now().Before(deadline) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
// Check if process is still alive using signal 0
|
||||
if s.cmd.Process != nil {
|
||||
if err := s.cmd.Process.Signal(syscall.Signal(0)); err != nil {
|
||||
return fmt.Errorf("server process exited unexpectedly")
|
||||
}
|
||||
}
|
||||
|
||||
// Try to connect
|
||||
conn, err := net.DialTimeout("tcp", addr, 1*time.Second)
|
||||
if err == nil {
|
||||
_ = conn.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
|
||||
return fmt.Errorf("timeout waiting for server to start on %s", addr)
|
||||
}
|
||||
|
||||
// GetRunningServerPID returns the PID of a running server from the PID file, or 0 if not running
|
||||
func GetRunningServerPID(dataDir string) int {
|
||||
pidFile := filepath.Join(dataDir, "dolt-server.pid")
|
||||
data, err := os.ReadFile(pidFile)
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
|
||||
pid, err := strconv.Atoi(strings.TrimSpace(string(data)))
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
|
||||
// Check if process is actually running
|
||||
process, err := os.FindProcess(pid)
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
|
||||
// On Unix, FindProcess always succeeds, so we need to check if it's alive
|
||||
if err := process.Signal(syscall.Signal(0)); err != nil {
|
||||
// Process is not running
|
||||
_ = os.Remove(pidFile)
|
||||
return 0
|
||||
}
|
||||
|
||||
return pid
|
||||
}
|
||||
|
||||
// StopServerByPID stops a dolt sql-server by its PID
|
||||
func StopServerByPID(pid int) error {
|
||||
process, err := os.FindProcess(pid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Try graceful shutdown first
|
||||
if err := process.Signal(syscall.SIGTERM); err != nil {
|
||||
if !strings.Contains(err.Error(), "process already finished") {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Wait for graceful shutdown
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
_, _ = process.Wait()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
return nil
|
||||
case <-time.After(ServerStopTimeout):
|
||||
// Force kill
|
||||
return process.Kill()
|
||||
}
|
||||
}
|
||||
122
internal/storage/dolt/server_integration_test.go
Normal file
122
internal/storage/dolt/server_integration_test.go
Normal file
@@ -0,0 +1,122 @@
|
||||
package dolt
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/steveyegge/beads/internal/types"
|
||||
)
|
||||
|
||||
// TestServerModeConnection tests connecting to DoltStore via server mode
|
||||
func TestServerModeConnection(t *testing.T) {
|
||||
// Skip if dolt is not available
|
||||
if _, err := exec.LookPath("dolt"); err != nil {
|
||||
t.Skip("dolt not installed, skipping server mode test")
|
||||
}
|
||||
|
||||
// Create temp directory for test
|
||||
tmpDir, err := os.MkdirTemp("", "dolt-server-mode-test-*")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create temp dir: %v", err)
|
||||
}
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
// Initialize dolt repo
|
||||
cmd := exec.Command("dolt", "init")
|
||||
cmd.Dir = tmpDir
|
||||
if err := cmd.Run(); err != nil {
|
||||
t.Fatalf("failed to init dolt repo: %v", err)
|
||||
}
|
||||
|
||||
// Start server on non-standard ports
|
||||
server := NewServer(ServerConfig{
|
||||
DataDir: tmpDir,
|
||||
SQLPort: 13307,
|
||||
RemotesAPIPort: 18081,
|
||||
Host: "127.0.0.1",
|
||||
LogFile: filepath.Join(tmpDir, "server.log"),
|
||||
})
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
|
||||
defer cancel()
|
||||
|
||||
if err := server.Start(ctx); err != nil {
|
||||
t.Fatalf("failed to start server: %v", err)
|
||||
}
|
||||
defer func() {
|
||||
if err := server.Stop(); err != nil {
|
||||
t.Logf("warning: failed to stop server: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Connect using server mode
|
||||
store, err := New(ctx, &Config{
|
||||
Path: tmpDir,
|
||||
Database: "beads",
|
||||
ServerMode: true,
|
||||
ServerHost: "127.0.0.1",
|
||||
ServerPort: 13307,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create server mode store: %v", err)
|
||||
}
|
||||
defer store.Close()
|
||||
|
||||
// Set issue prefix (required for creating issues)
|
||||
if err := store.SetConfig(ctx, "issue_prefix", "test"); err != nil {
|
||||
t.Fatalf("failed to set issue_prefix: %v", err)
|
||||
}
|
||||
|
||||
// Verify we can perform basic operations
|
||||
// Create an issue
|
||||
issue := &types.Issue{
|
||||
Title: "Test issue in server mode",
|
||||
Description: "Original description",
|
||||
Status: types.StatusOpen,
|
||||
Priority: 2,
|
||||
IssueType: types.TypeTask,
|
||||
}
|
||||
if err := store.CreateIssue(ctx, issue, "test"); err != nil {
|
||||
t.Fatalf("failed to create issue: %v", err)
|
||||
}
|
||||
if issue.ID == "" {
|
||||
t.Fatal("expected issue ID to be generated")
|
||||
}
|
||||
t.Logf("Created issue: %s", issue.ID)
|
||||
|
||||
// Read it back
|
||||
readIssue, err := store.GetIssue(ctx, issue.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to get issue: %v", err)
|
||||
}
|
||||
if readIssue.Title != issue.Title {
|
||||
t.Errorf("title mismatch: expected %q, got %q", issue.Title, readIssue.Title)
|
||||
}
|
||||
|
||||
// Update it
|
||||
updates := map[string]interface{}{
|
||||
"description": "Updated description",
|
||||
"priority": 1,
|
||||
}
|
||||
if err := store.UpdateIssue(ctx, issue.ID, updates, "test"); err != nil {
|
||||
t.Fatalf("failed to update issue: %v", err)
|
||||
}
|
||||
|
||||
// Verify update
|
||||
updatedIssue, err := store.GetIssue(ctx, issue.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to get updated issue: %v", err)
|
||||
}
|
||||
if updatedIssue.Description != "Updated description" {
|
||||
t.Errorf("expected description 'Updated description', got %q", updatedIssue.Description)
|
||||
}
|
||||
if updatedIssue.Priority != 1 {
|
||||
t.Errorf("expected priority 1, got %d", updatedIssue.Priority)
|
||||
}
|
||||
|
||||
t.Logf("Server mode connection test passed: created and updated issue %s", issue.ID)
|
||||
}
|
||||
132
internal/storage/dolt/server_test.go
Normal file
132
internal/storage/dolt/server_test.go
Normal file
@@ -0,0 +1,132 @@
|
||||
package dolt
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// TestServerStartStop tests basic server lifecycle
|
||||
func TestServerStartStop(t *testing.T) {
|
||||
// Skip if dolt is not available
|
||||
if _, err := exec.LookPath("dolt"); err != nil {
|
||||
t.Skip("dolt not installed, skipping server test")
|
||||
}
|
||||
|
||||
// Create temp directory for test
|
||||
tmpDir, err := os.MkdirTemp("", "dolt-server-test-*")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create temp dir: %v", err)
|
||||
}
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
// Initialize dolt repo
|
||||
cmd := exec.Command("dolt", "init")
|
||||
cmd.Dir = tmpDir
|
||||
output, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to init dolt repo: %v, output: %s", err, output)
|
||||
}
|
||||
t.Logf("dolt init output: %s", output)
|
||||
|
||||
// Use non-standard ports to avoid conflicts
|
||||
logFile := filepath.Join(tmpDir, "server.log")
|
||||
server := NewServer(ServerConfig{
|
||||
DataDir: tmpDir,
|
||||
SQLPort: 13306, // Non-standard port
|
||||
RemotesAPIPort: 18080, // Non-standard port
|
||||
Host: "127.0.0.1",
|
||||
LogFile: logFile,
|
||||
})
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
|
||||
defer cancel()
|
||||
|
||||
// Start server
|
||||
if err := server.Start(ctx); err != nil {
|
||||
// Read log file for debugging
|
||||
if logContent, readErr := os.ReadFile(logFile); readErr == nil {
|
||||
t.Logf("Server log:\n%s", logContent)
|
||||
}
|
||||
t.Fatalf("failed to start server: %v", err)
|
||||
}
|
||||
|
||||
// Verify server is running
|
||||
if !server.IsRunning() {
|
||||
t.Error("server should be running")
|
||||
}
|
||||
|
||||
// Verify ports
|
||||
if server.SQLPort() != 13306 {
|
||||
t.Errorf("expected SQL port 13306, got %d", server.SQLPort())
|
||||
}
|
||||
if server.RemotesAPIPort() != 18080 {
|
||||
t.Errorf("expected remotesapi port 18080, got %d", server.RemotesAPIPort())
|
||||
}
|
||||
|
||||
// Verify DSN format
|
||||
dsn := server.DSN("testdb")
|
||||
expected := "root@tcp(127.0.0.1:13306)/testdb"
|
||||
if dsn != expected {
|
||||
t.Errorf("expected DSN %q, got %q", expected, dsn)
|
||||
}
|
||||
|
||||
// Stop server
|
||||
if err := server.Stop(); err != nil {
|
||||
t.Fatalf("failed to stop server: %v", err)
|
||||
}
|
||||
|
||||
// Verify server is not running
|
||||
if server.IsRunning() {
|
||||
t.Error("server should not be running after stop")
|
||||
}
|
||||
}
|
||||
|
||||
// TestServerConfigDefaults tests that config defaults are applied correctly
|
||||
func TestServerConfigDefaults(t *testing.T) {
|
||||
server := NewServer(ServerConfig{
|
||||
DataDir: "/tmp/test",
|
||||
})
|
||||
|
||||
if server.cfg.SQLPort != DefaultSQLPort {
|
||||
t.Errorf("expected default SQL port %d, got %d", DefaultSQLPort, server.cfg.SQLPort)
|
||||
}
|
||||
if server.cfg.RemotesAPIPort != DefaultRemotesAPIPort {
|
||||
t.Errorf("expected default remotesapi port %d, got %d", DefaultRemotesAPIPort, server.cfg.RemotesAPIPort)
|
||||
}
|
||||
if server.cfg.Host != "127.0.0.1" {
|
||||
t.Errorf("expected default host 127.0.0.1, got %s", server.cfg.Host)
|
||||
}
|
||||
if server.cfg.User != "root" {
|
||||
t.Errorf("expected default user root, got %s", server.cfg.User)
|
||||
}
|
||||
}
|
||||
|
||||
// TestGetRunningServerPID tests the PID file detection
|
||||
func TestGetRunningServerPID(t *testing.T) {
|
||||
// Create temp directory
|
||||
tmpDir, err := os.MkdirTemp("", "dolt-pid-test-*")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create temp dir: %v", err)
|
||||
}
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
// No PID file should return 0
|
||||
if pid := GetRunningServerPID(tmpDir); pid != 0 {
|
||||
t.Errorf("expected 0 for non-existent PID file, got %d", pid)
|
||||
}
|
||||
|
||||
// Create fake PID file with non-existent PID
|
||||
pidFile := filepath.Join(tmpDir, "dolt-server.pid")
|
||||
if err := os.WriteFile(pidFile, []byte("999999"), 0600); err != nil {
|
||||
t.Fatalf("failed to write PID file: %v", err)
|
||||
}
|
||||
|
||||
// Should return 0 for non-running process
|
||||
if pid := GetRunningServerPID(tmpDir); pid != 0 {
|
||||
t.Errorf("expected 0 for non-running process, got %d", pid)
|
||||
}
|
||||
}
|
||||
@@ -26,8 +26,10 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
// Import Dolt driver
|
||||
// Import Dolt embedded driver
|
||||
_ "github.com/dolthub/driver"
|
||||
// Import MySQL driver for server mode connections
|
||||
_ "github.com/go-sql-driver/mysql"
|
||||
|
||||
"github.com/steveyegge/beads/internal/storage"
|
||||
)
|
||||
@@ -56,6 +58,12 @@ type Config struct {
|
||||
Remote string // Default remote name (e.g., "origin")
|
||||
Database string // Database name within Dolt (default: "beads")
|
||||
ReadOnly bool // Open in read-only mode (skip schema init)
|
||||
|
||||
// Server mode options (federation)
|
||||
ServerMode bool // Connect to dolt sql-server instead of embedded
|
||||
ServerHost string // Server host (default: 127.0.0.1)
|
||||
ServerPort int // Server port (default: 3306)
|
||||
ServerUser string // MySQL user (default: root)
|
||||
}
|
||||
|
||||
// New creates a new Dolt storage backend
|
||||
@@ -84,44 +92,39 @@ func New(ctx context.Context, cfg *Config) (*DoltStore, error) {
|
||||
cfg.Remote = "origin"
|
||||
}
|
||||
|
||||
// Server mode defaults
|
||||
if cfg.ServerMode {
|
||||
if cfg.ServerHost == "" {
|
||||
cfg.ServerHost = "127.0.0.1"
|
||||
}
|
||||
if cfg.ServerPort == 0 {
|
||||
cfg.ServerPort = DefaultSQLPort
|
||||
}
|
||||
if cfg.ServerUser == "" {
|
||||
cfg.ServerUser = "root"
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure directory exists
|
||||
if err := os.MkdirAll(cfg.Path, 0o750); err != nil {
|
||||
return nil, fmt.Errorf("failed to create database directory: %w", err)
|
||||
}
|
||||
|
||||
// First, connect without specifying a database to create it if needed
|
||||
initConnStr := fmt.Sprintf(
|
||||
"file://%s?commitname=%s&commitemail=%s",
|
||||
cfg.Path, cfg.CommitterName, cfg.CommitterEmail)
|
||||
var db *sql.DB
|
||||
var connStr string
|
||||
var err error
|
||||
|
||||
initDB, err := sql.Open("dolt", initConnStr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open Dolt for initialization: %w", err)
|
||||
if cfg.ServerMode {
|
||||
// Server mode: connect via MySQL protocol to dolt sql-server
|
||||
db, connStr, err = openServerConnection(ctx, cfg)
|
||||
} else {
|
||||
// Embedded mode: use Dolt driver directly
|
||||
db, connStr, err = openEmbeddedConnection(ctx, cfg)
|
||||
}
|
||||
|
||||
// Create the database if it doesn't exist
|
||||
_, err = initDB.ExecContext(ctx, fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", cfg.Database))
|
||||
if err != nil {
|
||||
_ = initDB.Close() // nolint:gosec // G104: error ignored on early return
|
||||
return nil, fmt.Errorf("failed to create database: %w", err)
|
||||
return nil, err
|
||||
}
|
||||
_ = initDB.Close() // nolint:gosec // G104: connection no longer needed
|
||||
|
||||
// Now connect with the database specified
|
||||
connStr := fmt.Sprintf(
|
||||
"file://%s?commitname=%s&commitemail=%s&database=%s",
|
||||
cfg.Path, cfg.CommitterName, cfg.CommitterEmail, cfg.Database)
|
||||
|
||||
db, err := sql.Open("dolt", connStr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open Dolt database: %w", err)
|
||||
}
|
||||
|
||||
// Configure connection pool
|
||||
// Dolt embedded mode is single-writer like SQLite
|
||||
db.SetMaxOpenConns(1)
|
||||
db.SetMaxIdleConns(1)
|
||||
db.SetConnMaxLifetime(0)
|
||||
|
||||
// Test connection
|
||||
if err := db.PingContext(ctx); err != nil {
|
||||
@@ -155,6 +158,82 @@ func New(ctx context.Context, cfg *Config) (*DoltStore, error) {
|
||||
return store, nil
|
||||
}
|
||||
|
||||
// openEmbeddedConnection opens a connection using the embedded Dolt driver
|
||||
func openEmbeddedConnection(ctx context.Context, cfg *Config) (*sql.DB, string, error) {
|
||||
// First, connect without specifying a database to create it if needed
|
||||
initConnStr := fmt.Sprintf(
|
||||
"file://%s?commitname=%s&commitemail=%s",
|
||||
cfg.Path, cfg.CommitterName, cfg.CommitterEmail)
|
||||
|
||||
initDB, err := sql.Open("dolt", initConnStr)
|
||||
if err != nil {
|
||||
return nil, "", fmt.Errorf("failed to open Dolt for initialization: %w", err)
|
||||
}
|
||||
|
||||
// Create the database if it doesn't exist
|
||||
_, err = initDB.ExecContext(ctx, fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", cfg.Database))
|
||||
if err != nil {
|
||||
_ = initDB.Close()
|
||||
return nil, "", fmt.Errorf("failed to create database: %w", err)
|
||||
}
|
||||
_ = initDB.Close()
|
||||
|
||||
// Now connect with the database specified
|
||||
connStr := fmt.Sprintf(
|
||||
"file://%s?commitname=%s&commitemail=%s&database=%s",
|
||||
cfg.Path, cfg.CommitterName, cfg.CommitterEmail, cfg.Database)
|
||||
|
||||
db, err := sql.Open("dolt", connStr)
|
||||
if err != nil {
|
||||
return nil, "", fmt.Errorf("failed to open Dolt database: %w", err)
|
||||
}
|
||||
|
||||
// Configure connection pool
|
||||
// Dolt embedded mode is single-writer like SQLite
|
||||
db.SetMaxOpenConns(1)
|
||||
db.SetMaxIdleConns(1)
|
||||
db.SetConnMaxLifetime(0)
|
||||
|
||||
return db, connStr, nil
|
||||
}
|
||||
|
||||
// openServerConnection opens a connection to a dolt sql-server via MySQL protocol
|
||||
func openServerConnection(ctx context.Context, cfg *Config) (*sql.DB, string, error) {
|
||||
// DSN format: user@tcp(host:port)/database?parseTime=true
|
||||
// parseTime=true tells the MySQL driver to parse DATETIME/TIMESTAMP to time.Time
|
||||
connStr := fmt.Sprintf("%s@tcp(%s:%d)/%s?parseTime=true",
|
||||
cfg.ServerUser, cfg.ServerHost, cfg.ServerPort, cfg.Database)
|
||||
|
||||
db, err := sql.Open("mysql", connStr)
|
||||
if err != nil {
|
||||
return nil, "", fmt.Errorf("failed to open Dolt server connection: %w", err)
|
||||
}
|
||||
|
||||
// Server mode supports multi-writer, configure reasonable pool size
|
||||
db.SetMaxOpenConns(10)
|
||||
db.SetMaxIdleConns(5)
|
||||
db.SetConnMaxLifetime(5 * time.Minute)
|
||||
|
||||
// Ensure database exists (may need to create it)
|
||||
// First connect without database to create it
|
||||
initConnStr := fmt.Sprintf("%s@tcp(%s:%d)/?parseTime=true",
|
||||
cfg.ServerUser, cfg.ServerHost, cfg.ServerPort)
|
||||
initDB, err := sql.Open("mysql", initConnStr)
|
||||
if err != nil {
|
||||
_ = db.Close()
|
||||
return nil, "", fmt.Errorf("failed to open init connection: %w", err)
|
||||
}
|
||||
defer func() { _ = initDB.Close() }()
|
||||
|
||||
_, err = initDB.ExecContext(ctx, fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", cfg.Database))
|
||||
if err != nil {
|
||||
_ = db.Close()
|
||||
return nil, "", fmt.Errorf("failed to create database: %w", err)
|
||||
}
|
||||
|
||||
return db, connStr, nil
|
||||
}
|
||||
|
||||
// initSchema creates all tables if they don't exist
|
||||
func (s *DoltStore) initSchema(ctx context.Context) error {
|
||||
// Execute schema creation - split into individual statements
|
||||
|
||||
@@ -26,6 +26,11 @@ func RegisterBackend(name string, factory BackendFactory) {
|
||||
type Options struct {
|
||||
ReadOnly bool
|
||||
LockTimeout time.Duration
|
||||
|
||||
// Dolt server mode options (federation)
|
||||
ServerMode bool // Connect to dolt sql-server instead of embedded
|
||||
ServerHost string // Server host (default: 127.0.0.1)
|
||||
ServerPort int // Server port (default: 3306)
|
||||
}
|
||||
|
||||
// New creates a storage backend based on the backend type.
|
||||
|
||||
@@ -41,6 +41,12 @@ func init() {
|
||||
fmt.Fprintf(os.Stderr, "\n Dolt database ready\n")
|
||||
}
|
||||
|
||||
return dolt.New(ctx, &dolt.Config{Path: path, ReadOnly: opts.ReadOnly})
|
||||
return dolt.New(ctx, &dolt.Config{
|
||||
Path: path,
|
||||
ReadOnly: opts.ReadOnly,
|
||||
ServerMode: opts.ServerMode,
|
||||
ServerHost: opts.ServerHost,
|
||||
ServerPort: opts.ServerPort,
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user