Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#17 sync substream #65

Merged
merged 10 commits into from
Apr 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions src/main/java/com/limechain/network/Network.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.limechain.chain.ChainService;
import com.limechain.config.HostConfig;
import com.limechain.network.kad.KademliaService;
import com.limechain.network.protocol.sync.SyncService;
import com.limechain.network.substream.lightclient.LightMessagesService;
import io.ipfs.multihash.Multihash;
import io.libp2p.core.Host;
Expand Down Expand Up @@ -32,6 +33,7 @@ public class Network {
private static final int TEN_SECONDS_IN_MS = 10000;
private static final int HOST_PORT = 1001;
private static Network network;
public SyncService syncService;
public LightMessagesService lightMessagesService;
public KademliaService kademliaService;
private HostBuilder hostBuilder;
Expand All @@ -51,12 +53,18 @@ private Network(ChainService chainService, HostConfig hostConfig) {
hostBuilder = (new HostBuilder()).generateIdentity().listenLocalhost(HOST_PORT);
Multihash hostId = Multihash.deserialize(hostBuilder.getPeerId().getBytes());

kademliaService = new KademliaService("/dot/kad", hostId, isLocalEnabled);
String chainId = chainService.getGenesis().getProtocolId();
String legacyKadProtocolId = String.format("/%s/kad", chainId);
String syncProtocolId = String.format("/%s/sync/2", chainId);
kademliaService = new KademliaService(legacyKadProtocolId, hostId, isLocalEnabled);
lightMessagesService = new LightMessagesService();
syncService = new SyncService(syncProtocolId);

hostBuilder.addProtocols(
List.of(new Ping(), kademliaService.getDht(),
lightMessagesService.getLightMessages()));
lightMessagesService.getLightMessages(),
syncService.getSyncMessages()
));

