From 86ff7d3d1a4d469ee604ef7e6e8b2239bc936e1e Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 20 Oct 2022 16:19:43 -0400 Subject: [PATCH 1/2] Immediately truncate full restriction on drain of periodic impulse --- sdks/python/apache_beam/transforms/periodicsequence.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sdks/python/apache_beam/transforms/periodicsequence.py b/sdks/python/apache_beam/transforms/periodicsequence.py index 56075a5acc71..4f8f41e1fbed 100644 --- a/sdks/python/apache_beam/transforms/periodicsequence.py +++ b/sdks/python/apache_beam/transforms/periodicsequence.py @@ -49,6 +49,10 @@ def create_tracker(self, restriction): def restriction_size(self, unused_element, restriction): return restriction.size() + + # On drain, immediately stop emitting new elements + def truncate(self, unused_element, unused_restriction): + return None class ImpulseSeqGenDoFn(beam.DoFn): From 950d8a6ac3451d40a728dd55c09682e3e1468a21 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 20 Oct 2022 21:04:20 -0400 Subject: [PATCH 2/2] Whitespace format --- sdks/python/apache_beam/transforms/periodicsequence.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/periodicsequence.py b/sdks/python/apache_beam/transforms/periodicsequence.py index 4f8f41e1fbed..6127aa8de555 100644 --- a/sdks/python/apache_beam/transforms/periodicsequence.py +++ b/sdks/python/apache_beam/transforms/periodicsequence.py @@ -49,7 +49,7 @@ def create_tracker(self, restriction): def restriction_size(self, unused_element, restriction): return restriction.size() - + # On drain, immediately stop emitting new elements def truncate(self, unused_element, unused_restriction): return None