Merge remote changes
Amp-Thread-ID: https://ampcode.com/threads/T-540ebf64-e14f-4541-b098-586d2b07dc3e Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
+31
-38
File diff suppressed because one or more lines are too long
@@ -60,7 +60,12 @@ func runEventDrivenLoop(
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case event := <-mutationChan:
|
||||
case event, ok := <-mutationChan:
|
||||
if !ok {
|
||||
// Channel closed (should never happen, but handle defensively)
|
||||
log.log("Mutation channel closed; exiting listener")
|
||||
return
|
||||
}
|
||||
log.log("Mutation detected: %s %s", event.Type, event.IssueID)
|
||||
exportDebouncer.Trigger()
|
||||
|
||||
@@ -70,23 +75,28 @@ func runEventDrivenLoop(
|
||||
}
|
||||
}()
|
||||
|
||||
// Optional: Periodic health check and dropped events safety net
|
||||
// Periodic health check
|
||||
healthTicker := time.NewTicker(60 * time.Second)
|
||||
defer healthTicker.Stop()
|
||||
|
||||
// Dropped events safety net (faster recovery than health check)
|
||||
droppedEventsTicker := time.NewTicker(1 * time.Second)
|
||||
defer droppedEventsTicker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-healthTicker.C:
|
||||
// Periodic health validation (not sync)
|
||||
checkDaemonHealth(ctx, store, log)
|
||||
|
||||
// Safety net: check for dropped mutation events
|
||||
case <-droppedEventsTicker.C:
|
||||
// Check for dropped mutation events every second
|
||||
dropped := server.ResetDroppedEventsCount()
|
||||
if dropped > 0 {
|
||||
log.log("WARNING: %d mutation events were dropped, triggering export", dropped)
|
||||
exportDebouncer.Trigger()
|
||||
}
|
||||
|
||||
case <-healthTicker.C:
|
||||
// Periodic health validation (not sync)
|
||||
checkDaemonHealth(ctx, store, log)
|
||||
|
||||
case sig := <-sigChan:
|
||||
if isReloadSignal(sig) {
|
||||
log.log("Received reload signal, ignoring")
|
||||
|
||||
+165
-54
@@ -21,10 +21,11 @@ var validateCmd = &cobra.Command{
|
||||
- Git merge conflicts in JSONL
|
||||
|
||||
Example:
|
||||
bd validate # Run all checks
|
||||
bd validate --fix-all # Auto-fix all issues
|
||||
bd validate --checks=orphans,dupes # Run specific checks
|
||||
bd validate --json # Output in JSON format`,
|
||||
bd validate # Run all checks
|
||||
bd validate --fix-all # Auto-fix all issues
|
||||
bd validate --checks=orphans,dupes # Run specific checks
|
||||
bd validate --checks=conflicts # Check for git conflicts
|
||||
bd validate --json # Output in JSON format`,
|
||||
Run: func(cmd *cobra.Command, _ []string) {
|
||||
// Check daemon mode - not supported yet (uses direct storage access)
|
||||
if daemonClient != nil {
|
||||
@@ -35,49 +36,119 @@ Example:
|
||||
|
||||
fixAll, _ := cmd.Flags().GetBool("fix-all")
|
||||
checksFlag, _ := cmd.Flags().GetString("checks")
|
||||
jsonOut, _ := cmd.Flags().GetBool("json")
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Determine which checks to run
|
||||
var checks []string
|
||||
if checksFlag == "" {
|
||||
checks = []string{"orphans", "duplicates", "pollution"}
|
||||
} else {
|
||||
checks = strings.Split(checksFlag, ",")
|
||||
// Parse and normalize checks
|
||||
checks, err := parseChecks(checksFlag)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
|
||||
fmt.Fprintf(os.Stderr, "Valid checks: orphans, duplicates, pollution, conflicts\n")
|
||||
os.Exit(2)
|
||||
}
|
||||
|
||||
// Fetch all issues once for checks that need them
|
||||
var allIssues []*types.Issue
|
||||
needsIssues := false
|
||||
for _, check := range checks {
|
||||
if check == "orphans" || check == "duplicates" || check == "pollution" {
|
||||
needsIssues = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if needsIssues {
|
||||
allIssues, err = store.SearchIssues(ctx, "", types.IssueFilter{})
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error fetching issues: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
results := validationResults{
|
||||
checks: make(map[string]checkResult),
|
||||
checks: make(map[string]checkResult),
|
||||
checkOrder: checks,
|
||||
}
|
||||
|
||||
// Run each check
|
||||
for _, check := range checks {
|
||||
switch check {
|
||||
case "orphans":
|
||||
results.checks["orphans"] = validateOrphanedDeps(ctx, fixAll)
|
||||
case "duplicates", "dupes":
|
||||
results.checks["duplicates"] = validateDuplicates(ctx, fixAll)
|
||||
results.checks["orphans"] = validateOrphanedDeps(ctx, allIssues, fixAll)
|
||||
case "duplicates":
|
||||
results.checks["duplicates"] = validateDuplicates(ctx, allIssues, fixAll)
|
||||
case "pollution":
|
||||
results.checks["pollution"] = validatePollution(ctx, fixAll)
|
||||
default:
|
||||
fmt.Fprintf(os.Stderr, "Unknown check: %s\n", check)
|
||||
results.checks["pollution"] = validatePollution(ctx, allIssues, fixAll)
|
||||
case "conflicts":
|
||||
results.checks["conflicts"] = validateGitConflicts(ctx, fixAll)
|
||||
}
|
||||
}
|
||||
|
||||
// Output results
|
||||
if jsonOutput {
|
||||
if jsonOut {
|
||||
outputJSON(results.toJSON())
|
||||
} else {
|
||||
results.print(fixAll)
|
||||
}
|
||||
|
||||
// Exit with error code if issues found
|
||||
if results.hasIssues() {
|
||||
// Exit with error code if issues found or errors occurred
|
||||
if results.hasFailures() {
|
||||
os.Exit(1)
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
// parseChecks normalizes and validates check names
|
||||
func parseChecks(checksFlag string) ([]string, error) {
|
||||
defaultChecks := []string{"orphans", "duplicates", "pollution", "conflicts"}
|
||||
|
||||
if checksFlag == "" {
|
||||
return defaultChecks, nil
|
||||
}
|
||||
|
||||
// Map of synonyms to canonical names
|
||||
synonyms := map[string]string{
|
||||
"dupes": "duplicates",
|
||||
"git-conflicts": "conflicts",
|
||||
}
|
||||
|
||||
var result []string
|
||||
seen := make(map[string]bool)
|
||||
|
||||
parts := strings.Split(checksFlag, ",")
|
||||
for _, part := range parts {
|
||||
check := strings.ToLower(strings.TrimSpace(part))
|
||||
if check == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
// Map synonyms
|
||||
if canonical, ok := synonyms[check]; ok {
|
||||
check = canonical
|
||||
}
|
||||
|
||||
// Validate
|
||||
valid := false
|
||||
for _, validCheck := range defaultChecks {
|
||||
if check == validCheck {
|
||||
valid = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !valid {
|
||||
return nil, fmt.Errorf("unknown check: %s", part)
|
||||
}
|
||||
|
||||
// Deduplicate
|
||||
if !seen[check] {
|
||||
seen[check] = true
|
||||
result = append(result, check)
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
type checkResult struct {
|
||||
name string
|
||||
issueCount int
|
||||
@@ -87,11 +158,15 @@ type checkResult struct {
|
||||
}
|
||||
|
||||
type validationResults struct {
|
||||
checks map[string]checkResult
|
||||
checks map[string]checkResult
|
||||
checkOrder []string
|
||||
}
|
||||
|
||||
func (r *validationResults) hasIssues() bool {
|
||||
func (r *validationResults) hasFailures() bool {
|
||||
for _, result := range r.checks {
|
||||
if result.err != nil {
|
||||
return true
|
||||
}
|
||||
if result.issueCount > 0 && result.fixedCount < result.issueCount {
|
||||
return true
|
||||
}
|
||||
@@ -106,12 +181,20 @@ func (r *validationResults) toJSON() map[string]interface{} {
|
||||
|
||||
totalIssues := 0
|
||||
totalFixed := 0
|
||||
hasErrors := false
|
||||
|
||||
for name, result := range r.checks {
|
||||
var errorStr interface{}
|
||||
if result.err != nil {
|
||||
errorStr = result.err.Error()
|
||||
hasErrors = true
|
||||
}
|
||||
|
||||
output["checks"].(map[string]interface{})[name] = map[string]interface{}{
|
||||
"issue_count": result.issueCount,
|
||||
"fixed_count": result.fixedCount,
|
||||
"error": result.err,
|
||||
"error": errorStr,
|
||||
"failed": result.err != nil,
|
||||
"suggestions": result.suggestions,
|
||||
}
|
||||
totalIssues += result.issueCount
|
||||
@@ -120,7 +203,7 @@ func (r *validationResults) toJSON() map[string]interface{} {
|
||||
|
||||
output["total_issues"] = totalIssues
|
||||
output["total_fixed"] = totalFixed
|
||||
output["healthy"] = totalIssues == 0 || totalIssues == totalFixed
|
||||
output["healthy"] = !hasErrors && (totalIssues == 0 || totalIssues == totalFixed)
|
||||
|
||||
return output
|
||||
}
|
||||
@@ -136,24 +219,26 @@ func (r *validationResults) print(fixAll bool) {
|
||||
totalIssues := 0
|
||||
totalFixed := 0
|
||||
|
||||
for name, result := range r.checks {
|
||||
// Print in deterministic order
|
||||
for _, name := range r.checkOrder {
|
||||
result := r.checks[name]
|
||||
prefix := "✓"
|
||||
colorFunc := green
|
||||
|
||||
if result.err != nil {
|
||||
prefix = "✗"
|
||||
colorFunc = red
|
||||
fmt.Printf("%s %s: ERROR - %v\n", colorFunc(prefix), name, result.err)
|
||||
fmt.Printf("%s %s: ERROR - %v\n", colorFunc(prefix), result.name, result.err)
|
||||
} else if result.issueCount > 0 {
|
||||
prefix = "⚠"
|
||||
colorFunc = yellow
|
||||
if result.fixedCount > 0 {
|
||||
fmt.Printf("%s %s: %d found, %d fixed\n", colorFunc(prefix), name, result.issueCount, result.fixedCount)
|
||||
fmt.Printf("%s %s: %d found, %d fixed\n", colorFunc(prefix), result.name, result.issueCount, result.fixedCount)
|
||||
} else {
|
||||
fmt.Printf("%s %s: %d found\n", colorFunc(prefix), name, result.issueCount)
|
||||
fmt.Printf("%s %s: %d found\n", colorFunc(prefix), result.name, result.issueCount)
|
||||
}
|
||||
} else {
|
||||
fmt.Printf("%s %s: OK\n", colorFunc(prefix), name)
|
||||
fmt.Printf("%s %s: OK\n", colorFunc(prefix), result.name)
|
||||
}
|
||||
|
||||
totalIssues += result.issueCount
|
||||
@@ -184,16 +269,9 @@ func (r *validationResults) print(fixAll bool) {
|
||||
}
|
||||
}
|
||||
|
||||
func validateOrphanedDeps(ctx context.Context, fix bool) checkResult {
|
||||
func validateOrphanedDeps(ctx context.Context, allIssues []*types.Issue, fix bool) checkResult {
|
||||
result := checkResult{name: "orphaned dependencies"}
|
||||
|
||||
// Get all issues
|
||||
allIssues, err := store.SearchIssues(ctx, "", types.IssueFilter{})
|
||||
if err != nil {
|
||||
result.err = err
|
||||
return result
|
||||
}
|
||||
|
||||
// Build ID existence map
|
||||
existingIDs := make(map[string]bool)
|
||||
for _, issue := range allIssues {
|
||||
@@ -248,16 +326,9 @@ func validateOrphanedDeps(ctx context.Context, fix bool) checkResult {
|
||||
return result
|
||||
}
|
||||
|
||||
func validateDuplicates(ctx context.Context, fix bool) checkResult {
|
||||
func validateDuplicates(ctx context.Context, allIssues []*types.Issue, fix bool) checkResult {
|
||||
result := checkResult{name: "duplicates"}
|
||||
|
||||
// Get all issues
|
||||
allIssues, err := store.SearchIssues(ctx, "", types.IssueFilter{})
|
||||
if err != nil {
|
||||
result.err = err
|
||||
return result
|
||||
}
|
||||
|
||||
// Find duplicates
|
||||
duplicateGroups := findDuplicateGroups(allIssues)
|
||||
|
||||
@@ -279,16 +350,9 @@ func validateDuplicates(ctx context.Context, fix bool) checkResult {
|
||||
return result
|
||||
}
|
||||
|
||||
func validatePollution(ctx context.Context, fix bool) checkResult {
|
||||
func validatePollution(ctx context.Context, allIssues []*types.Issue, fix bool) checkResult {
|
||||
result := checkResult{name: "test pollution"}
|
||||
|
||||
// Get all issues
|
||||
allIssues, err := store.SearchIssues(ctx, "", types.IssueFilter{})
|
||||
if err != nil {
|
||||
result.err = err
|
||||
return result
|
||||
}
|
||||
|
||||
// Detect pollution
|
||||
polluted := detectTestPollution(allIssues)
|
||||
result.issueCount = len(polluted)
|
||||
@@ -305,8 +369,55 @@ func validatePollution(ctx context.Context, fix bool) checkResult {
|
||||
return result
|
||||
}
|
||||
|
||||
func validateGitConflicts(ctx context.Context, fix bool) checkResult {
|
||||
result := checkResult{name: "git conflicts"}
|
||||
|
||||
// Check JSONL file for conflict markers
|
||||
jsonlPath := findJSONLPath()
|
||||
data, err := os.ReadFile(jsonlPath)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
// No JSONL file = no conflicts
|
||||
return result
|
||||
}
|
||||
result.err = fmt.Errorf("failed to read JSONL: %w", err)
|
||||
return result
|
||||
}
|
||||
|
||||
// Look for git conflict markers
|
||||
lines := strings.Split(string(data), "\n")
|
||||
var conflictLines []int
|
||||
for i, line := range lines {
|
||||
trimmed := strings.TrimSpace(line)
|
||||
if strings.HasPrefix(trimmed, "<<<<<<< ") ||
|
||||
trimmed == "=======" ||
|
||||
strings.HasPrefix(trimmed, ">>>>>>> ") {
|
||||
conflictLines = append(conflictLines, i+1)
|
||||
}
|
||||
}
|
||||
|
||||
if len(conflictLines) > 0 {
|
||||
result.issueCount = 1 // One conflict situation
|
||||
result.suggestions = append(result.suggestions,
|
||||
fmt.Sprintf("Resolve git conflict in %s (markers at lines: %v)", jsonlPath, conflictLines))
|
||||
if !fix {
|
||||
result.suggestions = append(result.suggestions,
|
||||
fmt.Sprintf("Then run 'bd import -i %s' to reload issues", jsonlPath))
|
||||
}
|
||||
}
|
||||
|
||||
// Can't auto-fix git conflicts
|
||||
if fix && result.issueCount > 0 {
|
||||
result.suggestions = append(result.suggestions,
|
||||
"Git conflicts cannot be auto-fixed. Resolve manually in your editor or run 'bd export' to regenerate JSONL")
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func init() {
|
||||
validateCmd.Flags().Bool("fix-all", false, "Auto-fix all fixable issues")
|
||||
validateCmd.Flags().String("checks", "", "Comma-separated list of checks (orphans,duplicates,pollution)")
|
||||
validateCmd.Flags().String("checks", "", "Comma-separated list of checks (orphans,duplicates,pollution,conflicts)")
|
||||
validateCmd.Flags().Bool("json", false, "Output in JSON format")
|
||||
rootCmd.AddCommand(validateCmd)
|
||||
}
|
||||
|
||||
@@ -51,9 +51,17 @@ type Server struct {
|
||||
droppedEvents atomic.Int64 // Counter for dropped mutation events
|
||||
}
|
||||
|
||||
// Mutation event types
|
||||
const (
|
||||
MutationCreate = "create"
|
||||
MutationUpdate = "update"
|
||||
MutationDelete = "delete"
|
||||
MutationComment = "comment"
|
||||
)
|
||||
|
||||
// MutationEvent represents a database mutation for event-driven sync
|
||||
type MutationEvent struct {
|
||||
Type string // "create", "update", "delete", "comment"
|
||||
Type string // One of: MutationCreate, MutationUpdate, MutationDelete, MutationComment
|
||||
IssueID string // e.g., "bd-42"
|
||||
Timestamp time.Time
|
||||
}
|
||||
@@ -76,6 +84,14 @@ func NewServer(socketPath string, store storage.Storage, workspacePath string, d
|
||||
}
|
||||
}
|
||||
|
||||
mutationBufferSize := 512 // default (increased from 100 for better burst handling)
|
||||
if env := os.Getenv("BEADS_MUTATION_BUFFER"); env != "" {
|
||||
var bufSize int
|
||||
if _, err := fmt.Sscanf(env, "%d", &bufSize); err == nil && bufSize > 0 {
|
||||
mutationBufferSize = bufSize
|
||||
}
|
||||
}
|
||||
|
||||
s := &Server{
|
||||
socketPath: socketPath,
|
||||
workspacePath: workspacePath,
|
||||
@@ -89,7 +105,7 @@ func NewServer(socketPath string, store storage.Storage, workspacePath string, d
|
||||
connSemaphore: make(chan struct{}, maxConns),
|
||||
requestTimeout: requestTimeout,
|
||||
readyChan: make(chan struct{}),
|
||||
mutationChan: make(chan MutationEvent, 100), // Buffered to avoid blocking
|
||||
mutationChan: make(chan MutationEvent, mutationBufferSize), // Configurable buffer
|
||||
}
|
||||
s.lastActivityTime.Store(time.Now())
|
||||
return s
|
||||
|
||||
@@ -161,7 +161,7 @@ func (s *Server) handleCreate(req *Request) Response {
|
||||
}
|
||||
|
||||
// Emit mutation event for event-driven daemon
|
||||
s.emitMutation("create", issue.ID)
|
||||
s.emitMutation(MutationCreate, issue.ID)
|
||||
|
||||
data, _ := json.Marshal(issue)
|
||||
return Response{
|
||||
@@ -195,7 +195,7 @@ func (s *Server) handleUpdate(req *Request) Response {
|
||||
}
|
||||
|
||||
// Emit mutation event for event-driven daemon
|
||||
s.emitMutation("update", updateArgs.ID)
|
||||
s.emitMutation(MutationUpdate, updateArgs.ID)
|
||||
|
||||
issue, err := store.GetIssue(ctx, updateArgs.ID)
|
||||
if err != nil {
|
||||
@@ -232,7 +232,7 @@ func (s *Server) handleClose(req *Request) Response {
|
||||
}
|
||||
|
||||
// Emit mutation event for event-driven daemon
|
||||
s.emitMutation("update", closeArgs.ID)
|
||||
s.emitMutation(MutationUpdate, closeArgs.ID)
|
||||
|
||||
issue, _ := store.GetIssue(ctx, closeArgs.ID)
|
||||
data, _ := json.Marshal(issue)
|
||||
|
||||
@@ -35,7 +35,7 @@ func (s *Server) handleDepAdd(req *Request) Response {
|
||||
}
|
||||
|
||||
// Emit mutation event for event-driven daemon
|
||||
s.emitMutation("update", depArgs.FromID)
|
||||
s.emitMutation(MutationUpdate, depArgs.FromID)
|
||||
|
||||
return Response{Success: true}
|
||||
}
|
||||
@@ -61,7 +61,7 @@ func (s *Server) handleSimpleStoreOp(req *Request, argsPtr interface{}, argDesc
|
||||
}
|
||||
|
||||
// Emit mutation event for event-driven daemon
|
||||
s.emitMutation("update", issueID)
|
||||
s.emitMutation(MutationUpdate, issueID)
|
||||
|
||||
return Response{Success: true}
|
||||
}
|
||||
@@ -135,7 +135,7 @@ func (s *Server) handleCommentAdd(req *Request) Response {
|
||||
}
|
||||
|
||||
// Emit mutation event for event-driven daemon
|
||||
s.emitMutation("comment", commentArgs.ID)
|
||||
s.emitMutation(MutationComment, commentArgs.ID)
|
||||
|
||||
data, _ := json.Marshal(comment)
|
||||
return Response{
|
||||
|
||||
Reference in New Issue
Block a user