Skip to content

Commit

Permalink
Add TrackedMemoryPool with better error messages on exhaustion (#11665
Browse files Browse the repository at this point in the history
)

* feat(11523): TrackConsumersPool impl which includes errors messages with top K of consumers

* test(11523): unit tests for TrackConsumersPool

* test(11523): integration test for tracked consumers oom message

* chore(11523): use nonzero usize

* chore(11523): document the what the memory insufficient_capacity_err is actually returning

* chore(11523): improve test failure coverage for TrackConsumersPool

* fix(11523): handle additive tracking of same hashed consumer, across different reservations

* refactor(11523): update error message to delineate the multiple consumer with the same name, but different hash

* test(11523): demonstrate the underlying pool behavior on deregister

* chore: make explicit what the insufficient_capacity_err() logs

* fix(11523): remove to_root() for the error, since the immediate inner child should be returning an OOM

* chore(11523): add result to logging of failed CI tests

* fix(11523): splice error message to get consumers prior to error message

* Revert "fix(11523): splice error message to get consumers prior to error message"

This reverts commit 09b20d2.

* fix(11523): fix without splicing error messages, and instead handle the proper error bubbling (msg wrapping)

* chore: update docs to explain purpose of TrackConsumersPool

Co-authored-by: Andrew Lamb <[email protected]>

* refactor(11523): enable TrackConsumersPool to be used in runtime metrics

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
wiedld and alamb authored Aug 1, 2024
1 parent 3fe1860 commit 921c3b6
Show file tree
Hide file tree
Showing 3 changed files with 431 additions and 3 deletions.
55 changes: 54 additions & 1 deletion datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,14 @@ use datafusion::assert_batches_eq;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::streaming::PartitionStream;
use datafusion_execution::memory_pool::{
GreedyMemoryPool, MemoryPool, TrackConsumersPool,
};
use datafusion_expr::{Expr, TableType};
use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
use futures::StreamExt;
use std::any::Any;
use std::num::NonZeroUsize;
use std::sync::{Arc, OnceLock};
use tokio::fs::File;

Expand Down Expand Up @@ -371,13 +375,47 @@ async fn oom_parquet_sink() {
.await
}

#[tokio::test]
async fn oom_with_tracked_consumer_pool() {
let dir = tempfile::tempdir().unwrap();
let path = dir.into_path().join("test.parquet");
let _ = File::create(path.clone()).await.unwrap();

TestCase::new()
.with_config(
SessionConfig::new()
)
.with_query(format!(
"
COPY (select * from t)
TO '{}'
STORED AS PARQUET OPTIONS (compression 'uncompressed');
",
path.to_string_lossy()
))
.with_expected_errors(vec![
"Failed to allocate additional",
"for ParquetSink(ArrowColumnWriter)",
"Resources exhausted with top memory consumers (across reservations) are: ParquetSink(ArrowColumnWriter)"
])
.with_memory_pool(Arc::new(
TrackConsumersPool::new(
GreedyMemoryPool::new(200_000),
NonZeroUsize::new(1).unwrap()
)
))
.run()
.await
}

/// Run the query with the specified memory limit,
/// and verifies the expected errors are returned
#[derive(Clone, Debug)]
struct TestCase {
query: Option<String>,
expected_errors: Vec<String>,
memory_limit: usize,
memory_pool: Option<Arc<dyn MemoryPool>>,
config: SessionConfig,
scenario: Scenario,
/// How should the disk manager (that allows spilling) be
Expand All @@ -396,6 +434,7 @@ impl TestCase {
expected_errors: vec![],
memory_limit: 0,
config: SessionConfig::new(),
memory_pool: None,
scenario: Scenario::AccessLog,
disk_manager_config: DiskManagerConfig::Disabled,
expected_plan: vec![],
Expand Down Expand Up @@ -425,6 +464,15 @@ impl TestCase {
self
}

/// Set the memory pool to be used
///
/// This will override the memory_limit requested,
/// as the memory pool includes the limit.
fn with_memory_pool(mut self, memory_pool: Arc<dyn MemoryPool>) -> Self {
self.memory_pool = Some(memory_pool);
self
}

/// Specify the configuration to use
pub fn with_config(mut self, config: SessionConfig) -> Self {
self.config = config;
Expand Down Expand Up @@ -465,6 +513,7 @@ impl TestCase {
query,
expected_errors,
memory_limit,
memory_pool,
config,
scenario,
disk_manager_config,
Expand All @@ -474,11 +523,15 @@ impl TestCase {

let table = scenario.table();

let rt_config = RuntimeConfig::new()
let mut rt_config = RuntimeConfig::new()
// disk manager setting controls the spilling
.with_disk_manager(disk_manager_config)
.with_memory_limit(memory_limit, MEMORY_FRACTION);

if let Some(pool) = memory_pool {
rt_config = rt_config.with_memory_pool(pool);
};

let runtime = RuntimeEnv::new(rt_config).unwrap();

// Configure execution
Expand Down
2 changes: 1 addition & 1 deletion datafusion/execution/src/memory_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ pub trait MemoryPool: Send + Sync + std::fmt::Debug {
/// For help with allocation accounting, see the [proxy] module.
///
/// [proxy]: crate::memory_pool::proxy
#[derive(Debug)]
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
pub struct MemoryConsumer {
name: String,
can_spill: bool,
Expand Down
Loading

0 comments on commit 921c3b6

Please sign in to comment.