From 9ceed8f18f43aa3d5deb4c5a946da590f7ba90d6 Mon Sep 17 00:00:00 2001 From: "Colin P. McCabe" Date: Tue, 4 Jun 2024 14:04:59 -0700 Subject: [PATCH] KAFKA-16535: Implement AddVoter, RemoveVoter, UpdateVoter RPCs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implement the add voter, remove voter, and update voter RPCs for KIP-853. This is just adding the RPC handling; the current implementation in RaftManager just throws UnsupportedVersionException. Reviewers: Andrew Schofield , José Armando García Sancio --- .../clients/admin/AddRaftVoterOptions.java | 26 +++++ .../clients/admin/AddRaftVoterResult.java | 42 ++++++++ .../org/apache/kafka/clients/admin/Admin.java | 56 ++++++++++ .../kafka/clients/admin/ForwardingAdmin.java | 10 ++ .../kafka/clients/admin/KafkaAdminClient.java | 100 ++++++++++++++++++ .../clients/admin/RaftVoterEndpoint.java | 100 ++++++++++++++++++ .../clients/admin/RemoveRaftVoterOptions.java | 26 +++++ .../clients/admin/RemoveRaftVoterResult.java | 42 ++++++++ .../apache/kafka/common/protocol/ApiKeys.java | 5 +- .../common/requests/AbstractRequest.java | 6 ++ .../common/requests/AbstractResponse.java | 6 ++ .../common/requests/AddRaftVoterRequest.java | 75 +++++++++++++ .../common/requests/AddRaftVoterResponse.java | 65 ++++++++++++ .../requests/RemoveRaftVoterRequest.java | 75 +++++++++++++ .../requests/RemoveRaftVoterResponse.java | 65 ++++++++++++ .../requests/UpdateRaftVoterRequest.java | 73 +++++++++++++ .../requests/UpdateRaftVoterResponse.java | 65 ++++++++++++ .../common/message/AddRaftVoterRequest.json | 40 +++++++ .../common/message/AddRaftVoterResponse.json | 30 ++++++ .../message/RemoveRaftVoterRequest.json | 30 ++++++ .../message/RemoveRaftVoterResponse.json | 30 ++++++ .../message/UpdateRaftVoterRequest.json | 46 ++++++++ .../message/UpdateRaftVoterResponse.json | 28 +++++ .../kafka/clients/admin/MockAdminClient.java | 10 ++ .../common/requests/RequestResponseTest.java | 64 +++++++++++ .../kafka/network/RequestConvertToJson.scala | 6 ++ .../main/scala/kafka/raft/RaftManager.scala | 5 +- .../scala/kafka/server/ControllerApis.scala | 18 ++++ .../main/scala/kafka/server/KafkaApis.scala | 2 + .../unit/kafka/server/RequestQuotaTest.scala | 9 ++ .../unit/kafka/tools/StorageToolTest.scala | 10 +- 31 files changed, 1161 insertions(+), 4 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/AddRaftVoterOptions.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/AddRaftVoterResult.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/RaftVoterEndpoint.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/RemoveRaftVoterOptions.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/RemoveRaftVoterResult.java create mode 100644 clients/src/main/java/org/apache/kafka/common/requests/AddRaftVoterRequest.java create mode 100644 clients/src/main/java/org/apache/kafka/common/requests/AddRaftVoterResponse.java create mode 100644 clients/src/main/java/org/apache/kafka/common/requests/RemoveRaftVoterRequest.java create mode 100644 clients/src/main/java/org/apache/kafka/common/requests/RemoveRaftVoterResponse.java create mode 100644 clients/src/main/java/org/apache/kafka/common/requests/UpdateRaftVoterRequest.java create mode 100644 clients/src/main/java/org/apache/kafka/common/requests/UpdateRaftVoterResponse.java create mode 100644 clients/src/main/resources/common/message/AddRaftVoterRequest.json create mode 100644 clients/src/main/resources/common/message/AddRaftVoterResponse.json create mode 100644 clients/src/main/resources/common/message/RemoveRaftVoterRequest.json create mode 100644 clients/src/main/resources/common/message/RemoveRaftVoterResponse.json create mode 100644 clients/src/main/resources/common/message/UpdateRaftVoterRequest.json create mode 100644 clients/src/main/resources/common/message/UpdateRaftVoterResponse.json diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AddRaftVoterOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/AddRaftVoterOptions.java new file mode 100644 index 0000000000000..d917c09fa1e30 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AddRaftVoterOptions.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.annotation.InterfaceStability; + +/** + * Options for {@link Admin#addRaftVoter}. + */ +@InterfaceStability.Stable +public class AddRaftVoterOptions extends AbstractOptions { +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AddRaftVoterResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/AddRaftVoterResult.java new file mode 100644 index 0000000000000..d42204c5e4e79 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AddRaftVoterResult.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; + +/** + * The result of {@link org.apache.kafka.clients.admin.Admin#addRaftVoter(int, org.apache.kafka.common.Uuid, java.util.Set, org.apache.kafka.clients.admin.AddRaftVoterOptions)}. + * + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Stable +public class AddRaftVoterResult { + private final KafkaFuture result; + + AddRaftVoterResult(KafkaFuture result) { + this.result = result; + } + + /** + * Returns a future that completes when the voter has been added. + */ + public KafkaFuture all() { + return result; + } + +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java index 291250aae9160..8d62069b279f3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java @@ -1711,6 +1711,62 @@ default ListClientMetricsResourcesResult listClientMetricsResources() { */ Uuid clientInstanceId(Duration timeout); + /** + * Add a new voter node to the KRaft metadata quorum. + * + * @param voterId The node ID of the voter. + * @param voterDirectoryId The directory ID of the voter. + * @param endpoints The endpoints that the new voter has. + */ + default AddRaftVoterResult addRaftVoter( + int voterId, + Uuid voterDirectoryId, + Set endpoints + ) { + return addRaftVoter(voterId, voterDirectoryId, endpoints, new AddRaftVoterOptions()); + } + + /** + * Add a new voter node to the KRaft metadata quorum. + * + * @param voterId The node ID of the voter. + * @param voterDirectoryId The directory ID of the voter. + * @param endpoints The endpoints that the new voter has. + * @param options The options to use when adding the new voter node. + */ + AddRaftVoterResult addRaftVoter( + int voterId, + Uuid voterDirectoryId, + Set endpoints, + AddRaftVoterOptions options + ); + + /** + * Remove a voter node from the KRaft metadata quorum. + * + * @param voterId The node ID of the voter. + * @param voterDirectoryId The directory ID of the voter. + */ + default RemoveRaftVoterResult removeRaftVoter( + int voterId, + Uuid voterDirectoryId + ) { + return removeRaftVoter(voterId, voterDirectoryId, new RemoveRaftVoterOptions()); + } + + /** + * Remove a voter node from the KRaft metadata quorum. + * + * @param voterId The node ID of the voter. + * @param voterDirectoryId The directory ID of the voter. + * @param options The options to use when removing the voter node. + */ + RemoveRaftVoterResult removeRaftVoter( + int voterId, + Uuid voterDirectoryId, + RemoveRaftVoterOptions options + ); + /** * Get the metrics kept by the adminClient */ diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java index 9fc809dbddd81..ad15c22498f7c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java @@ -288,6 +288,16 @@ public Uuid clientInstanceId(Duration timeout) { return delegate.clientInstanceId(timeout); } + @Override + public AddRaftVoterResult addRaftVoter(int voterId, Uuid voterDirectoryId, Set endpoints, AddRaftVoterOptions options) { + return delegate.addRaftVoter(voterId, voterDirectoryId, endpoints, options); + } + + @Override + public RemoveRaftVoterResult removeRaftVoter(int voterId, Uuid voterDirectoryId, RemoveRaftVoterOptions options) { + return delegate.removeRaftVoter(voterId, voterDirectoryId, options); + } + @Override public Map metrics() { return delegate.metrics(); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 92ba6ad3d6c1f..5c8d9ebb7990e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -97,6 +97,7 @@ import org.apache.kafka.common.errors.UnsupportedSaslMechanismException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.message.AddRaftVoterRequestData; import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData; import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignableTopic; import org.apache.kafka.common.message.AlterReplicaLogDirsRequestData; @@ -154,6 +155,7 @@ import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData; import org.apache.kafka.common.message.MetadataRequestData; +import org.apache.kafka.common.message.RemoveRaftVoterRequestData; import org.apache.kafka.common.message.RenewDelegationTokenRequestData; import org.apache.kafka.common.message.UnregisterBrokerRequestData; import org.apache.kafka.common.message.UpdateFeaturesRequestData; @@ -170,6 +172,8 @@ import org.apache.kafka.common.quota.ClientQuotaFilter; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.AddRaftVoterRequest; +import org.apache.kafka.common.requests.AddRaftVoterResponse; import org.apache.kafka.common.requests.AlterClientQuotasRequest; import org.apache.kafka.common.requests.AlterClientQuotasResponse; import org.apache.kafka.common.requests.AlterConfigsRequest; @@ -232,6 +236,7 @@ import org.apache.kafka.common.requests.ListPartitionReassignmentsResponse; import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.RemoveRaftVoterRequest; import org.apache.kafka.common.requests.RenewDelegationTokenRequest; import org.apache.kafka.common.requests.RenewDelegationTokenResponse; import org.apache.kafka.common.requests.UnregisterBrokerRequest; @@ -4604,6 +4609,101 @@ void handleFailure(Throwable throwable) { return new ListClientMetricsResourcesResult(future); } + @Override + public AddRaftVoterResult addRaftVoter( + int voterId, + Uuid voterDirectoryId, + Set endpoints, + AddRaftVoterOptions options + ) { + NodeProvider provider = new LeastLoadedBrokerOrActiveKController(); + + final KafkaFutureImpl future = new KafkaFutureImpl<>(); + final long now = time.milliseconds(); + final Call call = new Call( + "addRaftVoter", calcDeadlineMs(now, options.timeoutMs()), provider) { + + @Override + AddRaftVoterRequest.Builder createRequest(int timeoutMs) { + AddRaftVoterRequestData.ListenerCollection listeners = + new AddRaftVoterRequestData.ListenerCollection(); + endpoints.forEach(endpoint -> + listeners.add(new AddRaftVoterRequestData.Listener(). + setName(endpoint.name()). + setHost(endpoint.host()). + setPort(endpoint.port()))); + return new AddRaftVoterRequest.Builder( + new AddRaftVoterRequestData(). + setVoterId(voterId) . + setVoterDirectoryId(voterDirectoryId). + setListeners(listeners)); + } + + @Override + void handleResponse(AbstractResponse response) { + AddRaftVoterResponse addResponse = (AddRaftVoterResponse) response; + if (addResponse.data().errorCode() != Errors.NONE.code()) { + ApiError error = new ApiError( + addResponse.data().errorCode(), + addResponse.data().errorMessage()); + future.completeExceptionally(error.exception()); + } else { + future.complete(null); + } + } + + @Override + void handleFailure(Throwable throwable) { + future.completeExceptionally(throwable); + } + }; + runnable.call(call, now); + return new AddRaftVoterResult(future); + } + + @Override + public RemoveRaftVoterResult removeRaftVoter( + int voterId, + Uuid voterDirectoryId, + RemoveRaftVoterOptions options + ) { + NodeProvider provider = new LeastLoadedBrokerOrActiveKController(); + + final KafkaFutureImpl future = new KafkaFutureImpl<>(); + final long now = time.milliseconds(); + final Call call = new Call( + "removeRaftVoter", calcDeadlineMs(now, options.timeoutMs()), provider) { + + @Override + RemoveRaftVoterRequest.Builder createRequest(int timeoutMs) { + return new RemoveRaftVoterRequest.Builder( + new RemoveRaftVoterRequestData(). + setVoterId(voterId) . + setVoterDirectoryId(voterDirectoryId)); + } + + @Override + void handleResponse(AbstractResponse response) { + AddRaftVoterResponse addResponse = (AddRaftVoterResponse) response; + if (addResponse.data().errorCode() != Errors.NONE.code()) { + ApiError error = new ApiError( + addResponse.data().errorCode(), + addResponse.data().errorMessage()); + future.completeExceptionally(error.exception()); + } else { + future.complete(null); + } + } + + @Override + void handleFailure(Throwable throwable) { + future.completeExceptionally(throwable); + } + }; + runnable.call(call, now); + return new RemoveRaftVoterResult(future); + } + @Override public Uuid clientInstanceId(Duration timeout) { if (timeout.isNegative()) { diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/RaftVoterEndpoint.java b/clients/src/main/java/org/apache/kafka/clients/admin/RaftVoterEndpoint.java new file mode 100644 index 0000000000000..ec9cbfd1fb058 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/RaftVoterEndpoint.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Locale; +import java.util.Objects; + +/** + * An endpoint for a raft quorum voter. + */ +@InterfaceStability.Stable +public class RaftVoterEndpoint { + private final String name; + private final String host; + private final int port; + + static String requireNonNullAllCapsNonEmpty(String input) { + if (input == null) { + throw new IllegalArgumentException("Null argument not allowed."); + } + if (!input.trim().equals(input)) { + throw new IllegalArgumentException("Leading or trailing whitespace is not allowed."); + } + if (input.isEmpty()) { + throw new IllegalArgumentException("Empty string is not allowed."); + } + if (!input.toUpperCase(Locale.ROOT).equals(input)) { + throw new IllegalArgumentException("String must be UPPERCASE."); + } + return input; + } + + /** + * Create an endpoint for a metadata quorum voter. + * + * @param name The human-readable name for this endpoint. For example, CONTROLLER. + * @param host The DNS hostname for this endpoint. + * @param port The network port for this endpoint. + */ + public RaftVoterEndpoint( + String name, + String host, + int port + ) { + this.name = requireNonNullAllCapsNonEmpty(name); + this.host = Objects.requireNonNull(host); + this.port = port; + } + + public String name() { + return name; + } + + public String host() { + return host; + } + + public int port() { + return port; + } + + @Override + public boolean equals(Object o) { + if (o == null || (!o.getClass().equals(getClass()))) return false; + RaftVoterEndpoint other = (RaftVoterEndpoint) o; + return name.equals(other.name) && + host.equals(other.host) && + port == other.port; + } + + @Override + public int hashCode() { + return Objects.hash(name, host, port); + } + + @Override + public String toString() { + return "RaftVoterEndpoint" + + "(name=" + name + + ", host=" + host + + ", port=" + port + + ")"; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/RemoveRaftVoterOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/RemoveRaftVoterOptions.java new file mode 100644 index 0000000000000..afffdcc2d86b8 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/RemoveRaftVoterOptions.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.annotation.InterfaceStability; + +/** + * Options for {@link Admin#removeRaftVoter}. + */ +@InterfaceStability.Stable +public class RemoveRaftVoterOptions extends AbstractOptions { +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/RemoveRaftVoterResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/RemoveRaftVoterResult.java new file mode 100644 index 0000000000000..8e8e99ddbfc21 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/RemoveRaftVoterResult.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; + +/** + * The result of {@link org.apache.kafka.clients.admin.Admin#removeRaftVoter(int, org.apache.kafka.common.Uuid, org.apache.kafka.clients.admin.RemoveRaftVoterOptions)}. + * + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Stable +public class RemoveRaftVoterResult { + private final KafkaFuture result; + + RemoveRaftVoterResult(KafkaFuture result) { + this.result = result; + } + + /** + * Returns a future that completes when the voter has been removed. + */ + public KafkaFuture all() { + return result; + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index ffd5737ca3162..7607c1845a391 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -122,7 +122,10 @@ public enum ApiKeys { SHARE_GROUP_HEARTBEAT(ApiMessageType.SHARE_GROUP_HEARTBEAT), SHARE_GROUP_DESCRIBE(ApiMessageType.SHARE_GROUP_DESCRIBE), SHARE_FETCH(ApiMessageType.SHARE_FETCH), - SHARE_ACKNOWLEDGE(ApiMessageType.SHARE_ACKNOWLEDGE); + SHARE_ACKNOWLEDGE(ApiMessageType.SHARE_ACKNOWLEDGE), + ADD_RAFT_VOTER(ApiMessageType.ADD_RAFT_VOTER), + REMOVE_RAFT_VOTER(ApiMessageType.REMOVE_RAFT_VOTER), + UPDATE_RAFT_VOTER(ApiMessageType.UPDATE_RAFT_VOTER); private static final Map> APIS_BY_LISTENER = new EnumMap<>(ApiMessageType.ListenerType.class); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index 589e163992b22..eaa5e6dcb8bbd 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -334,6 +334,12 @@ private static AbstractRequest doParseRequest(ApiKeys apiKey, short apiVersion, return ShareFetchRequest.parse(buffer, apiVersion); case SHARE_ACKNOWLEDGE: return ShareAcknowledgeRequest.parse(buffer, apiVersion); + case ADD_RAFT_VOTER: + return AddRaftVoterRequest.parse(buffer, apiVersion); + case REMOVE_RAFT_VOTER: + return RemoveRaftVoterRequest.parse(buffer, apiVersion); + case UPDATE_RAFT_VOTER: + return UpdateRaftVoterRequest.parse(buffer, apiVersion); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index 5534168098e9d..8210fb2a7f6b3 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -271,6 +271,12 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, ByteBuffer response return ShareFetchResponse.parse(responseBuffer, version); case SHARE_ACKNOWLEDGE: return ShareAcknowledgeResponse.parse(responseBuffer, version); + case ADD_RAFT_VOTER: + return AddRaftVoterResponse.parse(responseBuffer, version); + case REMOVE_RAFT_VOTER: + return RemoveRaftVoterResponse.parse(responseBuffer, version); + case UPDATE_RAFT_VOTER: + return UpdateRaftVoterResponse.parse(responseBuffer, version); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddRaftVoterRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddRaftVoterRequest.java new file mode 100644 index 0000000000000..2d385d861c4ac --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/AddRaftVoterRequest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.AddRaftVoterRequestData; +import org.apache.kafka.common.message.AddRaftVoterResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +import java.nio.ByteBuffer; + +public class AddRaftVoterRequest extends AbstractRequest { + public static class Builder extends AbstractRequest.Builder { + private final AddRaftVoterRequestData data; + + public Builder(AddRaftVoterRequestData data) { + super(ApiKeys.ADD_RAFT_VOTER); + this.data = data; + } + + @Override + public AddRaftVoterRequest build(short version) { + return new AddRaftVoterRequest(data, version); + } + + @Override + public String toString() { + return data.toString(); + } + + } + + private final AddRaftVoterRequestData data; + + public AddRaftVoterRequest(AddRaftVoterRequestData data, short version) { + super(ApiKeys.ADD_RAFT_VOTER, version); + this.data = data; + } + + @Override + public AddRaftVoterRequestData data() { + return data; + } + + @Override + public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { + Errors error = Errors.forException(e); + return new AddRaftVoterResponse(new AddRaftVoterResponseData(). + setErrorCode(error.code()). + setErrorMessage(error.message()). + setThrottleTimeMs(throttleTimeMs)); + } + + public static AddRaftVoterRequest parse(ByteBuffer buffer, short version) { + return new AddRaftVoterRequest( + new AddRaftVoterRequestData(new ByteBufferAccessor(buffer), version), + version); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddRaftVoterResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddRaftVoterResponse.java new file mode 100644 index 0000000000000..3b931702d4419 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/AddRaftVoterResponse.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.AddRaftVoterResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Map; + +public class AddRaftVoterResponse extends AbstractResponse { + private final AddRaftVoterResponseData data; + + public AddRaftVoterResponse(AddRaftVoterResponseData data) { + super(ApiKeys.ADD_RAFT_VOTER); + this.data = data; + } + + @Override + public AddRaftVoterResponseData data() { + return data; + } + + @Override + public int throttleTimeMs() { + return data.throttleTimeMs(); + } + + @Override + public void maybeSetThrottleTimeMs(int throttleTimeMs) { + // not supported + } + + @Override + public Map errorCounts() { + if (data.errorCode() != Errors.NONE.code()) { + return Collections.singletonMap(Errors.forCode(data.errorCode()), 1); + } else { + return Collections.emptyMap(); + } + } + + public static AddRaftVoterResponse parse(ByteBuffer buffer, short version) { + return new AddRaftVoterResponse( + new AddRaftVoterResponseData(new ByteBufferAccessor(buffer), version)); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RemoveRaftVoterRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/RemoveRaftVoterRequest.java new file mode 100644 index 0000000000000..cf5f1dc0ce20b --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/RemoveRaftVoterRequest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.RemoveRaftVoterRequestData; +import org.apache.kafka.common.message.RemoveRaftVoterResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +import java.nio.ByteBuffer; + +public class RemoveRaftVoterRequest extends AbstractRequest { + public static class Builder extends AbstractRequest.Builder { + private final RemoveRaftVoterRequestData data; + + public Builder(RemoveRaftVoterRequestData data) { + super(ApiKeys.REMOVE_RAFT_VOTER); + this.data = data; + } + + @Override + public RemoveRaftVoterRequest build(short version) { + return new RemoveRaftVoterRequest(data, version); + } + + @Override + public String toString() { + return data.toString(); + } + + } + + private final RemoveRaftVoterRequestData data; + + public RemoveRaftVoterRequest(RemoveRaftVoterRequestData data, short version) { + super(ApiKeys.REMOVE_RAFT_VOTER, version); + this.data = data; + } + + @Override + public RemoveRaftVoterRequestData data() { + return data; + } + + @Override + public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { + Errors error = Errors.forException(e); + return new RemoveRaftVoterResponse(new RemoveRaftVoterResponseData(). + setErrorCode(error.code()). + setErrorMessage(error.message()). + setThrottleTimeMs(throttleTimeMs)); + } + + public static RemoveRaftVoterRequest parse(ByteBuffer buffer, short version) { + return new RemoveRaftVoterRequest( + new RemoveRaftVoterRequestData(new ByteBufferAccessor(buffer), version), + version); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RemoveRaftVoterResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/RemoveRaftVoterResponse.java new file mode 100644 index 0000000000000..a74711e2f836f --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/RemoveRaftVoterResponse.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.RemoveRaftVoterResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Map; + +public class RemoveRaftVoterResponse extends AbstractResponse { + private final RemoveRaftVoterResponseData data; + + public RemoveRaftVoterResponse(RemoveRaftVoterResponseData data) { + super(ApiKeys.REMOVE_RAFT_VOTER); + this.data = data; + } + + @Override + public RemoveRaftVoterResponseData data() { + return data; + } + + @Override + public int throttleTimeMs() { + return data.throttleTimeMs(); + } + + @Override + public void maybeSetThrottleTimeMs(int throttleTimeMs) { + // not supported + } + + @Override + public Map errorCounts() { + if (data.errorCode() != Errors.NONE.code()) { + return Collections.singletonMap(Errors.forCode(data.errorCode()), 1); + } else { + return Collections.emptyMap(); + } + } + + public static RemoveRaftVoterResponse parse(ByteBuffer buffer, short version) { + return new RemoveRaftVoterResponse( + new RemoveRaftVoterResponseData(new ByteBufferAccessor(buffer), version)); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateRaftVoterRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateRaftVoterRequest.java new file mode 100644 index 0000000000000..eb02673e1da99 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateRaftVoterRequest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.UpdateRaftVoterRequestData; +import org.apache.kafka.common.message.UpdateRaftVoterResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +import java.nio.ByteBuffer; + +public class UpdateRaftVoterRequest extends AbstractRequest { + public static class Builder extends AbstractRequest.Builder { + private final UpdateRaftVoterRequestData data; + + public Builder(UpdateRaftVoterRequestData data) { + super(ApiKeys.UPDATE_RAFT_VOTER); + this.data = data; + } + + @Override + public UpdateRaftVoterRequest build(short version) { + return new UpdateRaftVoterRequest(data, version); + } + + @Override + public String toString() { + return data.toString(); + } + + } + + private final UpdateRaftVoterRequestData data; + + public UpdateRaftVoterRequest(UpdateRaftVoterRequestData data, short version) { + super(ApiKeys.UPDATE_RAFT_VOTER, version); + this.data = data; + } + + @Override + public UpdateRaftVoterRequestData data() { + return data; + } + + @Override + public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { + return new UpdateRaftVoterResponse(new UpdateRaftVoterResponseData(). + setErrorCode(Errors.forException(e).code()). + setThrottleTimeMs(throttleTimeMs)); + } + + public static UpdateRaftVoterRequest parse(ByteBuffer buffer, short version) { + return new UpdateRaftVoterRequest( + new UpdateRaftVoterRequestData(new ByteBufferAccessor(buffer), version), + version); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateRaftVoterResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateRaftVoterResponse.java new file mode 100644 index 0000000000000..5c89caed2ef94 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateRaftVoterResponse.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.UpdateRaftVoterResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Map; + +public class UpdateRaftVoterResponse extends AbstractResponse { + private final UpdateRaftVoterResponseData data; + + public UpdateRaftVoterResponse(UpdateRaftVoterResponseData data) { + super(ApiKeys.UPDATE_RAFT_VOTER); + this.data = data; + } + + @Override + public UpdateRaftVoterResponseData data() { + return data; + } + + @Override + public int throttleTimeMs() { + return data.throttleTimeMs(); + } + + @Override + public void maybeSetThrottleTimeMs(int throttleTimeMs) { + // not supported + } + + @Override + public Map errorCounts() { + if (data.errorCode() != Errors.NONE.code()) { + return Collections.singletonMap(Errors.forCode(data.errorCode()), 1); + } else { + return Collections.emptyMap(); + } + } + + public static UpdateRaftVoterResponse parse(ByteBuffer buffer, short version) { + return new UpdateRaftVoterResponse( + new UpdateRaftVoterResponseData(new ByteBufferAccessor(buffer), version)); + } +} diff --git a/clients/src/main/resources/common/message/AddRaftVoterRequest.json b/clients/src/main/resources/common/message/AddRaftVoterRequest.json new file mode 100644 index 0000000000000..e2f129b13353a --- /dev/null +++ b/clients/src/main/resources/common/message/AddRaftVoterRequest.json @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 80, + "type": "request", + "listeners": ["controller", "broker"], + "name": "AddRaftVoterRequest", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "ClusterId", "type": "string", "versions": "0+" }, + { "name": "TimeoutMs", "type": "int32", "versions": "0+" }, + { "name": "VoterId", "type": "int32", "versions": "0+", + "about": "The replica id of the voter getting added to the topic partition" }, + { "name": "VoterDirectoryId", "type": "uuid", "versions": "0+", + "about": "The directory id of the voter getting added to the topic partition" }, + { "name": "Listeners", "type": "[]Listener", "versions": "0+", + "about": "The endpoints that can be used to communicate with the voter", "fields": [ + { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, + "about": "The name of the endpoint" }, + { "name": "Host", "type": "string", "versions": "0+", + "about": "The hostname" }, + { "name": "Port", "type": "uint16", "versions": "0+", + "about": "The port" } + ]} + ] +} diff --git a/clients/src/main/resources/common/message/AddRaftVoterResponse.json b/clients/src/main/resources/common/message/AddRaftVoterResponse.json new file mode 100644 index 0000000000000..3173f0d4d3a57 --- /dev/null +++ b/clients/src/main/resources/common/message/AddRaftVoterResponse.json @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 80, + "type": "response", + "name": "AddRaftVoterResponse", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The error code, or 0 if there was no error" }, + { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "ignorable": true, + "about": "The error message, or null if there was no error." } + ] +} diff --git a/clients/src/main/resources/common/message/RemoveRaftVoterRequest.json b/clients/src/main/resources/common/message/RemoveRaftVoterRequest.json new file mode 100644 index 0000000000000..182c8647db629 --- /dev/null +++ b/clients/src/main/resources/common/message/RemoveRaftVoterRequest.json @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 81, + "type": "request", + "listeners": ["controller", "broker"], + "name": "RemoveRaftVoterRequest", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "ClusterId", "type": "string", "versions": "0+" }, + { "name": "VoterId", "type": "int32", "versions": "0+", + "about": "The replica id of the voter getting removed from the topic partition" }, + { "name": "VoterDirectoryId", "type": "uuid", "versions": "0+", + "about": "The directory id of the voter getting removed from the topic partition" } + ] +} diff --git a/clients/src/main/resources/common/message/RemoveRaftVoterResponse.json b/clients/src/main/resources/common/message/RemoveRaftVoterResponse.json new file mode 100644 index 0000000000000..5f62059f35047 --- /dev/null +++ b/clients/src/main/resources/common/message/RemoveRaftVoterResponse.json @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 81, + "type": "response", + "name": "RemoveRaftVoterResponse", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The error code, or 0 if there was no error" }, + { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "ignorable": true, + "about": "The error message, or null if there was no error." } + ] +} diff --git a/clients/src/main/resources/common/message/UpdateRaftVoterRequest.json b/clients/src/main/resources/common/message/UpdateRaftVoterRequest.json new file mode 100644 index 0000000000000..80ee58a43a3d6 --- /dev/null +++ b/clients/src/main/resources/common/message/UpdateRaftVoterRequest.json @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 82, + "type": "request", + "listeners": ["controller"], + "name": "UpdateRaftVoterRequest", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "ClusterId", "type": "string", "versions": "0+" }, + { "name": "VoterId", "type": "int32", "versions": "0+", + "about": "The replica id of the voter getting updated in the topic partition" }, + { "name": "VoterDirectoryId", "type": "uuid", "versions": "0+", + "about": "The directory id of the voter getting updated in the topic partition" }, + { "name": "Listeners", "type": "[]Listener", "versions": "0+", + "about": "The endpoint that can be used to communicate with the leader", "fields": [ + { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, + "about": "The name of the endpoint" }, + { "name": "Host", "type": "string", "versions": "0+", + "about": "The hostname" }, + { "name": "Port", "type": "uint16", "versions": "0+", + "about": "The port" } + ]}, + { "name": "KRaftVersionFeature", "type": "KRaftVersionFeature", "versions": "0+", + "about": "The range of versions of the protocol that the replica supports", "fields": [ + { "name": "MinSupportedVersion", "type": "int16", "versions": "0+", + "about": "The minimum supported KRaft protocol version" }, + { "name": "MaxSupportedVersion", "type": "int16", "versions": "0+", + "about": "The maximum supported KRaft protocol version" } + ]} + ] +} diff --git a/clients/src/main/resources/common/message/UpdateRaftVoterResponse.json b/clients/src/main/resources/common/message/UpdateRaftVoterResponse.json new file mode 100644 index 0000000000000..64816406c7426 --- /dev/null +++ b/clients/src/main/resources/common/message/UpdateRaftVoterResponse.json @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 82, + "type": "response", + "name": "UpdateRaftVoterResponse", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The error code, or 0 if there was no error" } + ] +} diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index f72362715e26e..b88847227abea 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -1320,6 +1320,16 @@ public ListClientMetricsResourcesResult listClientMetricsResources(ListClientMet return new ListClientMetricsResourcesResult(future); } + @Override + public AddRaftVoterResult addRaftVoter(int voterId, Uuid voterDirectoryId, Set endpoints, AddRaftVoterOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public RemoveRaftVoterResult removeRaftVoter(int voterId, Uuid voterDirectoryId, RemoveRaftVoterOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + @Override synchronized public void close(Duration timeout) {} diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 7352d7a129e8e..b53d2cedd080d 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -44,6 +44,8 @@ import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection; import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTransaction; import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTransactionCollection; +import org.apache.kafka.common.message.AddRaftVoterRequestData; +import org.apache.kafka.common.message.AddRaftVoterResponseData; import org.apache.kafka.common.message.AllocateProducerIdsRequestData; import org.apache.kafka.common.message.AllocateProducerIdsResponseData; import org.apache.kafka.common.message.AlterClientQuotasResponseData; @@ -205,6 +207,8 @@ import org.apache.kafka.common.message.ProduceResponseData; import org.apache.kafka.common.message.PushTelemetryRequestData; import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.message.RemoveRaftVoterRequestData; +import org.apache.kafka.common.message.RemoveRaftVoterResponseData; import org.apache.kafka.common.message.RenewDelegationTokenRequestData; import org.apache.kafka.common.message.RenewDelegationTokenResponseData; import org.apache.kafka.common.message.SaslAuthenticateRequestData; @@ -233,6 +237,8 @@ import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataEndpoint; import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState; import org.apache.kafka.common.message.UpdateMetadataResponseData; +import org.apache.kafka.common.message.UpdateRaftVoterRequestData; +import org.apache.kafka.common.message.UpdateRaftVoterResponseData; import org.apache.kafka.common.message.VoteRequestData; import org.apache.kafka.common.message.VoteResponseData; import org.apache.kafka.common.network.ListenerName; @@ -1098,6 +1104,9 @@ private AbstractRequest getRequest(ApiKeys apikey, short version) { case SHARE_GROUP_DESCRIBE: return createShareGroupDescribeRequest(version); case SHARE_FETCH: return createShareFetchRequest(version); case SHARE_ACKNOWLEDGE: return createShareAcknowledgeRequest(version); + case ADD_RAFT_VOTER: return createAddRaftVoterRequest(version); + case REMOVE_RAFT_VOTER: return createRemoveRaftVoterRequest(version); + case UPDATE_RAFT_VOTER: return createUpdateRaftVoterRequest(version); default: throw new IllegalArgumentException("Unknown API key " + apikey); } } @@ -1184,6 +1193,9 @@ private AbstractResponse getResponse(ApiKeys apikey, short version) { case SHARE_GROUP_DESCRIBE: return createShareGroupDescribeResponse(); case SHARE_FETCH: return createShareFetchResponse(); case SHARE_ACKNOWLEDGE: return createShareAcknowledgeResponse(); + case ADD_RAFT_VOTER: return createAddRaftVoterResponse(); + case REMOVE_RAFT_VOTER: return createRemoveRaftVoterResponse(); + case UPDATE_RAFT_VOTER: return createUpdateRaftVoterResponse(); default: throw new IllegalArgumentException("Unknown API key " + apikey); } } @@ -1284,6 +1296,58 @@ private DescribeTopicPartitionsRequest createDescribeTopicPartitionsRequest(shor return new DescribeTopicPartitionsRequest.Builder(data).build(version); } + private AddRaftVoterRequest createAddRaftVoterRequest(short version) { + return new AddRaftVoterRequest(new AddRaftVoterRequestData(). + setClusterId("FmRMoH-iTCSFNnzgpkWT2A"). + setTimeoutMs(60_000). + setVoterId(1). + setVoterDirectoryId(Uuid.fromString("DZG26STKRxaelDpg2wqsXw")). + setListeners(new AddRaftVoterRequestData.ListenerCollection( + Collections.singletonList(new AddRaftVoterRequestData.Listener(). + setName("CONTROLLER"). + setHost("localhost"). + setPort(8080)).iterator()) + ), version); + } + + private AddRaftVoterResponse createAddRaftVoterResponse() { + return new AddRaftVoterResponse(new AddRaftVoterResponseData(). + setErrorCode((short) 0). + setErrorMessage(null)); + } + + private RemoveRaftVoterRequest createRemoveRaftVoterRequest(short version) { + return new RemoveRaftVoterRequest(new RemoveRaftVoterRequestData(). + setClusterId("FmRMoH-iTCSFNnzgpkWT2A"). + setVoterId(1). + setVoterDirectoryId(Uuid.fromString("DZG26STKRxaelDpg2wqsXw")), + version); + } + + private RemoveRaftVoterResponse createRemoveRaftVoterResponse() { + return new RemoveRaftVoterResponse(new RemoveRaftVoterResponseData(). + setErrorCode((short) 0). + setErrorMessage(null)); + } + + private UpdateRaftVoterRequest createUpdateRaftVoterRequest(short version) { + return new UpdateRaftVoterRequest(new UpdateRaftVoterRequestData(). + setClusterId("FmRMoH-iTCSFNnzgpkWT2A"). + setVoterId(1). + setVoterDirectoryId(Uuid.fromString("DZG26STKRxaelDpg2wqsXw")). + setListeners(new UpdateRaftVoterRequestData.ListenerCollection( + Collections.singletonList(new UpdateRaftVoterRequestData.Listener(). + setName("CONTROLLER"). + setHost("localhost"). + setPort(8080)).iterator())), + version); + } + + private UpdateRaftVoterResponse createUpdateRaftVoterResponse() { + return new UpdateRaftVoterResponse(new UpdateRaftVoterResponseData(). + setErrorCode((short) 0)); + } + private DescribeTopicPartitionsResponse createDescribeTopicPartitionsResponse() { DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopicCollection collection = new DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopicCollection(); diff --git a/core/src/main/scala/kafka/network/RequestConvertToJson.scala b/core/src/main/scala/kafka/network/RequestConvertToJson.scala index 0900b94ef9f4f..daed7f4e38c46 100644 --- a/core/src/main/scala/kafka/network/RequestConvertToJson.scala +++ b/core/src/main/scala/kafka/network/RequestConvertToJson.scala @@ -107,6 +107,9 @@ object RequestConvertToJson { case req: UpdateMetadataRequest => UpdateMetadataRequestDataJsonConverter.write(req.data, request.version) case req: VoteRequest => VoteRequestDataJsonConverter.write(req.data, request.version) case req: WriteTxnMarkersRequest => WriteTxnMarkersRequestDataJsonConverter.write(req.data, request.version) + case req: AddRaftVoterRequest => AddRaftVoterRequestDataJsonConverter.write(req.data, request.version) + case req: RemoveRaftVoterRequest => RemoveRaftVoterRequestDataJsonConverter.write(req.data, request.version) + case req: UpdateRaftVoterRequest => UpdateRaftVoterRequestDataJsonConverter.write(req.data, request.version) case _ => throw new IllegalStateException(s"ApiKey ${request.apiKey} is not currently handled in `request`, the " + "code should be updated to do so.") } @@ -194,6 +197,9 @@ object RequestConvertToJson { case res: UpdateMetadataResponse => UpdateMetadataResponseDataJsonConverter.write(res.data, version) case res: VoteResponse => VoteResponseDataJsonConverter.write(res.data, version) case res: WriteTxnMarkersResponse => WriteTxnMarkersResponseDataJsonConverter.write(res.data, version) + case res: AddRaftVoterResponse => AddRaftVoterResponseDataJsonConverter.write(res.data, version) + case res: RemoveRaftVoterResponse => RemoveRaftVoterResponseDataJsonConverter.write(res.data, version) + case res: UpdateRaftVoterResponse => UpdateRaftVoterResponseDataJsonConverter.write(res.data, version) case _ => throw new IllegalStateException(s"ApiKey ${response.apiKey} is not currently handled in `response`, the " + "code should be updated to do so.") } diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala index 0c45734593b36..48646e0b4d7d4 100644 --- a/core/src/main/scala/kafka/raft/RaftManager.scala +++ b/core/src/main/scala/kafka/raft/RaftManager.scala @@ -31,7 +31,10 @@ import kafka.utils.CoreUtils import kafka.utils.FileLock import kafka.utils.Logging import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient} -import org.apache.kafka.common.{KafkaException, Node, TopicPartition, Uuid} +import org.apache.kafka.common.KafkaException +import org.apache.kafka.common.Node +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.Uuid import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.{ChannelBuilders, ListenerName, NetworkReceive, Selectable, Selector} diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala index e81c37f96b74f..f4c0ba89f81f3 100644 --- a/core/src/main/scala/kafka/server/ControllerApis.scala +++ b/core/src/main/scala/kafka/server/ControllerApis.scala @@ -129,6 +129,9 @@ class ControllerApis( case ApiKeys.DESCRIBE_CLUSTER => handleDescribeCluster(request) case ApiKeys.CONTROLLER_REGISTRATION => handleControllerRegistration(request) case ApiKeys.ASSIGN_REPLICAS_TO_DIRS => handleAssignReplicasToDirs(request) + case ApiKeys.ADD_RAFT_VOTER => handleAddRaftVoter(request) + case ApiKeys.REMOVE_RAFT_VOTER => handleRemoveRaftVoter(request) + case ApiKeys.UPDATE_RAFT_VOTER => handleUpdateRaftVoter(request) case _ => throw new ApiException(s"Unsupported ApiKey ${request.context.header.apiKey}") } @@ -1080,4 +1083,19 @@ class ControllerApis( requestThrottleMs => new AssignReplicasToDirsResponse(reply.setThrottleTimeMs(requestThrottleMs))) } } + + def handleAddRaftVoter(request: RequestChannel.Request): CompletableFuture[Unit] = { + authHelper.authorizeClusterOperation(request, ALTER) + throw new UnsupportedVersionException("handleAddRaftVoter is not supported yet.") + } + + def handleRemoveRaftVoter(request: RequestChannel.Request): CompletableFuture[Unit] = { + authHelper.authorizeClusterOperation(request, ALTER) + throw new UnsupportedVersionException("handleRemoveRaftVoter is not supported yet.") + } + + def handleUpdateRaftVoter(request: RequestChannel.Request): CompletableFuture[Unit] = { + authHelper.authorizeClusterOperation(request, CLUSTER_ACTION) + throw new UnsupportedVersionException("handleUpdateRaftVoter is not supported yet.") + } } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index b76ebff59cb78..8c9aada899862 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -256,6 +256,8 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS => handleGetTelemetrySubscriptionsRequest(request) case ApiKeys.PUSH_TELEMETRY => handlePushTelemetryRequest(request) case ApiKeys.LIST_CLIENT_METRICS_RESOURCES => handleListClientMetricsResources(request) + case ApiKeys.ADD_RAFT_VOTER => forwardToControllerOrFail(request) + case ApiKeys.REMOVE_RAFT_VOTER => forwardToControllerOrFail(request) case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}") } } catch { diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 97efd9bcf4cc0..cdceafcac7ba0 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -730,6 +730,15 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.SHARE_ACKNOWLEDGE => new ShareAcknowledgeRequest.Builder(new ShareAcknowledgeRequestData(), true) + case ApiKeys.ADD_RAFT_VOTER => + new AddRaftVoterRequest.Builder(new AddRaftVoterRequestData()) + + case ApiKeys.REMOVE_RAFT_VOTER => + new RemoveRaftVoterRequest.Builder(new RemoveRaftVoterRequestData()) + + case ApiKeys.UPDATE_RAFT_VOTER => + new UpdateRaftVoterRequest.Builder(new UpdateRaftVoterRequestData()) + case _ => throw new IllegalArgumentException("Unsupported API key " + apiKey) } diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala index 60ddda5bc3196..9091b8cf26056 100644 --- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala +++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala @@ -182,7 +182,7 @@ Found problem: val bootstrapMetadata = StorageTool.buildBootstrapMetadata(MetadataVersion.latestTesting(), None, "test format command") assertEquals(0, StorageTool. formatCommand(new PrintStream(stream), Seq(tempDir.toString), metaProperties, bootstrapMetadata, MetadataVersion.latestTesting(), ignoreFormatted = false)) - assertTrue(stream.toString().split("\\r?\\n").exists(_.startsWith("Formatting %s".format(tempDir)))) + assertTrue(stringAfterFirstLine(stream.toString()).startsWith("Formatting %s".format(tempDir))) try assertEquals(1, StorageTool. formatCommand(new PrintStream(new ByteArrayOutputStream()), Seq(tempDir.toString), metaProperties, bootstrapMetadata, MetadataVersion.latestTesting(), ignoreFormatted = false)) catch { @@ -194,10 +194,15 @@ Found problem: val stream2 = new ByteArrayOutputStream() assertEquals(0, StorageTool. formatCommand(new PrintStream(stream2), Seq(tempDir.toString), metaProperties, bootstrapMetadata, MetadataVersion.latestTesting(), ignoreFormatted = true)) - assertEquals(1, stream2.toString().split("\\r?\\n").count(_.startsWith("All of the log directories are already formatted"))) + assertEquals("All of the log directories are already formatted.%n".format(), stringAfterFirstLine(stream2.toString())) } finally Utils.delete(tempDir) } + def stringAfterFirstLine(input: String): String = { + val firstNewline = input.indexOf("\n") + input.substring(firstNewline + 1) + } + private def runFormatCommand(stream: ByteArrayOutputStream, directories: Seq[String], ignoreFormatted: Boolean = false): Int = { val metaProperties = new MetaProperties.Builder(). setVersion(MetaPropertiesVersion.V1). @@ -215,6 +220,7 @@ Found problem: assertEquals(0, runFormatCommand(stream, availableDirs)) val actual = stream.toString().split("\\r?\\n") val expect = availableDirs.map("Formatting %s".format(_)) + assertEquals(availableDirs.size + 1, actual.size) expect.foreach(dir => { assertEquals(1, actual.count(_.startsWith(dir))) })