Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reduce allocations for parsing payloads #1099

Merged
merged 1 commit into from
Oct 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2014-2019 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.atlas.akka

import java.io.InputStream

import akka.util.ByteString

/**
* Wraps a `ByteString` to allow it to be read from code expecting an `InputStream`. This
* can be used to avoid allocating a temporary array and using `ByteArrayInputStream`.
*/
class ByteStringInputStream(data: ByteString) extends InputStream {
private var pos = -1
private val length = data.length

override def read(): Int = {
pos += 1
if (pos >= length) -1 else data(pos) & 255
}

override def available(): Int = {
if (pos >= length) 0 else length - pos - 1
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ object CustomDirectives {
def json[T: Manifest]: MediaType => ByteString => T = { mediaType => bs =>
{
if (isSmile(mediaType))
Json.smileDecode[T](bs.toArray)
Json.smileDecode[T](new ByteStringInputStream(bs))
else
Json.decode[T](bs.toArray)
Json.decode[T](new ByteStringInputStream(bs))
}
}

Expand All @@ -82,9 +82,9 @@ object CustomDirectives {
{
val p =
if (isSmile(mediaType))
Json.newSmileParser(bs.toArray)
Json.newSmileParser(new ByteStringInputStream(bs))
else
Json.newJsonParser(bs.toArray)
Json.newJsonParser(new ByteStringInputStream(bs))
try decoder(p)
finally p.close()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2014-2019 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.atlas.akka

import java.io.ByteArrayInputStream
import java.util.Random

import akka.util.ByteString
import org.scalatest.FunSuite

class ByteStringInputStreamSuite extends FunSuite {
test("read() and available()") {
val random = new Random(42)
val data = new Array[Byte](4096)
random.nextBytes(data)

val bais = new ByteArrayInputStream(data)
val bsis = new ByteStringInputStream(ByteString(data))

data.indices.foreach { i =>
assert(bais.available() === data.length - i)
assert(bsis.available() === data.length - i)
assert(bais.read() === bsis.read())
}

assert(bais.read() === bsis.read())
}

test("read(buffer, offset, length)") {
val random = new Random(42)
val data = new Array[Byte](4096)
random.nextBytes(data)

val bais = new ByteArrayInputStream(data)
val bsis = new ByteStringInputStream(ByteString(data))

val b1 = new Array[Byte](13)
val b2 = new Array[Byte](13)
var i = 0
while (i < data.length) {
val len1 = bais.read(b1)
val len2 = bsis.read(b2)
assert(len1 === len2)
assert(b1 === b2)
i += len2
}

assert(bais.read(b1) === bsis.read(b2))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2014-2019 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.atlas.akka

import java.util.Random

import akka.util.ByteString
import org.openjdk.jmh.annotations.Benchmark
import org.openjdk.jmh.annotations.Scope
import org.openjdk.jmh.annotations.State
import org.openjdk.jmh.infra.Blackhole

/**
* Results:
*
* ```
* Benchmark Mode Cnt Score Error Units
* toArray thrpt 10 13.051 ± 0.549 ops/s
* inputStream thrpt 10 14.363 ± 0.420 ops/s
*
* Benchmark Mode Cnt Score Error Units
* toArray gc.alloc.rate.norm 10 104857643.179 ± 0.278 B/op
* inputStream gc.alloc.rate.norm 10 4138.877 ± 0.191 B/op
* ```
*/
@State(Scope.Benchmark)
class ByteStringReading {

private val random = new Random()
private val byteArray = new Array[Byte](1024 * 1024 * 100)
random.nextBytes(byteArray)
private val byteString = ByteString(byteArray)

@Benchmark
def toArray(bh: Blackhole): Unit = {
bh.consume(byteString.toArray)
}

@Benchmark
def inputStream(bh: Blackhole): Unit = {
val in = new ByteStringInputStream(byteString)
val buffer = new Array[Byte](4096)
var len = in.read(buffer)
while (len > 0) {
bh.consume(buffer)
len = in.read(buffer)
}
}
}