Skip to content

Commit

Permalink
Server changes
Browse files Browse the repository at this point in the history
  • Loading branch information
rahulksnv committed Aug 28, 2023
1 parent a32be33 commit ab23f54
Showing 1 changed file with 158 additions and 70 deletions.
228 changes: 158 additions & 70 deletions crates/sc-subspace-block-relay/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,20 @@ use sc_client_api::{BlockBackend, HeaderBackend};
use sc_network::request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig};
use sc_network::types::ProtocolName;
use sc_network::{OutboundFailure, PeerId, RequestFailure};
use sc_network_common::sync::message::{BlockAttributes, BlockData, BlockRequest, FromBlock};
use sc_network_common::sync::message::{
BlockAttributes, BlockData, BlockRequest, Direction, FromBlock,
};
use sc_network_sync::block_relay_protocol::{
BlockDownloader, BlockRelayParams, BlockResponseError, BlockServer,
};
use sc_service::SpawnTaskHandle;
use sc_transaction_pool_api::{InPoolTransaction, TransactionPool, TxHash};
use sp_runtime::generic::BlockId;
use sp_runtime::traits::{Block as BlockT, Header};
use sp_runtime::traits::{Block as BlockT, Header, NumberFor, One, Zero};
use sp_runtime::Justifications;
use std::num::NonZeroUsize;
use std::num::{NonZeroU32, NonZeroUsize};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tracing::{debug, info, trace, warn};

type BlockHash<Block> = <Block as BlockT>::Hash;
type BlockHeader<Block> = <Block as BlockT>::Header;
Expand All @@ -37,6 +38,12 @@ const SYNC_PROTOCOL: &str = "/subspace/consensus-block-relay/1";
const NUM_PEER_HINT: NonZeroUsize = NonZeroUsize::new(100).expect("Not zero; qed");
const TRANSACTION_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(512).expect("Not zero; qed");

/// These are the same limits used by substrate block handler.
/// Maximum response size (bytes).
const MAX_RESPONSE_SIZE: NonZeroUsize = NonZeroUsize::new(8 * 1024 * 1024).expect("Not zero; qed");
/// Maximum blocks in the response.
const MAX_RESPONSE_BLOCKS: NonZeroU32 = NonZeroU32::new(128).expect("Not zero; qed");

