Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOSSIP][BREAKING] Replace floodPublish param with floodPublishMaxMessageSizeThreshold #391

Merged
merged 7 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,13 @@ data class GossipParams(
val seenTTL: Duration = 2.minutes,

/**
* [floodPublish] is a gossipsub router option that enables flood publishing.
* When this is enabled, published messages are forwarded to all peers with score >=
* to publishThreshold
* [floodPublishMaxMessageSizeThreshold] controls the maximum size (in bytes) a message will be
* published using flood publishing mode.
* When a message size is <= [floodPublishMaxMessageSizeThreshold], published messages are forwarded
* to all peers with score >= to [GossipScoreParams.publishThreshold]
* The default is 0 KiB (never flood publish).
*/
val floodPublish: Boolean = false,
val floodPublishMaxMessageSizeThreshold: Int = 0,
tbenr marked this conversation as resolved.
Show resolved Hide resolved
tbenr marked this conversation as resolved.
Show resolved Hide resolved

/**
* [gossipFactor] affects how many peers we will emit gossip to at each heartbeat.
Expand Down Expand Up @@ -240,7 +242,7 @@ data class GossipParams(

/**
* [iDontWantMinMessageSizeThreshold] controls the minimum size (in bytes) that an incoming message needs to be so that an IDONTWANT message is sent to mesh peers.
* The default is 16 KB.
* The default is 16 KiB.
tbenr marked this conversation as resolved.
Show resolved Hide resolved
*/
val iDontWantMinMessageSizeThreshold: Int = 16384,

Expand All @@ -260,6 +262,8 @@ data class GossipParams(
check(DLow <= D, "DLow should be <= D")
check(DHigh >= D, "DHigh should be >= D")
check(gossipFactor in 0.0..1.0, "gossipFactor should be in range [0.0, 1.0]")
check(floodPublishMaxMessageSizeThreshold >= 0, "floodPublishMaxMessageSizeThreshold should be >= 0")
check(iDontWantMinMessageSizeThreshold >= 0, "iDontWantMinMessageSizeThreshold should be >= 0")
}

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,10 @@ open class GossipRouter(
override fun broadcastOutbound(msg: PubsubMessage): CompletableFuture<Unit> {
msg.topics.forEach { lastPublished[it] = currentTimeSupplier() }

val floodPublish = msg.protobufMessage.data.size() <= params.floodPublishMaxMessageSizeThreshold
tbenr marked this conversation as resolved.
Show resolved Hide resolved

val peers =
if (params.floodPublish) {
if (floodPublish) {
msg.topics
.flatMap { getTopicPeers(it) }
.filter { score.score(it.peerId) >= scoreParams.publishThreshold }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ class GossipParamsBuilder {

private var pruneBackoff: Duration? = null

private var floodPublish: Boolean? = null

private var gossipFactor: Double? = null

private var opportunisticGraftPeers: Int? = null
Expand Down Expand Up @@ -76,6 +74,8 @@ class GossipParamsBuilder {

private var iDontWantMinMessageSizeThreshold: Int? = null

private var floodPublishMaxMessageSizeThreshold: Int? = null

private var iDontWantTTL: Duration? = null

init {
Expand All @@ -90,7 +90,7 @@ class GossipParamsBuilder {
this.maxPeersSentInPruneMsg = source.maxPeersSentInPruneMsg
this.maxPeersAcceptedInPruneMsg = source.maxPeersAcceptedInPruneMsg
this.pruneBackoff = source.pruneBackoff
this.floodPublish = source.floodPublish
this.floodPublishMaxMessageSizeThreshold = source.floodPublishMaxMessageSizeThreshold
this.gossipFactor = source.gossipFactor
this.opportunisticGraftPeers = source.opportunisticGraftPeers
this.opportunisticGraftTicks = source.opportunisticGraftTicks
Expand Down Expand Up @@ -141,8 +141,6 @@ class GossipParamsBuilder {

fun pruneBackoff(value: Duration): GossipParamsBuilder = apply { pruneBackoff = value }

fun floodPublish(value: Boolean): GossipParamsBuilder = apply { floodPublish = value }

fun gossipFactor(value: Double): GossipParamsBuilder = apply { gossipFactor = value }

fun opportunisticGraftPeers(value: Int): GossipParamsBuilder = apply {
Expand Down Expand Up @@ -185,6 +183,8 @@ class GossipParamsBuilder {

fun iDontWantMinMessageSizeThreshold(value: Int): GossipParamsBuilder = apply { iDontWantMinMessageSizeThreshold = value }

fun floodPublishMaxMessageSizeThreshold(value: Int): GossipParamsBuilder = apply { floodPublishMaxMessageSizeThreshold = value }

fun iDontWantTTL(value: Duration): GossipParamsBuilder = apply { iDontWantTTL = value }

fun build(): GossipParams {
Expand All @@ -203,7 +203,7 @@ class GossipParamsBuilder {
gossipHistoryLength = gossipHistoryLength!!,
heartbeatInterval = heartbeatInterval!!,
seenTTL = seenTTL!!,
floodPublish = floodPublish!!,
floodPublishMaxMessageSizeThreshold = floodPublishMaxMessageSizeThreshold!!,
gossipFactor = gossipFactor!!,
opportunisticGraftPeers = opportunisticGraftPeers!!,
opportunisticGraftTicks = opportunisticGraftTicks!!,
Expand Down Expand Up @@ -252,7 +252,7 @@ class GossipParamsBuilder {
check(seenTTL != null, { "seenTTL must not be null" })
check(maxPeersSentInPruneMsg != null, { "maxPeersSentInPruneMsg must not be null" })
check(pruneBackoff != null, { "pruneBackoff must not be null" })
check(floodPublish != null, { "floodPublish must not be null" })
check(floodPublishMaxMessageSizeThreshold != null, { "floodPublishMaxMessageSizeThreshold must not be null" })
check(gossipFactor != null, { "gossipFactor must not be null" })
check(opportunisticGraftPeers != null, { "opportunisticGraftPeers must not be null" })
check(opportunisticGraftTicks != null, { "opportunisticGraftTicks must not be null" })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit

class GossipPubsubRouterTest : PubsubRouterTest(
createGossipFuzzRouterFactory {
GossipRouterBuilder(params = GossipParams(3, 3, 100, floodPublish = false))
GossipRouterBuilder(params = GossipParams(3, 3, 100, floodPublishMaxMessageSizeThreshold = 0))
}
) {

Expand Down Expand Up @@ -59,7 +59,7 @@ class GossipPubsubRouterTest : PubsubRouterTest(
// this is to test ihave/iwant
fuzz.timeController.addTime(Duration.ofMillis(1))

val r = { GossipRouterBuilder(params = GossipParams(3, 3, 3, DOut = 0, DLazy = 1000, floodPublish = false)) }
val r = { GossipRouterBuilder(params = GossipParams(3, 3, 3, DOut = 0, DLazy = 1000, floodPublishMaxMessageSizeThreshold = 0)) }
val routerCenter = fuzz.createTestGossipRouter(r)
allRouters.add(0, routerCenter)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,8 +530,9 @@ class GossipV1_1Tests : GossipTestsBase() {

@Test
fun testNotFloodPublish() {
val message = newMessage("topic1", 0L, "Hello-0".toByteArray())
val appScore = mutableMapOf<PeerId, Double>().withDefault { 0.0 }
val coreParams = GossipParams(3, 3, 3, floodPublish = false)
val coreParams = GossipParams(3, 3, 3, floodPublishMaxMessageSizeThreshold = message.protobufMessage.data.size() - 1)
val peerScoreParams = GossipPeerScoreParams(appSpecificScore = { appScore.getValue(it) })
val scoreParams = GossipScoreParams(peerScoreParams = peerScoreParams)
val test = ManyRoutersTest(params = coreParams, scoreParams = scoreParams)
Expand All @@ -545,7 +546,7 @@ class GossipV1_1Tests : GossipTestsBase() {
val topicMesh = test.gossipRouter.mesh["topic1"]!!
assertTrue(topicMesh.size > 0 && topicMesh.size < test.routers.size)

test.gossipRouter.publish(newMessage("topic1", 0L, "Hello-0".toByteArray()))
test.gossipRouter.publish(message)

test.fuzz.timeController.addTime(50.millis)

Expand All @@ -557,8 +558,9 @@ class GossipV1_1Tests : GossipTestsBase() {

@Test
fun testFloodPublish() {
val message = newMessage("topic1", 0L, "Hello-0".toByteArray())
val appScore = mutableMapOf<PeerId, Double>().withDefault { 0.0 }
val coreParams = GossipParams(3, 3, 3, floodPublish = true)
val coreParams = GossipParams(3, 3, 3, floodPublishMaxMessageSizeThreshold = message.protobufMessage.data.size())
val peerScoreParams = GossipPeerScoreParams(
appSpecificScore = { appScore.getValue(it) },
appSpecificWeight = 1.0
Expand All @@ -580,7 +582,7 @@ class GossipV1_1Tests : GossipTestsBase() {
val topicMesh = test.gossipRouter.mesh["topic1"]!!.map { it.peerId }
assertTrue(topicMesh.size > 0 && topicMesh.size < test.routers.size)

test.gossipRouter.publish(newMessage("topic1", 0L, "Hello-0".toByteArray()))
test.gossipRouter.publish(message)

test.fuzz.timeController.addTime(50.millis)

Expand Down Expand Up @@ -650,7 +652,7 @@ class GossipV1_1Tests : GossipTestsBase() {
3,
3,
DLazy = 3,
floodPublish = false,
floodPublishMaxMessageSizeThreshold = 0,
gossipFactor = 0.5
)
val peerScoreParams = GossipPeerScoreParams(
Expand Down Expand Up @@ -714,7 +716,7 @@ class GossipV1_1Tests : GossipTestsBase() {
@Test
fun testOutboundMeshQuotas1() {
val appScore = mutableMapOf<PeerId, Double>().withDefault { 0.0 }
val coreParams = GossipParams(3, 3, 3, DLazy = 3, DOut = 1, floodPublish = false)
val coreParams = GossipParams(3, 3, 3, DLazy = 3, DOut = 1, floodPublishMaxMessageSizeThreshold = 0)
val peerScoreParams = GossipPeerScoreParams(appSpecificScore = { appScore.getValue(it) })
val scoreParams = GossipScoreParams(peerScoreParams = peerScoreParams)
val test = ManyRoutersTest(params = coreParams, scoreParams = scoreParams)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertDoesNotThrow

class SubscriptionsLimitTest : TwoGossipHostTestBase() {
override val params = GossipParams(maxSubscriptions = 5, floodPublish = true)
override val params = GossipParams(maxSubscriptions = 5, floodPublishMaxMessageSizeThreshold = 16384)

@Test
fun `new peer subscribed to many topics`() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ val Eth2DefaultGossipParams = GossipParams(
DLazy = 8,

pruneBackoff = 1.minutes,
floodPublish = true,
floodPublishMaxMessageSizeThreshold = 16384,
gossipFactor = 0.25,
DScore = 4,
DOut = 2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ val Eth2DefaultGossipParams = GossipParams(
DLazy = 8,

pruneBackoff = 1.minutes,
floodPublish = true,
floodPublishMaxMessageSizeThreshold = 16384,
gossipFactor = 0.25,
DScore = 4,
DOut = 2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class BlobDecouplingSimulation(
val randomSeed: Long = 3L,
val rnd: Random = Random(randomSeed),

val floodPublish: Boolean = true,
val floodPublishMaxMessageSizeThreshold: Int = 16384,

val sendingPeerBand: Bandwidth = Bandwidth.mbitsPerSec(100),

Expand Down Expand Up @@ -85,7 +85,7 @@ class BlobDecouplingSimulation(
val gossipParams = Eth2DefaultGossipParams
.copy(
// heartbeatInterval = 1.minutes
floodPublish = floodPublish
floodPublishMaxMessageSizeThreshold = floodPublishMaxMessageSizeThreshold
)
val gossipScoreParams = Eth2DefaultScoreParams
val gossipRouterCtor = { _: Int ->
Expand Down Expand Up @@ -294,7 +294,7 @@ fun main() {
// logger = {},
nodeCount = 1000,
peerBands = band,
floodPublish = false,
floodPublishMaxMessageSizeThreshold = 0,
// randomSeed = 2
)

Expand Down
Loading