Skip to content

Commit

Permalink
CLI command implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanzlenko committed Sep 12, 2024
1 parent d5de50a commit 5b6af2c
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import picocli.CommandLine;

import java.io.IOException;
import java.io.PrintWriter;
import java.io.Writer;

/**
* The {@link OzoneFsckCommand} class is a command-line tool for performing file system checks within Ozone.
Expand Down Expand Up @@ -117,6 +119,27 @@ public OzoneConfiguration createOzoneConfiguration() {

@Override
protected void execute(OzoneClient client, OzoneAddress address) throws IOException, OzoneClientException {
OzoneFsckVerboseSettings verboseSettings = new OzoneFsckVerboseSettings();
try (Writer writer = new PrintWriter(System.out)) {
writer.write("EXECUTING OZONE FSCK\n");

boolean deleteCorruptedKeys = false;
OzoneConfiguration ozoneConfiguration = getConf();

OzoneFsckHandler handler = new OzoneFsckHandler(address, verboseSettings, writer, deleteCorruptedKeys, client,
ozoneConfiguration);

try {
handler.scan();
} catch (Exception e) {
writer.write(e.getMessage());
throw new RuntimeException(e);
} catch (Error e) {
throw e;
} finally {
writer.flush();
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.shell.fsck;

import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.VerifyBlockResponseProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.ContainerMultinodeApi;
import org.apache.hadoop.hdds.scm.storage.ContainerMultinodeApiImpl;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneKey;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.om.helpers.KeyInfoWithVolumeContext;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
import org.apache.hadoop.ozone.shell.OzoneAddress;

import java.io.IOException;
import java.io.Writer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.STAND_ALONE;

public class OzoneFsckHandler implements AutoCloseable {
private final OzoneAddress address;

private final OzoneFsckVerboseSettings verboseSettings;

private final Writer writer;

private final boolean deleteCorruptedKeys;

private final OzoneClient client;

private final OzoneManagerProtocol omClient;

private final ContainerOperationClient containerOperationClient;

private final XceiverClientManager xceiverClientManager;

public OzoneFsckHandler(OzoneAddress address, OzoneFsckVerboseSettings verboseSettings, Writer writer,
boolean deleteCorruptedKeys, OzoneClient client, OzoneConfiguration ozoneConfiguration) throws IOException {
this.address = address;
this.verboseSettings = verboseSettings;
this.writer = writer;
this.deleteCorruptedKeys = deleteCorruptedKeys;
this.client = client;
this.omClient = client.getObjectStore().getClientProxy().getOzoneManagerClient();
this.containerOperationClient = new ContainerOperationClient(ozoneConfiguration);
this.xceiverClientManager = containerOperationClient.getXceiverClientManager();
}

public void scan() throws IOException, InterruptedException {
scanVolumes();
}

private void scanVolumes() throws IOException, InterruptedException {
Iterator<? extends OzoneVolume> volumes = client.getObjectStore().listVolumes(address.getVolumeName());

writer.write("Scanning volumes\n");

while (volumes.hasNext()) {
scanBuckets(volumes.next());
}
}

private void scanBuckets(OzoneVolume volume) throws IOException, InterruptedException {
Iterator<? extends OzoneBucket> buckets = volume.listBuckets(address.getBucketName());

writer.write("Scanning buckets for volume " + volume.getName() + "\n");

while (buckets.hasNext()) {
scanKeys(buckets.next());
}
}

private void scanKeys(OzoneBucket bucket) throws IOException, InterruptedException {
Iterator<? extends OzoneKey> keys = bucket.listKeys(address.getKeyName());

writer.write("Scanning keys for bucket " + bucket.getName() + "\n");

while (keys.hasNext()) {
scanKey(keys.next());
}
}

private void scanKey(OzoneKey key) throws IOException, InterruptedException {
writer.write("Scanning key " + key.getName() + "\n");

OmKeyArgs keyArgs = createKeyArgs(key);

KeyInfoWithVolumeContext keyInfoWithContext = omClient.getKeyInfo(keyArgs, false);

OmKeyInfo keyInfo = keyInfoWithContext.getKeyInfo();

List<OmKeyLocationInfo> locations = keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly();

for (OmKeyLocationInfo location : locations) {
Pipeline keyPipeline = location.getPipeline();
boolean isECKey = keyPipeline.getReplicationConfig().getReplicationType() == HddsProtos.ReplicationType.EC;
Pipeline pipeline;
if (!isECKey && keyPipeline.getType() != STAND_ALONE) {
pipeline = Pipeline.newBuilder(keyPipeline)
.setReplicationConfig(StandaloneReplicationConfig.getInstance(ONE))
.build();
} else {
pipeline = keyPipeline;
}

XceiverClientSpi xceiverClient = xceiverClientManager.acquireClientForReadData(pipeline);

try (ContainerMultinodeApi containerClient = new ContainerMultinodeApiImpl(xceiverClient)) {
Map<DatanodeDetails, VerifyBlockResponseProto> responses = containerClient.verifyBlock(
location.getBlockID().getDatanodeBlockIDProtobuf(),
location.getToken()
);
}
}
}

private OmKeyArgs createKeyArgs(OzoneKey key) {
return new OmKeyArgs.Builder()
.setVolumeName(key.getVolumeName())
.setBucketName(key.getBucketName())
.setKeyName(key.getName())
.build();
}

@Override
public void close() throws Exception {
this.xceiverClientManager.close();
this.containerOperationClient.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.apache.hadoop.ozone.shell.fsck;

public class OzoneFsckVerboseSettings {

}

0 comments on commit 5b6af2c

Please sign in to comment.