Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: numpy.int64 types are not serialized correctly #33020

Closed
1 of 17 tasks
jrmccluskey opened this issue Nov 5, 2024 · 6 comments · Fixed by #33137
Closed
1 of 17 tasks

[Bug]: numpy.int64 types are not serialized correctly #33020

jrmccluskey opened this issue Nov 5, 2024 · 6 comments · Fixed by #33137
Assignees

Comments

@jrmccluskey
Copy link
Contributor

What happened?

Relevant repro of the problem:

image

Relevant error:

WARNING:apache_beam.coders.coder_impl:Using fallback deterministic coder for type '<class 'numpy.int64'>' in '[7]: Create/MaybeReshuffle/Reshuffle/ReshufflePerKey/GroupByKey'.
ERROR:apache_beam.runners.common:Unable to deterministically encode '0' of type '<class 'numpy.int64'>', please provide a type hint for the input of '[7]: Create/MaybeReshuffle/Reshuffle/ReshufflePerKey/GroupByKey' [while running '[7]: Create/Map(decode)']

The problem appears to be that the coder does not know how to handle the numpy int64 type and the fallback coder (PickleCoder, I believe) cannot encode the type deterministically so it clobbers the content inside the class to its base value of 0.

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@shoyer
Copy link
Contributor

shoyer commented Nov 5, 2024

Here's an another case which is clearly broken:

>>> [(np.int64(1), 'first'), (np.int64(2), 'second')] | beam.GroupByKey()
[(0, ['first', 'second'])]

@jrmccluskey
Copy link
Contributor Author

it's the same serialization problem, numpy types as a whole don't seem to get handled by the coders correctly and wind up being clobbered to their base vale

@liferoad
Copy link
Collaborator

liferoad commented Nov 9, 2024

Tested this with different conditions.

  • Beam versions do not matter here.
  • this is somehow caused by python versions

Test code:

import apache_beam as beam
import numpy as np

with beam.Pipeline() as pipeline:
    indata = pipeline | "Create" >> beam.Create([(a, int(a)) for a in np.arange(3)])

    # Apply CombinePerkey to sum values for each key.
    outdata = indata | "CombinePerKey" >> beam.CombinePerKey(sum) | beam.Map(print)

Run this with Python 3.10 and Beam is able to generate the expected error:

WARNING:apache_beam.coders.coder_impl:Using fallback deterministic coder for type '<class 'numpy.int64'>' in 'Create/MaybeReshuffle/Reshuffle/ReshufflePerKey/GroupByKey'.
ERROR:apache_beam.runners.common:Unable to deterministically encode '0' of type '<class 'numpy.int64'>', please provide a type hint for the input of 'Create/MaybeReshuffle/Reshuffle/ReshufflePerKey/GroupByKey' [while running 'Create/Map(decode)']
Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1501, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 689, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1687, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1800, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 262, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 205, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
  File "apache_beam/runners/worker/opcounters.py", line 210, in apache_beam.runners.worker.opcounters.OperationCounters.update_from
  File "apache_beam/runners/worker/opcounters.py", line 262, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample
  File "apache_beam/coders/coder_impl.py", line 1493, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py", line 1504, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py", line 1053, in apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py", line 377, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py", line 457, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
  File "apache_beam/coders/coder_impl.py", line 518, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_special_deterministic
TypeError: Unable to deterministically encode '0' of type '<class 'numpy.int64'>', please provide a type hint for the input of 'Create/MaybeReshuffle/Reshuffle/ReshufflePerKey/GroupByKey'

With Python 3.11 and Python 3.12, Beam creates the warnings but does not stop running the code:

WARNING:apache_beam.coders.coder_impl:Using fallback deterministic coder for type '<class 'numpy.int64'>' in 'Create/MaybeReshuffle/Reshuffle/ReshufflePerKey/GroupByKey'.
WARNING:apache_beam.coders.coder_impl:Using fallback deterministic coder for type '<class 'numpy.int64'>' in 'Create/MaybeReshuffle/Reshuffle/ReshufflePerKey/GroupByKey'.
(0, 3)

@liferoad liferoad self-assigned this Nov 17, 2024
@liferoad
Copy link
Collaborator

More notes:

Looks like __getstate__ exists now with Python 3.12 but not Python 3.10. So for Python 3.12. we stop raising the type error due to this line:

elif hasattr(value, "__getstate__"):

Python 3.12.7 (main, Oct 11 2024, 17:03:50) [GCC 13.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import numpy as np
>>> value = np.int64(1)
>>> hasattr(value, "__setstate__")
True
>>> hasattr(value, "__getstate__")
True
>>>
Python 3.10.7 (main, Oct 11 2024, 17:41:30) [GCC 13.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import numpy as np
>>> value = np.int64(1)
>>> hasattr(value, "__getstate__")
False
>>> hasattr(value, "__setstate__")
True
>>>

@liferoad
Copy link
Collaborator

For numpy,

import numpy as np
import pickle

# Create a NumPy array
arr = np.array([[1, 2], [3, 4]])

# Pickle the array
serialized_arr = pickle.dumps(arr)

# Unpickle the array (implicitly calls __setstate__)
restored_arr = pickle.loads(serialized_arr)

print(restored_arr)

@liferoad
Copy link
Collaborator

@github-actions github-actions bot added this to the 2.62.0 Release milestone Nov 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants