diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go index d39f458ff407..8b8b8ca3841f 100644 --- a/pubsub/integration_test.go +++ b/pubsub/integration_test.go @@ -333,7 +333,13 @@ func testPublishAndReceive(t *testing.T, client *Client, maxMsgs int, synchronou timeout := 3 * time.Minute timeoutCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() - gotMsgs, err := pullN(timeoutCtx, sub, len(want), func(ctx context.Context, m *Message) { + gotMsgs, err := pullN(timeoutCtx, sub, len(want), 2*time.Second, func(ctx context.Context, m *Message) { + if exactlyOnceDelivery { + if _, err := m.AckWithResult().Get(ctx); err != nil { + t.Fatalf("failed to ack message with exactly once delivery: %v", err) + } + return + } m.Ack() }) if err != nil { @@ -2003,16 +2009,13 @@ func TestIntegration_TopicRetention(t *testing.T) { } } -func TestExactlyOnceDelivery_PublishReceive(t *testing.T) { +func TestIntegration_ExactlyOnceDelivery_PublishReceive(t *testing.T) { ctx := context.Background() client := integrationTestClient(ctx, t) for _, maxMsgs := range []int{0, 3, -1} { // MaxOutstandingMessages = default, 3, unlimited testPublishAndReceive(t, client, maxMsgs, false, true, 10, 0) } - - // Tests for large messages (larger than the 4MB gRPC limit). - testPublishAndReceive(t, client, 0, false, true, 1, 5*1024*1024) } func TestIntegration_TopicUpdateSchema(t *testing.T) { diff --git a/pubsub/message.go b/pubsub/message.go index 94010cd0cd22..33d87735cb41 100644 --- a/pubsub/message.go +++ b/pubsub/message.go @@ -157,20 +157,20 @@ func (ah *psAckHandler) OnNack() { } func (ah *psAckHandler) OnAckWithResult() *AckResult { + // call done with true to indicate ack. + ah.done(true) if !ah.exactlyOnceDelivery { return newSuccessAckResult() } - // call done with true to indicate ack. - ah.done(true) return ah.ackResult } func (ah *psAckHandler) OnNackWithResult() *AckResult { + // call done with false to indicate nack. + ah.done(false) if !ah.exactlyOnceDelivery { return newSuccessAckResult() } - // call done with false to indicate nack. - ah.done(false) return ah.ackResult } diff --git a/pubsub/streaming_pull_test.go b/pubsub/streaming_pull_test.go index a5a48a1e17ff..de832f1c7152 100644 --- a/pubsub/streaming_pull_test.go +++ b/pubsub/streaming_pull_test.go @@ -66,7 +66,7 @@ func TestStreamingPullMultipleFetches(t *testing.T) { func testStreamingPullIteration(t *testing.T, client *Client, server *mockServer, msgs []*pb.ReceivedMessage) { sub := client.Subscription("S") - gotMsgs, err := pullN(context.Background(), sub, len(msgs), func(_ context.Context, m *Message) { + gotMsgs, err := pullN(context.Background(), sub, len(msgs), 0, func(_ context.Context, m *Message) { id, err := strconv.Atoi(msgAckID(m)) if err != nil { t.Fatalf("pullN err: %v", err) @@ -196,7 +196,7 @@ func TestStreamingPullRetry(t *testing.T) { sub := client.Subscription("S") sub.ReceiveSettings.NumGoroutines = 1 - gotMsgs, err := pullN(context.Background(), sub, len(testMessages), func(_ context.Context, m *Message) { + gotMsgs, err := pullN(context.Background(), sub, len(testMessages), 0, func(_ context.Context, m *Message) { id, err := strconv.Atoi(msgAckID(m)) if err != nil { t.Fatalf("pullN err: %v", err) @@ -297,7 +297,7 @@ func TestStreamingPullConcurrent(t *testing.T) { sub := client.Subscription("S") ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - gotMsgs, err := pullN(ctx, sub, nMessages, func(ctx context.Context, m *Message) { + gotMsgs, err := pullN(ctx, sub, nMessages, 0, func(ctx context.Context, m *Message) { m.Ack() }) if c := status.Convert(err); err != nil && c.Code() != codes.Canceled { @@ -513,7 +513,8 @@ func newMock(t *testing.T) (*Client, *mockServer) { } // pullN calls sub.Receive until at least n messages are received. -func pullN(ctx context.Context, sub *Subscription, n int, f func(context.Context, *Message)) ([]*Message, error) { +// Wait a provided duration before cancelling. +func pullN(ctx context.Context, sub *Subscription, n int, wait time.Duration, f func(context.Context, *Message)) ([]*Message, error) { var ( mu sync.Mutex msgs []*Message @@ -526,6 +527,9 @@ func pullN(ctx context.Context, sub *Subscription, n int, f func(context.Context mu.Unlock() f(ctx, m) if nSeen >= n { + // Wait a specified amount of time so that for exactly once delivery, + // Acks aren't cancelled immediately. + time.Sleep(wait) cancel() } }) diff --git a/pubsub/subscription.go b/pubsub/subscription.go index 0f7c23804d86..7edd7ad2d4c2 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -397,8 +397,8 @@ type SubscriptionConfig struct { // by Pub/Sub and have distinct MessageID values. // // Lastly, to guarantee messages have been acked or nacked properly, you must - // call Message.AckWithResponse() or Message.NackWithResponse(). These return an - // AckResponse which will be ready if the message has been acked (or failed to be acked). + // call Message.AckWithResult() or Message.NackWithResult(). These return an + // AckResult which will be ready if the message has been acked (or failed to be acked). EnableExactlyOnceDelivery bool // State indicates whether or not the subscription can receive messages. diff --git a/pubsub/subscription_test.go b/pubsub/subscription_test.go index bcbbd1f355d2..ed8c865480a6 100644 --- a/pubsub/subscription_test.go +++ b/pubsub/subscription_test.go @@ -296,7 +296,7 @@ func testReceive(t *testing.T, synchronous, exactlyOnceDelivery bool) { srv.Publish(topic.name, []byte{byte(i)}, nil) } sub.ReceiveSettings.Synchronous = synchronous - msgs, err := pullN(ctx, sub, 256, func(_ context.Context, m *Message) { + msgs, err := pullN(ctx, sub, 256, 0, func(_ context.Context, m *Message) { if exactlyOnceDelivery { ar := m.AckWithResult() // Don't use the above ctx here since that will get cancelled.