host = hostBuilder.build();
kademliaService.setHost(host);
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/com/limechain/network/kad/KademliaService.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,16 @@ private void initialize(String protocolId, Multihash hostId, boolean localEnable
* Connects to boot nodes to the Kademlia dht
*
* @param bootNodes boot nodes set in ChainService
* @return the number of successfully connected nodes
*/
public void connectBootNodes(String[] bootNodes) {
public int connectBootNodes(String[] bootNodes) {
var bootstrapMultiAddress = List.of(bootNodes).stream()
.map(KademliaService::dnsNodeToIp4)
.map(MultiAddress::new)
.collect(Collectors.toList());
int successfulBootNodes = dht.bootstrapRoutingTable(host, bootstrapMultiAddress, addr -> !addr.contains("wss"));
log.log(Level.INFO, "Successfully connected to " + successfulBootNodes + " boot nodes");
return successfulBootNodes;
}

/**
Expand Down
61 changes: 61 additions & 0 deletions src/main/java/com/limechain/network/protobuf/SyncMessage.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
syntax = "proto3";

package com.limechain.network.substream.sync.pb;
option java_package = "com.limechain.network.substream.sync.pb";

// Schema definition for light client messages.
// Copied from https://github.com/paritytech/substrate/blob/9b08105b8c7106d723c4f470304ad9e2868569d9/client/network/src/schema/api.v1.proto

// Block enumeration direction.
enum Direction {
// Enumerate in ascending order (from child to parent).
Ascending = 0;
// Enumerate in descending order (from parent to canonical child).
Descending = 1;
}

// Request block data from a peer.
message BlockRequest {
// Bits of block data to request.
uint32 fields = 1;
// Start from this block.
oneof from_block {
// Start with given hash.
bytes hash = 2;
// Start with given block number.
bytes number = 3;
}
// End at this block. An implementation defined maximum is used when unspecified.
bytes to_block = 4; // optional
// Sequence direction.
Direction direction = 5;
// Maximum number of blocks to return. An implementation defined maximum is used when unspecified.
uint32 max_blocks = 6; // optional
}

// Response to `BlockRequest`
message BlockResponse {
// Block data for the requested sequence.
repeated BlockData blocks = 1;
}

// Block data sent in the response.
message BlockData {
// Block header hash.
bytes hash = 1;
// Block header if requested.
bytes header = 2; // optional
// Block body if requested.
repeated bytes body = 3; // optional
// Block receipt if requested.
bytes receipt = 4; // optional
// Block message queue if requested.
bytes message_queue = 5; // optional
// Justification if requested.
bytes justification = 6; // optional
// True if justification should be treated as present but empty.
// This hack is unfortunately necessary because shortcomings in the protobuf format otherwise
// doesn't make in possible to differentiate between a lack of justification and an empty
// justification.
bool is_empty_justification = 7; // optional, false if absent
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.limechain.network.protocol.sync;

import com.google.protobuf.ByteString;
import com.limechain.network.substream.sync.pb.SyncMessage;

import java.util.concurrent.CompletableFuture;

import static java.util.Objects.isNull;

public interface SyncController {
CompletableFuture<SyncMessage.BlockResponse> send(SyncMessage.BlockRequest msg);

/**
* Converts block number to bytes to encode them for the sync message
* @param blockNumber
* @return byte array made from the block number
*/
private static byte[] blockNumberToByteArray(int blockNumber) {
byte byte1 = (byte) (blockNumber);
byte byte2 = (byte) (blockNumber >>> 8);
byte byte3 = (byte) (blockNumber >>> 16);
byte byte4 = (byte) (blockNumber >>> 24);
byte byte5 = (byte) (blockNumber >>> 32);
return new byte[]{byte1, byte2, byte3, byte4, byte5};
}

default CompletableFuture<SyncMessage.BlockResponse> sendBlockRequest(int fields,
String fromHash,
Integer fromNumber,
Integer toBlockNumber,
SyncMessage.Direction direction,
int maxBlocks) {
var syncMessage = SyncMessage.BlockRequest.newBuilder()
.setFields(fields)
.setDirection(direction)
.setMaxBlocks(maxBlocks);
if (!isNull(fromHash))
syncMessage = syncMessage.setHash(ByteString.fromHex(fromHash));
if (!isNull(fromNumber))
syncMessage = syncMessage.setNumber(ByteString.copyFrom(blockNumberToByteArray(fromNumber)));
if (!isNull(toBlockNumber))
syncMessage = syncMessage.setToBlock(ByteString.copyFrom(blockNumberToByteArray(toBlockNumber)));

var builtSyncMessage = syncMessage.build();
return send(builtSyncMessage);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.limechain.network.protocol.sync;

import com.limechain.network.substream.sync.pb.SyncMessage;
import io.libp2p.core.AddressBook;
import io.libp2p.core.Host;
import io.libp2p.core.PeerId;
import io.libp2p.core.multiformats.Multiaddr;
import io.libp2p.core.multistream.StrictProtocolBinding;
import lombok.extern.java.Log;

import java.util.concurrent.TimeUnit;
import java.util.logging.Level;

@Log
public class SyncMessages extends StrictProtocolBinding<SyncController> {
public SyncMessages(String protocolId, SyncProtocol protocol){
super(protocolId, protocol);
}

public SyncMessage.BlockResponse remoteBlockRequest(Host us, AddressBook addrs, PeerId peer,
int fields,
String hash,
Integer number,
Integer toBlock,
SyncMessage.Direction direction,
int maxBlocks){
SyncController controller = dialPeer(us,peer,addrs);
try{
SyncMessage.BlockResponse response = controller
.sendBlockRequest(fields, hash, number, toBlock, direction, maxBlocks)
.get(2, TimeUnit.SECONDS);
log.log(Level.INFO, "Received response: " + response.toString());
return response;
} catch (Exception e){
log.log(Level.SEVERE, "Error while sending remote call request: ", e);
throw new RuntimeException(e);
}
}

private SyncController dialPeer(Host us, PeerId peer, AddressBook addrs){
Multiaddr[] addr = addrs.get(peer).join().toArray(new Multiaddr[0]);
if(addr.length == 0)
throw new IllegalStateException("No addresses known for peer " + peer);
return dial(us, peer, addr).getController().join();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.limechain.network.protocol.sync;

import com.limechain.network.substream.sync.pb.SyncMessage;
import io.libp2p.core.ConnectionClosedException;
import io.libp2p.core.Stream;
import io.libp2p.protocol.ProtocolHandler;
import io.libp2p.protocol.ProtocolMessageHandler;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;

import java.util.concurrent.CompletableFuture;

public class SyncProtocol extends ProtocolHandler<SyncController> {
public static final int MAX_REQUEST_SIZE = 1024 * 512;
public static final int MAX_RESPONSE_SIZE = 10 * 1024 * 1024;

public SyncProtocol() {
super(MAX_REQUEST_SIZE, MAX_RESPONSE_SIZE);
}

@Override
protected CompletableFuture<SyncController> onStartInitiator(Stream stream) {
stream.pushHandler(new ProtobufVarint32FrameDecoder());
stream.pushHandler(new ProtobufDecoder(SyncMessage.BlockResponse.getDefaultInstance()));

stream.pushHandler((new ProtobufVarint32LengthFieldPrepender()));
stream.pushHandler((new ProtobufEncoder()));

Sender handler = new Sender(stream);
stream.pushHandler(handler);
return CompletableFuture.completedFuture(handler);
}

// Class for handling outgoing requests
static class Sender
implements ProtocolMessageHandler<SyncMessage.BlockResponse>,
SyncController {
private final CompletableFuture<SyncMessage.BlockResponse> resp = new CompletableFuture<>();
private final Stream stream;

public Sender(Stream stream) {
this.stream = stream;
}

@Override
public void onMessage(Stream stream, SyncMessage.BlockResponse msg) {
resp.complete(msg);
stream.closeWrite();
}

@Override
public CompletableFuture<SyncMessage.BlockResponse> send(SyncMessage.BlockRequest msg) {
stream.writeAndFlush(msg);
return resp;
}

@Override
public void onClosed(Stream stream) {
resp.completeExceptionally(new ConnectionClosedException());
}

@Override
public void onException(Throwable cause) {
resp.completeExceptionally(cause);
}

}
}
12 changes: 12 additions & 0 deletions src/main/java/com/limechain/network/protocol/sync/SyncService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.limechain.network.protocol.sync;

import lombok.Getter;

@Getter
public class SyncService {
private final SyncMessages syncMessages;

public SyncService(String protocolId) {
this.syncMessages = new SyncMessages(protocolId, new SyncProtocol());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,5 @@ public void onClosed(Stream stream) {
public void onException(Throwable cause) {
resp.completeExceptionally(cause);
}

}
}
Loading