Skip to content

Commit

Permalink
allow out of order updates for blocks
Browse files Browse the repository at this point in the history
In Netflix#1297, normalized values were buffered and would only
get output at the end. This led to some user complaints
that data was delayed. It was still within SLA, but it
wouldn't show up until a bit later than before if the end
time was set to now instead of offset by the SLA for data
to be actionable.

With the compressed mutable block type introduced in Netflix#1320,
it is possible to update the older blocks as they are not
immutable. In this change, the block store checks have been
updated so the previous block can be updated to account for
out of order values that may arrive. This should make the
data visible earlier as it was before.
  • Loading branch information
brharrington committed Jun 9, 2021
1 parent 8520de5 commit 5eb9273
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 184 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class MemoryBlockStore(step: Long, blockSize: Int, numBlocks: Int) extends Block

private val blockStep = step * blockSize

private[db] val blocks: Array[Block] = new Array[Block](numBlocks)
private[db] val blocks: Array[MutableBlock] = new Array[MutableBlock](numBlocks)

private[db] var currentPos: Int = 0

Expand Down Expand Up @@ -129,12 +129,27 @@ class MemoryBlockStore(step: Long, blockSize: Int, numBlocks: Int) extends Block
hasData = true
}
var pos = ((timestamp - currentBlock.start) / step).asInstanceOf[Int]
require(pos >= 0, "data is too old")
//require(pos >= 0, "data is too old")
if (pos >= blockSize) {
// Exceeded window of current block, create a new one for the next
// interval
newBlock(alignStart(timestamp), rollup)
pos = ((timestamp - currentBlock.start) / step).asInstanceOf[Int]
currentBlock.update(pos, value)
} else if (pos < 0) {
// Out of order update received for an older block, try to update the
// previous block
val previousPos = (currentPos - 1) % numBlocks
if (previousPos > 0 && blocks(previousPos) != null) {
val previousBlock = blocks(previousPos)
pos = ((timestamp - previousBlock.start) / step).asInstanceOf[Int]
if (pos >= 0 && pos < blockSize) {
previousBlock.update(pos, value)
}
}
} else {
currentBlock.update(pos, value)
}
currentBlock.update(pos, value)
}

def update(timestamp: Long): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,36 +60,39 @@ class RollingValueFunction(
// Update the current entry or old entry that is still within range
val i = (t % size).toInt
values(i) = aggr(values(i), value)
writeValue(t, values(i))
} else if (delta == 1) {
// Flush the old entry and overwrite for the new time
val i = (t % size).toInt
writeValue((t - size) * step, values(i))
//writeValue((t - size) * step, values(i))
lastUpdateTime = t
values(i) = value
writeValue(t, values(i))
} else if (delta > 1) {
// Flush the old values, and update for most recent value
close()
//close()
val i = (t % size).toInt
lastUpdateTime = t
values(i) = value
writeValue(t, values(i))
}
}

override def close(): Unit = {
// Flush all the old entries
var ts = math.max(lastUpdateTime - size + 1, 0L)
/*var ts = math.max(lastUpdateTime - size + 1, 0L)
while (ts <= lastUpdateTime) {
val i = (ts % size).toInt
writeValue(ts * step, values(i))
values(i) = Double.NaN
ts += 1
}
}*/
lastUpdateTime = -1L
}

