Skip to content

Commit

Permalink
#17 sync substream (#65)
Browse files Browse the repository at this point in the history
* Add initial commit with proto setup

* Add block requests

* Add block request by number

* Fix checkstyle errors

* Refactor

* Remove print line

* Fix comments

* Add block request timeout

* Add javadoc

---------

Co-authored-by: Boris Velkovski <[email protected]>
  • Loading branch information
bokoto000 and Boris Velkovski authored Apr 12, 2023
1 parent 17d598a commit 4e36fd1
Show file tree
Hide file tree
Showing 10 changed files with 3,848 additions and 4 deletions.
12 changes: 10 additions & 2 deletions src/main/java/com/limechain/network/Network.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.limechain.chain.ChainService;
import com.limechain.config.HostConfig;
import com.limechain.network.kad.KademliaService;
import com.limechain.network.protocol.sync.SyncService;
import com.limechain.network.substream.lightclient.LightMessagesService;
import io.ipfs.multihash.Multihash;
import io.libp2p.core.Host;
Expand Down Expand Up @@ -32,6 +33,7 @@ public class Network {
private static final int TEN_SECONDS_IN_MS = 10000;
private static final int HOST_PORT = 1001;
private static Network network;
public SyncService syncService;
public LightMessagesService lightMessagesService;
public KademliaService kademliaService;
private HostBuilder hostBuilder;
Expand All @@ -51,12 +53,18 @@ private Network(ChainService chainService, HostConfig hostConfig) {
hostBuilder = (new HostBuilder()).generateIdentity().listenLocalhost(HOST_PORT);
Multihash hostId = Multihash.deserialize(hostBuilder.getPeerId().getBytes());

kademliaService = new KademliaService("/dot/kad", hostId, isLocalEnabled);
String chainId = chainService.getGenesis().getProtocolId();
String legacyKadProtocolId = String.format("/%s/kad", chainId);
String syncProtocolId = String.format("/%s/sync/2", chainId);
kademliaService = new KademliaService(legacyKadProtocolId, hostId, isLocalEnabled);
lightMessagesService = new LightMessagesService();
syncService = new SyncService(syncProtocolId);

hostBuilder.addProtocols(
List.of(new Ping(), kademliaService.getDht(),
lightMessagesService.getLightMessages()));
lightMessagesService.getLightMessages(),
syncService.getSyncMessages()
));

