Files
beads/internal/rpc/server_lifecycle_conn.go
Steve Yegge 481649a605 Fix widespread double JSON encoding bug in daemon RPC calls (bd-1048, bd-4ec8)
Multiple CLI commands had a systematic bug where ResolveID responses were
incorrectly converted using string(resp.Data) instead of json.Unmarshal.
Since resp.Data is json.RawMessage (already JSON-encoded), this preserved
the JSON quotes, causing IDs to become "bd-1048" instead of bd-1048.
When re-marshaled for subsequent RPC calls, these became double-quoted
("\"bd-1048\""), causing database lookups to fail.

Bugs fixed:
1. Nil pointer dereference in handleShow - added nil check after GetIssue
2. Double JSON encoding in 12 locations across 4 commands:
   - bd show (3 instances in show.go)
   - bd dep add/remove/tree (5 instances in dep.go)
   - bd label add/remove/list (3 instances in label.go)
   - bd reopen (1 instance in reopen.go)

All instances replaced string(resp.Data) with proper json.Unmarshal.
Removed debug logging added during investigation.

Tested: All affected commands now work correctly with daemon mode.
2025-11-02 22:34:24 -08:00

248 lines
5.7 KiB
Go

package rpc
import (
"bufio"
"context"
"encoding/json"
"fmt"
"net"
"os"
"os/signal"
"path/filepath"
"runtime"
"runtime/debug"
"sync/atomic"
"time"
)
// Start starts the RPC server and listens for connections
func (s *Server) Start(_ context.Context) error {
if err := s.ensureSocketDir(); err != nil {
return fmt.Errorf("failed to ensure socket directory: %w", err)
}
if err := s.removeOldSocket(); err != nil {
return fmt.Errorf("failed to remove old socket: %w", err)
}
listener, err := listenRPC(s.socketPath)
if err != nil {
return fmt.Errorf("failed to initialize RPC listener: %w", err)
}
s.listener = listener
// Set socket permissions to 0600 for security (owner only)
if runtime.GOOS != "windows" {
if err := os.Chmod(s.socketPath, 0600); err != nil {
_ = listener.Close()
return fmt.Errorf("failed to set socket permissions: %w", err)
}
}
// Store listener under lock
s.mu.Lock()
s.listener = listener
s.mu.Unlock()
// Signal that server is ready to accept connections
close(s.readyChan)
go s.handleSignals()
// Ensure cleanup is signaled when this function returns
defer close(s.doneChan)
// Accept connections using listener
for {
// Get listener under lock
s.mu.RLock()
listener := s.listener
s.mu.RUnlock()
conn, err := listener.Accept()
if err != nil {
s.mu.Lock()
shutdown := s.shutdown
s.mu.Unlock()
if shutdown {
return nil
}
return fmt.Errorf("failed to accept connection: %w", err)
}
// Try to acquire connection slot (non-blocking)
select {
case s.connSemaphore <- struct{}{}:
// Acquired slot, handle connection
s.metrics.RecordConnection()
go func(c net.Conn) {
defer func() { <-s.connSemaphore }() // Release slot
atomic.AddInt32(&s.activeConns, 1)
defer atomic.AddInt32(&s.activeConns, -1)
s.handleConnection(c)
}(conn)
default:
// Max connections reached, reject immediately
s.metrics.RecordRejectedConnection()
_ = conn.Close()
}
}
}
// WaitReady waits for the server to be ready to accept connections
func (s *Server) WaitReady() <-chan struct{} {
return s.readyChan
}
// Stop stops the RPC server and cleans up resources
func (s *Server) Stop() error {
var err error
s.stopOnce.Do(func() {
s.mu.Lock()
s.shutdown = true
s.mu.Unlock()
// Signal cleanup goroutine to stop
close(s.shutdownChan)
// Close storage
if s.storage != nil {
if closeErr := s.storage.Close(); closeErr != nil {
fmt.Fprintf(os.Stderr, "Warning: failed to close default storage: %v\n", closeErr)
}
}
// Close listener under lock
s.mu.Lock()
listener := s.listener
s.listener = nil
s.mu.Unlock()
if listener != nil {
if closeErr := listener.Close(); closeErr != nil {
err = fmt.Errorf("failed to close listener: %w", closeErr)
return
}
}
if removeErr := s.removeOldSocket(); removeErr != nil {
err = fmt.Errorf("failed to remove socket: %w", removeErr)
}
})
// Wait for Start() goroutine to finish cleanup (with timeout)
select {
case <-s.doneChan:
// Cleanup completed
case <-time.After(5 * time.Second):
// Timeout waiting for cleanup - continue anyway
}
return err
}
func (s *Server) ensureSocketDir() error {
dir := filepath.Dir(s.socketPath)
if err := os.MkdirAll(dir, 0700); err != nil {
return err
}
// Best-effort tighten permissions if directory already existed
_ = os.Chmod(dir, 0700) // #nosec G302 - 0700 is secure (user-only access)
return nil
}
func (s *Server) removeOldSocket() error {
if _, err := os.Stat(s.socketPath); err == nil {
// Socket exists - check if it's stale before removing
// Try to connect to see if a daemon is actually using it
conn, err := dialRPC(s.socketPath, 500*time.Millisecond)
if err == nil {
// Socket is active - another daemon is running
_ = conn.Close()
return fmt.Errorf("socket %s is in use by another daemon", s.socketPath)
}
// Socket is stale - safe to remove
if err := os.Remove(s.socketPath); err != nil && !os.IsNotExist(err) {
return err
}
}
return nil
}
func (s *Server) handleSignals() {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, serverSignals...)
<-sigChan
_ = s.Stop()
}
func (s *Server) handleConnection(conn net.Conn) {
defer func() {
_ = conn.Close()
}()
// Recover from panics to prevent daemon crash (bd-1048)
defer func() {
if r := recover(); r != nil {
fmt.Fprintf(os.Stderr, "PANIC in handleConnection: %v\n", r)
fmt.Fprintf(os.Stderr, "Stack trace:\n%s\n", debug.Stack())
}
}()
reader := bufio.NewReader(conn)
writer := bufio.NewWriter(conn)
for {
// Set read deadline for the next request
if err := conn.SetReadDeadline(time.Now().Add(s.requestTimeout)); err != nil {
return
}
line, err := reader.ReadBytes('\n')
if err != nil {
return
}
var req Request
if err := json.Unmarshal(line, &req); err != nil {
resp := Response{
Success: false,
Error: fmt.Sprintf("invalid request: %v", err),
}
s.writeResponse(writer, resp)
continue
}
// Set write deadline for the response
if err := conn.SetWriteDeadline(time.Now().Add(s.requestTimeout)); err != nil {
return
}
resp := s.handleRequest(&req)
s.writeResponse(writer, resp)
}
}
func (s *Server) writeResponse(writer *bufio.Writer, resp Response) {
data, _ := json.Marshal(resp)
_, _ = writer.Write(data)
_ = writer.WriteByte('\n')
_ = writer.Flush()
}
func (s *Server) handleShutdown(_ *Request) Response {
// Schedule shutdown in a goroutine so we can return a response first
go func() {
time.Sleep(100 * time.Millisecond) // Give time for response to be sent
if err := s.Stop(); err != nil {
fmt.Fprintf(os.Stderr, "Error during shutdown: %v\n", err)
}
}()
return Response{
Success: true,
Data: json.RawMessage(`{"message":"Daemon shutting down"}`),
}
}