Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix] check if callback is nil before calling it #1036

Merged
merged 1 commit into from
Jul 3, 2023

Conversation

gunli
Copy link
Contributor

@gunli gunli commented Jun 25, 2023

(If this PR fixes a github issue, please add Fixes #<xyz>.)

Fixes #

(or if this PR is one task of a github issue, please add Master Issue: #<xyz> to link to the master issue.)

Master Issue: #

Motivation

In pulsar/producer_partition.go, there are many cases that calling the callback without checking if it is nil or not, it will be panic if the input callback is nil at runtime.

Modifications

  • Modified pulsar/producer_partition.go

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (no)
  • The schema: (no)
  • The default values of configurations: (no)
  • The wire protocol: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented)
  • If a feature is not applicable for documentation, explain why?
  • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation

@gunli
Copy link
Contributor Author

gunli commented Jun 26, 2023

I report an bug #1040 that can be fixed in this PR, if it is confirmed, I can add a commit to fix it here. @liangyepianzhou @RobertIndie @merlimat

@gunli gunli force-pushed the fix_nil_callback_check branch from 041fd4a to 189ac73 Compare June 28, 2023 06:31
@shibd
Copy link
Member

shibd commented Jun 29, 2023

@gunli Hi, Thanks for your contribution!

It is enough to check if the callback is nil when the user pass.

func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage,
callback func(MessageID, *ProducerMessage, error)) {
p.internalSendAsync(ctx, msg, callback, false)
}

Callbacks elsewhere are coming in internally, so we don't need to check them.

BTW: Please add unit test.

@gunli
Copy link
Contributor Author

gunli commented Jun 29, 2023

No, the callback here is just the signature, users can input a nil one, and the input one can be passed everywhere.

func (p *producer) SendAsync(ctx context.Context, msg *ProducerMessage,
	callback func(MessageID, *ProducerMessage, error)) {
	p.getPartition(msg).SendAsync(ctx, msg, callback)
}

func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage,
	callback func(MessageID, *ProducerMessage, error)) {
	p.internalSendAsync(ctx, msg, callback, false)
}

func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage,
	callback func(MessageID, *ProducerMessage, error), flushImmediately bool) {
	// Register transaction operation to transaction and the transaction coordinator.
	var newCallback func(MessageID, *ProducerMessage, error)
	var txn *transaction
	if msg.Transaction != nil {
		transactionImpl := (msg.Transaction).(*transaction)
		txn = transactionImpl
		if transactionImpl.state != TxnOpen {
			p.log.WithField("state", transactionImpl.state).Error("Failed to send message" +
				" by a non-open transaction.")
			callback(nil, msg, newError(InvalidStatus, "Failed to send message by a non-open transaction."))
			return
		}

		if err := transactionImpl.registerProducerTopic(p.topic); err != nil {
			callback(nil, msg, err)
			return
		}
		if err := transactionImpl.registerSendOrAckOp(); err != nil {
			callback(nil, msg, err)
		}
		newCallback = func(id MessageID, producerMessage *ProducerMessage, err error) {
			callback(id, producerMessage, err)
			transactionImpl.endSendOrAckOp(err)
		}
	} else {
		newCallback = callback
	}
	if p.getProducerState() != producerReady {
		// Producer is closing
		newCallback(nil, msg, errProducerClosed)
		return
	}

	// bc only works when DisableBlockIfQueueFull is false
	bc := make(chan struct{})

	// callbackOnce make sure the callback is only invoked once in chunking
	callbackOnce := &sync.Once{}
	sr := &sendRequest{
		ctx:              ctx,
		msg:              msg,
		callback:         newCallback,
		callbackOnce:     callbackOnce,
		flushImmediately: flushImmediately,
		publishTime:      time.Now(),
		blockCh:          bc,
		closeBlockChOnce: &sync.Once{},
		transaction:      txn,
	}
	p.options.Interceptors.BeforeSend(p, msg)

	p.dataChan <- sr

	if !p.options.DisableBlockIfQueueFull {
		// block if queue full
		<-bc
	}
}

@shibd
Copy link
Member

shibd commented Jun 30, 2023

Why not verify that callback cannot be nil at the beginning of the producer.SendAsync method?

@RobertIndie
Copy link
Member

Why not verify that callback cannot be nil at the beginning of the producer.SendAsync method?

Do you mean to throw an error when the callback is nil? The user is currently able to pass the nil as the callback. If we verify it and raise the error, then it will introduce the breaking change here.

I think it's helpful to allow the user to pass the nil callback in some cases.

@shibd shibd merged commit 6f01a7c into apache:master Jul 3, 2023
@RobertIndie RobertIndie added this to the v0.11.0 milestone Jul 4, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants