Skip to content

Commit

Permalink
identify: Replace FuturesUnordered with FuturesStream (#302)
Browse files Browse the repository at this point in the history
This PR replaces the identify `FuturesUnordered` with `FuturesStream`.
This effectively fixes delays in processing outbound events.
- ensure that identify warns if the transport service is closed
(produces no events).
- identify no longer exits on pending outbound events

Related to:
- #287
- #300

cc @paritytech/networking

---------

Signed-off-by: Alexandru Vasile <[email protected]>
Co-authored-by: Dmitry Markin <[email protected]>
  • Loading branch information
lexnv and dmitry-markin authored Dec 11, 2024
1 parent 4df9f14 commit e9a009f
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 7 deletions.
18 changes: 11 additions & 7 deletions src/protocol/libp2p/identify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ use crate::{
substream::Substream,
transport::Endpoint,
types::{protocol::ProtocolName, SubstreamId},
utils::futures_stream::FuturesStream,
PeerId, DEFAULT_CHANNEL_SIZE,
};

use futures::{future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt};
use futures::{future::BoxFuture, Stream, StreamExt};
use multiaddr::Multiaddr;
use prost::Message;
use tokio::sync::mpsc::{channel, Sender};
Expand Down Expand Up @@ -181,10 +182,10 @@ pub(crate) struct Identify {
protocols: Vec<String>,

/// Pending outbound substreams.
pending_outbound: FuturesUnordered<BoxFuture<'static, crate::Result<IdentifyResponse>>>,
pending_outbound: FuturesStream<BoxFuture<'static, crate::Result<IdentifyResponse>>>,

/// Pending inbound substreams.
pending_inbound: FuturesUnordered<BoxFuture<'static, ()>>,
pending_inbound: FuturesStream<BoxFuture<'static, ()>>,
}

impl Identify {
Expand All @@ -197,8 +198,8 @@ impl Identify {
public: config.public.expect("public key to be supplied"),
protocol_version: config.protocol_version,
user_agent: config.user_agent.unwrap_or(DEFAULT_AGENT.to_string()),
pending_inbound: FuturesUnordered::new(),
pending_outbound: FuturesUnordered::new(),
pending_inbound: FuturesStream::new(),
pending_outbound: FuturesStream::new(),
protocols: config.protocols.iter().map(|protocol| protocol.to_string()).collect(),
}
}
Expand Down Expand Up @@ -356,7 +357,10 @@ impl Identify {
loop {
tokio::select! {
event = self.service.next() => match event {
None => return,
None => {
tracing::warn!(target: LOG_TARGET, "transport service stream ended, terminating identify event loop");
return
},
Some(TransportEvent::ConnectionEstablished { peer, endpoint }) => {
let _ = self.on_connection_established(peer, endpoint);
}
Expand Down Expand Up @@ -390,7 +394,7 @@ impl Identify {
.await;
}
Some(Err(error)) => tracing::debug!(target: LOG_TARGET, ?error, "failed to read ipfs identify response"),
None => return,
None => {}
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions src/utils/futures_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ impl<F> FuturesStream<F> {
self.futures.len()
}

/// Check if the stream is empty.
pub fn is_empty(&self) -> bool {
self.futures.is_empty()
}

/// Push a future for processing.
pub fn push(&mut self, future: F) {
self.futures.push(future);
Expand Down

0 comments on commit e9a009f

Please sign in to comment.