From 7efe546548c3c77472122622e6eb2ea1c56e69e5 Mon Sep 17 00:00:00 2001 From: Chris Neffshade Date: Fri, 8 Sep 2023 14:40:03 +0000 Subject: [PATCH 01/20] Remove level and errors arguments for Pandas 2 compatibility. Level was always NotImplemented anyway, and errors I conditionally disable based on the pandas version to keep functionality. --- sdks/python/apache_beam/dataframe/frames.py | 41 ++++++++----------- .../apache_beam/dataframe/frames_test.py | 3 ++ 2 files changed, 21 insertions(+), 23 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py index 3020eecbaeb5..5a24be2d9561 100644 --- a/sdks/python/apache_beam/dataframe/frames.py +++ b/sdks/python/apache_beam/dataframe/frames.py @@ -914,7 +914,7 @@ def sort_index(self, axis, **kwargs): @frame_base.args_to_kwargs(pd.DataFrame) @frame_base.populate_defaults(pd.DataFrame) @frame_base.maybe_inplace - def where(self, cond, other, errors, **kwargs): + def where(self, cond, other, **kwargs): """where is not parallelizable when ``errors="ignore"`` is specified.""" requires = partitionings.Arbitrary() deferred_args = {} @@ -934,16 +934,18 @@ def where(self, cond, other, errors, **kwargs): else: actual_args['other'] = other - if errors == "ignore": - # We need all data in order to ignore errors and propagate the original - # data. - requires = partitionings.Singleton( - reason=( - f"where(errors={errors!r}) is currently not parallelizable, " - "because all data must be collected on one node to determine if " - "the original data should be propagated instead.")) + # For Pandas 2.0, errors was removed as an argument. + if PD_VERSION < (2, 0): + if "errors" in kwargs and kwargs['errors'] == "ignore": + # We need all data in order to ignore errors and propagate the original + # data. + requires = partitionings.Singleton( + reason=( + f"where(errors={kwargs['errors']!r}) is currently not parallelizable, " + "because all data must be collected on one node to determine if " + "the original data should be propagated instead.")) - actual_args['errors'] = errors + actual_args['errors'] = kwargs['errors'] def where_execution(df, *args): runtime_values = { @@ -1602,12 +1604,7 @@ def mean(self, skipna, **kwargs): @frame_base.with_docs_from(pd.Series) @frame_base.args_to_kwargs(pd.Series) @frame_base.populate_defaults(pd.Series) - def var(self, axis, skipna, level, ddof, **kwargs): - """Per-level aggregation is not yet supported - (https://github.com/apache/beam/issues/21829). Only the default, - ``level=None``, is allowed.""" - if level is not None: - raise NotImplementedError("per-level aggregation") + def var(self, axis, skipna, ddof, **kwargs): if skipna is None or skipna: self = self.dropna() # pylint: disable=self-cls-assignment @@ -1677,9 +1674,7 @@ def corr(self, other, method, min_periods): @frame_base.with_docs_from(pd.Series) @frame_base.args_to_kwargs(pd.Series) @frame_base.populate_defaults(pd.Series) - def skew(self, axis, skipna, level, numeric_only, **kwargs): - if level is not None: - raise NotImplementedError("per-level aggregation") + def skew(self, axis, skipna, numeric_only, **kwargs): if skipna is None or skipna: self = self.dropna() # pylint: disable=self-cls-assignment # See the online, numerically stable formulae at @@ -1741,9 +1736,7 @@ def combine_moments(data): @frame_base.with_docs_from(pd.Series) @frame_base.args_to_kwargs(pd.Series) @frame_base.populate_defaults(pd.Series) - def kurtosis(self, axis, skipna, level, numeric_only, **kwargs): - if level is not None: - raise NotImplementedError("per-level aggregation") + def kurtosis(self, axis, skipna, numeric_only, **kwargs): if skipna is None or skipna: self = self.dropna() # pylint: disable=self-cls-assignment @@ -2571,7 +2564,8 @@ def align(self, other, join, axis, copy, level, method, **kwargs): if kwargs: raise NotImplementedError('align(%s)' % ', '.join(kwargs.keys())) - if level is not None: + # In Pandas 2.0, all aggregations lost the level keyword. + if PD_VERSION < (2, 0) and level is not None: # Could probably get by partitioning on the used levels. requires_partition_by = partitionings.Singleton(reason=( f"align(level={level}) is not currently parallelizable. Only " @@ -5376,6 +5370,7 @@ def func(df, *args, **kwargs): name, frame_base._elementwise_method(name, restrictions={'level': None}, base=pd.Series)) + if hasattr(pd.DataFrame, name): setattr( DeferredDataFrame, diff --git a/sdks/python/apache_beam/dataframe/frames_test.py b/sdks/python/apache_beam/dataframe/frames_test.py index a2a8ef75f885..1bf3a400d578 100644 --- a/sdks/python/apache_beam/dataframe/frames_test.py +++ b/sdks/python/apache_beam/dataframe/frames_test.py @@ -2036,6 +2036,7 @@ def test_dataframe_agg_modes(self): self._run_test(lambda df: df.agg({'A': ['sum', 'mean']}), df) self._run_test(lambda df: df.agg({'A': ['sum', 'mean'], 'B': 'min'}), df) + @unittest.skipIf(PD_VERSION >= (2, 0), "level argument removed in Pandas 2") def test_series_agg_level(self): self._run_test( lambda df: df.set_index(['group', 'foo']).bar.count(level=0), @@ -2059,6 +2060,7 @@ def test_series_agg_level(self): lambda df: df.set_index(['group', 'foo']).bar.median(level=1), GROUPBY_DF) + @unittest.skipIf(PD_VERSION >= (2, 0), "level argument removed in Pandas 2") def test_dataframe_agg_level(self): self._run_test( lambda df: df.set_index(['group', 'foo']).count(level=0), GROUPBY_DF) @@ -2226,6 +2228,7 @@ def test_df_agg_method_invalid_kwarg_raises(self): self._run_error_test( lambda df: df.median(min_count=3, numeric_only=True), GROUPBY_DF) + @unittest.skipIf(PD_VERSION >= (2, 0), "level argument removed in Pandas 2") def test_agg_min_count(self): df = pd.DataFrame({ 'good': [1, 2, 3, np.nan], From 64be5a8d29b5f3fe33d3a22829cb8a43c7725a1a Mon Sep 17 00:00:00 2001 From: Chris Neffshade Date: Fri, 8 Sep 2023 14:51:45 +0000 Subject: [PATCH 02/20] Fix formatting --- sdks/python/apache_beam/dataframe/frames.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py index 5a24be2d9561..7c7a587c2116 100644 --- a/sdks/python/apache_beam/dataframe/frames.py +++ b/sdks/python/apache_beam/dataframe/frames.py @@ -941,9 +941,10 @@ def where(self, cond, other, **kwargs): # data. requires = partitionings.Singleton( reason=( - f"where(errors={kwargs['errors']!r}) is currently not parallelizable, " - "because all data must be collected on one node to determine if " - "the original data should be propagated instead.")) + f"where(errors={kwargs['errors']!r}) is currently not " + "parallelizable, because all data must be collected on one " + "node to determine if the original data should be propagated " + "instead.")) actual_args['errors'] = kwargs['errors'] From 006c55b8a3d84728dcb29085c960d5147d5f7633 Mon Sep 17 00:00:00 2001 From: Chris Neffshade Date: Fri, 8 Sep 2023 14:56:03 +0000 Subject: [PATCH 03/20] More formatting --- sdks/python/apache_beam/dataframe/frames.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py index 7c7a587c2116..3fdacdb414f7 100644 --- a/sdks/python/apache_beam/dataframe/frames.py +++ b/sdks/python/apache_beam/dataframe/frames.py @@ -942,9 +942,9 @@ def where(self, cond, other, **kwargs): requires = partitionings.Singleton( reason=( f"where(errors={kwargs['errors']!r}) is currently not " - "parallelizable, because all data must be collected on one " - "node to determine if the original data should be propagated " - "instead.")) + "parallelizable, because all data must be collected on one " + "node to determine if the original data should be propagated " + "instead.")) actual_args['errors'] = kwargs['errors'] From a9bcdfdc2e7288ab7f5c338347ddf5c2b9b01f15 Mon Sep 17 00:00:00 2001 From: Chris Neffshade Date: Fri, 8 Sep 2023 15:26:39 +0000 Subject: [PATCH 04/20] Fix empty error case --- sdks/python/apache_beam/dataframe/frames.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py index 3fdacdb414f7..56cafa52a05f 100644 --- a/sdks/python/apache_beam/dataframe/frames.py +++ b/sdks/python/apache_beam/dataframe/frames.py @@ -946,7 +946,7 @@ def where(self, cond, other, **kwargs): "node to determine if the original data should be propagated " "instead.")) - actual_args['errors'] = kwargs['errors'] + actual_args['errors'] = kwargs['errors'] if 'errors' in kwargs else None def where_execution(df, *args): runtime_values = { From 647b006d29d23009b53885de7a72950102314f30 Mon Sep 17 00:00:00 2001 From: Chris Neffshade Date: Mon, 11 Sep 2023 16:00:22 +0000 Subject: [PATCH 05/20] Remove append method for Pandas >= 2.0. Pandas removed .append in 2.0 so this helps keep the API consistent. --- .../apache_beam/dataframe/frame_base.py | 31 ++++++++++++++++--- .../apache_beam/dataframe/frame_base_test.py | 9 ++++++ sdks/python/apache_beam/dataframe/frames.py | 16 ++++++---- .../apache_beam/dataframe/frames_test.py | 2 ++ 4 files changed, 47 insertions(+), 11 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/frame_base.py b/sdks/python/apache_beam/dataframe/frame_base.py index 24497f1de069..09bb71875160 100644 --- a/sdks/python/apache_beam/dataframe/frame_base.py +++ b/sdks/python/apache_beam/dataframe/frame_base.py @@ -475,7 +475,7 @@ def wrapper(self, inplace=False, **kwargs): return wrapper -def args_to_kwargs(base_type): +def args_to_kwargs(base_type, ignore=False): """Convert all args to kwargs before calling the decorated function. When applied to a function, this decorator creates a new function @@ -484,8 +484,15 @@ def args_to_kwargs(base_type): determine the name to use for arguments that are converted to keyword arguments. - For internal use only. No backwards compatibility guarantees.""" + ignore used mainly in cases where a method has been removed in a later version + of Pandas. + + For internal use only. No backwards compatibility guarantees. + """ def wrap(func): + if ignore: + return func + arg_names = getfullargspec(unwrap(getattr(base_type, func.__name__))).args @functools.wraps(func) @@ -524,14 +531,21 @@ def wrapper(*args, **kwargs): f"**{BEAM_SPECIFIC!r}** for details.") -def with_docs_from(base_type, name=None): +def with_docs_from(base_type, name=None, ignore=False): """Decorator that updates the documentation from the wrapped function to + duplicate the documentation from the identically-named method in `base_type`. Any docstring on the original function will be included in the new function under a "Differences from pandas" heading. + + ignore used mainly in cases where a method has been removed in a later version + of Pandas. """ def wrap(func): + if ignore: + return func + fn_name = name or func.__name__ orig_doc = getattr(base_type, fn_name).__doc__ if orig_doc is None: @@ -588,15 +602,22 @@ def format_section(header): return wrap -def populate_defaults(base_type): +def populate_defaults(base_type, ignore=False): """Populate default values for keyword arguments in decorated function. When applied to a function, this decorator creates a new function with default values for all keyword arguments, based on the default values for the identically-named method on `base_type`. - For internal use only. No backwards compatibility guarantees.""" + ignore used mainly in cases where a method has been removed in a later version + of Pandas. + + For internal use only. No backwards compatibility guarantees. + """ def wrap(func): + if ignore: + return func + base_argspec = getfullargspec(unwrap(getattr(base_type, func.__name__))) if not base_argspec.defaults: return func diff --git a/sdks/python/apache_beam/dataframe/frame_base_test.py b/sdks/python/apache_beam/dataframe/frame_base_test.py index 82d5b65e1a49..e36662407c3f 100644 --- a/sdks/python/apache_beam/dataframe/frame_base_test.py +++ b/sdks/python/apache_beam/dataframe/frame_base_test.py @@ -99,6 +99,11 @@ class Proxy(object): def func(self, a, c=1000, **kwargs): return dict(kwargs, a=a, c=c) + @frame_base.args_to_kwargs(Base, ignore=True) + @frame_base.populate_defaults(Base, ignore=True) + def func_ignore(self, a, **kwargs): + return dict(kwargs, a=a) + proxy = Proxy() # pylint: disable=too-many-function-args self.assertEqual(proxy.func(), {'a': 1, 'c': 1000}) @@ -108,6 +113,10 @@ def func(self, a, c=1000, **kwargs): self.assertEqual(proxy.func(c=6, a=2), {'a': 2, 'c': 6}) self.assertEqual(proxy.func(c=6), {'a': 1, 'c': 6}) + with self.assertRaises(TypeError): # missing 1 required positional argument + proxy.func_ignore() + self.assertEqual(proxy.func_ignore(12, c=100), {'a': 12, 'c': 100}) + if __name__ == '__main__': unittest.main() diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py index 0d9b22ae3f9e..65befe52977c 100644 --- a/sdks/python/apache_beam/dataframe/frames.py +++ b/sdks/python/apache_beam/dataframe/frames.py @@ -1336,12 +1336,14 @@ def keys(self): frame_base.wont_implement_method( pd.Series, 'shape', reason="non-deferred-result")) - @frame_base.with_docs_from(pd.Series) - @frame_base.args_to_kwargs(pd.Series) - @frame_base.populate_defaults(pd.Series) + @frame_base.with_docs_from(pd.Series, ignore=PD_VERSION >= (2, 0)) + @frame_base.args_to_kwargs(pd.Series, ignore=PD_VERSION >= (2, 0)) + @frame_base.populate_defaults(pd.Series, ignore=PD_VERSION >= (2, 0)) def append(self, to_append, ignore_index, verify_integrity, **kwargs): """``ignore_index=True`` is not supported, because it requires generating an order-sensitive index.""" + if PD_VERSION >= (2, 0): + raise frame_base.WontImplementError('append() was removed in Pandas 2.0.') if not isinstance(to_append, DeferredSeries): raise frame_base.WontImplementError( "append() only accepts DeferredSeries instances, received " + @@ -2593,12 +2595,14 @@ def align(self, other, join, axis, copy, level, method, **kwargs): requires_partition_by=requires_partition_by, preserves_partition_by=partitionings.Arbitrary())) - @frame_base.with_docs_from(pd.DataFrame) - @frame_base.args_to_kwargs(pd.DataFrame) - @frame_base.populate_defaults(pd.DataFrame) + @frame_base.with_docs_from(pd.DataFrame, ignore=PD_VERSION >= (2, 0)) + @frame_base.args_to_kwargs(pd.DataFrame, ignore=PD_VERSION >= (2, 0)) + @frame_base.populate_defaults(pd.DataFrame, ignore=PD_VERSION >= (2, 0)) def append(self, other, ignore_index, verify_integrity, sort, **kwargs): """``ignore_index=True`` is not supported, because it requires generating an order-sensitive index.""" + if PD_VERSION >= (2, 0): + raise frame_base.WontImplementError('append() was removed in Pandas 2.0.') if not isinstance(other, DeferredDataFrame): raise frame_base.WontImplementError( "append() only accepts DeferredDataFrame instances, received " + diff --git a/sdks/python/apache_beam/dataframe/frames_test.py b/sdks/python/apache_beam/dataframe/frames_test.py index fa121aa85c30..69e7f4b19d6f 100644 --- a/sdks/python/apache_beam/dataframe/frames_test.py +++ b/sdks/python/apache_beam/dataframe/frames_test.py @@ -797,6 +797,7 @@ def test_loc(self): self._run_test(lambda df: df.C.loc[df.A > 10], df) self._run_test(lambda df, s: df.loc[s.loc[1:3]], df, pd.Series(dates)) + @unittest.skipIf(PD_VERSION >= (2, 0), 'append removed in Pandas 2.0') def test_append_sort(self): # yapf: disable df1 = pd.DataFrame({'int': [1, 2, 3], 'str': ['a', 'b', 'c']}, @@ -985,6 +986,7 @@ def test_series_fillna_series_as_value(self): self._run_test(lambda df, df2: df.A.fillna(df2.A), df, df2) + @unittest.skipIf(PD_VERSION >= (2, 0), 'append removed in Pandas 2.0') def test_append_verify_integrity(self): df1 = pd.DataFrame({'A': range(10), 'B': range(10)}, index=range(10)) df2 = pd.DataFrame({'A': range(10), 'B': range(10)}, index=range(9, 19)) From cba15dd62759536251070d8a8bdae9e073ab305b Mon Sep 17 00:00:00 2001 From: Chris Neffshade Date: Mon, 11 Sep 2023 19:07:46 +0000 Subject: [PATCH 06/20] Add support for ignored arguments and more tests. --- .../apache_beam/dataframe/frame_base.py | 60 ++++++++++++++----- .../apache_beam/dataframe/frame_base_test.py | 24 ++++++-- sdks/python/apache_beam/dataframe/frames.py | 12 ++-- 3 files changed, 70 insertions(+), 26 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/frame_base.py b/sdks/python/apache_beam/dataframe/frame_base.py index 09bb71875160..ba866e36262b 100644 --- a/sdks/python/apache_beam/dataframe/frame_base.py +++ b/sdks/python/apache_beam/dataframe/frame_base.py @@ -475,7 +475,10 @@ def wrapper(self, inplace=False, **kwargs): return wrapper -def args_to_kwargs(base_type, ignore=False): +def args_to_kwargs( + base_type: object, + removed_method: bool = False, + removed_args: list[str] | None = None): """Convert all args to kwargs before calling the decorated function. When applied to a function, this decorator creates a new function @@ -484,25 +487,43 @@ def args_to_kwargs(base_type, ignore=False): determine the name to use for arguments that are converted to keyword arguments. - ignore used mainly in cases where a method has been removed in a later version - of Pandas. + removed_method used in cases where a method has been removed in a later version + of Pandas. removed_args used in cases where a method has had arguments removed + in a later version of Pandas. For internal use only. No backwards compatibility guarantees. """ def wrap(func): - if ignore: + if removed_method: + # Do no processing, let Beam function itself raise the error if called. return func - arg_names = getfullargspec(unwrap(getattr(base_type, func.__name__))).args + removed_arg_names = removed_args if removed_args is not None else [] + + base_arg_spec = getfullargspec(unwrap(getattr(base_type, func.__name__))) + base_arg_names = base_arg_spec.args + # Some arguments are keyword only and we still want to check against those. + all_possible_base_arg_names = base_arg_names + base_arg_spec.kwonlyargs + beam_arg_names = getfullargspec(func).args + + if not_found := (set(beam_arg_names) - set(all_possible_base_arg_names) - + set(removed_arg_names)): + raise TypeError( + f"Beam definition of {func.__name__} has arguments that are not found in the base version of the function: {not_found}" + ) @functools.wraps(func) def wrapper(*args, **kwargs): - for name, value in zip(arg_names, args): + for name, value in zip(base_arg_names, args): if name in kwargs: raise TypeError( "%s() got multiple values for argument '%s'" % (func.__name__, name)) kwargs[name] = value + # Still have to populate these for the Beam function signature. + if removed_args: + for name in removed_args: + kwargs[name] = None return func(**kwargs) return wrapper @@ -531,19 +552,23 @@ def wrapper(*args, **kwargs): f"**{BEAM_SPECIFIC!r}** for details.") -def with_docs_from(base_type, name=None, ignore=False): +def with_docs_from( + base_type: object, + name: str = None, + removed_method: bool = False, + ignored_args: list[str] | None = None): """Decorator that updates the documentation from the wrapped function to - duplicate the documentation from the identically-named method in `base_type`. Any docstring on the original function will be included in the new function under a "Differences from pandas" heading. - ignore used mainly in cases where a method has been removed in a later version + removed_method used in cases where a method has been removed in a later version of Pandas. """ def wrap(func): - if ignore: + if removed_method: + func.__doc__ = "This method has been removed in the current version of Pandas." return func fn_name = name or func.__name__ @@ -602,20 +627,25 @@ def format_section(header): return wrap -def populate_defaults(base_type, ignore=False): +def populate_defaults( + base_type: object, + removed_method: bool = False, + removed_args: list[str] | None = None, + ignore: bool = False): """Populate default values for keyword arguments in decorated function. When applied to a function, this decorator creates a new function with default values for all keyword arguments, based on the default values for the identically-named method on `base_type`. - ignore used mainly in cases where a method has been removed in a later version - of Pandas. + removed_method used in cases where a method has been removed in a later version + of Pandas. removed_args used in cases where a method has had arguments removed + in a later version of Pandas. For internal use only. No backwards compatibility guarantees. """ def wrap(func): - if ignore: + if removed_method: return func base_argspec = getfullargspec(unwrap(getattr(base_type, func.__name__))) @@ -634,6 +664,8 @@ def wrap(func): defaults_to_populate = set( func_argspec.args[:num_non_defaults]).intersection( arg_to_default.keys()) + if removed_args: + defaults_to_populate -= set(removed_args) @functools.wraps(func) def wrapper(**kwargs): diff --git a/sdks/python/apache_beam/dataframe/frame_base_test.py b/sdks/python/apache_beam/dataframe/frame_base_test.py index e36662407c3f..2d16d02ba1ea 100644 --- a/sdks/python/apache_beam/dataframe/frame_base_test.py +++ b/sdks/python/apache_beam/dataframe/frame_base_test.py @@ -93,19 +93,27 @@ class Base(object): def func(self, a=1, b=2, c=3): pass + def func_removed_args(self, a): + pass + class Proxy(object): @frame_base.args_to_kwargs(Base) @frame_base.populate_defaults(Base) def func(self, a, c=1000, **kwargs): return dict(kwargs, a=a, c=c) - @frame_base.args_to_kwargs(Base, ignore=True) - @frame_base.populate_defaults(Base, ignore=True) - def func_ignore(self, a, **kwargs): + @frame_base.args_to_kwargs(Base, removed_method=True) + @frame_base.populate_defaults(Base, removed_method=True) + def func_removed_method(self, a, **kwargs): + return dict(kwargs, a=a) + + @frame_base.args_to_kwargs(Base, removed_args=['c']) + @frame_base.populate_defaults(Base, removed_args=['c']) + def func_removed_args(self, a, c, **kwargs): return dict(kwargs, a=a) proxy = Proxy() - # pylint: disable=too-many-function-args + # pylint: disable=too-many-function-args,no-value-for-parameter self.assertEqual(proxy.func(), {'a': 1, 'c': 1000}) self.assertEqual(proxy.func(100), {'a': 100, 'c': 1000}) self.assertEqual(proxy.func(2, 4, 6), {'a': 2, 'b': 4, 'c': 6}) @@ -114,8 +122,12 @@ def func_ignore(self, a, **kwargs): self.assertEqual(proxy.func(c=6), {'a': 1, 'c': 6}) with self.assertRaises(TypeError): # missing 1 required positional argument - proxy.func_ignore() - self.assertEqual(proxy.func_ignore(12, c=100), {'a': 12, 'c': 100}) + proxy.func_removed_method() + self.assertEqual(proxy.func_removed_method(12, c=100), {'a': 12, 'c': 100}) + + with self.assertRaises(TypeError): # missing 1 required positional argument + proxy.func_removed_args() + self.assertEqual(proxy.func_removed_args(12, d=100), {'a': 12, 'd': 100}) if __name__ == '__main__': diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py index 65befe52977c..c8f10969c236 100644 --- a/sdks/python/apache_beam/dataframe/frames.py +++ b/sdks/python/apache_beam/dataframe/frames.py @@ -1336,9 +1336,9 @@ def keys(self): frame_base.wont_implement_method( pd.Series, 'shape', reason="non-deferred-result")) - @frame_base.with_docs_from(pd.Series, ignore=PD_VERSION >= (2, 0)) - @frame_base.args_to_kwargs(pd.Series, ignore=PD_VERSION >= (2, 0)) - @frame_base.populate_defaults(pd.Series, ignore=PD_VERSION >= (2, 0)) + @frame_base.with_docs_from(pd.Series, removed_method=PD_VERSION >= (2, 0)) + @frame_base.args_to_kwargs(pd.Series, removed_method=PD_VERSION >= (2, 0)) + @frame_base.populate_defaults(pd.Series, removed_method=PD_VERSION >= (2, 0)) def append(self, to_append, ignore_index, verify_integrity, **kwargs): """``ignore_index=True`` is not supported, because it requires generating an order-sensitive index.""" @@ -2595,9 +2595,9 @@ def align(self, other, join, axis, copy, level, method, **kwargs): requires_partition_by=requires_partition_by, preserves_partition_by=partitionings.Arbitrary())) - @frame_base.with_docs_from(pd.DataFrame, ignore=PD_VERSION >= (2, 0)) - @frame_base.args_to_kwargs(pd.DataFrame, ignore=PD_VERSION >= (2, 0)) - @frame_base.populate_defaults(pd.DataFrame, ignore=PD_VERSION >= (2, 0)) + @frame_base.with_docs_from(pd.DataFrame, removed_method=PD_VERSION >= (2, 0)) + @frame_base.args_to_kwargs(pd.DataFrame, removed_method=PD_VERSION >= (2, 0)) + @frame_base.populate_defaults(pd.DataFrame, removed_method=PD_VERSION >= (2, 0)) def append(self, other, ignore_index, verify_integrity, sort, **kwargs): """``ignore_index=True`` is not supported, because it requires generating an order-sensitive index.""" From de643c57cfa8dab6f724da1678f8f283fbcee70d Mon Sep 17 00:00:00 2001 From: Chris Neffshade Date: Mon, 11 Sep 2023 19:11:20 +0000 Subject: [PATCH 07/20] Delete removed_args arg from doc part that wasn't using it --- sdks/python/apache_beam/dataframe/frame_base.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/frame_base.py b/sdks/python/apache_beam/dataframe/frame_base.py index ba866e36262b..6c777b9f2b28 100644 --- a/sdks/python/apache_beam/dataframe/frame_base.py +++ b/sdks/python/apache_beam/dataframe/frame_base.py @@ -553,10 +553,7 @@ def wrapper(*args, **kwargs): def with_docs_from( - base_type: object, - name: str = None, - removed_method: bool = False, - ignored_args: list[str] | None = None): + base_type: object, name: str = None, removed_method: bool = False): """Decorator that updates the documentation from the wrapped function to duplicate the documentation from the identically-named method in `base_type`. From bfa0ae8a8a7e9d41e85e9c332d1e03414083deb8 Mon Sep 17 00:00:00 2001 From: Chris Neffshade Date: Mon, 11 Sep 2023 19:27:41 +0000 Subject: [PATCH 08/20] Fix up frame_base --- sdks/python/apache_beam/dataframe/frame_base.py | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/frame_base.py b/sdks/python/apache_beam/dataframe/frame_base.py index 6c777b9f2b28..92cebe4b0c6a 100644 --- a/sdks/python/apache_beam/dataframe/frame_base.py +++ b/sdks/python/apache_beam/dataframe/frame_base.py @@ -475,10 +475,7 @@ def wrapper(self, inplace=False, **kwargs): return wrapper -def args_to_kwargs( - base_type: object, - removed_method: bool = False, - removed_args: list[str] | None = None): +def args_to_kwargs(base_type, removed_method=False, removed_args=None): """Convert all args to kwargs before calling the decorated function. When applied to a function, this decorator creates a new function @@ -552,8 +549,7 @@ def wrapper(*args, **kwargs): f"**{BEAM_SPECIFIC!r}** for details.") -def with_docs_from( - base_type: object, name: str = None, removed_method: bool = False): +def with_docs_from(base_type: object, name=None, removed_method=False): """Decorator that updates the documentation from the wrapped function to duplicate the documentation from the identically-named method in `base_type`. @@ -624,11 +620,7 @@ def format_section(header): return wrap -def populate_defaults( - base_type: object, - removed_method: bool = False, - removed_args: list[str] | None = None, - ignore: bool = False): +def populate_defaults(base_type, removed_method=False, removed_args=None): """Populate default values for keyword arguments in decorated function. When applied to a function, this decorator creates a new function From a8cf5d19925b00513e5049afa603972b17750dd4 Mon Sep 17 00:00:00 2001 From: Chris Neffshade Date: Mon, 11 Sep 2023 19:37:51 +0000 Subject: [PATCH 09/20] Fix with new removed_args argument --- sdks/python/apache_beam/dataframe/frames.py | 24 ++++++++++----------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py index ea8511f83725..c7ef43c35439 100644 --- a/sdks/python/apache_beam/dataframe/frames.py +++ b/sdks/python/apache_beam/dataframe/frames.py @@ -914,10 +914,10 @@ def sort_index(self, axis, **kwargs): )) @frame_base.with_docs_from(pd.DataFrame) - @frame_base.args_to_kwargs(pd.DataFrame) - @frame_base.populate_defaults(pd.DataFrame) + @frame_base.args_to_kwargs(pd.DataFrame, removed_args=["errors"] if PD_VERSION >= (2,0) else None) + @frame_base.populate_defaults(pd.DataFrame, removed_args=["errors"] if PD_VERSION >= (2,0) else None) @frame_base.maybe_inplace - def where(self, cond, other, **kwargs): + def where(self, cond, other, errors, **kwargs): """where is not parallelizable when ``errors="ignore"`` is specified.""" requires = partitionings.Arbitrary() deferred_args = {} @@ -1606,9 +1606,9 @@ def mean(self, skipna, **kwargs): return self.sum(skipna=skipna, **kwargs) / size @frame_base.with_docs_from(pd.Series) - @frame_base.args_to_kwargs(pd.Series) - @frame_base.populate_defaults(pd.Series) - def var(self, axis, skipna, ddof, **kwargs): + @frame_base.args_to_kwargs(pd.Series, removed_args=["level"] if PD_VERSION >= (2,0) else None) + @frame_base.populate_defaults(pd.Series, removed_args=["level"] if PD_VERSION >= (2,0) else None) + def var(self, axis, skipna, level, ddof, **kwargs): if skipna is None or skipna: self = self.dropna() # pylint: disable=self-cls-assignment @@ -1676,9 +1676,9 @@ def corr(self, other, method, min_periods): requires_partition_by=partitionings.Singleton(reason=reason))) @frame_base.with_docs_from(pd.Series) - @frame_base.args_to_kwargs(pd.Series) - @frame_base.populate_defaults(pd.Series) - def skew(self, axis, skipna, numeric_only, **kwargs): + @frame_base.args_to_kwargs(pd.Series, removed_args=["level"] if PD_VERSION >= (2,0) else None) + @frame_base.populate_defaults(pd.Series, removed_args=["level"] if PD_VERSION >= (2,0) else None) + def skew(self, axis, skipna, level, numeric_only, **kwargs): if skipna is None or skipna: self = self.dropna() # pylint: disable=self-cls-assignment # See the online, numerically stable formulae at @@ -1738,9 +1738,9 @@ def combine_moments(data): requires_partition_by=partitionings.Singleton())) @frame_base.with_docs_from(pd.Series) - @frame_base.args_to_kwargs(pd.Series) - @frame_base.populate_defaults(pd.Series) - def kurtosis(self, axis, skipna, numeric_only, **kwargs): + @frame_base.args_to_kwargs(pd.Series, removed_args=["level"] if PD_VERSION >= (2,0) else None) + @frame_base.populate_defaults(pd.Series, removed_args=["level"] if PD_VERSION >= (2,0) else None) + def kurtosis(self, axis, skipna, level, numeric_only, **kwargs): if skipna is None or skipna: self = self.dropna() # pylint: disable=self-cls-assignment From c20205708ed48c99875cbe5f03b50da2cb7fc92f Mon Sep 17 00:00:00 2001 From: Chris Neffshade Date: Mon, 11 Sep 2023 19:43:17 +0000 Subject: [PATCH 10/20] Fix line lengths --- .../apache_beam/dataframe/frame_base.py | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/frame_base.py b/sdks/python/apache_beam/dataframe/frame_base.py index 92cebe4b0c6a..123fce4ff9bf 100644 --- a/sdks/python/apache_beam/dataframe/frame_base.py +++ b/sdks/python/apache_beam/dataframe/frame_base.py @@ -484,9 +484,9 @@ def args_to_kwargs(base_type, removed_method=False, removed_args=None): determine the name to use for arguments that are converted to keyword arguments. - removed_method used in cases where a method has been removed in a later version - of Pandas. removed_args used in cases where a method has had arguments removed - in a later version of Pandas. + removed_method used in cases where a method has been removed in a later + version of Pandas. removed_args used in cases where a method has had + arguments removed in a later version of Pandas. For internal use only. No backwards compatibility guarantees. """ @@ -506,8 +506,8 @@ def wrap(func): if not_found := (set(beam_arg_names) - set(all_possible_base_arg_names) - set(removed_arg_names)): raise TypeError( - f"Beam definition of {func.__name__} has arguments that are not found in the base version of the function: {not_found}" - ) + f"Beam definition of {func.__name__} has arguments that are not found" + f" in the base version of the function: {not_found}") @functools.wraps(func) def wrapper(*args, **kwargs): @@ -556,12 +556,13 @@ def with_docs_from(base_type: object, name=None, removed_method=False): Any docstring on the original function will be included in the new function under a "Differences from pandas" heading. - removed_method used in cases where a method has been removed in a later version - of Pandas. + removed_method used in cases where a method has been removed in a later + version of Pandas. """ def wrap(func): if removed_method: - func.__doc__ = "This method has been removed in the current version of Pandas." + func.__doc__ = ( + "This method has been removed in the current version of Pandas.") return func fn_name = name or func.__name__ @@ -627,9 +628,9 @@ def populate_defaults(base_type, removed_method=False, removed_args=None): with default values for all keyword arguments, based on the default values for the identically-named method on `base_type`. - removed_method used in cases where a method has been removed in a later version - of Pandas. removed_args used in cases where a method has had arguments removed - in a later version of Pandas. + removed_method used in cases where a method has been removed in a later + version of Pandas. removed_args used in cases where a method has had + arguments removed in a later version of Pandas. For internal use only. No backwards compatibility guarantees. """ From 2a8b489e2522c4427258cfae09132df2c96362df Mon Sep 17 00:00:00 2001 From: Chris Neffshade Date: Mon, 11 Sep 2023 20:02:02 +0000 Subject: [PATCH 11/20] Fix formatting --- sdks/python/apache_beam/dataframe/frames.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py index c8f10969c236..6c82b3b258cd 100644 --- a/sdks/python/apache_beam/dataframe/frames.py +++ b/sdks/python/apache_beam/dataframe/frames.py @@ -2597,7 +2597,8 @@ def align(self, other, join, axis, copy, level, method, **kwargs): @frame_base.with_docs_from(pd.DataFrame, removed_method=PD_VERSION >= (2, 0)) @frame_base.args_to_kwargs(pd.DataFrame, removed_method=PD_VERSION >= (2, 0)) - @frame_base.populate_defaults(pd.DataFrame, removed_method=PD_VERSION >= (2, 0)) + @frame_base.populate_defaults(pd.DataFrame, + removed_method=PD_VERSION >= (2, 0)) def append(self, other, ignore_index, verify_integrity, sort, **kwargs): """``ignore_index=True`` is not supported, because it requires generating an order-sensitive index.""" From 944770ed1fb6af831f335409a3fac20dc39e1974 Mon Sep 17 00:00:00 2001 From: Chris Neffshade Date: Tue, 12 Sep 2023 10:40:20 +0000 Subject: [PATCH 12/20] Add test for append deprecation --- sdks/python/apache_beam/dataframe/frames_test.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sdks/python/apache_beam/dataframe/frames_test.py b/sdks/python/apache_beam/dataframe/frames_test.py index 69e7f4b19d6f..ae6439a94001 100644 --- a/sdks/python/apache_beam/dataframe/frames_test.py +++ b/sdks/python/apache_beam/dataframe/frames_test.py @@ -813,6 +813,13 @@ def test_append_sort(self): self._run_test(lambda df1, df2: df2.append(df1, sort=True), df1, df2) self._run_test(lambda df1, df2: df2.append(df1, sort=False), df1, df2) + @unittest.skipIf(PD_VERSION < (2, 0), 'append removed in Pandas 2.0') + def test_append_raises_pandas_2(self): + df = pd.DataFrame({'A': [1, 1]}) + + with self.assertRaises(NotImplementedError): + df.append(df) + def test_smallest_largest(self): df = pd.DataFrame({'A': [1, 1, 2, 2], 'B': [2, 3, 5, 7]}) self._run_test(lambda df: df.nlargest(1, 'A', keep='all'), df) From ee7a8587ecc8a51ad9c80081bb70daae6af99c38 Mon Sep 17 00:00:00 2001 From: Chris Neffshade Date: Tue, 12 Sep 2023 10:42:29 +0000 Subject: [PATCH 13/20] Only populate kwargs if not there --- sdks/python/apache_beam/dataframe/frame_base.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/dataframe/frame_base.py b/sdks/python/apache_beam/dataframe/frame_base.py index 123fce4ff9bf..ba06b925ac4c 100644 --- a/sdks/python/apache_beam/dataframe/frame_base.py +++ b/sdks/python/apache_beam/dataframe/frame_base.py @@ -520,7 +520,8 @@ def wrapper(*args, **kwargs): # Still have to populate these for the Beam function signature. if removed_args: for name in removed_args: - kwargs[name] = None + if not name in kwargs: + kwargs[name] = None return func(**kwargs) return wrapper From d0a758516ac8b06213f6b7dd14f2a98f63e12086 Mon Sep 17 00:00:00 2001 From: Chris Neffshade Date: Tue, 12 Sep 2023 11:08:41 +0000 Subject: [PATCH 14/20] Formatting --- sdks/python/apache_beam/dataframe/frames.py | 24 ++++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py index cd285d50c192..e2390bda28be 100644 --- a/sdks/python/apache_beam/dataframe/frames.py +++ b/sdks/python/apache_beam/dataframe/frames.py @@ -914,8 +914,10 @@ def sort_index(self, axis, **kwargs): )) @frame_base.with_docs_from(pd.DataFrame) - @frame_base.args_to_kwargs(pd.DataFrame, removed_args=["errors"] if PD_VERSION >= (2,0) else None) - @frame_base.populate_defaults(pd.DataFrame, removed_args=["errors"] if PD_VERSION >= (2,0) else None) + @frame_base.args_to_kwargs( + pd.DataFrame, removed_args=["errors"] if PD_VERSION >= (2, 0) else None) + @frame_base.populate_defaults( + pd.DataFrame, removed_args=["errors"] if PD_VERSION >= (2, 0) else None) @frame_base.maybe_inplace def where(self, cond, other, errors, **kwargs): """where is not parallelizable when ``errors="ignore"`` is specified.""" @@ -1608,8 +1610,10 @@ def mean(self, skipna, **kwargs): return self.sum(skipna=skipna, **kwargs) / size @frame_base.with_docs_from(pd.Series) - @frame_base.args_to_kwargs(pd.Series, removed_args=["level"] if PD_VERSION >= (2,0) else None) - @frame_base.populate_defaults(pd.Series, removed_args=["level"] if PD_VERSION >= (2,0) else None) + @frame_base.args_to_kwargs( + pd.Series, removed_args=["level"] if PD_VERSION >= (2, 0) else None) + @frame_base.populate_defaults( + pd.Series, removed_args=["level"] if PD_VERSION >= (2, 0) else None) def var(self, axis, skipna, level, ddof, **kwargs): if skipna is None or skipna: self = self.dropna() # pylint: disable=self-cls-assignment @@ -1678,8 +1682,10 @@ def corr(self, other, method, min_periods): requires_partition_by=partitionings.Singleton(reason=reason))) @frame_base.with_docs_from(pd.Series) - @frame_base.args_to_kwargs(pd.Series, removed_args=["level"] if PD_VERSION >= (2,0) else None) - @frame_base.populate_defaults(pd.Series, removed_args=["level"] if PD_VERSION >= (2,0) else None) + @frame_base.args_to_kwargs( + pd.Series, removed_args=["level"] if PD_VERSION >= (2, 0) else None) + @frame_base.populate_defaults( + pd.Series, removed_args=["level"] if PD_VERSION >= (2, 0) else None) def skew(self, axis, skipna, level, numeric_only, **kwargs): if skipna is None or skipna: self = self.dropna() # pylint: disable=self-cls-assignment @@ -1740,8 +1746,10 @@ def combine_moments(data): requires_partition_by=partitionings.Singleton())) @frame_base.with_docs_from(pd.Series) - @frame_base.args_to_kwargs(pd.Series, removed_args=["level"] if PD_VERSION >= (2,0) else None) - @frame_base.populate_defaults(pd.Series, removed_args=["level"] if PD_VERSION >= (2,0) else None) + @frame_base.args_to_kwargs( + pd.Series, removed_args=["level"] if PD_VERSION >= (2, 0) else None) + @frame_base.populate_defaults( + pd.Series, removed_args=["level"] if PD_VERSION >= (2, 0) else None) def kurtosis(self, axis, skipna, level, numeric_only, **kwargs): if skipna is None or skipna: self = self.dropna() # pylint: disable=self-cls-assignment From 035fdfd2ab71456c2ff5de09fc3676e704ff841b Mon Sep 17 00:00:00 2001 From: Chris Neffshade Date: Tue, 12 Sep 2023 16:33:33 +0000 Subject: [PATCH 15/20] Fix frames.py to work with kw only args. Pandas 2 has made most args of most methods kw only. This still supports Pandas 1 while allowing for those. --- .../apache_beam/dataframe/frame_base.py | 24 +++++++--- .../apache_beam/dataframe/frame_base_test.py | 44 ++++++++++++++++++- sdks/python/apache_beam/dataframe/frames.py | 4 +- 3 files changed, 62 insertions(+), 10 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/frame_base.py b/sdks/python/apache_beam/dataframe/frame_base.py index ba06b925ac4c..f2356c8b75a8 100644 --- a/sdks/python/apache_beam/dataframe/frame_base.py +++ b/sdks/python/apache_beam/dataframe/frame_base.py @@ -497,6 +497,8 @@ def wrap(func): removed_arg_names = removed_args if removed_args is not None else [] + # TODO: Better handle position only arguments if they are ever a thing + # in Pandas (as of 2.1 currently they aren't). base_arg_spec = getfullargspec(unwrap(getattr(base_type, func.__name__))) base_arg_names = base_arg_spec.args # Some arguments are keyword only and we still want to check against those. @@ -511,6 +513,9 @@ def wrap(func): @functools.wraps(func) def wrapper(*args, **kwargs): + if len(args) > len(base_arg_names): + raise TypeError(f"{func.__name__} got too many positioned arguments.") + for name, value in zip(base_arg_names, args): if name in kwargs: raise TypeError( @@ -520,7 +525,7 @@ def wrapper(*args, **kwargs): # Still have to populate these for the Beam function signature. if removed_args: for name in removed_args: - if not name in kwargs: + if name not in kwargs: kwargs[name] = None return func(**kwargs) @@ -550,7 +555,7 @@ def wrapper(*args, **kwargs): f"**{BEAM_SPECIFIC!r}** for details.") -def with_docs_from(base_type: object, name=None, removed_method=False): +def with_docs_from(base_type, name=None, removed_method=False): """Decorator that updates the documentation from the wrapped function to duplicate the documentation from the identically-named method in `base_type`. @@ -640,13 +645,18 @@ def wrap(func): return func base_argspec = getfullargspec(unwrap(getattr(base_type, func.__name__))) - if not base_argspec.defaults: + if not base_argspec.defaults and not base_argspec.kwonlydefaults: return func - arg_to_default = dict( - zip( - base_argspec.args[-len(base_argspec.defaults):], - base_argspec.defaults)) + arg_to_default = {} + if base_argspec.defaults: + arg_to_default.update( + zip( + base_argspec.args[-len(base_argspec.defaults):], + base_argspec.defaults)) + + if base_argspec.kwonlydefaults: + arg_to_default.update(base_argspec.kwonlydefaults) unwrapped_func = unwrap(func) # args that do not have defaults in func, but do have defaults in base diff --git a/sdks/python/apache_beam/dataframe/frame_base_test.py b/sdks/python/apache_beam/dataframe/frame_base_test.py index 2d16d02ba1ea..5962bb2fb72b 100644 --- a/sdks/python/apache_beam/dataframe/frame_base_test.py +++ b/sdks/python/apache_beam/dataframe/frame_base_test.py @@ -72,7 +72,7 @@ def add_one(frame): def test_args_to_kwargs(self): class Base(object): - def func(self, a=1, b=2, c=3): + def func(self, a=1, b=2, c=3, *, kw_only=4): pass class Proxy(object): @@ -87,6 +87,9 @@ def func(self, **kwargs): self.assertEqual(proxy.func(2, 4, 6), {'a': 2, 'b': 4, 'c': 6}) self.assertEqual(proxy.func(2, c=6), {'a': 2, 'c': 6}) self.assertEqual(proxy.func(c=6, a=2), {'a': 2, 'c': 6}) + self.assertEqual(proxy.func(2, kw_only=20), {'a': 2, 'kw_only': 20}) + with self.assertRaises(TypeError): # got too many positioned arguments + proxy.func(2, 4, 6, 8) def test_args_to_kwargs_populates_defaults(self): class Base(object): @@ -129,6 +132,45 @@ def func_removed_args(self, a, c, **kwargs): proxy.func_removed_args() self.assertEqual(proxy.func_removed_args(12, d=100), {'a': 12, 'd': 100}) + def test_args_to_kwargs_populates_default_handles_kw_only(self): + class Base(object): + def func(self, a=1, b=2, c=3, *, kw_only=4): + pass + + class ProxyUsesKwOnly(object): + @frame_base.args_to_kwargs(Base) + @frame_base.populate_defaults(Base) + def func(self, a, kw_only, **kwargs): + return dict(kwargs, a=a, kw_only=kw_only) + + proxy = ProxyUsesKwOnly() + + # pylint: disable=too-many-function-args,no-value-for-parameter + self.assertEqual(proxy.func(), {'a': 1, 'kw_only': 4}) + self.assertEqual(proxy.func(100), {'a': 100, 'kw_only': 4}) + self.assertEqual( + proxy.func(2, 4, 6, kw_only=8), { + 'a': 2, 'b': 4, 'c': 6, 'kw_only': 8 + }) + with self.assertRaises(TypeError): + proxy.func(2, 4, 6, 8) # got too many positioned arguments + + class ProxyDoesntUseKwOnly(object): + @frame_base.args_to_kwargs(Base) + @frame_base.populate_defaults(Base) + def func(self, a, **kwargs): + return dict(kwargs, a=a) + + proxy = ProxyDoesntUseKwOnly() + + # pylint: disable=too-many-function-args,no-value-for-parameter + self.assertEqual(proxy.func(), {'a': 1}) + self.assertEqual(proxy.func(100), {'a': 100}) + self.assertEqual( + proxy.func(2, 4, 6, kw_only=8), { + 'a': 2, 'b': 4, 'c': 6, 'kw_only': 8 + }) + if __name__ == '__main__': unittest.main() diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py index e2390bda28be..a7a278fbf62a 100644 --- a/sdks/python/apache_beam/dataframe/frames.py +++ b/sdks/python/apache_beam/dataframe/frames.py @@ -907,7 +907,7 @@ def sort_index(self, axis, **kwargs): return frame_base.DeferredFrame.wrap( expressions.ComputedExpression( 'sort_index', - lambda df: df.sort_index(axis, **kwargs), + lambda df: df.sort_index(axis=axis, **kwargs), [self._expr], requires_partition_by=partitionings.Arbitrary(), preserves_partition_by=partitionings.Arbitrary(), @@ -2687,7 +2687,7 @@ def set_axis(self, labels, axis, **kwargs): return frame_base.DeferredFrame.wrap( expressions.ComputedExpression( 'set_axis', - lambda df: df.set_axis(labels, axis, **kwargs), + lambda df: df.set_axis(labels, axis=axis, **kwargs), [self._expr], requires_partition_by=partitionings.Arbitrary(), preserves_partition_by=partitionings.Arbitrary())) From 3ed2822e98e601ab10ed5a13b84df946e8ea01b3 Mon Sep 17 00:00:00 2001 From: Chris Neffshade Date: Tue, 12 Sep 2023 16:37:27 +0000 Subject: [PATCH 16/20] Fix arg. --- sdks/python/apache_beam/dataframe/frame_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/dataframe/frame_base.py b/sdks/python/apache_beam/dataframe/frame_base.py index ba06b925ac4c..88e6eb6a060b 100644 --- a/sdks/python/apache_beam/dataframe/frame_base.py +++ b/sdks/python/apache_beam/dataframe/frame_base.py @@ -550,7 +550,7 @@ def wrapper(*args, **kwargs): f"**{BEAM_SPECIFIC!r}** for details.") -def with_docs_from(base_type: object, name=None, removed_method=False): +def with_docs_from(base_type, name=None, removed_method=False): """Decorator that updates the documentation from the wrapped function to duplicate the documentation from the identically-named method in `base_type`. From fb38399cda8d7c7bd8aac77a5d6faba51e64fa10 Mon Sep 17 00:00:00 2001 From: Chris Neffshade Date: Tue, 12 Sep 2023 16:37:27 +0000 Subject: [PATCH 17/20] Fix arg. --- sdks/python/apache_beam/dataframe/frame_base.py | 2 +- sdks/python/apache_beam/dataframe/frames_test.py | 7 ------- 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/frame_base.py b/sdks/python/apache_beam/dataframe/frame_base.py index ba06b925ac4c..88e6eb6a060b 100644 --- a/sdks/python/apache_beam/dataframe/frame_base.py +++ b/sdks/python/apache_beam/dataframe/frame_base.py @@ -550,7 +550,7 @@ def wrapper(*args, **kwargs): f"**{BEAM_SPECIFIC!r}** for details.") -def with_docs_from(base_type: object, name=None, removed_method=False): +def with_docs_from(base_type, name=None, removed_method=False): """Decorator that updates the documentation from the wrapped function to duplicate the documentation from the identically-named method in `base_type`. diff --git a/sdks/python/apache_beam/dataframe/frames_test.py b/sdks/python/apache_beam/dataframe/frames_test.py index ae6439a94001..69e7f4b19d6f 100644 --- a/sdks/python/apache_beam/dataframe/frames_test.py +++ b/sdks/python/apache_beam/dataframe/frames_test.py @@ -813,13 +813,6 @@ def test_append_sort(self): self._run_test(lambda df1, df2: df2.append(df1, sort=True), df1, df2) self._run_test(lambda df1, df2: df2.append(df1, sort=False), df1, df2) - @unittest.skipIf(PD_VERSION < (2, 0), 'append removed in Pandas 2.0') - def test_append_raises_pandas_2(self): - df = pd.DataFrame({'A': [1, 1]}) - - with self.assertRaises(NotImplementedError): - df.append(df) - def test_smallest_largest(self): df = pd.DataFrame({'A': [1, 1, 2, 2], 'B': [2, 3, 5, 7]}) self._run_test(lambda df: df.nlargest(1, 'A', keep='all'), df) From d3d07bb36eea974be7bd85bdbb547bf9b3bb536d Mon Sep 17 00:00:00 2001 From: Chris Neffshade Date: Wed, 13 Sep 2023 09:50:43 +0000 Subject: [PATCH 18/20] Fix comments --- .../apache_beam/dataframe/frame_base.py | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/frame_base.py b/sdks/python/apache_beam/dataframe/frame_base.py index 88e6eb6a060b..1da12ececff6 100644 --- a/sdks/python/apache_beam/dataframe/frame_base.py +++ b/sdks/python/apache_beam/dataframe/frame_base.py @@ -484,11 +484,14 @@ def args_to_kwargs(base_type, removed_method=False, removed_args=None): determine the name to use for arguments that are converted to keyword arguments. - removed_method used in cases where a method has been removed in a later - version of Pandas. removed_args used in cases where a method has had - arguments removed in a later version of Pandas. - For internal use only. No backwards compatibility guarantees. + + Args: + base_type: The pandas type of the method that this is trying to replicate. + removed_method: Whether this method has been removed in the running + Pandas version. + removed_args: If not empty, which arguments have been dropped in the + running Pandas version. """ def wrap(func): if removed_method: @@ -629,11 +632,14 @@ def populate_defaults(base_type, removed_method=False, removed_args=None): with default values for all keyword arguments, based on the default values for the identically-named method on `base_type`. - removed_method used in cases where a method has been removed in a later - version of Pandas. removed_args used in cases where a method has had - arguments removed in a later version of Pandas. - For internal use only. No backwards compatibility guarantees. + + Args: + base_type: The pandas type of the method that this is trying to replicate. + removed_method: Whether this method has been removed in the running + Pandas version. + removed_args: If not empty, which arguments have been dropped in the + running Pandas version. """ def wrap(func): if removed_method: From 88087749bea9e2cf73bb1c33d0eaad68e7c3923c Mon Sep 17 00:00:00 2001 From: Chris Neffshade Date: Fri, 15 Sep 2023 17:24:52 +0000 Subject: [PATCH 19/20] Address comments --- sdks/python/apache_beam/dataframe/frame_base.py | 2 +- sdks/python/apache_beam/dataframe/frame_base_test.py | 9 ++++++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/frame_base.py b/sdks/python/apache_beam/dataframe/frame_base.py index b64e8e2d53c2..48a4c29d0589 100644 --- a/sdks/python/apache_beam/dataframe/frame_base.py +++ b/sdks/python/apache_beam/dataframe/frame_base.py @@ -500,7 +500,7 @@ def wrap(func): removed_arg_names = removed_args if removed_args is not None else [] - # TODO: Better handle position only arguments if they are ever a thing + # We would need to add position only arguments if they ever become a thing # in Pandas (as of 2.1 currently they aren't). base_arg_spec = getfullargspec(unwrap(getattr(base_type, func.__name__))) base_arg_names = base_arg_spec.args diff --git a/sdks/python/apache_beam/dataframe/frame_base_test.py b/sdks/python/apache_beam/dataframe/frame_base_test.py index 5962bb2fb72b..7ce4ba022acd 100644 --- a/sdks/python/apache_beam/dataframe/frame_base_test.py +++ b/sdks/python/apache_beam/dataframe/frame_base_test.py @@ -134,7 +134,7 @@ def func_removed_args(self, a, c, **kwargs): def test_args_to_kwargs_populates_default_handles_kw_only(self): class Base(object): - def func(self, a=1, b=2, c=3, *, kw_only=4): + def func(self, a, b=2, c=3, *, kw_only=4): pass class ProxyUsesKwOnly(object): @@ -146,7 +146,9 @@ def func(self, a, kw_only, **kwargs): proxy = ProxyUsesKwOnly() # pylint: disable=too-many-function-args,no-value-for-parameter - self.assertEqual(proxy.func(), {'a': 1, 'kw_only': 4}) + with self.assertRaises(TypeError): # missing 1 require positional argument + proxy.func() + self.assertEqual(proxy.func(100), {'a': 100, 'kw_only': 4}) self.assertEqual( proxy.func(2, 4, 6, kw_only=8), { @@ -164,7 +166,8 @@ def func(self, a, **kwargs): proxy = ProxyDoesntUseKwOnly() # pylint: disable=too-many-function-args,no-value-for-parameter - self.assertEqual(proxy.func(), {'a': 1}) + with self.assertRaises(TypeError): # missing 1 required positional argument + proxy.func() self.assertEqual(proxy.func(100), {'a': 100}) self.assertEqual( proxy.func(2, 4, 6, kw_only=8), { From a5c3a0e17c12e84d33eb502a1e802dc0cb06e7d5 Mon Sep 17 00:00:00 2001 From: Chris Neffshade Date: Fri, 15 Sep 2023 18:08:46 +0000 Subject: [PATCH 20/20] typo --- sdks/python/apache_beam/dataframe/frame_base_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/dataframe/frame_base_test.py b/sdks/python/apache_beam/dataframe/frame_base_test.py index 7ce4ba022acd..b3077320720f 100644 --- a/sdks/python/apache_beam/dataframe/frame_base_test.py +++ b/sdks/python/apache_beam/dataframe/frame_base_test.py @@ -146,7 +146,7 @@ def func(self, a, kw_only, **kwargs): proxy = ProxyUsesKwOnly() # pylint: disable=too-many-function-args,no-value-for-parameter - with self.assertRaises(TypeError): # missing 1 require positional argument + with self.assertRaises(TypeError): # missing 1 required positional argument proxy.func() self.assertEqual(proxy.func(100), {'a': 100, 'kw_only': 4})