private def writeValue(timestamp: Long, value: Double): Unit = {
if (!value.isNaN) {
next(timestamp, value)
next(timestamp * step, value)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,20 @@ class MemoryBlockStoreSuite extends AnyFunSuite {
test("update, old data") {
val bs = new MemoryBlockStore(1, 1, 40)
bs.update(0, List(1.0, 2.0, 3.0))
intercept[IllegalArgumentException] {
bs.update(0, List(4.0, 5.0))
}
assert(bs.fetch(0, 2, Block.Sum).toList === List(1.0, 2.0, 3.0))
bs.update(0, List(4.0, 5.0))
// previous block can still be updated, but older updates will be ignored
assert(bs.fetch(0, 2, Block.Sum).toList === List(1.0, 5.0, 3.0))
}

test("update, old data hour transition") {
val bs = new MemoryBlockStore(1, 60, 3)
val input = (0 to 61).map(_.toDouble).toList
bs.update(0, input)
assert(bs.fetch(0, 61, Block.Sum).toList === input)

bs.update(58, 60.0)
bs.update(59, 60.0)
assert(bs.fetch(58, 61, Block.Sum).toList === List(60.0, 60.0, 60.0, 61.0))
}

test("update, overwrite") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,164 +28,164 @@ class MaxValueFunctionSuite extends AnyFunSuite {

test("basic") {
val n = newFunction(10)
assert(n.update(5, 1.0) === Nil)
assert(n.update(15, 2.0) === Nil)
assert(n.update(25, 2.0) === List(10 -> 1.0))
assert(n.update(35, 1.0) === List(20 -> 2.0))
assert(n.update(85, 1.0) === List(30 -> 2.0, 40 -> 1.0))
assert(n.update(95, 2.0) === Nil)
assert(n.update(105, 2.0) === List(90 -> 1.0))
assert(n.update(5, 1.0) === List(10 -> 1.0))
assert(n.update(15, 2.0) === List(20 -> 2.0))
assert(n.update(25, 2.0) === List(30 -> 2.0))
assert(n.update(35, 1.0) === List(40 -> 1.0))
assert(n.update(85, 1.0) === List(90 -> 1.0))
assert(n.update(95, 2.0) === List(100 -> 2.0))
assert(n.update(105, 2.0) === List(110 -> 2.0))
n.close()
assert(n.result() === List(100 -> 2.0, 110 -> 2.0))
assert(n.result() === Nil)
}

test("already normalized updates") {
val n = newFunction(10)
assert(n.update(0, 1.0) === Nil)
assert(n.update(10, 2.0) === Nil)
assert(n.update(20, 3.0) === List(0 -> 1.0))
assert(n.update(30, 1.0) === List(10 -> 2.0))
assert(n.update(0, 1.0) === List(0 -> 1.0))
assert(n.update(10, 2.0) === List(10 -> 2.0))
assert(n.update(20, 3.0) === List(20 -> 3.0))
assert(n.update(30, 1.0) === List(30 -> 1.0))
n.close()
assert(n.result() === List(20 -> 3.0, 30 -> 1.0))
assert(n.result() === Nil)
}

test("already normalized updates, skip 1") {
val n = newFunction(10)
assert(n.update(0, 1.0) === Nil)
assert(n.update(10, 1.0) === Nil)
assert(n.update(30, 1.0) === List(0 -> 1.0, 10 -> 1.0))
assert(n.update(0, 1.0) === List(0 -> 1.0))
assert(n.update(10, 1.0) === List(10 -> 1.0))
assert(n.update(30, 1.0) === List(30 -> 1.0))
n.close()
assert(n.result() === List(30 -> 1.0))
assert(n.result() === Nil)
}

test("already normalized updates, miss heartbeat") {
val n = newFunction(10)
assert(n.update(0, 1.0) === Nil)
assert(n.update(10, 2.0) === Nil)
assert(n.update(30, 1.0) === List(0 -> 1.0, 10 -> 2.0))
assert(n.update(60, 4.0) === List(30 -> 1.0))
assert(n.update(70, 2.0) === Nil)
assert(n.update(0, 1.0) === List(0 -> 1.0))
assert(n.update(10, 2.0) === List(10 -> 2.0))
assert(n.update(30, 1.0) === List(30 -> 1.0))
assert(n.update(60, 4.0) === List(60 -> 4.0))
assert(n.update(70, 2.0) === List(70 -> 2.0))
n.close()
assert(n.result() === List(60 -> 4.0, 70 -> 2.0))
assert(n.result() === Nil)
}

test("random offset") {

def t(m: Int, s: Int) = (m * 60 + s) * 1000L
val n = newFunction(60000)
assert(n.update(t(1, 13), 1.0) === Nil)
assert(n.update(t(2, 13), 1.0) === Nil)
assert(n.update(t(3, 13), 1.0) === List(t(2, 0) -> 1.0))
assert(n.update(t(1, 13), 1.0) === List(t(2, 0) -> 1.0))
assert(n.update(t(2, 13), 1.0) === List(t(3, 0) -> 1.0))
assert(n.update(t(3, 13), 1.0) === List(t(4, 0) -> 1.0))
n.close()
assert(n.result() === List(t(3, 0) -> 1.0, t(4, 0) -> 1.0))
assert(n.result() === Nil)
}

test("random offset, skip 1") {

def t(m: Int, s: Int) = (m * 60 + s) * 1000L
val n = newFunction(60000)
assert(n.update(t(1, 13), 1.0) === Nil)
assert(n.update(t(2, 13), 1.0) === Nil)
assert(n.update(t(3, 13), 1.0) === List(t(2, 0) -> 1.0))
assert(n.update(t(5, 13), 1.0) === List(t(3, 0) -> 1.0, t(4, 0) -> 1.0))
assert(n.update(t(1, 13), 1.0) === List(t(2, 0) -> 1.0))
assert(n.update(t(2, 13), 1.0) === List(t(3, 0) -> 1.0))
assert(n.update(t(3, 13), 1.0) === List(t(4, 0) -> 1.0))
assert(n.update(t(5, 13), 1.0) === List(t(6, 0) -> 1.0))
n.close()
assert(n.result() === List(t(6, 0) -> 1.0))
assert(n.result() === Nil)
}

test("random offset, skip 2") {

def t(m: Int, s: Int) = (m * 60 + s) * 1000L
val n = newFunction(60000)
assert(n.update(t(1, 13), 1.0) === Nil)
assert(n.update(t(2, 13), 1.0) === Nil)
assert(n.update(t(3, 13), 1.0) === List(t(2, 0) -> 1.0))
assert(n.update(t(6, 13), 1.0) === List(t(3, 0) -> 1.0, t(4, 0) -> 1.0))
assert(n.update(t(1, 13), 1.0) === List(t(2, 0) -> 1.0))
assert(n.update(t(2, 13), 1.0) === List(t(3, 0) -> 1.0))
assert(n.update(t(3, 13), 1.0) === List(t(4, 0) -> 1.0))
assert(n.update(t(6, 13), 1.0) === List(t(7, 0) -> 1.0))
n.close()
assert(n.result() === List(t(7, 0) -> 1.0))
assert(n.result() === Nil)
}

test("random offset, skip almost 2") {

def t(m: Int, s: Int) = (m * 60 + s) * 1000L
val n = newFunction(60000)
assert(n.update(t(1, 13), 1.0) === Nil)
assert(n.update(t(2, 13), 1.0) === Nil)
assert(n.update(t(3, 13), 1.0) === List(t(2, 0) -> 1.0))
assert(n.update(t(6, 5), 1.0) === List(t(3, 0) -> 1.0, t(4, 0) -> 1.0))
assert(n.update(t(1, 13), 1.0) === List(t(2, 0) -> 1.0))
assert(n.update(t(2, 13), 1.0) === List(t(3, 0) -> 1.0))
assert(n.update(t(3, 13), 1.0) === List(t(4, 0) -> 1.0))
assert(n.update(t(6, 5), 1.0) === List(t(7, 0) -> 1.0))
n.close()
assert(n.result() === List(t(7, 0) -> 1.0))
assert(n.result() === Nil)
}

test("random offset, out of order") {

def t(m: Int, s: Int) = (m * 60 + s) * 1000L
val n = newFunction(60000)
assert(n.update(t(1, 13), 1.0) === Nil)
assert(n.update(t(1, 12), 1.0) === Nil)
assert(n.update(t(2, 13), 1.0) === Nil)
assert(n.update(t(2, 10), 1.0) === Nil)
assert(n.update(t(3, 13), 1.0) === List(t(2, 0) -> 1.0))
assert(n.update(t(3, 11), 1.0) === Nil)
assert(n.update(t(1, 13), 1.0) === List(t(2, 0) -> 1.0))
assert(n.update(t(1, 12), 1.0) === List(t(2, 0) -> 1.0))
assert(n.update(t(2, 13), 1.0) === List(t(3, 0) -> 1.0))
assert(n.update(t(2, 10), 1.0) === List(t(3, 0) -> 1.0))
assert(n.update(t(3, 13), 1.0) === List(t(4, 0) -> 1.0))
assert(n.update(t(3, 11), 1.0) === List(t(4, 0) -> 1.0))
n.close()
assert(n.result() === List(t(3, 0) -> 1.0, t(4, 0) -> 1.0))
assert(n.result() === Nil)
}

test("random offset, dual reporting") {

def t(m: Int, s: Int) = (m * 60 + s) * 1000L
val n = newFunction(60000)
assert(n.update(t(1, 13), 1.0) === Nil)
assert(n.update(t(1, 13), 1.0) === Nil)
assert(n.update(t(2, 13), 1.0) === Nil)
assert(n.update(t(2, 13), 1.0) === Nil)
assert(n.update(t(3, 13), 1.0) === List(t(2, 0) -> 1.0))
assert(n.update(t(3, 13), 1.0) === Nil)
assert(n.update(t(1, 13), 1.0) === List(t(2, 0) -> 1.0))
assert(n.update(t(1, 13), 1.0) === List(t(2, 0) -> 1.0))
assert(n.update(t(2, 13), 1.0) === List(t(3, 0) -> 1.0))
assert(n.update(t(2, 13), 1.0) === List(t(3, 0) -> 1.0))
assert(n.update(t(3, 13), 1.0) === List(t(4, 0) -> 1.0))
assert(n.update(t(3, 13), 1.0) === List(t(4, 0) -> 1.0))
n.close()
assert(n.result() === List(t(3, 0) -> 1.0, t(4, 0) -> 1.0))
assert(n.result() === Nil)
}

test("init, 17") {

def t(m: Int, s: Int) = (m * 60 + s) * 1000L
val n = newFunction(60000)
val v = 1.0 / 60.0
assert(n.update(t(8, 17), v) === Nil)
assert(n.update(t(9, 17), 0.0) === Nil)
assert(n.update(t(10, 17), 0.0) === List(t(9, 0) -> v))
assert(n.update(t(8, 17), v) === List(t(9, 0) -> v))
assert(n.update(t(9, 17), 0.0) === List(t(10, 0) -> 0.0))
assert(n.update(t(10, 17), 0.0) === List(t(11, 0) -> 0.0))
n.close()
assert(n.result() === List(t(10, 0) -> 0.0, t(11, 0) -> 0.0))
assert(n.result() === Nil)
}

test("frequent updates") {
val n = newFunction(10)
assert(n.update(0, 1.0) === Nil)
assert(n.update(2, 2.0) === Nil)
assert(n.update(4, 4.0) === Nil)
assert(n.update(8, 8.0) === Nil)
assert(n.update(12, 2.0) === List(0 -> 1.0))
assert(n.update(40, 3.0) === List(10 -> 8.0, 20 -> 2.0))
assert(n.update(0, 1.0) === List(0 -> 1.0))
assert(n.update(2, 2.0) === List(10 -> 2.0))
assert(n.update(4, 4.0) === List(10 -> 4.0))
assert(n.update(8, 8.0) === List(10 -> 8.0))
assert(n.update(12, 2.0) === List(20 -> 2.0))
assert(n.update(40, 3.0) === List(40 -> 3.0))
n.close()
assert(n.result() === List(40 -> 3.0))
assert(n.result() === Nil)
}

test("multi-node updates") {
val n = newFunction(10)

// Node 1: if shutting down it can flush an interval early
assert(n.update(0, 1.0) === Nil)
assert(n.update(10, 2.0) === Nil)
assert(n.update(0, 1.0) === List(0 -> 1.0))
assert(n.update(10, 2.0) === List(10 -> 2.0))

// Other nodes: report around the same time. Need to ensure that the flush
// from the node shutting down doesn't block the updates from the other nodes
assert(n.update(0, 3.0) === Nil)
assert(n.update(0, 4.0) === Nil)
assert(n.update(0, 5.0) === Nil)
assert(n.update(0, 3.0) === List(0 -> 3.0))
assert(n.update(0, 4.0) === List(0 -> 4.0))
assert(n.update(0, 5.0) === List(0 -> 5.0))

assert(n.update(10, 6.0) === Nil)
assert(n.update(10, 7.0) === Nil)
assert(n.update(10, 8.0) === Nil)
assert(n.update(10, 6.0) === List(10 -> 6.0))
assert(n.update(10, 7.0) === List(10 -> 7.0))
assert(n.update(10, 8.0) === List(10 -> 8.0))

n.close()
assert(n.result() === List(0 -> 5.0, 10 -> 8.0))
assert(n.result() === Nil)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,6 @@ class NormalizationCacheSuite extends AnyFunSuite with BeforeAndAfter {
clock.setWallTime(41)
cache.updateGauge(dp(40, 2.0))

// Force expiration to check that entry will be closed. Update a different entry
// to trigger the check
clock.setWallTime(100)
cache.updateGauge(dp(100, 1.0).copy(tags = Map("name" -> "test")))

val expected = List(
dp(10, 0.0),
dp(20, 1.0),
Expand Down
Loading

0 comments on commit 5eb9273

Please sign in to comment.