Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
rahulksnv committed Aug 29, 2023
1 parent 72cf71d commit 602170f
Showing 1 changed file with 59 additions and 58 deletions.
117 changes: 59 additions & 58 deletions crates/sc-subspace-block-relay/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use sp_runtime::Justifications;
use std::num::{NonZeroU32, NonZeroUsize};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tracing::{debug, info, trace, warn};

type BlockHash<Block> = <Block as BlockT>::Hash;
type BlockHeader<Block> = <Block as BlockT>::Header;
Expand Down Expand Up @@ -116,24 +117,25 @@ impl<Block: BlockT> PartialBlock<Block> {
}

/// The message to the server
#[allow(clippy::enum_variant_names)]
#[derive(Encode, Decode)]
enum ServerRequest<Block: BlockT, ProtocolRequest> {
enum ServerMessage<Block: BlockT, ProtocolRequest> {
/// Initial message, to be handled both by the client
/// and the protocol
Initial(InitialRequest<Block, ProtocolRequest>),
InitialRequest(InitialRequest<Block, ProtocolRequest>),

/// Message to be handled by the protocol
Protocol(ProtocolRequest),
ProtocolRequest(ProtocolRequest),

/// Full download request.
FullDownload(FullDownloadRequest<Block>),
FullDownloadRequest(FullDownloadRequest<Block>),
}

impl<Block: BlockT, ProtocolRequest> From<ProtocolRequest>
for ServerRequest<Block, ProtocolRequest>
for ServerMessage<Block, ProtocolRequest>
{
fn from(inner: ProtocolRequest) -> ServerRequest<Block, ProtocolRequest> {
ServerRequest::Protocol(inner)
fn from(inner: ProtocolRequest) -> ServerMessage<Block, ProtocolRequest> {
ServerMessage::ProtocolRequest(inner)
}
}

Expand Down Expand Up @@ -161,7 +163,7 @@ where
&self,
who: PeerId,
request: BlockRequest<Block>,
) -> Result<Vec<u8>, RelayError> {
) -> Result<Vec<BlockData<Block>>, RelayError> {
let start_ts = Instant::now();
let network_peer_handle = self
.network
Expand All @@ -177,16 +179,16 @@ where
protocol_request: self.protocol_client.build_initial_request(),
};
let initial_response = network_peer_handle
.request::<_, InitialResponse<Block, ProtoClient::Response>>(ServerRequest::Initial(
initial_request,
))
.request::<_, InitialResponse<Block, ProtoClient::Response>>(
ServerMessage::InitialRequest(initial_request),
)
.await?;

// Resolve the protocol response to get the extrinsics
let (body, local_miss) = if let Some(protocol_response) = initial_response.protocol_response
{
let (body, local_miss) = self
.resolve_extrinsics::<ServerRequest<Block, ProtoClient::Request>>(
.resolve_extrinsics::<ServerMessage<Block, ProtoClient::Request>>(
protocol_response,
&network_peer_handle,
)
Expand All @@ -197,18 +199,47 @@ where
};

// Assemble the final response
let downloaded = vec![initial_response.partial_block.block_data(body)].encode();
tracing::debug!(
let downloaded = vec![initial_response.partial_block.block_data(body)];
debug!(
target: LOG_TARGET,
block_hash = ?initial_response.block_hash,
download_len = %downloaded.len(),
download_bytes = %downloaded.encoded_size(),
%local_miss,
duration = ?start_ts.elapsed(),
"block_download",
);
Ok(downloaded)
}

/// Downloads the requested blocks from the peer, without using the relay protocol.
async fn full_download(
&self,
who: PeerId,
request: BlockRequest<Block>,
) -> Result<Vec<BlockData<Block>>, RelayError> {
let start_ts = Instant::now();
let network_peer_handle = self
.network
.network_peer_handle(self.protocol_name.clone(), who)?;

let server_request: ServerMessage<Block, ProtoClient::Request> =
ServerMessage::FullDownloadRequest(FullDownloadRequest(request.clone()));
let full_response = network_peer_handle
.request::<_, FullDownloadResponse<Block>>(server_request)
.await?;
let downloaded = full_response.0;

debug!(
target: LOG_TARGET,
?request,
download_blocks = %downloaded.len(),
download_bytes = %downloaded.encoded_size(),
duration = ?start_ts.elapsed(),
"full_download",
);
Ok(downloaded)
}

/// Resolves the extrinsics from the initial response
async fn resolve_extrinsics<Request>(
&self,
Expand All @@ -228,7 +259,7 @@ where
.map(|entry| {
let encoded = entry.protocol_unit.encode();
if !entry.locally_resolved {
tracing::trace!(
trace!(
target: LOG_TARGET,
?block_hash,
tx_hash = ?entry.protocol_unit_id,
Expand All @@ -242,36 +273,6 @@ where
.collect();
Ok((extrinsics, local_miss))
}

/// Downloads the requested blocks from the peer, without using the relay protocol.
async fn full_download(
&self,
who: PeerId,
request: BlockRequest<Block>,
) -> Result<Vec<u8>, RelayError> {
let start_ts = Instant::now();
let network_peer_handle = self
.network
.network_peer_handle(self.protocol_name.clone(), who)?;

let server_request: ServerRequest<Block, ProtoClient::Request> =
ServerRequest::FullDownload(FullDownloadRequest(request.clone()));
let full_response = network_peer_handle
.request::<_, FullDownloadResponse<Block>>(server_request)
.await?;
let downloaded_blocks = full_response.0.len();
let downloaded = full_response.0.encode();

tracing::debug!(
target: LOG_TARGET,
?request,
%downloaded_blocks,
download_len = %downloaded.len(),
duration = ?start_ts.elapsed(),
"full_download",
);
Ok(downloaded)
}
}

#[async_trait]
Expand All @@ -294,9 +295,9 @@ where
self.download(who, request.clone()).await
};
match ret {
Ok(downloaded) => Ok(Ok(downloaded)),
Ok(blocks) => Ok(Ok(blocks.encode())),
Err(error) => {
tracing::debug!(
debug!(
target: LOG_TARGET,
peer=?who,
?request,
Expand Down Expand Up @@ -364,11 +365,11 @@ where
payload,
pending_response,
} = request;
let server_msg: ServerRequest<Block, ProtoServer::Request> =
let server_msg: ServerMessage<Block, ProtoServer::Request> =
match Decode::decode(&mut payload.as_ref()) {
Ok(msg) => msg,
Err(err) => {
tracing::warn!(
warn!(
target: LOG_TARGET,
?peer,
?err,
Expand All @@ -379,22 +380,22 @@ where
};

let ret = match server_msg {
ServerRequest::Initial(req) => self.on_initial_request(req),
ServerRequest::Protocol(req) => self.on_protocol_request(req),
ServerRequest::FullDownload(req) => self.on_full_download_request(req),
ServerMessage::InitialRequest(req) => self.on_initial_request(req),
ServerMessage::ProtocolRequest(req) => self.on_protocol_request(req),
ServerMessage::FullDownloadRequest(req) => self.on_full_download_request(req),
};

match ret {
Ok(response) => {
self.send_response(peer, response, pending_response);
tracing::trace!(
trace!(
target: LOG_TARGET,
?peer,
"server: request processed from"
);
}
Err(error) => {
tracing::debug!(
debug!(
target: LOG_TARGET,
?peer,
?error,
Expand Down Expand Up @@ -566,7 +567,7 @@ where
sent_feedback: None,
};
if sender.send(response).is_err() {
tracing::warn!(
warn!(
target: LOG_TARGET,
?peer,
"Failed to send response"
Expand All @@ -584,7 +585,7 @@ where
ProtoServer: ProtocolServer<BlockHash<Block>> + Send,
{
async fn run(&mut self) {
tracing::info!(
info!(
target: LOG_TARGET,
"relay::consensus block server: starting"
);
Expand Down Expand Up @@ -681,7 +682,7 @@ where
return Ok(Some(extrinsic));
}
}
tracing::trace!(
trace!(
target: LOG_TARGET,
?tx_hash,
?block_hash,
Expand Down

0 comments on commit 602170f

Please sign in to comment.