Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make SessionContext::enable_url_table consume self #12573

Merged
merged 1 commit into from
Sep 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ async fn main() -> Result<()> {
df.show().await?;

// dynamic query by the file path
ctx.enable_url_table();
let ctx = ctx.enable_url_table();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this example actually has the exact same bug I hit in #12551 -- namely the returned ctx has the table enabled, not the current one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reminding this bug!

let df = ctx
.sql(format!(r#"SELECT * FROM '{}' LIMIT 10"#, &path).as_str())
.await?;
Expand Down
74 changes: 63 additions & 11 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,11 +343,17 @@ impl SessionContext {
}
}

/// Enable dynamic file querying for the current session.
/// Enable querying local files as tables.
///
/// This allows queries to directly access arbitrary file names via SQL like
/// `SELECT * from 'my_file.parquet'`
/// so it should only be enabled for systems that such access is not a security risk
/// This feature is security sensitive and should only be enabled for
/// systems that wish to permit direct access to the file system from SQL.
///
/// When enabled, this feature permits direct access to arbitrary files via
/// SQL like
///
/// ```sql
/// SELECT * from 'my_file.parquet'
/// ```
///
/// See [DynamicFileCatalog] for more details
///
Expand All @@ -356,7 +362,8 @@ impl SessionContext {
/// # use datafusion::{error::Result, assert_batches_eq};
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// let ctx = SessionContext::new().enable_url_table();
/// let ctx = SessionContext::new()
/// .enable_url_table(); // permit local file access
/// let results = ctx
/// .sql("SELECT a, MIN(b) FROM 'tests/data/example.csv' as example GROUP BY a LIMIT 100")
/// .await?
Expand All @@ -375,21 +382,60 @@ impl SessionContext {
/// # Ok(())
/// # }
/// ```
pub fn enable_url_table(&self) -> Self {
let state_ref = self.state();
pub fn enable_url_table(self) -> Self {
let current_catalog_list = Arc::clone(self.state.read().catalog_list());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this avoids copying the entire state just to copy the catalog list

let factory = Arc::new(DynamicListTableFactory::new(SessionStore::new()));
let catalog_list = Arc::new(DynamicFileCatalog::new(
Arc::clone(state_ref.catalog_list()),
current_catalog_list,
Arc::clone(&factory) as Arc<dyn UrlTableFactory>,
));
let new_state = SessionStateBuilder::new_from_existing(self.state())
let ctx: SessionContext = self
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is likely just my obsession with trying to have clear Builder style APIs, but I think it reads more clearly here to to back and forth between the types

.into_state_builder()
.with_catalog_list(catalog_list)
.build();
let ctx = SessionContext::new_with_state(new_state);
.build()
.into();
// register new state with the factory
factory.session_store().with_state(ctx.state_weak_ref());
ctx
}

/// Convert the current `SessionContext` into a [`SessionStateBuilder`]
///
/// This is useful to switch back to `SessionState` with custom settings such as
/// [`Self::enable_url_table`].
///
/// Avoids cloning the SessionState if possible.
///
/// # Example
/// ```
/// # use std::sync::Arc;
/// # use datafusion::prelude::*;
/// # use datafusion::execution::SessionStateBuilder;
/// # use datafusion_optimizer::push_down_filter::PushDownFilter;
/// let my_rule = PushDownFilter{}; // pretend it is a new rule
/// // Create a new builder with a custom optimizer rule
/// let context: SessionContext = SessionStateBuilder::new()
/// .with_optimizer_rule(Arc::new(my_rule))
/// .build()
/// .into();
/// // Enable local file access and convert context back to a builder
/// let builder = context
/// .enable_url_table()
/// .into_state_builder();
/// ```
pub fn into_state_builder(self) -> SessionStateBuilder {
let SessionContext {
session_id: _,
session_start_time: _,
state,
} = self;
let state = match Arc::try_unwrap(state) {
Ok(rwlock) => rwlock.into_inner(),
Err(state) => state.read().clone(),
};
SessionStateBuilder::from(state)
}

/// Returns the time this `SessionContext` was created
pub fn session_start_time(&self) -> DateTime<Utc> {
self.session_start_time
Expand Down Expand Up @@ -1496,6 +1542,12 @@ impl From<SessionState> for SessionContext {
}
}

impl From<SessionContext> for SessionStateBuilder {
fn from(session: SessionContext) -> Self {
session.into_state_builder()
}
}

/// A planner used to add extensions to DataFusion logical and physical plans.
#[async_trait]
pub trait QueryPlanner {
Expand Down