/// Initial request for a single block.
#[derive(Encode, Decode)]
struct InitialRequest<Block: BlockT, ProtocolRequest> {
Expand Down Expand Up @@ -80,15 +87,34 @@ struct FullDownloadRequest<Block: BlockT>(BlockRequest<Block>);
struct FullDownloadResponse<Block: BlockT>(Vec<BlockData<Block>>);

/// The partial block response from the server. It has all the fields
/// except the extrinsics. The extrinsics are handled by the protocol
/// except the extrinsics(and some other meta info). The extrinsics
/// are handled by the protocol.
#[derive(Encode, Decode)]
struct PartialBlock<Block: BlockT> {
hash: BlockHash<Block>,
parent_hash: BlockHash<Block>,
block_number: NumberFor<Block>,
block_header_hash: BlockHash<Block>,
header: Option<BlockHeader<Block>>,
indexed_body: Option<Vec<Vec<u8>>>,
justifications: Option<Justifications>,
}

impl<Block: BlockT> PartialBlock<Block> {
/// Builds the full block data.
fn block_data(self, body: Option<Vec<Extrinsic<Block>>>) -> BlockData<Block> {
BlockData::<Block> {
hash: self.block_header_hash,
header: self.header,
body,
indexed_body: self.indexed_body,
receipt: None,
message_queue: None,
justification: None,
justifications: self.justifications,
}
}
}

/// The message to the server
#[derive(Encode, Decode)]
enum ServerRequest<Block: BlockT, ProtocolRequest> {
Expand Down Expand Up @@ -171,24 +197,14 @@ where
};

// Assemble the final response
let downloaded = vec![BlockData::<Block> {
hash: initial_response.partial_block.hash,
header: initial_response.partial_block.header,
body,
indexed_body: initial_response.partial_block.indexed_body,
receipt: None,
message_queue: None,
justification: None,
justifications: initial_response.partial_block.justifications,
}]
.encode();
trace!(
let downloaded = vec![initial_response.partial_block.block_data(body)].encode();
tracing::debug!(
target: LOG_TARGET,
"relay::download: {:?} => {},{},{:?}",
initial_response.block_hash,
downloaded.len(),
local_miss,
start_ts.elapsed()
block_hash = ?initial_response.block_hash,
download_len = %downloaded.len(),
%local_miss,
duration = ?start_ts.elapsed(),
"block_download",
);
Ok(downloaded)
}
Expand All @@ -212,12 +228,12 @@ where
.map(|entry| {
let encoded = entry.protocol_unit.encode();
if !entry.locally_resolved {
trace!(
tracing::trace!(
target: LOG_TARGET,
"relay::download: local miss: {block_hash:?}/{:?}, \
size = {}",
entry.protocol_unit_id,
encoded.len()
?block_hash,
tx_hash = ?entry.protocol_unit_id,
tx_size = %encoded.len(),
"resolve_extrinsics: local miss"
);
local_miss += encoded.len();
}
Expand Down Expand Up @@ -246,11 +262,13 @@ where
let downloaded_blocks = full_response.0.len();
let downloaded = full_response.0.encode();

trace!(
tracing::debug!(
target: LOG_TARGET,
"relay::full_download: {request:?},{downloaded_blocks},{},{:?}",
downloaded.len(),
start_ts.elapsed()
?request,
%downloaded_blocks,
download_len = %downloaded.len(),
duration = ?start_ts.elapsed(),
"full_download",
);
Ok(downloaded)
}
Expand Down Expand Up @@ -278,9 +296,12 @@ where
match ret {
Ok(downloaded) => Ok(Ok(downloaded)),
Err(error) => {
debug!(
tracing::debug!(
target: LOG_TARGET,
"relay::download_block: error: {who:?}/{request:?}/{error:?}"
peer=?who,
?request,
?error,
"download_block failed"
);
match error {
RelayError::RequestResponse(error) => match error {
Expand Down Expand Up @@ -347,9 +368,11 @@ where
match Decode::decode(&mut payload.as_ref()) {
Ok(msg) => msg,
Err(err) => {
warn!(
tracing::warn!(
target: LOG_TARGET,
"relay::on_request: decode incoming: {peer}: {err:?}"
?peer,
?err,
"Decode failed"
);
return;
}
Expand All @@ -358,21 +381,24 @@ where
let ret = match server_msg {
ServerRequest::Initial(req) => self.on_initial_request(req),
ServerRequest::Protocol(req) => self.on_protocol_request(req),
_ => return,
ServerRequest::FullDownload(req) => self.on_full_download_request(req),
};

match ret {
Ok(response) => {
self.send_response(peer, response, pending_response);
trace!(
tracing::trace!(
target: LOG_TARGET,
"relay::consensus server: request processed from: {peer}"
?peer,
"server: request processed from"
);
}
Err(error) => {
debug!(
tracing::debug!(
target: LOG_TARGET,
"relay::consensus server: error: {peer}/{error:?}"
?peer,
?error,
"Server error"
);
}
}
Expand Down Expand Up @@ -414,6 +440,59 @@ where
Ok(response.encode())
}

/// Handles the full download request from the client
fn on_full_download_request(
&mut self,
full_download_request: FullDownloadRequest<Block>,
) -> Result<Vec<u8>, RelayError> {
let block_request = full_download_request.0;
let mut blocks = Vec::new();
let mut block_id = match block_request.from {
FromBlock::Hash(h) => BlockId::<Block>::Hash(h),
FromBlock::Number(n) => BlockId::<Block>::Number(n),
};

let mut total_size: usize = 0;
let max_blocks = block_request.max.map_or(MAX_RESPONSE_BLOCKS.into(), |val| {
std::cmp::min(val, MAX_RESPONSE_BLOCKS.into())
});
while blocks.len() < max_blocks as usize {
let block_hash = self.block_hash(&block_id)?;
let partial_block = self.get_partial_block(&block_hash, block_request.fields)?;
let body = if block_request.fields.contains(BlockAttributes::BODY) {
Some(block_transactions(&block_hash, self.client.as_ref())?)
} else {
None
};

// Enforce the max limit on response size.
let mut bytes = body.as_ref().map_or(0, |extrinsics| {
extrinsics.iter().map(|ext| ext.encoded_size()).sum()
});
bytes += partial_block
.indexed_body
.as_ref()
.map_or(0, |entries| entries.iter().map(|entry| entry.len()).sum());
if !blocks.is_empty() && (total_size + bytes) > MAX_RESPONSE_SIZE.into() {
break;
}
total_size += bytes;

block_id = match block_request.direction {
Direction::Ascending => BlockId::Number(partial_block.block_number + One::one()),
Direction::Descending => {
if partial_block.block_number.is_zero() {
break;
}
BlockId::Hash(partial_block.parent_hash)
}
};
blocks.push(partial_block.block_data(body));
}

Ok(blocks.encode())
}

/// Builds the partial block response
fn get_partial_block(
&self,
Expand All @@ -429,7 +508,9 @@ where
}
Err(err) => return Err(RelayError::BlockHeader(format!("{block_hash:?}, {err:?}"))),
};
let hash = block_header.hash();
let parent_hash = *block_header.parent_hash();
let block_number = *block_header.number();
let block_header_hash = block_header.hash();

let header = if block_attributes.contains(BlockAttributes::HEADER) {
Some(block_header)
Expand All @@ -454,7 +535,9 @@ where
};

Ok(PartialBlock {
hash,
parent_hash,
block_number,
block_header_hash,
header,
indexed_body,
justifications,
Expand Down Expand Up @@ -483,9 +566,10 @@ where
sent_feedback: None,
};
if sender.send(response).is_err() {
warn!(
tracing::warn!(
target: LOG_TARGET,
"relay::send_response: failed to send to {peer}"
?peer,
"Failed to send response"
);
}
}
Expand All @@ -500,7 +584,7 @@ where
ProtoServer: ProtocolServer<BlockHash<Block>> + Send,
{
async fn run(&mut self) {
info!(
tracing::info!(
target: LOG_TARGET,
"relay::consensus block server: starting"
);
Expand Down Expand Up @@ -538,10 +622,6 @@ where
transaction_cache
.lock()
.put(hash.clone(), transaction.data().clone());
trace!(
target: LOG_TARGET,
"relay::backend: received import notification: {hash:?}"
);
}
}
})
Expand All @@ -559,10 +639,6 @@ where
self.transaction_cache
.lock()
.put(tx_hash.clone(), extrinsic.clone());
trace!(
target: LOG_TARGET,
"relay::backend: updated cache: {tx_hash:?}"
);
}
}

Expand All @@ -577,20 +653,11 @@ where
&self,
block_hash: &BlockHash<Block>,
) -> Result<Vec<(TxHash<Pool>, Extrinsic<Block>)>, RelayError> {
let maybe_extrinsics = self
.client
.block_body(*block_hash)
.map_err(|err| RelayError::BlockBody(format!("{block_hash:?}, {err:?}")))?;
if let Some(extrinsics) = maybe_extrinsics {
Ok(extrinsics
.into_iter()
.map(|extrinsic| (self.transaction_pool.hash_of(&extrinsic), extrinsic))
.collect())
} else {
Err(RelayError::BlockExtrinsicsNotFound(format!(
"{block_hash:?}"
)))
}
let tx = block_transactions(block_hash, self.client.as_ref())?;
Ok(tx
.into_iter()
.map(|extrinsic| (self.transaction_pool.hash_of(&extrinsic), extrinsic))
.collect())
}

fn protocol_unit(
Expand All @@ -614,9 +681,12 @@ where
return Ok(Some(extrinsic));
}
}
trace!(
tracing::trace!(
target: LOG_TARGET,
"relay::protocol_unit: {tx_hash:?} not found in {block_hash:?}/{len}",
?tx_hash,
?block_hash,
%len,
"protocol_unit",
);
}
}
Expand All @@ -633,6 +703,24 @@ where
}
}

/// Retrieves the block transactions/tx hash from the backend.
fn block_transactions<Block, Client>(
block_hash: &BlockHash<Block>,
client: &Client,
) -> Result<Vec<Extrinsic<Block>>, RelayError>
where
Block: BlockT,
Client: HeaderBackend<Block> + BlockBackend<Block>,
{
let maybe_extrinsics = client
.block_body(*block_hash)
.map_err(|err| RelayError::BlockBody(format!("{block_hash:?}, {err:?}")))?;
maybe_extrinsics.ok_or(RelayError::BlockExtrinsicsNotFound(format!(
"{block_hash:?}"
)))
}

/// Sets up the relay components.
pub fn build_consensus_relay<Block, Client, Pool>(
network: Arc<NetworkWrapper>,
client: Arc<Client>,
Expand Down

0 comments on commit ab23f54

Please sign in to comment.