Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: forbid DFLYCLUSTER commads set for emulated cluster mode #3307

Merged
merged 13 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions src/server/cluster/cluster_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,18 @@ void ClusterFamily::Cluster(CmdArgList args, ConnectionContext* cntx) {
return cntx->SendError(kClusterDisabled);
}

if (sub_cmd == "KEYSLOT") {
return KeySlot(args, cntx);
}

if (args.size() > 1) {
return cntx->SendError(WrongNumArgsError(absl::StrCat("CLUSTER ", sub_cmd)));
}

if (sub_cmd == "HELP") {
return ClusterHelp(cntx);
} else if (sub_cmd == "MYID") {
return ClusterMyId(cntx);
} else if (sub_cmd == "SHARDS") {
return ClusterShards(cntx);
} else if (sub_cmd == "SLOTS") {
Expand All @@ -379,8 +389,6 @@ void ClusterFamily::Cluster(CmdArgList args, ConnectionContext* cntx) {
return ClusterNodes(cntx);
} else if (sub_cmd == "INFO") {
return ClusterInfo(cntx);
} else if (sub_cmd == "KEYSLOT") {
return KeySlot(args, cntx);
} else {
return cntx->SendError(facade::UnknownSubCmd(sub_cmd, "CLUSTER"), facade::kSyntaxErrType);
}
Expand All @@ -401,11 +409,15 @@ void ClusterFamily::ReadWrite(CmdArgList args, ConnectionContext* cntx) {
}

void ClusterFamily::DflyCluster(CmdArgList args, ConnectionContext* cntx) {
if (!IsClusterEnabledOrEmulated()) {
return cntx->SendError(kClusterDisabled);
if (!(IsClusterEnabled() || (IsClusterEmulated() && cntx->journal_emulated))) {
return cntx->SendError("Cluster is disabled. Use --cluster_mode=yes to enable.");
}

VLOG(2) << "Got DFLYCLUSTER command (" << cntx->conn()->GetClientId() << "): " << args;
if (cntx->conn()) {
VLOG(2) << "Got DFLYCLUSTER command (" << cntx->conn()->GetClientId() << "): " << args;
} else {
VLOG(2) << "Got DFLYCLUSTER command (NO_CLIENT_ID): " << args;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which flow do you get null connection?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh this is the journal emulated mode

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:
client_info = cntx->conn()? ntx->conn()->GetClientId() : "stub";
VLOG(2) << "Got DFLYCLUSTER command (" << client_info << args;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in journal_emulated

}
Comment on lines +416 to +420
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (cntx->conn()) {
VLOG(2) << "Got DFLYCLUSTER command (" << cntx->conn()->GetClientId() << "): " << args;
} else {
VLOG(2) << "Got DFLYCLUSTER command (NO_CLIENT_ID): " << args;
}
auto print_conn = [](auto* conn) { return conn ? conn->GetClientId() : "NO_CLIENT_ID" };
VLOG(2) << "Got DFLYCLUSTER command (" << print_conn(cntx->conn()) << "): " << args;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's different types. int and char[]

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could return a string.. no need to duplicate code
(a string < 16 bytes will not do dynamic allocations)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will change it next time when do some changes in this file


ToUpper(&args[0]);
string_view sub_cmd = ArgS(args, 0);
Expand All @@ -414,8 +426,6 @@ void ClusterFamily::DflyCluster(CmdArgList args, ConnectionContext* cntx) {
return DflyClusterGetSlotInfo(args, cntx);
} else if (sub_cmd == "CONFIG") {
return DflyClusterConfig(args, cntx);
} else if (sub_cmd == "MYID") {
return DflyClusterMyId(args, cntx);
} else if (sub_cmd == "FLUSHSLOTS") {
return DflyClusterFlushSlots(args, cntx);
} else if (sub_cmd == "SLOT-MIGRATION-STATUS") {
Expand All @@ -425,12 +435,8 @@ void ClusterFamily::DflyCluster(CmdArgList args, ConnectionContext* cntx) {
return cntx->SendError(UnknownSubCmd(sub_cmd, "DFLYCLUSTER"), kSyntaxErrType);
}

void ClusterFamily::DflyClusterMyId(CmdArgList args, ConnectionContext* cntx) {
if (!args.empty()) {
return cntx->SendError(WrongNumArgsError("DFLYCLUSTER MYID"));
}
BorysTheDev marked this conversation as resolved.
Show resolved Hide resolved
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
rb->SendBulkString(id_);
void ClusterFamily::ClusterMyId(ConnectionContext* cntx) {
cntx->SendSimpleString(id_);
}

namespace {
Expand Down
2 changes: 1 addition & 1 deletion src/server/cluster/cluster_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class ClusterFamily {
void ClusterSlots(ConnectionContext* cntx);
void ClusterNodes(ConnectionContext* cntx);
void ClusterInfo(ConnectionContext* cntx);
void ClusterMyId(ConnectionContext* cntx);

void KeySlot(CmdArgList args, ConnectionContext* cntx);

Expand All @@ -56,7 +57,6 @@ class ClusterFamily {
void DflyCluster(CmdArgList args, ConnectionContext* cntx);
void DflyClusterConfig(CmdArgList args, ConnectionContext* cntx);
void DflyClusterGetSlotInfo(CmdArgList args, ConnectionContext* cntx);
void DflyClusterMyId(CmdArgList args, ConnectionContext* cntx);
void DflyClusterFlushSlots(CmdArgList args, ConnectionContext* cntx);

private: // Slots migration section
Expand Down
11 changes: 8 additions & 3 deletions src/server/cluster/cluster_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class ClusterFamilyTest : public BaseFamilyTest {
static constexpr string_view kInvalidConfiguration = "Invalid cluster configuration";

string GetMyId() {
return RunPrivileged({"dflycluster", "myid"}).GetString();
return Run({"cluster", "myid"}).GetString();
}

void ConfigSingleNodeCluster(string id) {
Expand Down Expand Up @@ -735,8 +735,13 @@ TEST_F(ClusterFamilyEmulatedTest, ClusterSlots) {
}

TEST_F(ClusterFamilyEmulatedTest, ClusterNodes) {
EXPECT_THAT(Run({"cluster", "nodes"}),
GetMyId() + " fake-host:6379@6379 myself,master - 0 0 0 connected 0-16383\n");
auto res = Run({"cluster", "nodes"});
EXPECT_THAT(res, GetMyId() + " fake-host:6379@6379 myself,master - 0 0 0 connected 0-16383\n");
}

TEST_F(ClusterFamilyEmulatedTest, ForbidenCommands) {
auto res = Run({"DFLYCLUSTER", "GETSLOTINFO", "SLOTS", "1"});
EXPECT_THAT(res, ErrArg("Cluster is disabled. Use --cluster_mode=yes to enable."));
}

} // namespace
Expand Down
31 changes: 16 additions & 15 deletions tests/dragonfly/cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,15 @@ class NodeInfo:


async def create_node_info(instance):
admin_client = instance.admin_client()
client = instance.client()
node_id = await get_node_id(client)
ninfo = NodeInfo(
instance=instance,
client=instance.client(),
admin_client=admin_client,
client=client,
admin_client=instance.admin_client(),
slots=[],
migrations=[],
id=await get_node_id(admin_client),
id=node_id,
)
return ninfo

Expand Down Expand Up @@ -169,8 +170,8 @@ def key_slot(key_str) -> int:
return crc_hqx(key, 0) % 16384


async def get_node_id(admin_connection):
id = await admin_connection.execute_command("DFLYCLUSTER MYID")
async def get_node_id(connection):
id = await connection.execute_command("CLUSTER MYID")
assert isinstance(id, str)
return id

Expand Down Expand Up @@ -257,11 +258,11 @@ async def test_emulated_cluster_with_replicas(df_factory):
df_factory.start_all([master, *replicas])

c_master = aioredis.Redis(port=master.port)
master_id = (await c_master.execute_command("dflycluster myid")).decode("utf-8")
master_id = (await c_master.execute_command("CLUSTER MYID")).decode("utf-8")

c_replicas = [aioredis.Redis(port=replica.port) for replica in replicas]
replica_ids = [
(await c_replica.execute_command("dflycluster myid")).decode("utf-8")
(await c_replica.execute_command("CLUSTER MYID")).decode("utf-8")
for c_replica in c_replicas
]

Expand Down Expand Up @@ -403,7 +404,7 @@ async def test_cluster_slot_ownership_changes(df_factory: DflyInstanceFactory):
c_nodes = [node.client() for node in nodes]
c_nodes_admin = [node.admin_client() for node in nodes]

node_ids = await asyncio.gather(*(get_node_id(c) for c in c_nodes_admin))
node_ids = await asyncio.gather(*(get_node_id(c) for c in c_nodes))

config = f"""
[
Expand Down Expand Up @@ -529,8 +530,8 @@ async def test_cluster_replica_sets_non_owned_keys(df_factory: DflyInstanceFacto
df_factory.start_all([master, replica])

async with master.client() as c_master, master.admin_client() as c_master_admin, replica.client() as c_replica, replica.admin_client() as c_replica_admin:
master_id = await get_node_id(c_master_admin)
replica_id = await get_node_id(c_replica_admin)
master_id = await get_node_id(c_master)
replica_id = await get_node_id(c_replica)

config = f"""
[
Expand Down Expand Up @@ -640,11 +641,11 @@ async def test_cluster_flush_slots_after_config_change(df_factory: DflyInstanceF

c_master = master.client()
c_master_admin = master.admin_client()
master_id = await get_node_id(c_master_admin)
master_id = await get_node_id(c_master)

c_replica = replica.client()
c_replica_admin = replica.admin_client()
replica_id = await get_node_id(c_replica_admin)
replica_id = await get_node_id(c_replica)

config = f"""
[
Expand Down Expand Up @@ -748,7 +749,7 @@ async def test_cluster_blocking_command(df_server):
config = [
{
"slot_ranges": [{"start": 0, "end": 8000}],
"master": {"id": await get_node_id(c_master_admin), "ip": "10.0.0.1", "port": 7000},
"master": {"id": await get_node_id(c_master), "ip": "10.0.0.1", "port": 7000},
"replicas": [],
},
{
Expand Down Expand Up @@ -820,7 +821,7 @@ async def test_cluster_native_client(
c_replicas = [replica.client() for replica in replicas]
await asyncio.gather(*(wait_available_async(c) for c in c_replicas))
c_replicas_admin = [replica.admin_client() for replica in replicas]
replica_ids = await asyncio.gather(*(get_node_id(c) for c in c_replicas_admin))
replica_ids = await asyncio.gather(*(get_node_id(c) for c in c_replicas))

config = f"""
[
Expand Down
3 changes: 0 additions & 3 deletions tests/dragonfly/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ def __init__(self, host, port, remote_host, remote_port):
self.stop_connections = []
self.server = None

async def __del__(self):
await self.close()

async def handle(self, reader, writer):
remote_reader, remote_writer = await asyncio.open_connection(
self.remote_host, self.remote_port
Expand Down
Loading