Skip to content

Commit

Permalink
Make SessionContext::enable_url_table consume self (#12573)
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb authored Sep 22, 2024
1 parent 72a1053 commit f6a0ed0
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ async fn main() -> Result<()> {
df.show().await?;

// dynamic query by the file path
ctx.enable_url_table();
let ctx = ctx.enable_url_table();
let df = ctx
.sql(format!(r#"SELECT * FROM '{}' LIMIT 10"#, &path).as_str())
.await?;
Expand Down
74 changes: 63 additions & 11 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,11 +343,17 @@ impl SessionContext {
}
}

/// Enable dynamic file querying for the current session.
/// Enable querying local files as tables.
///
/// This allows queries to directly access arbitrary file names via SQL like
/// `SELECT * from 'my_file.parquet'`
/// so it should only be enabled for systems that such access is not a security risk
/// This feature is security sensitive and should only be enabled for
/// systems that wish to permit direct access to the file system from SQL.
///
/// When enabled, this feature permits direct access to arbitrary files via
/// SQL like
///
/// ```sql
/// SELECT * from 'my_file.parquet'
/// ```
///
/// See [DynamicFileCatalog] for more details
///
Expand All @@ -356,7 +362,8 @@ impl SessionContext {
/// # use datafusion::{error::Result, assert_batches_eq};
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// let ctx = SessionContext::new().enable_url_table();
/// let ctx = SessionContext::new()
/// .enable_url_table(); // permit local file access
/// let results = ctx
/// .sql("SELECT a, MIN(b) FROM 'tests/data/example.csv' as example GROUP BY a LIMIT 100")
/// .await?
Expand All @@ -375,21 +382,60 @@ impl SessionContext {
/// # Ok(())
/// # }
/// ```
pub fn enable_url_table(&self) -> Self {
let state_ref = self.state();
pub fn enable_url_table(self) -> Self {
let current_catalog_list = Arc::clone(self.state.read().catalog_list());
let factory = Arc::new(DynamicListTableFactory::new(SessionStore::new()));
let catalog_list = Arc::new(DynamicFileCatalog::new(
Arc::clone(state_ref.catalog_list()),
current_catalog_list,
Arc::clone(&factory) as Arc<dyn UrlTableFactory>,
));
let new_state = SessionStateBuilder::new_from_existing(self.state())
let ctx: SessionContext = self
.into_state_builder()
.with_catalog_list(catalog_list)
.build();
let ctx = SessionContext::new_with_state(new_state);
.build()
.into();
// register new state with the factory
factory.session_store().with_state(ctx.state_weak_ref());
ctx
}

/// Convert the current `SessionContext` into a [`SessionStateBuilder`]
///
/// This is useful to switch back to `SessionState` with custom settings such as
/// [`Self::enable_url_table`].
///
/// Avoids cloning the SessionState if possible.
///
/// # Example
/// ```
/// # use std::sync::Arc;
/// # use datafusion::prelude::*;
/// # use datafusion::execution::SessionStateBuilder;
/// # use datafusion_optimizer::push_down_filter::PushDownFilter;
/// let my_rule = PushDownFilter{}; // pretend it is a new rule
/// // Create a new builder with a custom optimizer rule
/// let context: SessionContext = SessionStateBuilder::new()
/// .with_optimizer_rule(Arc::new(my_rule))
/// .build()
/// .into();
/// // Enable local file access and convert context back to a builder
/// let builder = context
/// .enable_url_table()
/// .into_state_builder();
/// ```
pub fn into_state_builder(self) -> SessionStateBuilder {
let SessionContext {
session_id: _,
session_start_time: _,
state,
} = self;
let state = match Arc::try_unwrap(state) {
Ok(rwlock) => rwlock.into_inner(),
Err(state) => state.read().clone(),
};
SessionStateBuilder::from(state)
}

/// Returns the time this `SessionContext` was created
pub fn session_start_time(&self) -> DateTime<Utc> {
self.session_start_time
Expand Down Expand Up @@ -1496,6 +1542,12 @@ impl From<SessionState> for SessionContext {
}
}

impl From<SessionContext> for SessionStateBuilder {
fn from(session: SessionContext) -> Self {
session.into_state_builder()
}
}

/// A planner used to add extensions to DataFusion logical and physical plans.
#[async_trait]
pub trait QueryPlanner {
Expand Down

0 comments on commit f6a0ed0

Please sign in to comment.