Skip to content

Commit

Permalink
Use tracked-consumers memory pool be the default. (apache#11949)
Browse files Browse the repository at this point in the history
* feat(11523): set the default memory pool to the tracked-consumer pool

* test(11523): update tests for the OOM message including the top consumers

* chore(11523): remove duplicate wording from OOM messages
  • Loading branch information
wiedld committed Aug 15, 2024
1 parent 6c579cc commit dec2324
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 46 deletions.
35 changes: 12 additions & 23 deletions datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ async fn group_by_none() {
TestCase::new()
.with_query("select median(request_bytes) from t")
.with_expected_errors(vec![
"Resources exhausted: Failed to allocate additional",
"AggregateStream",
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: AggregateStream"
])
.with_memory_limit(2_000)
.run()
Expand All @@ -89,8 +88,7 @@ async fn group_by_row_hash() {
TestCase::new()
.with_query("select count(*) from t GROUP BY response_bytes")
.with_expected_errors(vec![
"Resources exhausted: Failed to allocate additional",
"GroupedHashAggregateStream",
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: GroupedHashAggregateStream"
])
.with_memory_limit(2_000)
.run()
Expand All @@ -103,8 +101,7 @@ async fn group_by_hash() {
// group by dict column
.with_query("select count(*) from t GROUP BY service, host, pod, container")
.with_expected_errors(vec![
"Resources exhausted: Failed to allocate additional",
"GroupedHashAggregateStream",
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: GroupedHashAggregateStream"
])
.with_memory_limit(1_000)
.run()
Expand All @@ -117,8 +114,7 @@ async fn join_by_key_multiple_partitions() {
TestCase::new()
.with_query("select t1.* from t t1 JOIN t t2 ON t1.service = t2.service")
.with_expected_errors(vec![
"Resources exhausted: Failed to allocate additional",
"HashJoinInput[0]",
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: HashJoinInput[0]",
])
.with_memory_limit(1_000)
.with_config(config)
Expand All @@ -132,8 +128,7 @@ async fn join_by_key_single_partition() {
TestCase::new()
.with_query("select t1.* from t t1 JOIN t t2 ON t1.service = t2.service")
.with_expected_errors(vec![
"Resources exhausted: Failed to allocate additional",
"HashJoinInput",
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: HashJoinInput",
])
.with_memory_limit(1_000)
.with_config(config)
Expand All @@ -146,8 +141,7 @@ async fn join_by_expression() {
TestCase::new()
.with_query("select t1.* from t t1 JOIN t t2 ON t1.service != t2.service")
.with_expected_errors(vec![
"Resources exhausted: Failed to allocate additional",
"NestedLoopJoinLoad[0]",
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: NestedLoopJoinLoad[0]",
])
.with_memory_limit(1_000)
.run()
Expand All @@ -159,8 +153,7 @@ async fn cross_join() {
TestCase::new()
.with_query("select t1.* from t t1 CROSS JOIN t t2")
.with_expected_errors(vec![
"Resources exhausted: Failed to allocate additional",
"CrossJoinExec",
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: CrossJoinExec",
])
.with_memory_limit(1_000)
.run()
Expand Down Expand Up @@ -216,8 +209,7 @@ async fn symmetric_hash_join() {
"select t1.* from t t1 JOIN t t2 ON t1.pod = t2.pod AND t1.time = t2.time",
)
.with_expected_errors(vec![
"Resources exhausted: Failed to allocate additional",
"SymmetricHashJoinStream",
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: SymmetricHashJoinStream",
])
.with_memory_limit(1_000)
.with_scenario(Scenario::AccessLogStreaming)
Expand All @@ -235,8 +227,7 @@ async fn sort_preserving_merge() {
// so only a merge is needed
.with_query("select * from t ORDER BY a ASC NULLS LAST, b ASC NULLS LAST LIMIT 10")
.with_expected_errors(vec![
"Resources exhausted: Failed to allocate additional",
"SortPreservingMergeExec",
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: SortPreservingMergeExec",
])
// provide insufficient memory to merge
.with_memory_limit(partition_size / 2)
Expand Down Expand Up @@ -313,8 +304,7 @@ async fn sort_spill_reservation() {

test.clone()
.with_expected_errors(vec![
"Resources exhausted: Failed to allocate additional",
"ExternalSorterMerge", // merging in sort fails
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: ExternalSorterMerge",
])
.with_config(config)
.run()
Expand Down Expand Up @@ -343,8 +333,7 @@ async fn oom_recursive_cte() {
SELECT * FROM nodes;",
)
.with_expected_errors(vec![
"Resources exhausted: Failed to allocate additional",
"RecursiveQuery",
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: RecursiveQuery",
])
.with_memory_limit(2_000)
.run()
Expand Down Expand Up @@ -396,7 +385,7 @@ async fn oom_with_tracked_consumer_pool() {
.with_expected_errors(vec![
"Failed to allocate additional",
"for ParquetSink(ArrowColumnWriter)",
"Resources exhausted with top memory consumers (across reservations) are: ParquetSink(ArrowColumnWriter)"
"Additional allocation failed with top memory consumers (across reservations) as: ParquetSink(ArrowColumnWriter)"
])
.with_memory_pool(Arc::new(
TrackConsumersPool::new(
Expand Down
16 changes: 8 additions & 8 deletions datafusion/execution/src/memory_pool/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ fn provide_top_memory_consumers_to_error_msg(
error_msg: String,
top_consumers: String,
) -> String {
format!("Resources exhausted with top memory consumers (across reservations) are: {}. Error: {}", top_consumers, error_msg)
format!("Additional allocation failed with top memory consumers (across reservations) as: {}. Error: {}", top_consumers, error_msg)
}

#[cfg(test)]
Expand Down Expand Up @@ -501,7 +501,7 @@ mod tests {
// Test: reports if new reservation causes error
// using the previously set sizes for other consumers
let mut r5 = MemoryConsumer::new("r5").register(&pool);
let expected = "Resources exhausted with top memory consumers (across reservations) are: r1 consumed 50 bytes, r3 consumed 20 bytes, r2 consumed 15 bytes. Error: Failed to allocate additional 150 bytes for r5 with 0 bytes already allocated for this reservation - 5 bytes remain available for the total pool";
let expected = "Additional allocation failed with top memory consumers (across reservations) as: r1 consumed 50 bytes, r3 consumed 20 bytes, r2 consumed 15 bytes. Error: Failed to allocate additional 150 bytes for r5 with 0 bytes already allocated for this reservation - 5 bytes remain available for the total pool";
let res = r5.try_grow(150);
assert!(
matches!(
Expand All @@ -524,7 +524,7 @@ mod tests {

// Test: see error message when no consumers recorded yet
let mut r0 = MemoryConsumer::new(same_name).register(&pool);
let expected = "Resources exhausted with top memory consumers (across reservations) are: foo consumed 0 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated for this reservation - 100 bytes remain available for the total pool";
let expected = "Additional allocation failed with top memory consumers (across reservations) as: foo consumed 0 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated for this reservation - 100 bytes remain available for the total pool";
let res = r0.try_grow(150);
assert!(
matches!(
Expand All @@ -543,7 +543,7 @@ mod tests {
let mut r1 = new_consumer_same_name.clone().register(&pool);
// TODO: the insufficient_capacity_err() message is per reservation, not per consumer.
// a followup PR will clarify this message "0 bytes already allocated for this reservation"
let expected = "Resources exhausted with top memory consumers (across reservations) are: foo consumed 10 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated for this reservation - 90 bytes remain available for the total pool";
let expected = "Additional allocation failed with top memory consumers (across reservations) as: foo consumed 10 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated for this reservation - 90 bytes remain available for the total pool";
let res = r1.try_grow(150);
assert!(
matches!(
Expand All @@ -555,7 +555,7 @@ mod tests {

// Test: will accumulate size changes per consumer, not per reservation
r1.grow(20);
let expected = "Resources exhausted with top memory consumers (across reservations) are: foo consumed 30 bytes. Error: Failed to allocate additional 150 bytes for foo with 20 bytes already allocated for this reservation - 70 bytes remain available for the total pool";
let expected = "Additional allocation failed with top memory consumers (across reservations) as: foo consumed 30 bytes. Error: Failed to allocate additional 150 bytes for foo with 20 bytes already allocated for this reservation - 70 bytes remain available for the total pool";
let res = r1.try_grow(150);
assert!(
matches!(
Expand All @@ -570,7 +570,7 @@ mod tests {
let consumer_with_same_name_but_different_hash =
MemoryConsumer::new(same_name).with_can_spill(true);
let mut r2 = consumer_with_same_name_but_different_hash.register(&pool);
let expected = "Resources exhausted with top memory consumers (across reservations) are: foo(can_spill=false) consumed 30 bytes, foo(can_spill=true) consumed 0 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated for this reservation - 70 bytes remain available for the total pool";
let expected = "Additional allocation failed with top memory consumers (across reservations) as: foo(can_spill=false) consumed 30 bytes, foo(can_spill=true) consumed 0 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated for this reservation - 70 bytes remain available for the total pool";
let res = r2.try_grow(150);
assert!(
matches!(
Expand All @@ -590,7 +590,7 @@ mod tests {
let r1_consumer = MemoryConsumer::new("r1");
let mut r1 = r1_consumer.clone().register(&pool);
r1.grow(20);
let expected = "Resources exhausted with top memory consumers (across reservations) are: r1 consumed 20 bytes, r0 consumed 10 bytes. Error: Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated for this reservation - 70 bytes remain available for the total pool";
let expected = "Additional allocation failed with top memory consumers (across reservations) as: r1 consumed 20 bytes, r0 consumed 10 bytes. Error: Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated for this reservation - 70 bytes remain available for the total pool";
let res = r0.try_grow(150);
assert!(
matches!(
Expand All @@ -604,7 +604,7 @@ mod tests {
// Test: unregister one
// only the remaining one should be listed
pool.unregister(&r1_consumer);
let expected_consumers = "Resources exhausted with top memory consumers (across reservations) are: r0 consumed 10 bytes";
let expected_consumers = "Additional allocation failed with top memory consumers (across reservations) as: r0 consumed 10 bytes";
let res = r0.try_grow(150);
assert!(
matches!(
Expand Down
14 changes: 11 additions & 3 deletions datafusion/execution/src/runtime_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,21 @@
use crate::{
disk_manager::{DiskManager, DiskManagerConfig},
memory_pool::{GreedyMemoryPool, MemoryPool, UnboundedMemoryPool},
memory_pool::{
GreedyMemoryPool, MemoryPool, TrackConsumersPool, UnboundedMemoryPool,
},
object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
};

use crate::cache::cache_manager::{CacheManager, CacheManagerConfig};
use datafusion_common::{DataFusionError, Result};
use object_store::ObjectStore;
use std::fmt::{Debug, Formatter};
use std::path::PathBuf;
use std::sync::Arc;
use std::{
fmt::{Debug, Formatter},
num::NonZeroUsize,
};
use url::Url;

#[derive(Clone)]
Expand Down Expand Up @@ -213,7 +218,10 @@ impl RuntimeConfig {
/// Note DataFusion does not yet respect this limit in all cases.
pub fn with_memory_limit(self, max_memory: usize, memory_fraction: f64) -> Self {
let pool_size = (max_memory as f64 * memory_fraction) as usize;
self.with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size)))
self.with_memory_pool(Arc::new(TrackConsumersPool::new(
GreedyMemoryPool::new(pool_size),
NonZeroUsize::new(5).unwrap(),
)))
}

/// Use the specified path to create any needed temporary files
Expand Down
3 changes: 1 addition & 2 deletions datafusion/physical-plan/src/joins/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -693,9 +693,8 @@ mod tests {

assert_contains!(
err.to_string(),
"External error: Resources exhausted: Failed to allocate additional"
"External error: Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: CrossJoinExec"
);
assert_contains!(err.to_string(), "CrossJoinExec");

Ok(())
}
Expand Down
13 changes: 5 additions & 8 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3821,13 +3821,11 @@ mod tests {
let stream = join.execute(0, task_ctx)?;
let err = common::collect(stream).await.unwrap_err();

// Asserting that operator-level reservation attempting to overallocate
assert_contains!(
err.to_string(),
"External error: Resources exhausted: Failed to allocate additional"
"External error: Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: HashJoinInput"
);

// Asserting that operator-level reservation attempting to overallocate
assert_contains!(err.to_string(), "HashJoinInput");
}

Ok(())
Expand Down Expand Up @@ -3902,13 +3900,12 @@ mod tests {
let stream = join.execute(1, task_ctx)?;
let err = common::collect(stream).await.unwrap_err();

// Asserting that stream-level reservation attempting to overallocate
assert_contains!(
err.to_string(),
"External error: Resources exhausted: Failed to allocate additional"
);
"External error: Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: HashJoinInput[1]"

// Asserting that stream-level reservation attempting to overallocate
assert_contains!(err.to_string(), "HashJoinInput[1]");
);
}

Ok(())
Expand Down
3 changes: 1 addition & 2 deletions datafusion/physical-plan/src/joins/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1039,9 +1039,8 @@ mod tests {

assert_contains!(
err.to_string(),
"External error: Resources exhausted: Failed to allocate additional"
"External error: Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: NestedLoopJoinLoad[0]"
);
assert_contains!(err.to_string(), "NestedLoopJoinLoad[0]");
}

Ok(())
Expand Down

0 comments on commit dec2324

Please sign in to comment.