diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index c02306f8af8b..ceeba1a115e2 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -386,12 +386,22 @@ private XceiverClientReply sendCommandWithRetry( List datanodeList = null; DatanodeBlockID blockID = null; - if (request.getCmdType() == ContainerProtos.Type.GetBlock) { - blockID = request.getGetBlock().getBlockID(); - } else if (request.getCmdType() == ContainerProtos.Type.ReadChunk) { - blockID = request.getReadChunk().getBlockID(); - } else if (request.getCmdType() == ContainerProtos.Type.GetSmallFile) { - blockID = request.getGetSmallFile().getBlock().getBlockID(); + switch (request.getCmdType()) { + case GetBlock: + blockID = request.getGetBlock().getBlockID(); + break; + + case ReadChunk: + blockID = request.getReadChunk().getBlockID(); + break; + + case GetSmallFile: + blockID = request.getGetSmallFile().getBlock().getBlockID(); + break; + + case VerifyBlock: + blockID = request.getVerifyBlock().getBlockData().getBlockID(); + break; } if (blockID != null) { diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java index a3f00030b7b3..c446e302ebd9 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java @@ -385,6 +385,7 @@ public static boolean isReadOnly(ContainerCommandRequestProtoOrBuilder proto) { case ListContainer: case ListChunk: case GetCommittedBlockLength: + case VerifyBlock: return true; case CloseContainer: case WriteChunk: @@ -437,6 +438,7 @@ public static boolean requireBlockToken(ContainerProtos.Type cmdType) { case ReadChunk: case WriteChunk: case FinalizeBlock: + case VerifyBlock: return true; default: return false; @@ -528,6 +530,11 @@ public static BlockID getBlockID(ContainerCommandRequestProtoOrBuilder msg) { blockID = msg.getFinalizeBlock().getBlockID(); } break; + case VerifyBlock: + if (msg.hasVerifyBlock()) { + blockID = msg.getVerifyBlock().getBlockData().getBlockID(); + } + break; default: break; } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index 3eae6172265f..65faa8291a4a 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -58,6 +58,8 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.VerifyBlockRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.VerifyBlockResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.FinalizeBlockRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.EchoRequestProto; @@ -198,17 +200,9 @@ public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, Blo private static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, List validators, ContainerCommandRequestProto.Builder builder, BlockID blockID, DatanodeDetails datanode, Map replicaIndexes) throws IOException { - String traceId = TracingUtil.exportCurrentSpan(); - if (traceId != null) { - builder.setTraceID(traceId); - } - final DatanodeBlockID.Builder datanodeBlockID = blockID.getDatanodeBlockIDProtobufBuilder(); - int replicaIndex = replicaIndexes.getOrDefault(datanode, 0); - if (replicaIndex > 0) { - datanodeBlockID.setReplicaIndex(replicaIndex); - } + DatanodeBlockID datanodeBlockID = buildDatanodeId(builder, blockID, datanode, replicaIndexes); final GetBlockRequestProto.Builder readBlockRequest = GetBlockRequestProto.newBuilder() - .setBlockID(datanodeBlockID.build()); + .setBlockID(datanodeBlockID); final ContainerCommandRequestProto request = builder .setDatanodeUuid(datanode.getUuidString()) .setGetBlock(readBlockRequest).build(); @@ -849,4 +843,52 @@ private static ContainerCommandRequestProto buildReadContainerRequest(long conta return request.build(); } + + public static VerifyBlockResponseProto verifyBlock(XceiverClientSpi xceiverClient, + List validators, BlockID blockID, Token token, + Map replicaIndexes) throws IOException { + ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto + .newBuilder() + .setCmdType(Type.VerifyBlock) + .setContainerID(blockID.getContainerID()); + + if (token != null) { + builder.setEncodedToken(token.encodeToUrlString()); + } + + return tryEachDatanode(xceiverClient.getPipeline(), + d -> verifyBlock(xceiverClient, validators, builder, blockID, d, replicaIndexes), + d -> toErrorMessage(blockID, d)); + } + + private static VerifyBlockResponseProto verifyBlock(XceiverClientSpi xceiverClient, List validators, + ContainerCommandRequestProto.Builder builder, BlockID blockID, DatanodeDetails datanode, + Map replicaIndexes) throws IOException { + + DatanodeBlockID datanodeBlockID = buildDatanodeId(builder, blockID, datanode, replicaIndexes); + VerifyBlockRequestProto verifyBlockResponse = VerifyBlockRequestProto.newBuilder() + .setBlockData(BlockData.newBuilder().setBlockID(datanodeBlockID).build()) + .build(); + ContainerCommandRequestProto request = builder + .setDatanodeUuid(datanode.getUuidString()) + .setVerifyBlock(verifyBlockResponse) + .build(); + ContainerCommandResponseProto response = xceiverClient.sendCommand(request, validators); + return response.getVerifyBlock(); + } + + private static DatanodeBlockID buildDatanodeId(ContainerCommandRequestProto.Builder builder, BlockID blockID, + DatanodeDetails datanode, Map replicaIndexes) { + String traceId = TracingUtil.exportCurrentSpan(); + if (traceId != null) { + builder.setTraceID(traceId); + } + final DatanodeBlockID.Builder datanodeBlockID = blockID.getDatanodeBlockIDProtobufBuilder(); + int replicaIndex = replicaIndexes.getOrDefault(datanode, 0); + if (replicaIndex > 0) { + datanodeBlockID.setReplicaIndex(replicaIndex); + } + + return datanodeBlockID.build(); + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java index f7a38e3dec8b..bd1e031ad712 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java @@ -41,7 +41,8 @@ public enum DNAction implements AuditAction { GET_COMMITTED_BLOCK_LENGTH, STREAM_INIT, FINALIZE_BLOCK, - ECHO; + ECHO, + VERIFY_BLOCK; @Override public String getAction() { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java index 449ea5d793a6..4ea969b05548 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java @@ -808,6 +808,8 @@ private static DNAction getAuditAction(Type cmdType) { return DNAction.FINALIZE_BLOCK; case Echo: return DNAction.ECHO; + case VerifyBlock: + return DNAction.VERIFY_BLOCK; default: LOG.debug("Invalid command type - {}", cmdType); return null; @@ -915,7 +917,12 @@ private static Map getAuditParams(ContainerCommandRequestProto m auditParams.put("blockData", BlockID.getFromProtobuf(msg.getFinalizeBlock().getBlockID()) .toString()); - return auditParams; + return auditParams; + + case VerifyBlock: + auditParams.put( + "verifyBlock", + BlockID.getFromProtobuf(msg.getVerifyBlock().getBlockData().getBlockID()).toString()); default : LOG.debug("Invalid command type - {}", cmdType); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 07d635319010..0b818839aef0 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -268,6 +268,8 @@ static ContainerCommandResponseProto dispatchRequest(KeyValueHandler handler, Co return handler.handleFinalizeBlock(request, kvContainer); case Echo: return handler.handleEcho(request); + case VerifyBlock: + return handler.handleVerifyBlock(request); default: return null; } @@ -1357,6 +1359,17 @@ private ContainerCommandResponseProto checkFaultInjector(ContainerCommandRequest return null; } + private ContainerCommandResponseProto handleVerifyBlock(ContainerCommandRequestProto request) { + if (!request.hasVerifyBlock()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Malformed Verify Block request. trace ID: {}", request.getTraceID()); + } + return malformedRequest(request); + } + + return null; + } + public static Logger getLogger() { return LOG; }