Skip to content

Commit

Permalink
Implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanzlenko committed Sep 5, 2024
1 parent 79f8a6a commit 84ff2fb
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -386,12 +386,22 @@ private XceiverClientReply sendCommandWithRetry(
List<DatanodeDetails> datanodeList = null;

DatanodeBlockID blockID = null;
if (request.getCmdType() == ContainerProtos.Type.GetBlock) {
blockID = request.getGetBlock().getBlockID();
} else if (request.getCmdType() == ContainerProtos.Type.ReadChunk) {
blockID = request.getReadChunk().getBlockID();
} else if (request.getCmdType() == ContainerProtos.Type.GetSmallFile) {
blockID = request.getGetSmallFile().getBlock().getBlockID();
switch (request.getCmdType()) {
case GetBlock:
blockID = request.getGetBlock().getBlockID();
break;

case ReadChunk:
blockID = request.getReadChunk().getBlockID();
break;

case GetSmallFile:
blockID = request.getGetSmallFile().getBlock().getBlockID();
break;

case VerifyBlock:
blockID = request.getVerifyBlock().getBlockData().getBlockID();
break;
}

if (blockID != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ public static boolean isReadOnly(ContainerCommandRequestProtoOrBuilder proto) {
case ListContainer:
case ListChunk:
case GetCommittedBlockLength:
case VerifyBlock:
return true;
case CloseContainer:
case WriteChunk:
Expand Down Expand Up @@ -437,6 +438,7 @@ public static boolean requireBlockToken(ContainerProtos.Type cmdType) {
case ReadChunk:
case WriteChunk:
case FinalizeBlock:
case VerifyBlock:
return true;
default:
return false;
Expand Down Expand Up @@ -528,6 +530,11 @@ public static BlockID getBlockID(ContainerCommandRequestProtoOrBuilder msg) {
blockID = msg.getFinalizeBlock().getBlockID();
}
break;
case VerifyBlock:
if (msg.hasVerifyBlock()) {
blockID = msg.getVerifyBlock().getBlockData().getBlockID();
}
break;
default:
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.VerifyBlockRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.VerifyBlockResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.FinalizeBlockRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.EchoRequestProto;
Expand Down Expand Up @@ -198,17 +200,9 @@ public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, Blo
private static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, List<Validator> validators,
ContainerCommandRequestProto.Builder builder, BlockID blockID, DatanodeDetails datanode,
Map<DatanodeDetails, Integer> replicaIndexes) throws IOException {
String traceId = TracingUtil.exportCurrentSpan();
if (traceId != null) {
builder.setTraceID(traceId);
}
final DatanodeBlockID.Builder datanodeBlockID = blockID.getDatanodeBlockIDProtobufBuilder();
int replicaIndex = replicaIndexes.getOrDefault(datanode, 0);
if (replicaIndex > 0) {
datanodeBlockID.setReplicaIndex(replicaIndex);
}
DatanodeBlockID datanodeBlockID = buildDatanodeId(builder, blockID, datanode, replicaIndexes);
final GetBlockRequestProto.Builder readBlockRequest = GetBlockRequestProto.newBuilder()
.setBlockID(datanodeBlockID.build());
.setBlockID(datanodeBlockID);
final ContainerCommandRequestProto request = builder
.setDatanodeUuid(datanode.getUuidString())
.setGetBlock(readBlockRequest).build();
Expand Down Expand Up @@ -849,4 +843,52 @@ private static ContainerCommandRequestProto buildReadContainerRequest(long conta

return request.build();
}

public static VerifyBlockResponseProto verifyBlock(XceiverClientSpi xceiverClient,
List<Validator> validators, BlockID blockID, Token<? extends TokenIdentifier> token,
Map<DatanodeDetails, Integer> replicaIndexes) throws IOException {
ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto
.newBuilder()
.setCmdType(Type.VerifyBlock)
.setContainerID(blockID.getContainerID());

if (token != null) {
builder.setEncodedToken(token.encodeToUrlString());
}

return tryEachDatanode(xceiverClient.getPipeline(),
d -> verifyBlock(xceiverClient, validators, builder, blockID, d, replicaIndexes),
d -> toErrorMessage(blockID, d));
}

private static VerifyBlockResponseProto verifyBlock(XceiverClientSpi xceiverClient, List<Validator> validators,
ContainerCommandRequestProto.Builder builder, BlockID blockID, DatanodeDetails datanode,
Map<DatanodeDetails, Integer> replicaIndexes) throws IOException {

DatanodeBlockID datanodeBlockID = buildDatanodeId(builder, blockID, datanode, replicaIndexes);
VerifyBlockRequestProto verifyBlockResponse = VerifyBlockRequestProto.newBuilder()
.setBlockData(BlockData.newBuilder().setBlockID(datanodeBlockID).build())
.build();
ContainerCommandRequestProto request = builder
.setDatanodeUuid(datanode.getUuidString())
.setVerifyBlock(verifyBlockResponse)
.build();
ContainerCommandResponseProto response = xceiverClient.sendCommand(request, validators);
return response.getVerifyBlock();
}

private static DatanodeBlockID buildDatanodeId(ContainerCommandRequestProto.Builder builder, BlockID blockID,
DatanodeDetails datanode, Map<DatanodeDetails, Integer> replicaIndexes) {
String traceId = TracingUtil.exportCurrentSpan();
if (traceId != null) {
builder.setTraceID(traceId);
}
final DatanodeBlockID.Builder datanodeBlockID = blockID.getDatanodeBlockIDProtobufBuilder();
int replicaIndex = replicaIndexes.getOrDefault(datanode, 0);
if (replicaIndex > 0) {
datanodeBlockID.setReplicaIndex(replicaIndex);
}

return datanodeBlockID.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ public enum DNAction implements AuditAction {
GET_COMMITTED_BLOCK_LENGTH,
STREAM_INIT,
FINALIZE_BLOCK,
ECHO;
ECHO,
VERIFY_BLOCK;

@Override
public String getAction() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,8 @@ private static DNAction getAuditAction(Type cmdType) {
return DNAction.FINALIZE_BLOCK;
case Echo:
return DNAction.ECHO;
case VerifyBlock:
return DNAction.VERIFY_BLOCK;
default:
LOG.debug("Invalid command type - {}", cmdType);
return null;
Expand Down Expand Up @@ -915,7 +917,12 @@ private static Map<String, String> getAuditParams(ContainerCommandRequestProto m
auditParams.put("blockData",
BlockID.getFromProtobuf(msg.getFinalizeBlock().getBlockID())
.toString());
return auditParams;
return auditParams;

case VerifyBlock:
auditParams.put(
"verifyBlock",
BlockID.getFromProtobuf(msg.getVerifyBlock().getBlockData().getBlockID()).toString());

default :
LOG.debug("Invalid command type - {}", cmdType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ static ContainerCommandResponseProto dispatchRequest(KeyValueHandler handler, Co
return handler.handleFinalizeBlock(request, kvContainer);
case Echo:
return handler.handleEcho(request);
case VerifyBlock:
return handler.handleVerifyBlock(request);
default:
return null;
}
Expand Down Expand Up @@ -1357,6 +1359,17 @@ private ContainerCommandResponseProto checkFaultInjector(ContainerCommandRequest
return null;
}

private ContainerCommandResponseProto handleVerifyBlock(ContainerCommandRequestProto request) {
if (!request.hasVerifyBlock()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Malformed Verify Block request. trace ID: {}", request.getTraceID());
}
return malformedRequest(request);
}

return null;
}

public static Logger getLogger() {
return LOG;
}
Expand Down

0 comments on commit 84ff2fb

Please sign in to comment.