Skip to content

Commit

Permalink
WIP: Log more details about time taken in copying
Browse files Browse the repository at this point in the history
  • Loading branch information
lutter committed Apr 2, 2021
1 parent 436da11 commit c951875
Showing 1 changed file with 13 additions and 6 deletions.
19 changes: 13 additions & 6 deletions store/postgres/src/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use diesel::{
use graph::{
components::store::EntityType,
constraint_violation,
prelude::{info, o, BlockNumber, EthereumBlockPointer, Logger, StoreError},
prelude::{debug, info, o, BlockNumber, EthereumBlockPointer, Logger, StoreError},
};

use crate::{
Expand Down Expand Up @@ -433,7 +433,9 @@ impl TableState {
Ok(())
}

fn copy_batch(&mut self, conn: &PgConnection) -> Result<Status, StoreError> {
fn copy_batch(&mut self, conn: &PgConnection) -> Result<(Status, Duration), StoreError> {
let batch_start = Instant::now();

fn is_cancelled(dst: &Site, conn: &PgConnection) -> Result<bool, StoreError> {
use active_copies as ac;

Expand All @@ -447,7 +449,7 @@ impl TableState {
let start = Instant::now();

if is_cancelled(self.dst_site.as_ref(), conn)? {
return Ok(Status::Cancelled);
return Ok((Status::Cancelled, batch_start.elapsed()));
}

// Copy all versions with next_vid <= vid <= next_vid + batch_size - 1,
Expand Down Expand Up @@ -476,10 +478,10 @@ impl TableState {
}

if is_cancelled(self.dst_site.as_ref(), conn)? {
return Ok(Status::Cancelled);
return Ok((Status::Cancelled, batch_start.elapsed()));
}

Ok(Status::Finished)
Ok((Status::Finished, batch_start.elapsed()))
}
}

Expand Down Expand Up @@ -608,13 +610,18 @@ impl Connection {
let mut progress = CopyProgress::new(&self.logger, &state);
progress.start();

let start = Instant::now();
for table in state.tables.iter_mut().filter(|table| !table.finished()) {
while !table.finished() {
let status = self.transaction(|conn| table.copy_batch(conn))?;
let (status, elapsed) = self.transaction(|conn| table.copy_batch(conn))?;
if status == Status::Cancelled {
return Ok(status);
}
progress.update(table);
debug!(&self.logger, "Finished batch";
"table" => table.dst.object.as_str(),
"batch_s" => elapsed.as_secs(),
"total_s" => start.elapsed().as_secs());
}
progress.table_finished(table);
}
Expand Down

0 comments on commit c951875

Please sign in to comment.