Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Periodic Impulse does not drain on Dataflow #22776

Closed
Abacn opened this issue Aug 18, 2022 · 3 comments · Fixed by #23765 or #25716
Closed

[Bug]: Periodic Impulse does not drain on Dataflow #22776

Abacn opened this issue Aug 18, 2022 · 3 comments · Fixed by #23765 or #25716
Labels
bug core done & done Issue has been reviewed after it was closed for verification, followups, etc. java P2 python

Comments

@Abacn
Copy link
Contributor

Abacn commented Aug 18, 2022

What happened?

Draining a pipeline with source from PeriodicImpulse never ends. This happens for both Java and Python sdk.
In contrast, similar functionality GenericSequence which existing in Java supports drain.

Code snippets:

public class PeriodicImpulseTest {
  public static void main(String[] argv) {
    PipelineOptions options = PipelineOptionsFactory.fromArgs(argv).withValidation().as(PipelineOptions.class);
    Pipeline p = Pipeline.create(options);

    PCollection<Void> result =
        p.apply(PeriodicImpulse.create().withInterval(Duration.millis(500)))
            .apply(Reshuffle.viaRandomKey())
            .apply(ParDo.of(new DoFn<Instant, Void>() {
              @ProcessElement
              public void processElement(DoFn<Instant, Void>.ProcessContext c){
                System.out.println(c.element());
              }}
            ));
    assertThat(result.isBounded(), equalTo(IsBounded.UNBOUNDED));
    p.run().waitUntilFinish();
  }
}

Successful implementation using GenerateSequence:

public class GenerateSequenceTest {
  public static void main(String[] argv) {
    PipelineOptions options = PipelineOptionsFactory.fromArgs(argv).withValidation().as(PipelineOptions.class);
    Pipeline p = Pipeline.create(options);

    PCollection<Long> input = p.apply(GenerateSequence.from(0).withRate(1,
        Duration.standardSeconds(1)));
    input.apply(ParDo.of(new DoFn<Long, Void>() {
        @ProcessElement
        public void processElement(DoFn<Long, Void>.ProcessContext c){
          System.out.println(c.element());
        }}
    ));
    p.run();
  }
}

Issue Priority

Priority: 2

Issue Component

Component: sdk-java-core

@Abacn
Copy link
Contributor Author

Abacn commented Aug 18, 2022

.add-labels python

@Abacn
Copy link
Contributor Author

Abacn commented Nov 4, 2022

Python SDK issue resolved. May keep this open for Java SDK.

@damccorm
Copy link
Contributor

damccorm commented Nov 4, 2022

Oops - forgot this was happening both places

@damccorm damccorm reopened this Nov 4, 2022
@Abacn Abacn removed this from the 2.44.0 Release milestone Nov 22, 2022
@github-actions github-actions bot added this to the 2.47.0 Release milestone Mar 4, 2023
@tvalentyn tvalentyn added the done & done Issue has been reviewed after it was closed for verification, followups, etc. label Mar 7, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug core done & done Issue has been reviewed after it was closed for verification, followups, etc. java P2 python
Projects
None yet
3 participants