Hotfix issues with 1.0.3
This commit is contained in:
@@ -75,26 +75,6 @@ type Job struct {
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func (j *Job) getUnprocessedBrokenItems() map[string][]arr.ContentFile {
|
||||
items := make(map[string][]arr.ContentFile)
|
||||
|
||||
for arrName, files := range j.BrokenItems {
|
||||
if len(files) == 0 {
|
||||
continue // Skip empty file lists
|
||||
}
|
||||
items[arrName] = make([]arr.ContentFile, 0, len(files))
|
||||
for _, file := range files {
|
||||
if file.Path != "" && file.TargetPath != "" && !file.Processed {
|
||||
items[arrName] = append(items[arrName], file)
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(items) == 0 {
|
||||
return nil // Return nil if no unprocessed items found
|
||||
}
|
||||
return items
|
||||
}
|
||||
|
||||
func New(arrs *arr.Storage, engine *debrid.Storage) *Repair {
|
||||
cfg := config.Get()
|
||||
workers := runtime.NumCPU() * 20
|
||||
@@ -765,7 +745,7 @@ func (r *Repair) ProcessJob(id string) error {
|
||||
return fmt.Errorf("job %s already failed", id)
|
||||
}
|
||||
|
||||
brokenItems := job.getUnprocessedBrokenItems()
|
||||
brokenItems := job.BrokenItems
|
||||
if len(brokenItems) == 0 {
|
||||
r.logger.Info().Msgf("No broken items found for job %s", id)
|
||||
job.CompletedAt = time.Now()
|
||||
@@ -773,144 +753,63 @@ func (r *Repair) ProcessJob(id string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
r.logger.Info().Msgf("Processing job %s with %d broken items", id, len(brokenItems))
|
||||
go r.processJob(job, brokenItems)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Repair) processJob(job *Job, brokenItems map[string][]arr.ContentFile) {
|
||||
if job.ctx == nil || job.ctx.Err() != nil {
|
||||
job.ctx, job.cancelFunc = context.WithCancel(r.ctx)
|
||||
}
|
||||
|
||||
errs := make([]error, 0)
|
||||
processedCount := 0
|
||||
g, ctx := errgroup.WithContext(job.ctx)
|
||||
g.SetLimit(r.workers)
|
||||
|
||||
for arrName, items := range brokenItems {
|
||||
select {
|
||||
case <-job.ctx.Done():
|
||||
r.logger.Info().Msgf("Job %s cancelled", job.ID)
|
||||
job.Status = JobCancelled
|
||||
job.CompletedAt = time.Now()
|
||||
job.Error = "Job was cancelled"
|
||||
return
|
||||
default:
|
||||
// Continue processing
|
||||
}
|
||||
items := items
|
||||
arrName := arrName
|
||||
g.Go(func() error {
|
||||
|
||||
a := r.arrs.Get(arrName)
|
||||
if a == nil {
|
||||
errs = append(errs, fmt.Errorf("arr %s not found", arrName))
|
||||
continue
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if err := a.DeleteFiles(items); err != nil {
|
||||
errs = append(errs, fmt.Errorf("failed to delete broken items for %s: %w", arrName, err))
|
||||
continue
|
||||
}
|
||||
// Search for missing items
|
||||
if err := a.SearchMissing(items); err != nil {
|
||||
errs = append(errs, fmt.Errorf("failed to search missing items for %s: %w", arrName, err))
|
||||
continue
|
||||
}
|
||||
processedCount += len(items)
|
||||
// Mark this item as processed
|
||||
for i := range items {
|
||||
items[i].Processed = true
|
||||
}
|
||||
job.BrokenItems[arrName] = items
|
||||
a := r.arrs.Get(arrName)
|
||||
if a == nil {
|
||||
r.logger.Error().Msgf("Arr %s not found", arrName)
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := a.DeleteFiles(items); err != nil {
|
||||
r.logger.Error().Err(err).Msgf("Failed to delete broken items for %s", arrName)
|
||||
return nil
|
||||
}
|
||||
// Search for missing items
|
||||
if err := a.SearchMissing(items); err != nil {
|
||||
r.logger.Error().Err(err).Msgf("Failed to search missing items for %s", arrName)
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// Update job status to in-progress
|
||||
job.Status = JobProcessing
|
||||
r.saveToFile()
|
||||
|
||||
if len(errs) > 0 {
|
||||
errMsg := fmt.Sprintf("Job %s encountered errors: %v", job.ID, errs)
|
||||
job.Error = errMsg
|
||||
job.FailedAt = time.Now()
|
||||
job.Status = JobFailed
|
||||
r.logger.Error().Msg(errMsg)
|
||||
go func() {
|
||||
if err := request.SendDiscordMessage("repair_failed", "error", job.discordContext()); err != nil {
|
||||
r.logger.Error().Msgf("Error sending discord message: %v", err)
|
||||
}
|
||||
}()
|
||||
return
|
||||
}
|
||||
remainingItems := job.getUnprocessedBrokenItems()
|
||||
if len(remainingItems) == 0 {
|
||||
// All items processed, mark job as completed
|
||||
job.CompletedAt = time.Now()
|
||||
job.Status = JobCompleted
|
||||
r.logger.Info().Msgf("Job %s completed successfully (all items processed)", job.ID)
|
||||
go func() {
|
||||
if err := request.SendDiscordMessage("repair_complete", "success", job.discordContext()); err != nil {
|
||||
r.logger.Error().Msgf("Error sending discord message: %v", err)
|
||||
}
|
||||
}()
|
||||
} else {
|
||||
// Some items still remain, keep job as pending
|
||||
job.Status = JobPending
|
||||
r.logger.Info().Msgf("Job %s: processed %d selected items successfully, %d items remaining", job.ID, processedCount, len(remainingItems))
|
||||
go func() {
|
||||
if err := request.SendDiscordMessage("repair_partial_complete", "info", job.discordContext()); err != nil {
|
||||
r.logger.Error().Msgf("Error sending discord message: %v", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
r.saveToFile()
|
||||
}
|
||||
|
||||
// ProcessJobItems processes the selected items for a job
|
||||
// selectedItems is the map of arr names to the list of file IDs to process
|
||||
func (r *Repair) ProcessJobItems(id string, selectedItems map[string][]int) error {
|
||||
job := r.GetJob(id)
|
||||
if job == nil {
|
||||
return fmt.Errorf("job %s not found", id)
|
||||
}
|
||||
if job.Status != JobPending {
|
||||
return fmt.Errorf("job %s not pending", id)
|
||||
}
|
||||
if job.StartedAt.IsZero() {
|
||||
return fmt.Errorf("job %s not started", id)
|
||||
}
|
||||
if !job.CompletedAt.IsZero() {
|
||||
return fmt.Errorf("job %s already completed", id)
|
||||
}
|
||||
if !job.FailedAt.IsZero() {
|
||||
return fmt.Errorf("job %s already failed", id)
|
||||
}
|
||||
|
||||
brokenItems := job.getUnprocessedBrokenItems()
|
||||
validatedItems := make(map[string][]arr.ContentFile)
|
||||
|
||||
for arrName, selectedItemsList := range selectedItems {
|
||||
if jobItems, exists := brokenItems[arrName]; exists {
|
||||
validItems := make([]arr.ContentFile, 0, len(selectedItemsList))
|
||||
for _, item := range selectedItemsList {
|
||||
// Find the item in the job items
|
||||
for _, jobItem := range jobItems {
|
||||
if jobItem.FileId == item {
|
||||
validItems = append(validItems, jobItem)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(validItems) > 0 {
|
||||
validatedItems[arrName] = validItems
|
||||
}
|
||||
// Launch a goroutine to wait for completion and update the job
|
||||
go func() {
|
||||
if err := g.Wait(); err != nil {
|
||||
job.FailedAt = time.Now()
|
||||
job.Error = err.Error()
|
||||
job.CompletedAt = time.Now()
|
||||
job.Status = JobFailed
|
||||
r.logger.Error().Err(err).Msgf("Job %s failed", id)
|
||||
} else {
|
||||
job.CompletedAt = time.Now()
|
||||
job.Status = JobCompleted
|
||||
r.logger.Info().Msgf("Job %s completed successfully", id)
|
||||
}
|
||||
}
|
||||
if len(validatedItems) == 0 {
|
||||
return fmt.Errorf("no valid items found for job %s", id)
|
||||
}
|
||||
|
||||
job.Status = JobProcessing
|
||||
r.saveToFile()
|
||||
|
||||
go r.processJob(job, validatedItems)
|
||||
r.saveToFile()
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user