host = hostBuilder.build();
kademliaService.setHost(host);
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/com/limechain/network/kad/KademliaService.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,16 @@ private void initialize(String protocolId, Multihash hostId, boolean localEnable
* Connects to boot nodes to the Kademlia dht
*
* @param bootNodes boot nodes set in ChainService
* @return the number of successfully connected nodes
*/
public void connectBootNodes(String[] bootNodes) {
public int connectBootNodes(String[] bootNodes) {
var bootstrapMultiAddress = List.of(bootNodes).stream()
.map(KademliaService::dnsNodeToIp4)
.map(MultiAddress::new)
.collect(Collectors.toList());
int successfulBootNodes = dht.bootstrapRoutingTable(host, bootstrapMultiAddress, addr -> !addr.contains("wss"));
log.log(Level.INFO, "Successfully connected to " + successfulBootNodes + " boot nodes");
return successfulBootNodes;
}

/**
Expand Down
61 changes: 61 additions & 0 deletions src/main/java/com/limechain/network/protobuf/SyncMessage.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
syntax = "proto3";

package com.limechain.network.substream.sync.pb;
option java_package = "com.limechain.network.substream.sync.pb";

// Schema definition for light client messages.
// Copied from https://github.com/paritytech/substrate/blob/9b08105b8c7106d723c4f470304ad9e2868569d9/client/network/src/schema/api.v1.proto

// Block enumeration direction.
enum Direction {
// Enumerate in ascending order (from child to parent).
Ascending = 0;
// Enumerate in descending order (from parent to canonical child).
Descending = 1;
}

// Request block data from a peer.
message BlockRequest {
// Bits of block data to request.
uint32 fields = 1;
// Start from this block.
oneof from_block {
// Start with given hash.
bytes hash = 2;
// Start with given block number.
bytes number = 3;
}
// End at this block. An implementation defined maximum is used when unspecified.
bytes to_block = 4; // optional
// Sequence direction.
Direction direction = 5;
// Maximum number of blocks to return. An implementation defined maximum is used when unspecified.
uint32 max_blocks = 6; // optional
}

// Response to `BlockRequest`
message BlockResponse {
// Block data for the requested sequence.
repeated BlockData blocks = 1;
}

// Block data sent in the response.
message BlockData {
// Block header hash.
bytes hash = 1;
// Block header if requested.
bytes header = 2; // optional
// Block body if requested.
repeated bytes body = 3; // optional
// Block receipt if requested.
bytes receipt = 4; // optional
// Block message queue if requested.
bytes message_queue = 5; // optional
// Justification if requested.
bytes justification = 6; // optional
// True if justification should be treated as present but empty.
// This hack is unfortunately necessary because shortcomings in the protobuf format otherwise
// doesn't make in possible to differentiate between a lack of justification and an empty
// justification.
bool is_empty_justification = 7; // optional, false if absent
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.limechain.network.protocol.sync;

import com.google.protobuf.ByteString;
import com.limechain.network.substream.sync.pb.SyncMessage;

import java.util.concurrent.CompletableFuture;

import static java.util.Objects.isNull;

public interface SyncController {
CompletableFuture<SyncMessage.BlockResponse> send(SyncMessage.BlockRequest msg);

/**
* Converts block number to bytes to encode them for the sync message
* @param blockNumber
* @return byte array made from the block number
*/
private static byte[] blockNumberToByteArray(int blockNumber) {
byte byte1 = (byte) (blockNumber);
byte byte2 = (byte) (blockNumber >>> 8);
byte byte3 = (byte) (blockNumber >>> 16);
byte byte4 = (byte) (blockNumber >>> 24);
byte byte5 = (byte) (blockNumber >>> 32);
return new byte[]{byte1, byte2, byte3, byte4, byte5};
}

default CompletableFuture<SyncMessage.BlockResponse> sendBlockRequest(int fields,
String fromHash,
Integer fromNumber,
Integer toBlockNumber,
SyncMessage.Direction direction,
int maxBlocks) {
var syncMessage = SyncMessage.BlockRequest.newBuilder()
.setFields(fields)
.setDirection(direction)
.setMaxBlocks(maxBlocks);
if (!isNull(fromHash))
syncMessage = syncMessage.setHash(ByteString.fromHex(fromHash));
if (!isNull(fromNumber))
syncMessage = syncMessage.setNumber(ByteString.copyFrom(blockNumberToByteArray(fromNumber)));
if (!isNull(toBlockNumber))
syncMessage = syncMessage.setToBlock(ByteString.copyFrom(blockNumberToByteArray(toBlockNumber)));

var builtSyncMessage = syncMessage.build();
return send(builtSyncMessage);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.limechain.network.protocol.sync;

import com.limechain.network.substream.sync.pb.SyncMessage;
import io.libp2p.core.AddressBook;
import io.libp2p.core.Host;
import io.libp2p.core.PeerId;
import io.libp2p.core.multiformats.Multiaddr;
import io.libp2p.core.multistream.StrictProtocolBinding;
import lombok.extern.java.Log;

import java.util.concurrent.TimeUnit;
import java.util.logging.Level;

@Log
public class SyncMessages extends StrictProtocolBinding<SyncController> {
public SyncMessages(String protocolId, SyncProtocol protocol){
super(protocolId, protocol);
}

public SyncMessage.BlockResponse remoteBlockRequest(Host us, AddressBook addrs, PeerId peer,
int fields,
String hash,
Integer number,
Integer toBlock,
SyncMessage.Direction direction,
int maxBlocks){
SyncController controller = dialPeer(us,peer,addrs);
try{
SyncMessage.BlockResponse response = controller
.sendBlockRequest(fields, hash, number, toBlock, direction, maxBlocks)
.get(2, TimeUnit.SECONDS);
log.log(Level.INFO, "Received response: " + response.toString());
return response;
} catch (Exception e){
log.log(Level.SEVERE, "Error while sending remote call request: ", e);
throw new RuntimeException(e);
}
}

private SyncController dialPeer(Host us, PeerId peer, AddressBook addrs){
Multiaddr[] addr = addrs.get(peer).join().toArray(new Multiaddr[0]);
if(addr.length == 0)
throw new IllegalStateException("No addresses known for peer " + peer);
return dial(us, peer, addr).getController().join();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.limechain.network.protocol.sync;

import com.limechain.network.substream.sync.pb.SyncMessage;
import io.libp2p.core.ConnectionClosedException;
import io.libp2p.core.Stream;
import io.libp2p.protocol.ProtocolHandler;
import io.libp2p.protocol.ProtocolMessageHandler;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;

import java.util.concurrent.CompletableFuture;

public class SyncProtocol extends ProtocolHandler<SyncController> {
public static final int MAX_REQUEST_SIZE = 1024 * 512;
public static final int MAX_RESPONSE_SIZE = 10 * 1024 * 1024;

public SyncProtocol() {
super(MAX_REQUEST_SIZE, MAX_RESPONSE_SIZE);
}

@Override
protected CompletableFuture<SyncController> onStartInitiator(Stream stream) {
stream.pushHandler(new ProtobufVarint32FrameDecoder());
stream.pushHandler(new ProtobufDecoder(SyncMessage.BlockResponse.getDefaultInstance()));

stream.pushHandler((new ProtobufVarint32LengthFieldPrepender()));
stream.pushHandler((new ProtobufEncoder()));

Sender handler = new Sender(stream);
stream.pushHandler(handler);
return CompletableFuture.completedFuture(handler);
}

// Class for handling outgoing requests
static class Sender
implements ProtocolMessageHandler<SyncMessage.BlockResponse>,
SyncController {
private final CompletableFuture<SyncMessage.BlockResponse> resp = new CompletableFuture<>();
private final Stream stream;

public Sender(Stream stream) {
this.stream = stream;
}

@Override
public void onMessage(Stream stream, SyncMessage.BlockResponse msg) {
resp.complete(msg);
stream.closeWrite();
}

@Override
public CompletableFuture<SyncMessage.BlockResponse> send(SyncMessage.BlockRequest msg) {
stream.writeAndFlush(msg);
return resp;
}

@Override
public void onClosed(Stream stream) {
resp.completeExceptionally(new ConnectionClosedException());
}

@Override
public void onException(Throwable cause) {
resp.completeExceptionally(cause);
}

}
}
12 changes: 12 additions & 0 deletions src/main/java/com/limechain/network/protocol/sync/SyncService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.limechain.network.protocol.sync;

import lombok.Getter;

@Getter
public class SyncService {
private final SyncMessages syncMessages;

public SyncService(String protocolId) {
this.syncMessages = new SyncMessages(protocolId, new SyncProtocol());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,5 @@ public void onClosed(Stream stream) {
public void onException(Throwable cause) {
resp.completeExceptionally(cause);
}

}
}
Loading

0 comments on commit 4e36fd1

Please sign in to comment.