Skip to content

Commit

Permalink
test(11344): create bounded stream for testing
Browse files Browse the repository at this point in the history
  • Loading branch information
wiedld committed Jul 9, 2024
1 parent d48fb36 commit 0f43c33
Showing 1 changed file with 36 additions and 0 deletions.
36 changes: 36 additions & 0 deletions datafusion/core/src/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,3 +366,39 @@ pub fn register_unbounded_file_with_ordering(
ctx.register_table(table_name, Arc::new(StreamTable::new(Arc::new(config))))?;
Ok(())
}

struct BoundedStream {
limit: usize,
count: usize,
batch: RecordBatch,
}

impl Stream for BoundedStream {
type Item = Result<RecordBatch>;

fn poll_next(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
if self.count >= self.limit {
return Poll::Ready(None);
}
self.count += 1;
Poll::Ready(Some(Ok(self.batch.clone())))
}
}

impl RecordBatchStream for BoundedStream {
fn schema(&self) -> SchemaRef {
self.batch.schema()
}
}

/// Creates an bounded stream for testing purposes.
pub fn bounded_stream(batch: RecordBatch, limit: usize) -> SendableRecordBatchStream {
Box::pin(BoundedStream {
count: 0,
limit,
batch,
})
}

0 comments on commit 0f43c33

Please sign in to comment.