Skip to content

Commit

Permalink
test: fix test_disconnect_replica (#3442)
Browse files Browse the repository at this point in the history
  • Loading branch information
BorysTheDev authored Aug 5, 2024
1 parent 6da445f commit faea4ee
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 29 deletions.
2 changes: 1 addition & 1 deletion src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,7 @@ void DflyShardReplica::ExecuteTx(TransactionData&& tx_data, Context* cntx) {
}

if (!tx_data.IsGlobalCmd()) {
VLOG(2) << "Execute cmd without sync between shards. txid: " << tx_data.txid;
VLOG(3) << "Execute cmd without sync between shards. txid: " << tx_data.txid;
executor_->Execute(tx_data.dbid, tx_data.command);
return;
}
Expand Down
52 changes: 24 additions & 28 deletions tests/dragonfly/replication_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,6 @@ async def check_all_replicas_finished(c_replicas, c_master, timeout=20):
]


@pytest.mark.skip(reason="Failing on github regression action")
@pytest.mark.asyncio
@pytest.mark.parametrize("t_master, t_crash_fs, t_crash_ss, t_disonnect, n_keys", disconnect_cases)
async def test_disconnect_replica(
Expand All @@ -204,9 +203,14 @@ async def test_disconnect_replica(
t_disonnect,
n_keys,
):
master = df_factory.create(proactor_threads=t_master)
master = df_factory.create(
proactor_threads=t_master, vmodule="replica=2,dflycmd=2,server_family=2"
)
replicas = [
(df_factory.create(proactor_threads=t), crash_fs)
(
df_factory.create(proactor_threads=t, vmodule="replica=2,dflycmd=2,server_family=2"),
crash_fs,
)
for i, (t, crash_fs) in enumerate(
chain(
zip(t_crash_fs, repeat(DISCONNECT_CRASH_FULL_SYNC)),
Expand All @@ -216,33 +220,32 @@ async def test_disconnect_replica(
)
]

# Start master
logging.debug("Start master")
master.start()
c_master = master.client(single_connection_client=True)

# Start replicas and create clients
logging.debug("Start replicas and create clients")
df_factory.start_all([replica for replica, _ in replicas])

c_replicas = [(replica, replica.client(), crash_type) for replica, crash_type in replicas]

def replicas_of_type(tfunc):
return [args for args in c_replicas if tfunc(args[2])]

# Start data fill loop
logging.debug("Start data fill loop")
seeder = df_seeder_factory.create(port=master.port, keys=n_keys, dbcount=2)
fill_task = asyncio.create_task(seeder.run())

# Run full sync
logging.debug("Run full sync")

async def full_sync(replica: DflyInstance, c_replica, crash_type):
c_replica = replica.client(single_connection_client=True)
await c_replica.execute_command("REPLICAOF localhost " + str(master.port))
if crash_type == 0:
await asyncio.sleep(random.random() / 100 + 0.01)
await c_replica.close()
replica.stop(kill=True)
else:
await wait_available_async(c_replica)
await c_replica.close()

await asyncio.gather(*(full_sync(*args) for args in c_replicas))

Expand All @@ -256,10 +259,11 @@ async def full_sync(replica: DflyInstance, c_replica, crash_type):
for _, c_replica, _ in replicas_of_type(lambda t: t > 0):
assert await c_replica.ping()

# Run stable state crashes
logging.debug("Run stable state crashes")

async def stable_sync(replica, c_replica, crash_type):
await asyncio.sleep(random.random() / 100)
await c_replica.connection_pool.disconnect()
await c_replica.close()
replica.stop(kill=True)

await asyncio.gather(*(stable_sync(*args) for args in replicas_of_type(lambda t: t == 1)))
Expand All @@ -271,37 +275,29 @@ async def stable_sync(replica, c_replica, crash_type):
for _, c_replica, _ in replicas_of_type(lambda t: t > 1):
assert await c_replica.ping()

# Stop streaming
seeder.stop()
await fill_task

# Check master survived all crashes
logging.debug("Check master survived all crashes")
assert await c_master.ping()

# Check phase 3 replicas are up-to-date and there is no gap or lag
await seeder.run(target_ops=2000)
await asyncio.sleep(1.0)

capture = await seeder.capture()
for replica, _, _ in replicas_of_type(lambda t: t > 1):
assert await seeder.compare(capture, port=replica.port)

# Check disconnects
async def disconnect(replica, c_replica, crash_type):
await asyncio.sleep(random.random() / 100)
await c_replica.execute_command("REPLICAOF NO ONE")

logging.debug("disconnect replicas")
await asyncio.gather(*(disconnect(*args) for args in replicas_of_type(lambda t: t == 2)))

await asyncio.sleep(0.5)

# Check phase 3 replica survived
logging.debug("Check phase 3 replica survived")
for replica, c_replica, _ in replicas_of_type(lambda t: t == 2):
assert await c_replica.ping()
assert await seeder.compare(capture, port=replica.port)
await c_replica.connection_pool.disconnect()
await c_replica.close()

logging.debug("Stop streaming")
seeder.stop()
await fill_task

# Check master survived all disconnects
logging.debug("Check master survived all disconnects")
assert await c_master.ping()
await c_master.close()

Expand Down

0 comments on commit faea4ee

Please sign in to comment.