Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Liang Zhao committed Mar 16, 2023
1 parent e5fac47 commit 9be9775
Showing 1 changed file with 50 additions and 24 deletions.
74 changes: 50 additions & 24 deletions src/storage/src/hummock/compactor/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ impl ConcatSstableIterator {
.sstable(table_info, &mut self.stats)
.await?;
let block_metas = &table.value().meta.block_metas;
let start_index = match seek_key {
let mut start_index = match seek_key {
None => 0,
Some(seek_key) => {
// start_index points to the greatest block whose smallest_key <= seek_key.
Expand All @@ -286,33 +286,59 @@ impl ConcatSstableIterator {
) != Ordering::Greater
})
};
if end_index <= start_index {
return Ok(());
}

let stats_ptr = self.stats.remote_io_time.clone();
let now = Instant::now();

let block_stream = self
.sstable_store
.get_stream(table.value(), Some(start_index))
.await?;

// Determine time needed to open stream.
let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
stats_ptr.fetch_add(add as u64, atomic::Ordering::Relaxed);
while start_index < end_index {
let start_block_table_id = FullKey::decode(&block_metas[start_index].smallest_key)
.user_key
.table_id;
if table_info
.get_table_ids()
.binary_search(&start_block_table_id.table_id)
.is_ok()
{
break;
} else {
start_index +=
&block_metas[(start_index + 1)..].partition_point(|block_meta| {
FullKey::decode(&block_meta.smallest_key).user_key.table_id
== start_block_table_id
}) + 1;
}
}

let mut sstable_iter = SstableStreamIterator::new(
table_info,
block_stream,
end_index - start_index,
&self.stats,
);
sstable_iter.seek(seek_key).await?;
let found = if end_index <= start_index {
false
} else {
let stats_ptr = self.stats.remote_io_time.clone();
let now = Instant::now();

let block_stream = self
.sstable_store
.get_stream(table.value(), Some(start_index))
.await?;

// Determine time needed to open stream.
let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
stats_ptr.fetch_add(add as u64, atomic::Ordering::Relaxed);

let mut sstable_iter = SstableStreamIterator::new(
table_info,
block_stream,
end_index - start_index,
&self.stats,
);
sstable_iter.seek(seek_key).await?;

if sstable_iter.is_valid() {
self.sstable_iter = Some(sstable_iter);
true
} else {
false
}
};
self.cur_idx = idx;

if sstable_iter.is_valid() {
self.sstable_iter = Some(sstable_iter);
if found {
return Ok(());
} else {
idx += 1;
Expand Down

0 comments on commit 9be9775

Please sign in to comment.