Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow PTransforms to be applied directly to dataframes. #25919

Merged
merged 3 commits into from
Mar 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@
## New Features / Improvements

* The Flink runner now supports Flink 1.16.x ([#25046](https://github.com/apache/beam/issues/25046)).
* Schema'd PTransforms can now be directly applied to Beam dataframes just like PCollections.
(Note that when doing multiple operations, it may be more efficient to explicitly chain the operations
like `df | (Transform1 | Transform2 | ...)` to avoid excessive conversions.)

## Breaking Changes

Expand Down
20 changes: 20 additions & 0 deletions sdks/python/apache_beam/dataframe/convert_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,26 @@ def test_convert_memoization_clears_cache(self):
gc.enable()
logging.disable(logging.NOTSET)

def test_auto_convert(self):
class MySchemaTransform(beam.PTransform):
def expand(self, pcoll):
return pcoll | beam.Map(
lambda x: beam.Row(
a=x.n**2 - x.m**2, b=2 * x.m * x.n, c=x.n**2 + x.m**2))

with beam.Pipeline() as p:
pc_mn = p | beam.Create([
(1, 2), (2, 3), (3, 10)
]) | beam.MapTuple(lambda m, n: beam.Row(m=m, n=n))

df_mn = convert.to_dataframe(pc_mn)

# Apply a transform directly to a dataframe to get another dataframe.
df_abc = df_mn | MySchemaTransform()

pc_abc = convert.to_pcollection(df_abc) | beam.Map(tuple)
assert_that(pc_abc, equal_to([(3, 4, 5), (5, 12, 13), (91, 60, 109)]))


if __name__ == '__main__':
unittest.main()
20 changes: 20 additions & 0 deletions sdks/python/apache_beam/dataframe/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,12 @@
from pandas.api.types import is_list_like
from pandas.core.groupby.generic import DataFrameGroupBy

from apache_beam.dataframe import convert
from apache_beam.dataframe import expressions
from apache_beam.dataframe import frame_base
from apache_beam.dataframe import io
from apache_beam.dataframe import partitionings
from apache_beam.transforms import PTransform

__all__ = [
'DeferredSeries',
Expand Down Expand Up @@ -5394,6 +5396,24 @@ def func(df, *args, **kwargs):
frame_base._elementwise_method(inplace_name, inplace=True,
base=pd.DataFrame))

# Allow dataframe | SchemaTransform
def _create_maybe_elementwise_or(base):
elementwise = frame_base._elementwise_method(
'__or__', restrictions={'level': None}, base=base)

def _maybe_elementwise_or(self, right):
if isinstance(right, PTransform):
return convert.to_dataframe(convert.to_pcollection(self) | right)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine, it is worth calling out that we're opening users up to doing an inefficient thing where they go:

df = convert.to_dataframe(pc)
df2 = df | MyDfOperation()
df3 = df2 | MySchemaTransform() | MySchemaTransform2() | MySchemaTransform3()
result = convert.to_dataframe(df3)

which would be (much?) more efficient written as:

df = convert.to_dataframe(pc)
df2 = df | MyDfOperation()
result = convert.to_dataframe(df2) | MySchemaTransform() | MySchemaTransform2() | MySchemaTransform3()

because this avoids the repeated to_dataframe/to_pcollection transition. The former is probably more natural though, especially if you have real df operations mixed in there even if its less efficient. I think the user experience still trumps the efficiency loss, but it might be something we want to doc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a note. Once we push the batching stuff through, there could be little to no overhead here, but that's future work.

else:
return elementwise(self, right)

return _maybe_elementwise_or


DeferredSeries.__or__ = _create_maybe_elementwise_or(pd.Series) # type: ignore
DeferredDataFrame.__or__ = _create_maybe_elementwise_or(pd.DataFrame) # type: ignore


for name in ['lt', 'le', 'gt', 'ge', 'eq', 'ne']:
for p in '%s', '__%s__':
# Note that non-underscore name is used for both as the __xxx__ methods are
Expand Down