Skip to content

Commit

Permalink
Use exists(select)) solution
Browse files Browse the repository at this point in the history
Still need to pass  somehow
  • Loading branch information
kamilkisiela committed May 23, 2022
1 parent 53af1d4 commit 2c30a78
Showing 1 changed file with 54 additions and 103 deletions.
157 changes: 54 additions & 103 deletions store/postgres/src/relational_queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -922,82 +922,6 @@ impl<'a> QueryFilter<'a> {
})
}

fn visit_child_filters(
&self,
block: BlockNumber,
filter: &'a EntityFilter,
mut out: AstPass<Pg>,
) -> Result<(), StoreError> {
use EntityFilter::*;
Ok(match filter {
And(filters) | Or(filters) => {
for filter in filters {
self.visit_child_filters(block, filter, out.reborrow())?;
}
}
Child(field_name, entity, child_filter) => {
// the root table is always the parent table (because we support only one level of nesting)
let parent_column = self.table.column_for_field(field_name)?;
let parent_prefix = "c".to_string();
let child_table = self.layout.table_for_entity(entity)?;
let child_column = child_table.primary_key();
let child_prefix = format!("c_{}", field_name);

fn push_column_with_prefix(
mut out: AstPass<Pg>,
column: &str,
prefix: &String,
) -> QueryResult<()> {
out.push_sql(format!("{}.", prefix).as_str());
out.push_identifier(column)?;

Ok(())
}

out.push_sql("\n");

out.push_sql("inner join ");
out.push_sql(child_table.qualified_name.as_str());
out.push_sql(format!(" {}", child_prefix).as_str());

out.push_sql(" on (");

// Join by ID
push_column_with_prefix(out.reborrow(), child_column.name.as_str(), &child_prefix)?;
out.push_sql(" = ");
push_column_with_prefix(
out.reborrow(),
parent_column.name.as_str(),
&parent_prefix,
)?;

out.push_sql(" AND ");

// Match by block
push_column_with_prefix(out.reborrow(), BLOCK_RANGE_COLUMN, &child_prefix)?;
out.push_sql(" @> ");
out.push_bind_param::<Integer, _>(&block)?;

out.push_sql(" )");

self.visit_child_filters(block, child_filter, out)?;
}
_ => {}
})
}

/// Creates a SQL fragment representing the table joins.
///
/// inner join child1 c1 ON ( c1.id = parent.child_id AND c1.block_range @> $block )
/// inner join child2 c2 ON ( c2.id = parent.child_id AND c2.block_range @> $block )
///
pub fn walk_joins_ast(&self, block: BlockNumber, mut out: AstPass<Pg>) -> QueryResult<()> {
out.unsafe_to_cache_prepared();
self.visit_child_filters(block, self.filter, out)
.map_err(|e| DieselError::QueryBuilderError(Box::new(e)))?;
Ok(())
}

fn valid_attributes(
filter: &'a EntityFilter,
table: &'a Table,
Expand All @@ -1017,10 +941,10 @@ impl<'a> QueryFilter<'a> {
"Only a single level sub filter is allowed".to_string(),
));
}

// Make sure that the attribute name is valid for the given table
table.column_for_field(attr)?;

