-
Notifications
You must be signed in to change notification settings - Fork 344
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Encryption support producer #560
Encryption support producer #560
Conversation
- use base crypto package for encryption
pulsar/internal/commands.go
Outdated
@@ -269,6 +270,117 @@ func serializeBatch(wb Buffer, | |||
wb.PutUint32(checksum, checksumIdx) | |||
} | |||
|
|||
// copy of the method serializeBatch(....) with an extension to encrypt payload | |||
func serializeBatchWithEncryption(wb Buffer, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are the differences between serializeBatch and serializeBatchWithEncryption? It looks like a lot of duplicate code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I preferred to put this in separate function instead of modifying the exiting one. Let me check if I can remove duplicate code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reused the serializeBatch
pulsar/producer.go
Outdated
@@ -163,6 +165,19 @@ type ProducerOptions struct { | |||
// PartitionsAutoDiscoveryInterval is the time interval for the background process to discover new partitions | |||
// Default is 1 minute | |||
PartitionsAutoDiscoveryInterval time.Duration | |||
|
|||
// EncryptionKeys list of encryption key names to encrypt session key | |||
EncryptionKeys []string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we wrap these into a struct? Maybe something like this?
type Encryption {
Keys []string
KeyReader crypto.KeyReader
etc
]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -14,10 +14,7 @@ require ( | |||
github.com/google/uuid v1.1.2 | |||
github.com/inconshreveable/mousetrap v1.0.0 // indirect | |||
github.com/klauspost/compress v1.10.8 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious why is the mod and sum file changing? Can these changes be done in a separate PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me recheck again :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
synced with master branch
pulsar/internal/commands.go
Outdated
wb.PutUint32(checksum, checksumIdx) | ||
} | ||
|
||
func encryptPayload(msgMetadata *pb.MessageMetadata, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this return an error?
pulsar/internal/commands.go
Outdated
|
||
// there was a error in encrypting the payload and | ||
// crypto failure action is set to crypto.ProducerCryptoFailureActionFail | ||
if encryptedPayload == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure panic is the correct behavior here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its better to throw an exception because client is setting ProducerCryptoFailureActionFail
to fail.
pulsar/internal/commands.go
Outdated
crypto.NewMessageMetadataSupplier(msgMetadata), | ||
compressedPayload) | ||
|
||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this error be log somewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
// [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD] | ||
|
||
// compress the payload | ||
compressedPayload := compressionProvider.Compress(nil, uncompressedPayload.ReadableSlice()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We want to compress before encrypting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per java implementation => Yes compress and then encrypt.
pulsar/producer_partition.go
Outdated
} | ||
} | ||
|
||
func (p *partitionProducer) updateDataKey() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be thread safe? Also, can we move the key refreshing to another MR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can move it to next MR.
Just ask: why this to be moved to another MR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
pulsar/producer_partition.go
Outdated
} else if p.batchBuilder == nil { | ||
provider, err := GetBatcherBuilderProvider(p.options.BatcherBuilderType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why change this? The p
masks the partitionProducer and make the code harder to follow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just tried to remove duplicate code :) If it is not ok, I'll revert it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reverted
pulsar/internal/batch_builder.go
Outdated
) | ||
|
||
return &bc, nil | ||
} | ||
|
||
// UseEncryptionKeys encryption key names to use |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about removing these config options and passing an Encryptor interface/struct. I think the config functions work better when things are scoped in their own package - it make it name spacing clearer. Also, it doesn't really match the rest of the code.
type Encrypter interface {
Encrypt(msgMetadata, payload []byte) ([]byte, error)
}
If encryption is not provided there can be a noop encryter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@cckellogg @GPrabhudas Any update for this? We are ready to initiate a new version of the release, hoping to include this new change in the new version (because some of the code has been merged in), can we consider advancing the progress here. |
I'll work on it and address the review suggestions. |
@wolfstudy what is your timeframe for a new release? |
What I want is to start the release process after merging the change related to this function. If the development of this feature takes some time, we can consider including it in the next version. |
There are additional MRs (besides this one) to get this feature working. Currently, just the crypto package has been merged. It's probably better to cut a release now and then add the crypto implementation (producer and consumer) into the next release. To me that is better than rushing to get MRs in. |
…Fanatics/pulsar-client-go into encryption-support-ext-producer
// Encrypt producer encryptor | ||
func (e *producerEncryptor) Encrypt(payload []byte, msgMetadata crypto.MessageMetadataSupplier) ([]byte, error) { | ||
// encryption is enabled but KeyReader interface is not implemented | ||
if e.keyReader == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be detected and an error raised while setting up the producer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
pulsar/producer_partition.go
Outdated
logCtx := fmt.Sprintf("[%v] [%v] [%v]", p.topic, p.producerName, p.producerID) | ||
messageCrypto, err := crypto.NewDefaultMessageCrypto(logCtx, true, logger) | ||
if err != nil { | ||
logger.WithError(err).Error("Unable to get MessageCrypto instance. Producer creation is abandoned") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will there be more context in the err of why this failed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Error is returned if generation of data key is failed. Here we are logging the same error with other information.
pulsar/encryption.go
Outdated
// ProducerEncryptionInfo encryption related fields required by the producer | ||
type ProducerEncryptionInfo struct { | ||
// KeyReader read RSA public/private key pairs | ||
Keyreader crypto.KeyReader |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KeyReader
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
pulsar/internal/crypto/encryptor.go
Outdated
|
||
// Encryptor support encryption | ||
type Encryptor interface { | ||
Encrypt([]byte, crypto.MessageMetadataSupplier) ([]byte, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is internal we can pass the *pb.MessageMetadata and avoid having to create a supplier for each message when encryption is not being used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
// send unencrypted message | ||
if e.producerCryptoFailureAction == crypto.ProducerCryptoFailureActionSend { | ||
e.logger.Errorf("Encryption of payload failed : %v", err) | ||
e.logger.Warn("ProducerCryptoFailureAction is set to send, sending unecrypted message") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would log only one warning message here.
logger.WithError(err).Warnf("Encryption failed for payload sending unencrypted message ProducerCryptoFailureAction is set to send")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
pulsar/producer_partition.go
Outdated
func (p *partitionProducer) generateDataKey() error { | ||
if p.options.Encryption != nil { | ||
if p.options.Encryption.KeyReader != nil { | ||
return p.options.Encryption.MessageCrypto.AddPublicKeyCipher(p.options.Encryption.Keys, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could the MessageCrypto be nil here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. It won't be nil.
If not provided, it is getting initialized with default one.
https://github.com/Fanatics/pulsar-client-go/blob/encryption-support-ext-producer/pulsar/producer_partition.go#L141
} | ||
|
||
if encryption.MessageCrypto == nil { | ||
logCtx := fmt.Sprintf("[%v] [%v] [%v]", p.topic, p.producerName, p.producerID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this can be update in another MR. It might be better to create a context logger instead of sending a string
cl := logger.WithFields(log.Fields{
"topic": topic,
"producer": producerName,
"producerID", producerID
})
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure
if b != nil { | ||
batchesData[idx] = b | ||
sequenceIDs[idx] = s | ||
callbacks[idx] = c | ||
} else if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the error be set with the other arrays so they are always the same length? the only way b
can be nil is if there are no messages right.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cckellogg can this PR me merged if all looks good :)
Breakdown of PR #552
This PR includes the encryption changes at producer side.