Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion store/postgres/src/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3319,7 +3319,7 @@ impl ChainStoreTrait for ChainStore {
let mut last_log = Instant::now();

loop {
let current_size = batch_size.size;
let current_size = batch_size.size.max(1);
let start = Instant::now();
let deleted = self
.storage
Expand Down
28 changes: 21 additions & 7 deletions store/postgres/src/vid_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const INITIAL_BATCH_SIZE: i64 = 10_000;
/// arrays can be large and large arrays will slow down copying a lot. We
/// therefore tread lightly in that case
const INITIAL_BATCH_SIZE_LIST: i64 = 100;
const MIN_BATCH_SIZE: i64 = 1;

/// Track the desired size of a batch in such a way that doing the next
/// batch gets close to TARGET_DURATION for the time it takes to copy one
Expand All @@ -35,9 +36,13 @@ pub(crate) struct AdaptiveBatchSize {
}

impl AdaptiveBatchSize {
fn clamp_size(size: i64) -> i64 {
size.max(MIN_BATCH_SIZE)
}

pub fn with_size(size: i64) -> Self {
Self {
size,
size: Self::clamp_size(size),
target: ENV_VARS.store.batch_target_duration,
}
}
Expand All @@ -49,10 +54,7 @@ impl AdaptiveBatchSize {
INITIAL_BATCH_SIZE
};

Self {
size,
target: ENV_VARS.store.batch_target_duration,
}
Self::with_size(size)
}

// adjust batch size by trying to extrapolate in such a way that we
Expand All @@ -62,7 +64,7 @@ impl AdaptiveBatchSize {
// Avoid division by zero
let duration = duration.as_millis().max(1);
let new_batch_size = self.size as f64 * self.target.as_millis() as f64 / duration as f64;
self.size = (2 * self.size).min(new_batch_size.round() as i64);
self.size = Self::clamp_size((2 * self.size).min(new_batch_size.round() as i64));
self.size
}
}
Expand Down Expand Up @@ -186,7 +188,7 @@ impl VidBatcher {

/// Explicitly set the batch size
pub fn with_batch_size(mut self: VidBatcher, size: usize) -> Self {
self.batch_size.size = size as i64;
self.batch_size.size = AdaptiveBatchSize::clamp_size(size as i64);
self
}

Expand Down Expand Up @@ -244,6 +246,7 @@ impl VidBatcher {
}

pub(crate) fn set_batch_size(&mut self, size: usize) {
let size = AdaptiveBatchSize::clamp_size(size as i64) as usize;
self.batch_size.size = size as i64;
self.end = match &self.ogive {
Some(ogive) => ogive.next_point(self.start, size).unwrap(),
Expand Down Expand Up @@ -464,6 +467,17 @@ mod tests {
batcher.step(360, 359, S010).await;
}

#[test]
fn adaptive_batch_size_never_shrinks_to_zero() {
let mut batch_size = AdaptiveBatchSize {
size: 100,
target: S100,
};

assert_eq!(batch_size.adapt(Duration::from_secs(20_001)), 1);
assert_eq!(batch_size.size, 1);
}

#[test]
fn vid_batcher_adjusts_bounds() {
// The first and last entry in `bounds` are estimats of the min and
Expand Down