Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TrackedMemoryPool with better error messages on exhaustion #11665

Merged
merged 17 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
cf9bcca
feat(11523): TrackConsumersPool impl which includes errors messages w…
wiedld Jul 26, 2024
d4e4da2
test(11523): unit tests for TrackConsumersPool
wiedld Jul 26, 2024
92541c0
test(11523): integration test for tracked consumers oom message
wiedld Jul 26, 2024
8941fa3
chore(11523): use nonzero usize
wiedld Jul 26, 2024
7ff1534
chore(11523): document the what the memory insufficient_capacity_err …
wiedld Jul 26, 2024
f3905de
chore(11523): improve test failure coverage for TrackConsumersPool
wiedld Jul 26, 2024
ddc7700
fix(11523): handle additive tracking of same hashed consumer, across …
wiedld Jul 26, 2024
e71a710
refactor(11523): update error message to delineate the multiple consu…
wiedld Jul 26, 2024
9447368
test(11523): demonstrate the underlying pool behavior on deregister
wiedld Jul 26, 2024
a8383fa
chore: make explicit what the insufficient_capacity_err() logs
wiedld Jul 26, 2024
1b1223f
fix(11523): remove to_root() for the error, since the immediate inner…
wiedld Jul 26, 2024
9a77f90
chore(11523): add result to logging of failed CI tests
wiedld Jul 29, 2024
09b20d2
fix(11523): splice error message to get consumers prior to error message
wiedld Jul 29, 2024
f405795
Revert "fix(11523): splice error message to get consumers prior to er…
wiedld Jul 29, 2024
f75764e
fix(11523): fix without splicing error messages, and instead handle t…
wiedld Jul 29, 2024
c3ce60f
chore: update docs to explain purpose of TrackConsumersPool
wiedld Jul 30, 2024
c8c0196
refactor(11523): enable TrackConsumersPool to be used in runtime metrics
wiedld Jul 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 54 additions & 1 deletion datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,14 @@ use datafusion::assert_batches_eq;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::streaming::PartitionStream;
use datafusion_execution::memory_pool::{
GreedyMemoryPool, MemoryPool, TrackConsumersPool,
};
use datafusion_expr::{Expr, TableType};
use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
use futures::StreamExt;
use std::any::Any;
use std::num::NonZeroUsize;
use std::sync::{Arc, OnceLock};
use tokio::fs::File;

Expand Down Expand Up @@ -370,13 +374,47 @@ async fn oom_parquet_sink() {
.await
}

#[tokio::test]
async fn oom_with_tracked_consumer_pool() {
let dir = tempfile::tempdir().unwrap();
let path = dir.into_path().join("test.parquet");
let _ = File::create(path.clone()).await.unwrap();

TestCase::new()
.with_config(
SessionConfig::new()
)
.with_query(format!(
"
COPY (select * from t)
TO '{}'
STORED AS PARQUET OPTIONS (compression 'uncompressed');
",
path.to_string_lossy()
))
.with_expected_errors(vec![
"Failed to allocate additional",
"for ParquetSink(ArrowColumnWriter)",
"Resources exhausted with top memory consumers (across reservations) are: ParquetSink(ArrowColumnWriter)"
])
.with_memory_pool(Arc::new(
TrackConsumersPool::new(
GreedyMemoryPool::new(200_000),
NonZeroUsize::new(1).unwrap()
)
))
.run()
.await
}

/// Run the query with the specified memory limit,
/// and verifies the expected errors are returned
#[derive(Clone, Debug)]
struct TestCase {
query: Option<String>,
expected_errors: Vec<String>,
memory_limit: usize,
memory_pool: Option<Arc<dyn MemoryPool>>,
config: SessionConfig,
scenario: Scenario,
/// How should the disk manager (that allows spilling) be
Expand All @@ -395,6 +433,7 @@ impl TestCase {
expected_errors: vec![],
memory_limit: 0,
config: SessionConfig::new(),
memory_pool: None,
scenario: Scenario::AccessLog,
disk_manager_config: DiskManagerConfig::Disabled,
expected_plan: vec![],
Expand Down Expand Up @@ -424,6 +463,15 @@ impl TestCase {
self
}

/// Set the memory pool to be used
///
/// This will override the memory_limit requested,
/// as the memory pool includes the limit.
fn with_memory_pool(mut self, memory_pool: Arc<dyn MemoryPool>) -> Self {
self.memory_pool = Some(memory_pool);
self
}

/// Specify the configuration to use
pub fn with_config(mut self, config: SessionConfig) -> Self {
self.config = config;
Expand Down Expand Up @@ -464,6 +512,7 @@ impl TestCase {
query,
expected_errors,
memory_limit,
memory_pool,
config,
scenario,
disk_manager_config,
Expand All @@ -473,11 +522,15 @@ impl TestCase {

let table = scenario.table();

let rt_config = RuntimeConfig::new()
let mut rt_config = RuntimeConfig::new()
// disk manager setting controls the spilling
.with_disk_manager(disk_manager_config)
.with_memory_limit(memory_limit, MEMORY_FRACTION);

if let Some(pool) = memory_pool {
rt_config = rt_config.with_memory_pool(pool);
};

let runtime = RuntimeEnv::new(rt_config).unwrap();

// Configure execution
Expand Down
2 changes: 1 addition & 1 deletion datafusion/execution/src/memory_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ pub trait MemoryPool: Send + Sync + std::fmt::Debug {
/// For help with allocation accounting, see the [proxy] module.
///
/// [proxy]: crate::memory_pool::proxy
#[derive(Debug)]
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
pub struct MemoryConsumer {
name: String,
can_spill: bool,
Expand Down
Loading