fix(federation): add configurable ports, password support, fix log leak
- Add --federation-port and --remotesapi-port flags (default 3306/8080) - Fix log file leak in server.go - track and close on Stop() - Add BEADS_DOLT_PASSWORD env var for server mode authentication - Update DSN to include password when set Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
committed by
Steve Yegge
parent
83e3c75635
commit
ea51c4b0bd
@@ -240,7 +240,9 @@ Run 'bd daemon --help' to see all subcommands.`,
|
||||
}
|
||||
|
||||
federation, _ := cmd.Flags().GetBool("federation")
|
||||
startDaemon(interval, autoCommit, autoPush, autoPull, localMode, foreground, logFile, pidFile, logLevel, logJSON, federation)
|
||||
federationPort, _ := cmd.Flags().GetInt("federation-port")
|
||||
remotesapiPort, _ := cmd.Flags().GetInt("remotesapi-port")
|
||||
startDaemon(interval, autoCommit, autoPush, autoPull, localMode, foreground, logFile, pidFile, logLevel, logJSON, federation, federationPort, remotesapiPort)
|
||||
},
|
||||
}
|
||||
|
||||
@@ -267,6 +269,8 @@ func init() {
|
||||
daemonCmd.Flags().String("log-level", "info", "Log level (debug, info, warn, error)")
|
||||
daemonCmd.Flags().Bool("log-json", false, "Output logs in JSON format (structured logging)")
|
||||
daemonCmd.Flags().Bool("federation", false, "Enable federation mode (runs dolt sql-server with remotesapi)")
|
||||
daemonCmd.Flags().Int("federation-port", 3306, "MySQL port for federation mode dolt sql-server")
|
||||
daemonCmd.Flags().Int("remotesapi-port", 8080, "remotesapi port for peer-to-peer sync in federation mode")
|
||||
daemonCmd.Flags().BoolVar(&jsonOutput, "json", false, "Output JSON format")
|
||||
rootCmd.AddCommand(daemonCmd)
|
||||
}
|
||||
@@ -283,7 +287,7 @@ func computeDaemonParentPID() int {
|
||||
}
|
||||
return os.Getppid()
|
||||
}
|
||||
func runDaemonLoop(interval time.Duration, autoCommit, autoPush, autoPull, localMode bool, logPath, pidFile, logLevel string, logJSON, federation bool) {
|
||||
func runDaemonLoop(interval time.Duration, autoCommit, autoPush, autoPull, localMode bool, logPath, pidFile, logLevel string, logJSON, federation bool, federationPort, remotesapiPort int) {
|
||||
level := parseLogLevel(logLevel)
|
||||
logF, log := setupDaemonLogger(logPath, logJSON, level)
|
||||
defer func() { _ = logF.Close() }()
|
||||
@@ -430,10 +434,20 @@ func runDaemonLoop(interval time.Duration, autoCommit, autoPush, autoPull, local
|
||||
doltPath := filepath.Join(beadsDir, "dolt")
|
||||
serverLogFile := filepath.Join(beadsDir, "dolt-server.log")
|
||||
|
||||
// Use provided ports or defaults
|
||||
sqlPort := federationPort
|
||||
if sqlPort == 0 {
|
||||
sqlPort = dolt.DefaultSQLPort
|
||||
}
|
||||
remotePort := remotesapiPort
|
||||
if remotePort == 0 {
|
||||
remotePort = dolt.DefaultRemotesAPIPort
|
||||
}
|
||||
|
||||
doltServer = dolt.NewServer(dolt.ServerConfig{
|
||||
DataDir: doltPath,
|
||||
SQLPort: dolt.DefaultSQLPort,
|
||||
RemotesAPIPort: dolt.DefaultRemotesAPIPort,
|
||||
SQLPort: sqlPort,
|
||||
RemotesAPIPort: remotePort,
|
||||
Host: "127.0.0.1",
|
||||
LogFile: serverLogFile,
|
||||
})
|
||||
|
||||
@@ -369,7 +369,7 @@ func stopAllDaemons() {
|
||||
}
|
||||
|
||||
// startDaemon starts the daemon (in foreground if requested, otherwise background)
|
||||
func startDaemon(interval time.Duration, autoCommit, autoPush, autoPull, localMode, foreground bool, logFile, pidFile, logLevel string, logJSON, federation bool) {
|
||||
func startDaemon(interval time.Duration, autoCommit, autoPush, autoPull, localMode, foreground bool, logFile, pidFile, logLevel string, logJSON, federation bool, federationPort, remotesapiPort int) {
|
||||
logPath, err := getLogFilePath(logFile)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
|
||||
@@ -386,7 +386,7 @@ func startDaemon(interval time.Duration, autoCommit, autoPush, autoPull, localMo
|
||||
|
||||
// Run in foreground if --foreground flag set or if we're the forked child process
|
||||
if foreground || os.Getenv("BD_DAEMON_FOREGROUND") == "1" {
|
||||
runDaemonLoop(interval, autoCommit, autoPush, autoPull, localMode, logPath, pidFile, logLevel, logJSON, federation)
|
||||
runDaemonLoop(interval, autoCommit, autoPush, autoPull, localMode, logPath, pidFile, logLevel, logJSON, federation, federationPort, remotesapiPort)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -422,6 +422,12 @@ func startDaemon(interval time.Duration, autoCommit, autoPush, autoPull, localMo
|
||||
}
|
||||
if federation {
|
||||
args = append(args, "--federation")
|
||||
if federationPort != 0 && federationPort != 3306 {
|
||||
args = append(args, "--federation-port", strconv.Itoa(federationPort))
|
||||
}
|
||||
if remotesapiPort != 0 && remotesapiPort != 8080 {
|
||||
args = append(args, "--remotesapi-port", strconv.Itoa(remotesapiPort))
|
||||
}
|
||||
}
|
||||
|
||||
cmd := exec.Command(exe, args...) // #nosec G204 - bd daemon command from trusted binary
|
||||
|
||||
@@ -46,6 +46,8 @@ Examples:
|
||||
logLevel, _ := cmd.Flags().GetString("log-level")
|
||||
logJSON, _ := cmd.Flags().GetBool("log-json")
|
||||
federation, _ := cmd.Flags().GetBool("federation")
|
||||
federationPort, _ := cmd.Flags().GetInt("federation-port")
|
||||
remotesapiPort, _ := cmd.Flags().GetInt("remotesapi-port")
|
||||
|
||||
// NOTE: Only load daemon auto-settings from the database in foreground mode.
|
||||
//
|
||||
@@ -153,7 +155,7 @@ Examples:
|
||||
fmt.Printf("Logging to: %s\n", logFile)
|
||||
}
|
||||
|
||||
startDaemon(interval, autoCommit, autoPush, autoPull, localMode, foreground, logFile, pidFile, logLevel, logJSON, federation)
|
||||
startDaemon(interval, autoCommit, autoPush, autoPull, localMode, foreground, logFile, pidFile, logLevel, logJSON, federation, federationPort, remotesapiPort)
|
||||
},
|
||||
}
|
||||
|
||||
@@ -167,5 +169,7 @@ func init() {
|
||||
daemonStartCmd.Flags().Bool("foreground", false, "Run in foreground (don't daemonize)")
|
||||
daemonStartCmd.Flags().String("log-level", "info", "Log level (debug, info, warn, error)")
|
||||
daemonStartCmd.Flags().Bool("log-json", false, "Output logs in JSON format")
|
||||
daemonStartCmd.Flags().Bool("federation", false, "Enable federation mode (runs dolt sql-server with remotesapi on port 8080)")
|
||||
daemonStartCmd.Flags().Bool("federation", false, "Enable federation mode (runs dolt sql-server)")
|
||||
daemonStartCmd.Flags().Int("federation-port", 3306, "MySQL port for federation mode dolt sql-server")
|
||||
daemonStartCmd.Flags().Int("remotesapi-port", 8080, "remotesapi port for peer-to-peer sync in federation mode")
|
||||
}
|
||||
|
||||
@@ -102,7 +102,7 @@ func TestDoltSingleProcess_StartDaemonGuardrailExitsNonZero(t *testing.T) {
|
||||
dbPath = ""
|
||||
|
||||
pidFile := filepath.Join(ws, ".beads", "daemon.pid")
|
||||
startDaemon(5*time.Second, false, false, false, false, false, "", pidFile, "info", false, false)
|
||||
startDaemon(5*time.Second, false, false, false, false, false, "", pidFile, "info", false, false, 0, 0)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -48,6 +48,7 @@ type Server struct {
|
||||
mu sync.Mutex
|
||||
running bool
|
||||
pidFile string
|
||||
logFile *os.File // Track log file for cleanup
|
||||
}
|
||||
|
||||
// NewServer creates a new dolt sql-server manager
|
||||
@@ -116,6 +117,7 @@ func (s *Server) Start(ctx context.Context) error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open log file: %w", err)
|
||||
}
|
||||
s.logFile = logFile // Track for cleanup on Stop()
|
||||
s.cmd.Stdout = logFile
|
||||
s.cmd.Stderr = logFile
|
||||
} else {
|
||||
@@ -140,6 +142,10 @@ func (s *Server) Start(ctx context.Context) error {
|
||||
// Server failed to start, clean up
|
||||
_ = s.cmd.Process.Kill()
|
||||
_ = os.Remove(s.pidFile)
|
||||
if s.logFile != nil {
|
||||
_ = s.logFile.Close()
|
||||
s.logFile = nil
|
||||
}
|
||||
return fmt.Errorf("server failed to become ready: %w", err)
|
||||
}
|
||||
|
||||
@@ -180,8 +186,12 @@ func (s *Server) Stop() error {
|
||||
<-done // Wait for process to be reaped
|
||||
}
|
||||
|
||||
// Clean up PID file
|
||||
// Clean up PID file and log file
|
||||
_ = os.Remove(s.pidFile)
|
||||
if s.logFile != nil {
|
||||
_ = s.logFile.Close()
|
||||
s.logFile = nil
|
||||
}
|
||||
s.running = false
|
||||
s.cmd = nil
|
||||
|
||||
|
||||
@@ -60,10 +60,11 @@ type Config struct {
|
||||
ReadOnly bool // Open in read-only mode (skip schema init)
|
||||
|
||||
// Server mode options (federation)
|
||||
ServerMode bool // Connect to dolt sql-server instead of embedded
|
||||
ServerHost string // Server host (default: 127.0.0.1)
|
||||
ServerPort int // Server port (default: 3306)
|
||||
ServerUser string // MySQL user (default: root)
|
||||
ServerMode bool // Connect to dolt sql-server instead of embedded
|
||||
ServerHost string // Server host (default: 127.0.0.1)
|
||||
ServerPort int // Server port (default: 3306)
|
||||
ServerUser string // MySQL user (default: root)
|
||||
ServerPassword string // MySQL password (default: empty, can be set via BEADS_DOLT_PASSWORD)
|
||||
}
|
||||
|
||||
// New creates a new Dolt storage backend
|
||||
@@ -103,6 +104,10 @@ func New(ctx context.Context, cfg *Config) (*DoltStore, error) {
|
||||
if cfg.ServerUser == "" {
|
||||
cfg.ServerUser = "root"
|
||||
}
|
||||
// Check environment variable for password (more secure than command-line)
|
||||
if cfg.ServerPassword == "" {
|
||||
cfg.ServerPassword = os.Getenv("BEADS_DOLT_PASSWORD")
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure directory exists
|
||||
@@ -199,10 +204,16 @@ func openEmbeddedConnection(ctx context.Context, cfg *Config) (*sql.DB, string,
|
||||
|
||||
// openServerConnection opens a connection to a dolt sql-server via MySQL protocol
|
||||
func openServerConnection(ctx context.Context, cfg *Config) (*sql.DB, string, error) {
|
||||
// DSN format: user@tcp(host:port)/database?parseTime=true
|
||||
// DSN format: user:password@tcp(host:port)/database?parseTime=true
|
||||
// parseTime=true tells the MySQL driver to parse DATETIME/TIMESTAMP to time.Time
|
||||
connStr := fmt.Sprintf("%s@tcp(%s:%d)/%s?parseTime=true",
|
||||
cfg.ServerUser, cfg.ServerHost, cfg.ServerPort, cfg.Database)
|
||||
var connStr string
|
||||
if cfg.ServerPassword != "" {
|
||||
connStr = fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?parseTime=true",
|
||||
cfg.ServerUser, cfg.ServerPassword, cfg.ServerHost, cfg.ServerPort, cfg.Database)
|
||||
} else {
|
||||
connStr = fmt.Sprintf("%s@tcp(%s:%d)/%s?parseTime=true",
|
||||
cfg.ServerUser, cfg.ServerHost, cfg.ServerPort, cfg.Database)
|
||||
}
|
||||
|
||||
db, err := sql.Open("mysql", connStr)
|
||||
if err != nil {
|
||||
@@ -216,8 +227,14 @@ func openServerConnection(ctx context.Context, cfg *Config) (*sql.DB, string, er
|
||||
|
||||
// Ensure database exists (may need to create it)
|
||||
// First connect without database to create it
|
||||
initConnStr := fmt.Sprintf("%s@tcp(%s:%d)/?parseTime=true",
|
||||
cfg.ServerUser, cfg.ServerHost, cfg.ServerPort)
|
||||
var initConnStr string
|
||||
if cfg.ServerPassword != "" {
|
||||
initConnStr = fmt.Sprintf("%s:%s@tcp(%s:%d)/?parseTime=true",
|
||||
cfg.ServerUser, cfg.ServerPassword, cfg.ServerHost, cfg.ServerPort)
|
||||
} else {
|
||||
initConnStr = fmt.Sprintf("%s@tcp(%s:%d)/?parseTime=true",
|
||||
cfg.ServerUser, cfg.ServerHost, cfg.ServerPort)
|
||||
}
|
||||
initDB, err := sql.Open("mysql", initConnStr)
|
||||
if err != nil {
|
||||
_ = db.Close()
|
||||
|
||||
Reference in New Issue
Block a user