Skip to content

Commit

Permalink
store: Improve predictability of the time for each batch during pruning
Browse files Browse the repository at this point in the history
  • Loading branch information
lutter committed Nov 10, 2022
1 parent 95c316a commit 2da697b
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 65 deletions.
2 changes: 1 addition & 1 deletion store/postgres/src/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ impl CopyState {
/// batch, but don't step up the size by more than 2x at once
#[derive(Debug, Queryable)]
pub(crate) struct AdaptiveBatchSize {
size: i64,
pub size: i64,
}

impl AdaptiveBatchSize {
Expand Down
155 changes: 91 additions & 64 deletions store/postgres/src/relational/prune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ impl TablePair {
}

/// Copy all entity versions visible between `earliest_block` and
/// `final_block` in batches, where each batch is a separate transaction
/// `final_block` in batches, where each batch is a separate
/// transaction. Write activity for nonfinal blocks can happen
/// concurrently to this copy
fn copy_final_entities(
&self,
conn: &PgConnection,
Expand All @@ -73,62 +75,74 @@ impl TablePair {
final_block: BlockNumber,
cancel: &CancelHandle,
) -> Result<usize, CancelableError<StoreError>> {
#[derive(QueryableByName)]
struct LastVid {
#[sql_type = "BigInt"]
rows: i64,
#[sql_type = "BigInt"]
last_vid: i64,
}

let column_list = self.column_list();

// Determine the last vid that we need to copy
let VidRange { min_vid, max_vid } = sql_query(&format!(
"select coalesce(min(vid), 0) as min_vid, \
coalesce(max(vid), -1) as max_vid from {src} \
where lower(block_range) <= $2 \
and coalesce(upper(block_range), 2147483647) > $1 \
and coalesce(upper(block_range), 2147483647) <= $2 \
and block_range && int4range($1, $2, '[]')",
src = self.src.qualified_name,
))
.bind::<Integer, _>(earliest_block)
.bind::<Integer, _>(final_block)
.get_result::<VidRange>(conn)?;

let mut batch_size = AdaptiveBatchSize::new(&self.src);
// The first vid we still need to copy. When we start, we start with
// 0 so that we don't constrain the set of rows based on their vid
let mut next_vid = 0;
// The first vid we still need to copy
let mut next_vid = min_vid;
let mut total_rows: usize = 0;
loop {
while next_vid <= max_vid {
let start = Instant::now();
let LastVid { last_vid, rows } = conn.transaction(|| {
let rows = conn.transaction(|| {
// Page through all rows in `src` in batches of `batch_size`
// and copy the ones that are visible to queries at block
// heights between `earliest_block` and `final_block`, but
// whose block_range does not extend past `final_block`
// since they could still be reverted while we copy.
// The conditions on `block_range` are expressed redundantly
// to make more indexes useable
sql_query(&format!(
"with cp as (insert into {dst}({column_list}) \
select {column_list} from {src} \
where lower(block_range) <= $2 \
and coalesce(upper(block_range), 2147483647) > $1 \
and coalesce(upper(block_range), 2147483647) <= $2 \
and block_range && int4range($1, $2, '[]') \
and vid >= $3 \
order by vid \
limit $4 \
returning vid) \
select coalesce(max(cp.vid), 0) as last_vid, count(*) as rows from cp",
"insert into {dst}({column_list}) \
select {column_list} from {src} \
where lower(block_range) <= $2 \
and coalesce(upper(block_range), 2147483647) > $1 \
and coalesce(upper(block_range), 2147483647) <= $2 \
and block_range && int4range($1, $2, '[]') \
and vid >= $3 and vid < $3 + $4 \
order by vid",
src = self.src.qualified_name,
dst = self.dst.qualified_name
))
.bind::<Integer, _>(earliest_block)
.bind::<Integer, _>(final_block)
.bind::<BigInt, _>(next_vid)
.bind::<BigInt, _>(&batch_size)
.get_result::<LastVid>(conn)
.execute(conn)
})?;
cancel.check_cancel()?;

total_rows += rows as usize;
let done = rows == 0;
reporter.copy_final_batch(self.src.name.as_str(), rows as usize, total_rows, done);

if done {
return Ok(total_rows);
}
total_rows += rows;
next_vid += batch_size.size;

batch_size.adapt(start.elapsed());
next_vid = last_vid + 1;

reporter.copy_final_batch(
self.src.name.as_str(),
rows as usize,
total_rows,
next_vid > max_vid,
);
}
Ok(total_rows)
}

/// Copy all entity versions visible after `final_block` in batches,
/// where each batch is a separate transaction
/// where each batch is a separate transaction. This assumes that all
/// other write activity to the source table is blocked while we copy
fn copy_nonfinal_entities(
&self,
conn: &PgConnection,
Expand All @@ -137,54 +151,59 @@ impl TablePair {
) -> Result<usize, StoreError> {
let column_list = self.column_list();

#[derive(QueryableByName)]
struct LastVid {
#[sql_type = "BigInt"]
rows: i64,
#[sql_type = "BigInt"]
last_vid: i64,
}
// Determine the last vid that we need to copy
let VidRange { min_vid, max_vid } = sql_query(&format!(
"select coalesce(min(vid), 0) as min_vid, \
coalesce(max(vid), -1) as max_vid from {src} \
where coalesce(upper(block_range), 2147483647) > $1 \
and block_range && int4range($1, null)",
src = self.src.qualified_name,
))
.bind::<Integer, _>(final_block)
.get_result::<VidRange>(conn)?;

let mut batch_size = AdaptiveBatchSize::new(&self.src);
// The first vid we still need to copy. When we start, we start with
// 0 so that we don't constrain the set of rows based on their vid
let mut next_vid = 0;
// The first vid we still need to copy
let mut next_vid = min_vid;
let mut total_rows = 0;
loop {
while next_vid <= max_vid {
let start = Instant::now();
let LastVid { rows, last_vid } = conn.transaction(|| {
let rows = conn.transaction(|| {
// Page through all the rows in `src` in batches of
// `batch_size` that are visible to queries at block heights
// starting right after `final_block`.
// The conditions on `block_range` are expressed redundantly
// to make more indexes useable
sql_query(&format!(
"with cp as (insert into {dst}({column_list}) \
select {column_list} from {src} \
where coalesce(upper(block_range), 2147483647) > $1 \
and block_range && int4range($1, null) \
and vid >= $2 \
order by vid \
limit $3
returning vid) \
select coalesce(max(cp.vid), 0) as last_vid, count(*) as rows from cp",
"insert into {dst}({column_list}) \
select {column_list} from {src} \
where coalesce(upper(block_range), 2147483647) > $1 \
and block_range && int4range($1, null) \
and vid >= $2 and vid < $2 + $3 \
order by vid",
dst = self.dst.qualified_name,
src = self.src.qualified_name,
))
.bind::<Integer, _>(final_block)
.bind::<BigInt, _>(next_vid)
.bind::<BigInt, _>(&batch_size)
.get_result::<LastVid>(conn)
.execute(conn)
.map_err(StoreError::from)
})?;
total_rows += rows as usize;

total_rows += rows;
next_vid += batch_size.size;

batch_size.adapt(start.elapsed());

reporter.copy_nonfinal_batch(
self.src.name.as_str(),
rows as usize,
total_rows,
rows == 0,
next_vid > max_vid,
);
if rows == 0 {
return Ok(total_rows);
}
batch_size.adapt(start.elapsed());
next_vid = last_vid + 1;
}
Ok(total_rows)
}

/// Replace the `src` table with the `dst` table
Expand Down Expand Up @@ -368,3 +387,11 @@ impl Layout {
Ok(())
}
}

#[derive(QueryableByName)]
struct VidRange {
#[sql_type = "BigInt"]
min_vid: i64,
#[sql_type = "BigInt"]
max_vid: i64,
}

0 comments on commit 2da697b

Please sign in to comment.