Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Make sure we poll authority event stream until all events are process…
Browse files Browse the repository at this point in the history
…ed (#5608)

* Make sure we poll authority event stream until all events are processed

* Add test
  • Loading branch information
bkchr authored Apr 11, 2020
1 parent 18f0be7 commit 4af60fd
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 16 deletions.
24 changes: 11 additions & 13 deletions client/authority-discovery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,21 +538,19 @@ where
Ok(())
};

match inner() {
Ok(()) => {}

// Handle fatal errors.
//
// Given that the network likely terminated authority discovery should do the same.
Err(Error::DhtEventStreamTerminated) => return Poll::Ready(()),
loop {
match inner() {
Ok(()) => return Poll::Pending,

// Handle non-fatal errors.
Err(e) => error!(target: "sub-authority-discovery", "Poll failure: {:?}", e),
};
// Handle fatal errors.
//
// Given that the network likely terminated authority discovery should do the same.
Err(Error::DhtEventStreamTerminated) => return Poll::Ready(()),

// Return Poll::Pending as this is a long running task with the same lifetime as the node
// itself.
Poll::Pending
// Handle non-fatal errors.
Err(e) => error!(target: "sub-authority-discovery", "Poll failure: {:?}", e),
};
}
}
}

Expand Down
78 changes: 75 additions & 3 deletions client/authority-discovery/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
use std::{iter::FromIterator, sync::{Arc, Mutex}};

use futures::channel::mpsc::channel;
use futures::executor::block_on;
use futures::future::poll_fn;
use futures::executor::{block_on, LocalPool};
use futures::future::{poll_fn, FutureExt};
use futures::sink::SinkExt;
use futures::task::LocalSpawn;
use futures::poll;
use libp2p::{kad, PeerId};

Expand Down Expand Up @@ -319,7 +321,7 @@ fn handle_dht_events_with_value_found_should_call_set_priority_group() {
let mut signed_addresses = vec![];
schema::SignedAuthorityAddresses {
addresses: serialized_addresses,
signature: signature,
signature,
}
.encode(&mut signed_addresses)
.unwrap();
Expand Down Expand Up @@ -380,3 +382,73 @@ fn terminate_when_event_stream_terminates() {
);
});
}

#[test]
fn dont_stop_polling_when_error_is_returned() {
#[derive(PartialEq, Debug)]
enum Event {
Processed,
End,
};

let (mut dht_event_tx, dht_event_rx) = channel(1000);
let (mut discovery_update_tx, mut discovery_update_rx) = channel(1000);
let network: Arc<TestNetwork> = Arc::new(Default::default());
let key_store = KeyStore::new();
let test_api = Arc::new(TestApi {
authorities: vec![],
});
let mut pool = LocalPool::new();

let mut authority_discovery = AuthorityDiscovery::new(
test_api,
network.clone(),
vec![],
key_store,
dht_event_rx.boxed(),
None,
);

// Spawn the authority discovery to make sure it is polled independently.
//
// As this is a local pool, only one future at a time will have the CPU and
// can make progress until the future returns `Pending`.
pool.spawner().spawn_local_obj(
futures::future::poll_fn(move |ctx| {
match std::pin::Pin::new(&mut authority_discovery).poll(ctx) {
Poll::Ready(()) => {},
Poll::Pending => {
discovery_update_tx.send(Event::Processed).now_or_never();
return Poll::Pending;
},
}
let _ = discovery_update_tx.send(Event::End).now_or_never().unwrap();
Poll::Ready(())
}).boxed_local().into(),
).expect("Spawns authority discovery");

pool.run_until(
// The future that drives the event stream
async {
// Send an event that should generate an error
let _ = dht_event_tx.send(DhtEvent::ValueFound(Default::default())).now_or_never();
// Send the same event again to make sure that the event stream needs to be polled twice
// to be woken up again.
let _ = dht_event_tx.send(DhtEvent::ValueFound(Default::default())).now_or_never();

// Now we call `await` and give the control to the authority discovery future.
assert_eq!(Some(Event::Processed), discovery_update_rx.next().await);

// Drop the event rx to stop the authority discovery. If it was polled correctly, it should
// end properly.
drop(dht_event_tx);

assert!(
discovery_update_rx.collect::<Vec<Event>>()
.await
.into_iter()
.any(|evt| evt == Event::End), "The authority should have ended",
);
}
);
}

0 comments on commit 4af60fd

Please sign in to comment.