diff --git a/pbs-datastore/src/datastore.rs b/pbs-datastore/src/datastore.rs index 333f5f8d..309137e0 100644 --- a/pbs-datastore/src/datastore.rs +++ b/pbs-datastore/src/datastore.rs @@ -1145,52 +1145,88 @@ impl DataStore { let namespace = namespace.context("iterating namespaces failed")?; for group in arc_self.iter_backup_groups(namespace)? { let group = group.context("iterating backup groups failed")?; - let mut snapshots = group.list_backups().context("listing snapshots failed")?; - // Sort by snapshot timestamp to iterate over consecutive snapshots for each image. - BackupInfo::sort_list(&mut snapshots, true); - for snapshot in snapshots { - for file in snapshot.files { - worker.check_abort()?; - worker.fail_on_shutdown()?; - match ArchiveType::from_path(&file) { - Ok(ArchiveType::FixedIndex) | Ok(ArchiveType::DynamicIndex) => (), - Ok(ArchiveType::Blob) | Err(_) => continue, - } + // Avoid race between listing/marking of snapshots by GC and pruning the last + // snapshot in the group, following a new snapshot creation. Otherwise known chunks + // might only be referenced by the new snapshot, so it must be read as well. + let mut retry_counter = 0; + 'retry: loop { + let _lock = match retry_counter { + 0..=9 => None, + 10 => Some( + group + .lock() + .context("exhausted retries and failed to lock group")?, + ), + _ => bail!("exhausted retries and unexpected counter overrun"), + }; - let mut path = snapshot.backup_dir.full_path(); - path.push(file); - - let index = match self.open_index_reader(&path)? { - Some(index) => index, - None => { - unprocessed_index_list.remove(&path); - continue; + let mut snapshots = match group.list_backups() { + Ok(snapshots) => snapshots, + Err(err) => { + if group.exists() { + return Err(err).context("listing snapshots failed")?; } - }; - self.index_mark_used_chunks( - index, - &path, - &mut chunk_lru_cache, - status, - worker, - )?; - - if !unprocessed_index_list.remove(&path) { - info!("Encountered new index file '{path:?}', increment total index file count"); - index_count += 1; + break 'retry; } + }; - let percentage = (processed_index_files + 1) * 100 / index_count; - if percentage > last_percentage { - info!( - "marked {percentage}% ({} of {index_count} index files)", - processed_index_files + 1, - ); - last_percentage = percentage; + // Always start iteration with the last snapshot of the group to reduce race + // window with concurrent backup+prune previous last snapshot. Allows to retry + // without the need to keep track of already processed index files for the + // current group. + BackupInfo::sort_list(&mut snapshots, true); + for (count, snapshot) in snapshots.into_iter().rev().enumerate() { + for file in snapshot.files { + worker.check_abort()?; + worker.fail_on_shutdown()?; + + match ArchiveType::from_path(&file) { + Ok(ArchiveType::FixedIndex) | Ok(ArchiveType::DynamicIndex) => (), + Ok(ArchiveType::Blob) | Err(_) => continue, + } + + let mut path = snapshot.backup_dir.full_path(); + path.push(file); + + let index = match self.open_index_reader(&path)? { + Some(index) => index, + None => { + unprocessed_index_list.remove(&path); + if count == 0 { + retry_counter += 1; + continue 'retry; + } + continue; + } + }; + + self.index_mark_used_chunks( + index, + &path, + &mut chunk_lru_cache, + status, + worker, + )?; + + if !unprocessed_index_list.remove(&path) { + info!("Encountered new index file '{path:?}', increment total index file count"); + index_count += 1; + } + + let percentage = (processed_index_files + 1) * 100 / index_count; + if percentage > last_percentage { + info!( + "marked {percentage}% ({} of {index_count} index files)", + processed_index_files + 1, + ); + last_percentage = percentage; + } + processed_index_files += 1; } - processed_index_files += 1; } + + break; } } }