Self::valid_attributes(
child_filter,
layout.table_for_entity(entity)?,
Expand Down Expand Up @@ -1071,22 +995,61 @@ impl<'a> QueryFilter<'a> {
attribute: &Attribute,
entity_type: &'a EntityType,
filter: &'a EntityFilter,
out: AstPass<Pg>,
mut out: AstPass<Pg>,
) -> QueryResult<()> {
let table = self
let inner_table = self
.layout
.table_for_entity(entity_type)
.expect("Table for child entity not found");

let table_prefix = format!("c_{}.", attribute); // align it with a table prefix creation in visit_child_filters()
let inner_prefix = "inner.";

out.push_sql("exists (select 1");

out.push_sql(" from ");
out.push_sql(inner_table.qualified_name.as_str());
out.push_sql(" inner");

out.push_sql(" where ");

// Join parent and inner table
let parent_column = self
.table
.column_for_field(attribute)
.expect("Column for an attribute not found");
let parent_prefix = "c.";
let inner_column = inner_table.primary_key();

out.push_sql(inner_prefix);
out.push_sql(inner_column.name.as_str());
out.push_sql(" = ");
out.push_sql(parent_prefix);
out.push_sql(parent_column.name.as_str());

out.push_sql(" and ");

// TODO: we need a $block here
// // Match by block
// out.push_sql(inner_prefix);
// out.push_identifier(BLOCK_RANGE_COLUMN)?;
// out.push_sql(" @> ");
// out.push_bind_param::<Integer, _>(&block_number)?;

// out.push_sql(" and ");

// Inner filters
let query_filter = QueryFilter {
filter,
table,
table: inner_table,
layout: self.layout,
table_prefix: table_prefix.as_str(),
table_prefix: inner_prefix,
};

query_filter.walk_ast(out)
query_filter.walk_ast(out.reborrow())?;

out.push_sql(")");

Ok(())
}

fn column(&self, attribute: &Attribute) -> &'a Column {
Expand Down Expand Up @@ -1943,7 +1906,7 @@ impl<'a> ParentLimit<'a> {
fn restrict(&self, out: &mut AstPass<Pg>) -> QueryResult<()> {
if let ParentLimit::Ranked(sort_key, range) = self {
out.push_sql(" ");
sort_key.order_by(out, None)?;
sort_key.order_by(out)?;
range.walk_ast(out.reborrow())?;
}
Ok(())
Expand Down Expand Up @@ -2650,30 +2613,24 @@ impl<'a> SortKey<'a> {

/// Generate
/// order by [name direction], id
fn order_by(&self, out: &mut AstPass<Pg>, prefix: Option<&str>) -> QueryResult<()> {
let prefix = prefix.unwrap_or("");

fn order_by(&self, out: &mut AstPass<Pg>) -> QueryResult<()> {
match self {
SortKey::None => Ok(()),
SortKey::IdAsc(br_column) => {
out.push_sql("order by ");
out.push_sql(prefix);
out.push_identifier(PRIMARY_KEY_COLUMN)?;
if let Some(br_column) = br_column {
out.push_sql(", ");
out.push_sql(prefix);
br_column.bare_name(out);
}
Ok(())
}
SortKey::IdDesc(br_column) => {
out.push_sql("order by ");
out.push_sql(prefix);
out.push_identifier(PRIMARY_KEY_COLUMN)?;
out.push_sql(" desc");
if let Some(br_column) = br_column {
out.push_sql(", ");
out.push_sql(prefix);
br_column.bare_name(out);
out.push_sql(" desc");
}
Expand Down Expand Up @@ -2868,10 +2825,6 @@ impl<'a> FilterQuery<'a> {
out.push_sql(table.qualified_name.as_str());
out.push_sql(" c");

if let Some(filter) = table_filter {
filter.walk_joins_ast(self.block, out.reborrow())?;
}

out.push_sql("\n where ");
BlockRangeColumn::new(&table, "c.", self.block).contains(&mut out)?;
if let Some(filter) = table_filter {
Expand Down Expand Up @@ -2912,8 +2865,7 @@ impl<'a> FilterQuery<'a> {
write_column_names(column_names, table, &mut out)?;
self.filtered_rows(table, filter, out.reborrow())?;
out.push_sql("\n ");
out.push_sql(" GROUP BY c.id, c.vid "); // TODO: ask if it's fine to group by id and vid, always
self.sort_key.order_by(&mut out, Some("c."))?;
self.sort_key.order_by(&mut out)?;
self.range.walk_ast(out.reborrow())?;
out.push_sql(") c");
Ok(())
Expand Down Expand Up @@ -2992,8 +2944,7 @@ impl<'a> FilterQuery<'a> {
self.filtered_rows(table, filter, out.reborrow())?;
}
out.push_sql("\n ");
out.push_sql(" GROUP BY c.id, c.vid "); // TODO: ask if it's fine to group by id and vid, always
self.sort_key.order_by(&mut out, None)?;
self.sort_key.order_by(&mut out)?;
self.range.walk_ast(out.reborrow())?;

out.push_sql(")\n");
Expand All @@ -3015,7 +2966,7 @@ impl<'a> FilterQuery<'a> {
out.push_bind_param::<Text, _>(&table.object.as_str())?;
}
out.push_sql("\n ");
self.sort_key.order_by(&mut out, None)?;
self.sort_key.order_by(&mut out)?;
Ok(())
}

Expand Down Expand Up @@ -3066,7 +3017,7 @@ impl<'a> FilterQuery<'a> {
window.children_uniform(&self.sort_key, self.block, out.reborrow())?;
}
out.push_sql("\n");
self.sort_key.order_by(&mut out, None)?;
self.sort_key.order_by(&mut out)?;
self.range.walk_ast(out.reborrow())?;
out.push_sql(") c)\n");

Expand Down Expand Up @@ -3538,7 +3489,7 @@ fn write_column_names(
out: &mut AstPass<Pg>,
) -> QueryResult<()> {
match column_names {
AttributeNames::All => out.push_sql(" c.* "),
AttributeNames::All => out.push_sql(" * "),
AttributeNames::Select(column_names) => {
let mut iterator = iter_column_names(column_names, table, true).peekable();
while let Some(column_name) = iterator.next() {
Expand Down

0 comments on commit 2c30a78

Please sign in to comment.