-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ack and compression parameters for Kafka #1359 #1712
Add ack and compression parameters for Kafka #1359 #1712
Conversation
@yurishkuro, I have made all the required changes along with the test. Kindly review. |
48519aa
to
464acd0
Compare
plugin/storage/kafka/options.go
Outdated
defaultRequiredAcks = "local" | ||
defaultCompression = "none" | ||
defaultCompressionLevel = -1000 | ||
suffixProtocolVersion = ".protocol-version" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please keep suffix
constants together
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
func getCompressionLevel(compressionMode string, compressionLevel int) (int, error) { | ||
compressionModeData := compressionModes[compressionMode] | ||
|
||
if compressionLevel == defaultCompressionLevel { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is confusing, I suggest if compressionLevel == 0 {
, which makes it obvious that default type's default value is being converted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a snippet from sarama library. this is the reason why I made defaultCompressionLevel = -1000
// CompressionLevelDefault is the constant to use in CompressionLevel
// to have the default compression level for any codec. The value is picked
// that we don't use any existing compression levels.
const CompressionLevelDefault = -1000
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's not the point. The meaning of "default" in sarama is different from what we have here. When dealing with CLI, if you want to use the default value for the respective compressor you simply not specify compression-level flag. Both 0 and -1000 are invalid values for most compressors except for zstd, where BOTH are valid and neither is the default. So I don't see a reason why this bad parameter design from sarama should be carried over to our flags. 0 is the natural null value for int field. It makes no sense to be playing tricks with -1000, which achieve the same results.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with you, i will make the changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's been fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i prefer if we compare with 0 explicitly, not with defaultCompressionLevel
which happens to be 0, because it makes it easier to understand the logic (you're replacing the zero value).
plugin/storage/kafka/options.go
Outdated
|
||
// getCompressionLevel to get compression level from compression type | ||
func getCompressionLevel(compressionMode string, compressionLevel int) (int, error) { | ||
compressionModeData := compressionModes[compressionMode] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in the other functions you use ToLower, should it be consistent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
plugin/storage/kafka/options.go
Outdated
return compressionModeData.defaultCompressionLevel, nil | ||
} | ||
|
||
if compressionModeData.minCompressionLevel <= compressionLevel && compressionModeData.maxCompressionLevel >= compressionLevel { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest reversing the condition and returning an error "compression level is not within valid range"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
assert.Equal(t, val.compressor, sarama.CompressionSnappy) | ||
} | ||
|
||
func TestRequiredAcks(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this test is useful, it just tests that map[] works as expected
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed how? what is it testing? It's just a map lookup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad. I have removed it now.
plugin/storage/kafka/options_test.go
Outdated
require.Error(t, err) | ||
} | ||
|
||
func TestCompressionModes(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this test is useful, it just tests that map[] works as expected
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
85e1a37
to
6974ec5
Compare
725f179
to
464acd0
Compare
56b9891
to
464acd0
Compare
plugin/storage/kafka/options.go
Outdated
} | ||
|
||
if compressionModeData.minCompressionLevel > compressionLevel || compressionModeData.maxCompressionLevel < compressionLevel { | ||
return 0, fmt.Errorf("compression level is not within valid range") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could make the error more descriptive:
fmt.Errorf("compression level %d for '%s' is not within valid range [%d, %d]", level, mode, min, max)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
plugin/storage/kafka/options.go
Outdated
} | ||
|
||
//getCompressionModes maps input modes to sarama CompressionCodec | ||
func getCompressionModes(mode string) (sarama.CompressionCodec, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/getCompressionModes/getCompressionMode/ (it returns a singular value, not plural)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
plugin/storage/kafka/options_test.go
Outdated
|
||
func TestCompressionModesFailures(t *testing.T) { | ||
_, err := getCompressionModes("test") | ||
require.Error(t, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this should be assert, not require (you CAN continue the test)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
7c410c9
to
f43a8cd
Compare
Codecov Report
@@ Coverage Diff @@
## master #1712 +/- ##
=========================================
Coverage ? 98.17%
=========================================
Files ? 195
Lines ? 9602
Branches ? 0
=========================================
Hits ? 9427
Misses ? 137
Partials ? 38
Continue to review full report at Codecov.
|
505a16c
to
2491996
Compare
@yurishkuro Kindly review. |
plugin/storage/kafka/options.go
Outdated
suffixEncoding = ".encoding" | ||
suffixRequiredAcks = ".required-acks" | ||
suffixCompression = ".compression" | ||
suffixCompressionLevel = ".compression.level" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you change this to compression-level
otherwise it will cause problems when specifying the options in yaml due to the previous option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
plugin/storage/kafka/options_test.go
Outdated
_, err := getCompressionMode("test") | ||
assert.Error(t, err) | ||
|
||
_, err = getCompressionMode("temp") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why testing test
and temp
- isn't this one redundant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed it.
assert.Error(t, err) | ||
} | ||
|
||
func TestRequiredAcksFailures(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about testing the valid values aswell?
Also same comment as above related to test
and temp
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed it. Also added test for valid values.
@yurishkuro How do you propose to test it? I am not able to figure it out. |
94e6e9b
to
464acd0
Compare
plugin/storage/kafka/options_test.go
Outdated
"--kafka.producer.encoding=protobuf", | ||
"--kafka.producer.required-acks=local", | ||
"--kafka.producer.compression=gzip", | ||
"--kafka.producer.compression.level=6"}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should have been changed to compression-level
- so not sure why the unit tests seems to be passing atm?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Such a stupid mistake from my side. :(
fixed and pushed it.
0c22184
to
8bd740b
Compare
plugin/storage/kafka/options_test.go
Outdated
"--kafka.producer.encoding=protobuf", | ||
"--kafka.producer.required-acks=local", | ||
"--kafka.producer.compression=gzip", | ||
"--kafka.producer.compression-level=6"}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need to change the value, as 6 is the default for gzip - so regardless of whether this flag is present, the test would currently pass.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
8bd740b
to
464acd0
Compare
Signed-off-by: chandresh-pancholi <[email protected]>
90e43d2
to
e5582d9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chandresh-pancholi thanks.
For reference in the future, if you could avoid squashing commits during the review cycles, it makes it easier to review changes. The commits will then be squashed when the PR is merged.
@chandresh-pancholi To address the code coverage, you should create some additional tests, variations of |
@objectiser Thank you so much for the review. I didn't know about the squashing commit. I will keep that in mind in future commits. |
@objectiser I agree but how to test
|
is this PR mergable to master? |
@chandresh-pancholi Good point - possibly the best way would be to return the error and handle up the stack? From quick look the other storage plugins don't seem to validate the options there. @yurishkuro Thoughts? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. The coverage for the error handling inside InitFromViper
could be fixed with some hacking, but not sure it brings much value, as the underlying error conditions are being tested in isolation already.
Exactly why we, as a rule, don't use |
func getCompressionLevel(compressionMode string, compressionLevel int) (int, error) { | ||
compressionModeData := compressionModes[compressionMode] | ||
|
||
if compressionLevel == defaultCompressionLevel { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i prefer if we compare with 0 explicitly, not with defaultCompressionLevel
which happens to be 0, because it makes it easier to understand the logic (you're replacing the zero value).
Which problem is this PR solving?
Short description of the changes