Add native Windows support (#91)

- Native Windows daemon using TCP loopback endpoints
- Direct-mode fallback for CLI/daemon compatibility
- Comment operations over RPC
- PowerShell installer script
- Go 1.24 requirement
- Cross-OS testing documented

Co-authored-by: danshapiro <danshapiro@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-c6230265-055f-4af1-9712-4481061886db
Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
Steve Yegge
2025-10-20 21:08:49 -07:00
parent 94a23cae39
commit a86f3e139e
58 changed files with 1707 additions and 729 deletions

View File

@@ -14,7 +14,6 @@ import (
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
"github.com/steveyegge/beads/internal/storage"
@@ -62,16 +61,16 @@ type Server struct {
shutdownChan chan struct{}
stopOnce sync.Once
// Per-request storage routing with eviction support
storageCache map[string]*StorageCacheEntry // repoRoot -> entry
cacheMu sync.RWMutex
maxCacheSize int
cacheTTL time.Duration
cleanupTicker *time.Ticker
storageCache map[string]*StorageCacheEntry // repoRoot -> entry
cacheMu sync.RWMutex
maxCacheSize int
cacheTTL time.Duration
cleanupTicker *time.Ticker
// Health and metrics
startTime time.Time
cacheHits int64
cacheMisses int64
metrics *Metrics
startTime time.Time
cacheHits int64
cacheMisses int64
metrics *Metrics
// Connection limiting
maxConns int
activeConns int32 // atomic counter
@@ -79,7 +78,7 @@ type Server struct {
// Request timeout
requestTimeout time.Duration
// Ready channel signals when server is listening
readyChan chan struct{}
readyChan chan struct{}
}
// NewServer creates a new RPC server
@@ -93,7 +92,7 @@ func NewServer(socketPath string, store storage.Storage) *Server {
maxCacheSize = size
}
}
cacheTTL := 30 * time.Minute // default
if env := os.Getenv("BEADS_DAEMON_CACHE_TTL"); env != "" {
if ttl, err := time.ParseDuration(env); err == nil && ttl > 0 {
@@ -142,15 +141,18 @@ func (s *Server) Start(ctx context.Context) error {
return fmt.Errorf("failed to remove old socket: %w", err)
}
listener, err := net.Listen("unix", s.socketPath)
listener, err := listenRPC(s.socketPath)
if err != nil {
return fmt.Errorf("failed to listen on socket: %w", err)
return fmt.Errorf("failed to initialize RPC listener: %w", err)
}
s.listener = listener
// Set socket permissions to 0600 for security (owner only)
if err := os.Chmod(s.socketPath, 0600); err != nil {
listener.Close()
return fmt.Errorf("failed to set socket permissions: %w", err)
if runtime.GOOS != "windows" {
if err := os.Chmod(s.socketPath, 0600); err != nil {
listener.Close()
return fmt.Errorf("failed to set socket permissions: %w", err)
}
}
// Store listener under lock
@@ -170,7 +172,7 @@ func (s *Server) Start(ctx context.Context) error {
s.mu.RLock()
listener := s.listener
s.mu.RUnlock()
conn, err := listener.Accept()
if err != nil {
s.mu.Lock()
@@ -238,7 +240,7 @@ func (s *Server) Stop() error {
listener := s.listener
s.listener = nil
s.mu.Unlock()
if listener != nil {
if closeErr := listener.Close(); closeErr != nil {
err = fmt.Errorf("failed to close listener: %w", closeErr)
@@ -267,13 +269,13 @@ func (s *Server) removeOldSocket() error {
if _, err := os.Stat(s.socketPath); err == nil {
// Socket exists - check if it's stale before removing
// Try to connect to see if a daemon is actually using it
conn, err := net.DialTimeout("unix", s.socketPath, 500*time.Millisecond)
conn, err := dialRPC(s.socketPath, 500*time.Millisecond)
if err == nil {
// Socket is active - another daemon is running
conn.Close()
return fmt.Errorf("socket %s is in use by another daemon", s.socketPath)
}
// Socket is stale - safe to remove
if err := os.Remove(s.socketPath); err != nil && !os.IsNotExist(err) {
return err
@@ -284,7 +286,7 @@ func (s *Server) removeOldSocket() error {
func (s *Server) handleSignals() {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
signal.Notify(sigChan, serverSignals...)
<-sigChan
s.Stop()
}
@@ -333,7 +335,7 @@ func (s *Server) aggressiveEviction() {
toClose := []storage.Storage{}
s.cacheMu.Lock()
if len(s.storageCache) == 0 {
s.cacheMu.Unlock()
return
@@ -374,7 +376,7 @@ func (s *Server) aggressiveEviction() {
func (s *Server) evictStaleStorage() {
now := time.Now()
toClose := []storage.Storage{}
s.cacheMu.Lock()
// First pass: evict TTL-expired entries
@@ -466,7 +468,7 @@ func (s *Server) checkVersionCompatibility(clientVersion string) error {
if clientVersion == "" {
return nil
}
// Normalize versions to semver format (add 'v' prefix if missing)
serverVer := ServerVersion
if !strings.HasPrefix(serverVer, "v") {
@@ -476,38 +478,38 @@ func (s *Server) checkVersionCompatibility(clientVersion string) error {
if !strings.HasPrefix(clientVer, "v") {
clientVer = "v" + clientVer
}
// Validate versions are valid semver
if !semver.IsValid(serverVer) || !semver.IsValid(clientVer) {
// If either version is invalid, allow connection (dev builds, etc)
return nil
}
// Extract major versions
serverMajor := semver.Major(serverVer)
clientMajor := semver.Major(clientVer)
// Major version must match
if serverMajor != clientMajor {
cmp := semver.Compare(serverVer, clientVer)
if cmp < 0 {
// Daemon is older - needs upgrade
return fmt.Errorf("incompatible major versions: client %s, daemon %s. Daemon is older; upgrade and restart daemon: 'bd daemon --stop && bd daemon'",
return fmt.Errorf("incompatible major versions: client %s, daemon %s. Daemon is older; upgrade and restart daemon: 'bd daemon --stop && bd daemon'",
clientVersion, ServerVersion)
}
// Daemon is newer - client needs upgrade
return fmt.Errorf("incompatible major versions: client %s, daemon %s. Client is older; upgrade the bd CLI to match the daemon's major version",
return fmt.Errorf("incompatible major versions: client %s, daemon %s. Client is older; upgrade the bd CLI to match the daemon's major version",
clientVersion, ServerVersion)
}
// Compare full versions - daemon should be >= client for backward compatibility
cmp := semver.Compare(serverVer, clientVer)
if cmp < 0 {
// Server is older than client within same major version - may be missing features
return fmt.Errorf("version mismatch: daemon %s is older than client %s. Upgrade and restart daemon: 'bd daemon --stop && bd daemon'",
return fmt.Errorf("version mismatch: daemon %s is older than client %s. Upgrade and restart daemon: 'bd daemon --stop && bd daemon'",
ServerVersion, clientVersion)
}
// Client is same version or older - OK (daemon supports backward compat within major version)
return nil
}
@@ -515,13 +517,13 @@ func (s *Server) checkVersionCompatibility(clientVersion string) error {
func (s *Server) handleRequest(req *Request) Response {
// Track request timing
start := time.Now()
// Defer metrics recording to ensure it always happens
defer func() {
latency := time.Since(start)
s.metrics.RecordRequest(req.Operation, latency)
}()
// Check version compatibility (skip for ping/health to allow version checks)
if req.Operation != OpPing && req.Operation != OpHealth {
if err := s.checkVersionCompatibility(req.ClientVersion); err != nil {
@@ -532,7 +534,7 @@ func (s *Server) handleRequest(req *Request) Response {
}
}
}
var resp Response
switch req.Operation {
case OpPing:
@@ -563,6 +565,10 @@ func (s *Server) handleRequest(req *Request) Response {
resp = s.handleLabelAdd(req)
case OpLabelRemove:
resp = s.handleLabelRemove(req)
case OpCommentList:
resp = s.handleCommentList(req)
case OpCommentAdd:
resp = s.handleCommentAdd(req)
case OpBatch:
resp = s.handleBatch(req)
case OpReposList:
@@ -580,12 +586,12 @@ func (s *Server) handleRequest(req *Request) Response {
Error: fmt.Sprintf("unknown operation: %s", req.Operation),
}
}
// Record error if request failed
if !resp.Success {
s.metrics.RecordError(req.Operation)
}
return resp
}
@@ -656,11 +662,11 @@ func (s *Server) handlePing(_ *Request) Response {
func (s *Server) handleHealth(req *Request) Response {
start := time.Now()
// Get memory stats for health response
var m runtime.MemStats
runtime.ReadMemStats(&m)
store, err := s.getStorageForRequest(req)
if err != nil {
data, _ := json.Marshal(HealthResponse{
@@ -681,10 +687,10 @@ func (s *Server) handleHealth(req *Request) Response {
status := "healthy"
dbError := ""
_, pingErr := store.GetStatistics(healthCtx)
dbResponseMs := time.Since(start).Seconds() * 1000
if pingErr != nil {
status = "unhealthy"
dbError = pingErr.Error()
@@ -718,7 +724,7 @@ func (s *Server) handleHealth(req *Request) Response {
MaxConns: s.maxConns,
MemoryAllocMB: m.Alloc / 1024 / 1024,
}
if dbError != "" {
health.Error = dbError
}
@@ -735,14 +741,14 @@ func (s *Server) handleMetrics(_ *Request) Response {
s.cacheMu.RLock()
cacheSize := len(s.storageCache)
s.cacheMu.RUnlock()
snapshot := s.metrics.Snapshot(
atomic.LoadInt64(&s.cacheHits),
atomic.LoadInt64(&s.cacheMisses),
cacheSize,
int(atomic.LoadInt32(&s.activeConns)),
)
data, _ := json.Marshal(snapshot)
return Response{
Success: true,
@@ -982,7 +988,7 @@ func (s *Server) handleShow(req *Request) Response {
labels, _ := store.GetLabels(ctx, issue.ID)
deps, _ := store.GetDependencies(ctx, issue.ID)
dependents, _ := store.GetDependents(ctx, issue.ID)
// Create detailed response with related data
type IssueDetails struct {
*types.Issue
@@ -990,7 +996,7 @@ func (s *Server) handleShow(req *Request) Response {
Dependencies []*types.Issue `json:"dependencies,omitempty"`
Dependents []*types.Issue `json:"dependents,omitempty"`
}
details := &IssueDetails{
Issue: issue,
Labels: labels,
@@ -1190,6 +1196,72 @@ func (s *Server) handleLabelRemove(req *Request) Response {
return Response{Success: true}
}
func (s *Server) handleCommentList(req *Request) Response {
var commentArgs CommentListArgs
if err := json.Unmarshal(req.Args, &commentArgs); err != nil {
return Response{
Success: false,
Error: fmt.Sprintf("invalid comment list args: %v", err),
}
}
store, err := s.getStorageForRequest(req)
if err != nil {
return Response{
Success: false,
Error: fmt.Sprintf("storage error: %v", err),
}
}
ctx := s.reqCtx(req)
comments, err := store.GetIssueComments(ctx, commentArgs.ID)
if err != nil {
return Response{
Success: false,
Error: fmt.Sprintf("failed to list comments: %v", err),
}
}
data, _ := json.Marshal(comments)
return Response{
Success: true,
Data: data,
}
}
func (s *Server) handleCommentAdd(req *Request) Response {
var commentArgs CommentAddArgs
if err := json.Unmarshal(req.Args, &commentArgs); err != nil {
return Response{
Success: false,
Error: fmt.Sprintf("invalid comment add args: %v", err),
}
}
store, err := s.getStorageForRequest(req)
if err != nil {
return Response{
Success: false,
Error: fmt.Sprintf("storage error: %v", err),
}
}
ctx := s.reqCtx(req)
comment, err := store.AddIssueComment(ctx, commentArgs.ID, commentArgs.Author, commentArgs.Text)
if err != nil {
return Response{
Success: false,
Error: fmt.Sprintf("failed to add comment: %v", err),
}
}
data, _ := json.Marshal(comment)
return Response{
Success: true,
Data: data,
}
}
func (s *Server) handleBatch(req *Request) Response {
var batchArgs BatchArgs
if err := json.Unmarshal(req.Args, &batchArgs); err != nil {
@@ -1255,14 +1327,14 @@ func (s *Server) getStorageForRequest(req *Request) (storage.Storage, error) {
// Check cache first with write lock (to avoid race on lastAccess update)
s.cacheMu.Lock()
defer s.cacheMu.Unlock()
if entry, ok := s.storageCache[repoRoot]; ok {
// Update last access time (safe under Lock)
entry.lastAccess = time.Now()
atomic.AddInt64(&s.cacheHits, 1)
return entry.store, nil
}
atomic.AddInt64(&s.cacheMisses, 1)
// Open storage
@@ -1280,7 +1352,7 @@ func (s *Server) getStorageForRequest(req *Request) (storage.Storage, error) {
// Enforce LRU immediately to prevent FD spikes between cleanup ticks
needEvict := len(s.storageCache) > s.maxCacheSize
s.cacheMu.Unlock()
if needEvict {
s.evictStaleStorage()
}