Skip to content

Commit

Permalink
Sync: validate block responses for required data (paritytech#5052)
Browse files Browse the repository at this point in the history
* Less verbose state-db logging

* Validate block responses for block bodies

* Update client/network/src/protocol.rs

Co-Authored-By: Bastian Köcher <[email protected]>

* Added validation test

* Disconnect on missing header as well

* Typo

Co-Authored-By: André Silva <[email protected]>

Co-authored-by: Bastian Köcher <[email protected]>
Co-authored-by: André Silva <[email protected]>
  • Loading branch information
3 people authored and General-Beck committed Mar 4, 2020
1 parent 9fb3bba commit 2d52e62
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 24 deletions.
31 changes: 27 additions & 4 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ mod rep {
pub const BAD_PROTOCOL: Rep = Rep::new_fatal("Unsupported protocol");
/// Peer role does not match (e.g. light peer connecting to another light peer).
pub const BAD_ROLE: Rep = Rep::new_fatal("Unsupported role");
/// Peer response data does not have requested bits.
pub const BAD_RESPONSE: Rep = Rep::new(-(1 << 12), "Incomplete response");
}

// Lock must always be taken in order declared here.
Expand Down Expand Up @@ -701,12 +703,14 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
peer: PeerId,
request: message::BlockRequest<B>
) {
trace!(target: "sync", "BlockRequest {} from {}: from {:?} to {:?} max {:?}",
trace!(target: "sync", "BlockRequest {} from {}: from {:?} to {:?} max {:?} for {:?}",
request.id,
peer,
request.from,
request.to,
request.max);
request.max,
request.fields,
);

// sending block requests to the node that is unable to serve it is considered a bad behavior
if !self.config.roles.is_full() {
Expand Down Expand Up @@ -754,6 +758,11 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
message_queue: None,
justification,
};
// Stop if we don't have requested block body
if get_body && block_data.body.is_none() {
trace!(target: "sync", "Missing data for block request.");
break;
}
blocks.push(block_data);
match request.direction {
message::Direction::Ascending => id = BlockId::Number(number + One::one()),
Expand Down Expand Up @@ -784,7 +793,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
request: message::BlockRequest<B>,
response: message::BlockResponse<B>,
) -> CustomMessageOutcome<B> {
let blocks_range = match (
let blocks_range = || match (
response.blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())),
response.blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())),
) {
Expand All @@ -796,7 +805,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
response.id,
peer,
response.blocks.len(),
blocks_range
blocks_range(),
);

if request.fields == message::BlockAttributes::JUSTIFICATION {
Expand All @@ -811,6 +820,20 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
}
}
} else {
// Validate fields against the request.
if request.fields.contains(message::BlockAttributes::HEADER) && response.blocks.iter().any(|b| b.header.is_none()) {
self.behaviour.disconnect_peer(&peer);
self.peerset_handle.report_peer(peer, rep::BAD_RESPONSE);
trace!(target: "sync", "Missing header for a block");
return CustomMessageOutcome::None
}
if request.fields.contains(message::BlockAttributes::BODY) && response.blocks.iter().any(|b| b.body.is_none()) {
self.behaviour.disconnect_peer(&peer);
self.peerset_handle.report_peer(peer, rep::BAD_RESPONSE);
trace!(target: "sync", "Missing body for a block");
return CustomMessageOutcome::None
}

match self.sync.on_block_data(peer, Some(request), response) {
Ok(sync::OnBlockData::Import(origin, blocks)) =>
CustomMessageOutcome::BlockImport(origin, blocks),
Expand Down
2 changes: 1 addition & 1 deletion client/network/src/protocol/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,7 @@ impl<B: BlockT> ChainSync<B> {
| PeerSyncState::DownloadingFinalityProof(..) => Vec::new()
}
} else {
// When request.is_none() just accept blocks
// When request.is_none() this is a block announcement. Just accept blocks.
blocks.into_iter().map(|b| {
IncomingBlock {
hash: b.hash,
Expand Down
71 changes: 57 additions & 14 deletions client/network/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ impl<D> Peer<D> {
where F: FnMut(BlockBuilder<Block, PeersFullClient, substrate_test_runtime_client::Backend>) -> Block
{
let best_hash = self.client.info().best_hash;
self.generate_blocks_at(BlockId::Hash(best_hash), count, origin, edit_block)
self.generate_blocks_at(BlockId::Hash(best_hash), count, origin, edit_block, false)
}

/// Add blocks to the peer -- edit the block before adding. The chain will
Expand All @@ -247,7 +247,8 @@ impl<D> Peer<D> {
at: BlockId<Block>,
count: usize,
origin: BlockOrigin,
mut edit_block: F
mut edit_block: F,
headers_only: bool,
) -> H256 where F: FnMut(BlockBuilder<Block, PeersFullClient, substrate_test_runtime_client::Backend>) -> Block {
let full_client = self.client.as_full()
.expect("blocks could only be generated by full clients");
Expand All @@ -272,7 +273,7 @@ impl<D> Peer<D> {
origin,
header.clone(),
None,
Some(block.extrinsics)
if headers_only { None } else { Some(block.extrinsics) },
).unwrap();
let cache = if let Some(cache) = cache {
cache.into_iter().collect()
Expand All @@ -294,28 +295,46 @@ impl<D> Peer<D> {
self.push_blocks_at(BlockId::Hash(best_hash), count, with_tx)
}

/// Push blocks to the peer (simplified: with or without a TX)
pub fn push_headers(&mut self, count: usize) -> H256 {
let best_hash = self.client.info().best_hash;
self.generate_tx_blocks_at(BlockId::Hash(best_hash), count, false, true)
}

/// Push blocks to the peer (simplified: with or without a TX) starting from
/// given hash.
pub fn push_blocks_at(&mut self, at: BlockId<Block>, count: usize, with_tx: bool) -> H256 {
self.generate_tx_blocks_at(at, count, with_tx, false)
}

/// Push blocks/headers to the peer (simplified: with or without a TX) starting from
/// given hash.
fn generate_tx_blocks_at(&mut self, at: BlockId<Block>, count: usize, with_tx: bool, headers_only:bool) -> H256 {
let mut nonce = 0;
if with_tx {
self.generate_blocks_at(at, count, BlockOrigin::File, |mut builder| {
let transfer = Transfer {
from: AccountKeyring::Alice.into(),
to: AccountKeyring::Alice.into(),
amount: 1,
nonce,
};
builder.push(transfer.into_signed_tx()).unwrap();
nonce = nonce + 1;
builder.build().unwrap().block
})
self.generate_blocks_at(
at,
count,
BlockOrigin::File, |mut builder| {
let transfer = Transfer {
from: AccountKeyring::Alice.into(),
to: AccountKeyring::Alice.into(),
amount: 1,
nonce,
};
builder.push(transfer.into_signed_tx()).unwrap();
nonce = nonce + 1;
builder.build().unwrap().block
},
headers_only
)
} else {
self.generate_blocks_at(
at,
count,
BlockOrigin::File,
|builder| builder.build().unwrap().block,
headers_only,
)
}
}
Expand Down Expand Up @@ -748,13 +767,37 @@ pub trait TestNetFactory: Sized {
Async::Ready(())
}

/// Polls the testnet until theres' no activiy of any kind.
///
/// Must be executed in a task context.
fn poll_until_idle(&mut self) -> Async<()> {
self.poll();

for peer in self.peers().iter() {
if peer.is_major_syncing() || peer.network.num_queued_blocks() != 0 {
return Async::NotReady
}
if peer.network.num_sync_requests() != 0 {
return Async::NotReady
}
}
Async::Ready(())
}

/// Blocks the current thread until we are sync'ed.
///
/// Calls `poll_until_sync` repeatedly with the runtime passed as parameter.
fn block_until_sync(&mut self, runtime: &mut tokio::runtime::current_thread::Runtime) {
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| Ok(self.poll_until_sync()))).unwrap();
}

/// Blocks the current thread until there are no pending packets.
///
/// Calls `poll_until_idle` repeatedly with the runtime passed as parameter.
fn block_until_idle(&mut self, runtime: &mut tokio::runtime::current_thread::Runtime) {
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| Ok(self.poll_until_idle()))).unwrap();
}

/// Polls the testnet. Processes all the pending actions and returns `NotReady`.
fn poll(&mut self) {
self.mut_peers(|peers| {
Expand Down
21 changes: 21 additions & 0 deletions client/network/test/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -660,3 +660,24 @@ fn does_not_sync_announced_old_best_block() {
})).unwrap();
assert!(!net.peer(1).is_major_syncing());
}

#[test]
fn full_sync_requires_block_body() {
// Check that we don't sync headers-only in full mode.
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(2);

net.peer(0).push_headers(1);
// Wait for nodes to connect
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
if net.peer(0).num_peers() == 0 || net.peer(1).num_peers() == 0 {
Ok(Async::NotReady)
} else {
Ok(Async::Ready(()))
}
})).unwrap();
net.block_until_idle(&mut runtime);
assert_eq!(net.peer(1).client.info().best_number, 0);
}
6 changes: 3 additions & 3 deletions client/state-db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ impl<BlockHash: Hash, Key: Hash> StateDbSync<BlockHash, Key> {
{
let refs = self.pinned.entry(hash.clone()).or_default();
if *refs == 0 {
trace!(target: "state-db", "Pinned block: {:?}", hash);
trace!(target: "state-db-pin", "Pinned block: {:?}", hash);
self.non_canonical.pin(hash);
}
*refs += 1;
Expand All @@ -357,11 +357,11 @@ impl<BlockHash: Hash, Key: Hash> StateDbSync<BlockHash, Key> {
Entry::Occupied(mut entry) => {
*entry.get_mut() -= 1;
if *entry.get() == 0 {
trace!(target: "state-db", "Unpinned block: {:?}", hash);
trace!(target: "state-db-pin", "Unpinned block: {:?}", hash);
entry.remove();
self.non_canonical.unpin(hash);
} else {
trace!(target: "state-db", "Releasing reference for {:?}", hash);
trace!(target: "state-db-pin", "Releasing reference for {:?}", hash);
}
},
Entry::Vacant(_) => {},
Expand Down
4 changes: 2 additions & 2 deletions client/state-db/src/noncanonical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
while let Some(hash) = parent {
let refs = self.pinned.entry(hash.clone()).or_default();
if *refs == 0 {
trace!(target: "state-db", "Pinned non-canon block: {:?}", hash);
trace!(target: "state-db-pin", "Pinned non-canon block: {:?}", hash);
}
*refs += 1;
parent = self.parents.get(hash);
Expand All @@ -455,7 +455,7 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
if *entry.get() == 0 {
entry.remove();
if let Some(inserted) = self.pinned_insertions.remove(&hash) {
trace!(target: "state-db", "Discarding unpinned non-canon block: {:?}", hash);
trace!(target: "state-db-pin", "Discarding unpinned non-canon block: {:?}", hash);
discard_values(&mut self.values, inserted);
self.parents.remove(&hash);
}
Expand Down

0 comments on commit 2d52e62

Please sign in to comment.