From b8484391e72ff425b22f73b27bf6c35d00a948bc Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 11 Nov 2022 15:18:44 +0100 Subject: [PATCH] Remove some multistream-select round-trips (#2984) cc https://github.com/paritytech/smoldot/issues/2983 This PR removes some extra networking round-trips caused by multistream-select. The way multistream-select works is: we tell the remote that we would like to use a specific protocol, and then the remote answers either yes or no. No is only ever returned if the remote doesn't support the protocol at all, which isn't supposed to happen unless we're talking to a buggy node or to a libp2p-compatible-but-not-Substrate node. Before this PR, we wait for the remote to send back yes or no. After this PR, we don't wait and simply start sending the protocol-specific data immediately after the request for a protocol, and assume that the remote is going to answer yes. The negotiation is still properly finished afterwards, so if it turns out that the remote answers no, then we'll still get an error locally. The drawback is that, if the remote answers no, the protocol-specific data that we have eagerly sent will be interpreted as being from the multistream-select protocol, which can lead to confusing decoding errors. However, the trade-off is worth it. I've done this change only for substreams after the connection has been opened. There are two other unnecessary round-trips during the connection opening, but considering that we don't open connections that often they're actually less important. For this reason, I'll leave https://github.com/paritytech/smoldot/issues/2983 open. Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- bin/wasm-node/CHANGELOG.md | 4 + .../connection/established/substream.rs | 644 ++++++++---------- src/libp2p/connection/multistream_select.rs | 14 + .../connection/single_stream_handshake.rs | 1 + 4 files changed, 291 insertions(+), 372 deletions(-) diff --git a/bin/wasm-node/CHANGELOG.md b/bin/wasm-node/CHANGELOG.md index 65e0f49e70..d0aa5f4af1 100644 --- a/bin/wasm-node/CHANGELOG.md +++ b/bin/wasm-node/CHANGELOG.md @@ -2,6 +2,10 @@ ## Unreleased +### Changed + +- Reduced the number of networking round-trips after a connection has been opened by assuming that the remote supports the desired networking protocols instead of waiting for its confirmation. ([#2984](https://github.com/paritytech/smoldot/pull/2984)) + ## 0.7.6 - 2022-11-04 ### Fixed diff --git a/src/libp2p/connection/established/substream.rs b/src/libp2p/connection/established/substream.rs index 32e5cf3559..e26272a3ca 100644 --- a/src/libp2p/connection/established/substream.rs +++ b/src/libp2p/connection/established/substream.rs @@ -52,26 +52,15 @@ enum SubstreamInner { /// ignored. InboundFailed, - /// Negotiating a protocol for a notifications protocol substream. - NotificationsOutNegotiating { - /// When the opening will time out in the absence of response. - timeout: TNow, - /// State of the protocol negotiation. - negotiation: multistream_select::InProgress, String>, - /// Maximum allowed size for the remote's handshake. - max_handshake_size: usize, - /// Bytes of the handshake to send after the substream is open. - handshake_out: Vec, - /// Data passed by the user to [`Substream::notifications_out`]. - user_data: TNotifUd, - }, /// Failure to negotiate an outbound notifications substream. NotificationsOutNegotiationFailed, - /// A notifications protocol has been negotiated on a substream. Either a successful handshake - /// or an abrupt closing is now expected. + /// A notifications protocol is being negotiated or has been negotiated on a substream. Either + /// a successful handshake or an abrupt closing is now expected. NotificationsOutHandshakeRecv { /// When the opening will time out in the absence of response. timeout: TNow, + /// State of the protocol negotiation. `None` if the handshake has already finished. + negotiation: Option, String>>, /// Buffer for the incoming handshake. handshake_in: leb128::FramedInProgress, /// Handshake payload to write out. @@ -131,27 +120,12 @@ enum SubstreamInner { /// An inbound notifications protocol was open, but then the remote closed its writing side. NotificationsInClosed, - /// Negotiating a protocol for an outgoing request. - RequestOutNegotiating { - /// When the request will time out in the absence of response. - timeout: TNow, - /// State of the protocol negotiation. - negotiation: multistream_select::InProgress, String>, - /// Bytes of the request to send after the substream is open. - /// - /// If `None`, nothing should be sent on the substream at all, not even the length prefix. - /// This contrasts with `Some(empty_vec)` where a `0` length prefix must be sent. - request: Option>, - /// Maximum allowed size for the response. - max_response_size: usize, - /// Data passed by the user to [`Substream::request_out`]. - user_data: TRqUd, - }, - /// Outgoing request has been sent out or is queued for send out, and a response from the - /// remote is now expected. Substream has been closed. + /// Outgoing request. RequestOut { /// When the request will time out in the absence of response. timeout: TNow, + /// State of the protocol negotiation. `None` if the negotiation has finished. + negotiation: Option, String>>, /// Request payload to write out. request: VecDeque, /// Data passed by the user to [`Substream::request_out`]. @@ -188,21 +162,6 @@ enum SubstreamInner { payload_out: VecDeque, }, - /// Negotiating a protocol for an outgoing ping substream. - /// - /// Note that the negotiation process doesn't have any timeout. Individual outgoing ping - /// requests *will* time out. - PingOutNegotiating { - /// State of the protocol negotiation. - negotiation: multistream_select::InProgress, String>, - /// Payload of the queued pings that remains to write out. Since the substream is still - /// negotiating, no ping has been sent out, and this is thus always equal to 32 times the - /// number of queued pings. - outgoing_payload: VecDeque, - /// FIFO queue of pings waiting to be answered. For each ping, when the ping will time - /// out, or `None` if the timeout has already occurred. - queued_pings: smallvec::SmallVec<[Option; 1]>, - }, /// Failed to negotiate a protocol for an outgoing ping substream. PingOutFailed { /// FIFO queue of pings that will immediately fail. @@ -210,6 +169,8 @@ enum SubstreamInner { }, /// Outbound ping substream. PingOut { + /// State of the protocol negotiation. `None` if the handshake is already finished. + negotiation: Option, String>>, /// Payload of the queued pings that remains to write out. outgoing_payload: VecDeque, /// Data waiting to be received from the remote. Any mismatch will cause an error. @@ -282,12 +243,19 @@ where requested_protocol, }); + let handshake_out = { + let handshake_len = handshake.len(); + leb128::encode_usize(handshake_len) + .chain(handshake.into_iter()) + .collect::>() + }; + Substream { - inner: SubstreamInner::NotificationsOutNegotiating { + inner: SubstreamInner::NotificationsOutHandshakeRecv { timeout, - negotiation, - max_handshake_size, - handshake_out: handshake, + negotiation: Some(negotiation), + handshake_in: leb128::FramedInProgress::new(max_handshake_size), + handshake_out, user_data, }, } @@ -312,12 +280,21 @@ where requested_protocol, }); + let request_payload = if let Some(request) = request { + let request_len = request.len(); + leb128::encode_usize(request_len) + .chain(request.into_iter()) + .collect::>() + } else { + VecDeque::new() + }; + Substream { - inner: SubstreamInner::RequestOutNegotiating { + inner: SubstreamInner::RequestOut { timeout, - negotiation, - request, - max_response_size, + negotiation: Some(negotiation), + request: request_payload, + response: leb128::FramedInProgress::new(max_response_size), user_data, }, } @@ -341,9 +318,10 @@ where }); Substream { - inner: SubstreamInner::PingOutNegotiating { - negotiation, + inner: SubstreamInner::PingOut { + negotiation: Some(negotiation), outgoing_payload: VecDeque::with_capacity(32), + expected_payload: VecDeque::with_capacity(32), queued_pings: smallvec::SmallVec::new(), }, } @@ -354,7 +332,6 @@ where /// Returns `None` if the substream isn't a request substream. pub fn request_substream_user_data_mut(&mut self) -> Option<&mut TRqUd> { match &mut self.inner { - SubstreamInner::RequestOutNegotiating { user_data, .. } => Some(user_data), SubstreamInner::RequestOut { user_data, .. } => Some(user_data), _ => None, } @@ -365,7 +342,6 @@ where /// Returns `None` if the substream isn't a notifications substream. pub fn notifications_substream_user_data_mut(&mut self) -> Option<&mut TNotifUd> { match &mut self.inner { - SubstreamInner::NotificationsOutNegotiating { user_data, .. } => Some(user_data), SubstreamInner::NotificationsOutHandshakeRecv { user_data, .. } => Some(user_data), SubstreamInner::NotificationsOut { user_data, .. } => Some(user_data), SubstreamInner::NotificationsIn { user_data, .. } => Some(user_data), @@ -428,67 +404,6 @@ where } } - SubstreamInner::NotificationsOutNegotiating { - negotiation, - timeout, - max_handshake_size, - handshake_out, - user_data, - } => { - if timeout < read_write.now { - return ( - Some(SubstreamInner::NotificationsOutNegotiationFailed), - Some(Event::NotificationsOutResult { - result: Err((NotificationsOutErr::Timeout, user_data)), - }), - ); - } - - read_write.wake_up_after(&timeout); - - match negotiation.read_write(read_write) { - Ok(multistream_select::Negotiation::InProgress(nego)) => ( - Some(SubstreamInner::NotificationsOutNegotiating { - negotiation: nego, - timeout, - max_handshake_size, - handshake_out, - user_data, - }), - None, - ), - Ok(multistream_select::Negotiation::Success(_)) => { - let handshake_out = { - let handshake_len = handshake_out.len(); - leb128::encode_usize(handshake_len) - .chain(handshake_out.into_iter()) - .collect::>() - }; - - ( - Some(SubstreamInner::NotificationsOutHandshakeRecv { - timeout, - handshake_in: leb128::FramedInProgress::new(max_handshake_size), - handshake_out, - user_data, - }), - None, - ) - } - Ok(multistream_select::Negotiation::NotAvailable) => ( - Some(SubstreamInner::NotificationsOutNegotiationFailed), - Some(Event::NotificationsOutResult { - result: Err((NotificationsOutErr::ProtocolNotAvailable, user_data)), - }), - ), - Err(err) => ( - None, - Some(Event::NotificationsOutResult { - result: Err((NotificationsOutErr::NegotiationError(err), user_data)), - }), - ), - } - } SubstreamInner::NotificationsOutNegotiationFailed => { // Substream has failed to negotiate a protocol. The substream is expected to // close soon. @@ -505,6 +420,7 @@ where } SubstreamInner::NotificationsOutHandshakeRecv { timeout, + mut negotiation, handshake_in, mut handshake_out, user_data, @@ -520,67 +436,121 @@ where read_write.wake_up_after(&timeout); - read_write.write_from_vec_deque(&mut handshake_out); - - let incoming_buffer = match read_write.incoming_buffer { - Some(buf) => buf, - None => { - return ( - Some(SubstreamInner::NotificationsOutNegotiationFailed), - Some(Event::NotificationsOutResult { - result: Err((NotificationsOutErr::RefusedHandshake, user_data)), - }), - ); + if let Some(extracted_negotiation) = negotiation.take() { + match extracted_negotiation.read_write(read_write) { + Ok(multistream_select::Negotiation::InProgress(nego)) => { + negotiation = Some(nego) + } + Ok(multistream_select::Negotiation::Success(_)) => {} + Ok(multistream_select::Negotiation::NotAvailable) => { + return ( + Some(SubstreamInner::NotificationsOutNegotiationFailed), + Some(Event::NotificationsOutResult { + result: Err(( + NotificationsOutErr::ProtocolNotAvailable, + user_data, + )), + }), + ) + } + Err(err) => { + return ( + None, + Some(Event::NotificationsOutResult { + result: Err(( + NotificationsOutErr::NegotiationError(err), + user_data, + )), + }), + ) + } } - }; + } - // Don't actually process incoming data before handshake is sent out, in order to - // not accidentally perform a state transition. - if !handshake_out.is_empty() { - return ( - Some(SubstreamInner::NotificationsOutHandshakeRecv { - timeout, - handshake_in, - handshake_out, - user_data, - }), - None, - ); + if negotiation + .as_ref() + .map_or(true, |n| n.can_write_protocol_data()) + { + read_write.write_from_vec_deque(&mut handshake_out); } - match handshake_in.update(incoming_buffer) { - Ok((num_read, leb128::Framed::Finished(remote_handshake))) => { - read_write.advance_read(num_read); + if negotiation.is_none() { + let incoming_buffer = match read_write.incoming_buffer { + Some(buf) => buf, + None => { + return ( + Some(SubstreamInner::NotificationsOutNegotiationFailed), + Some(Event::NotificationsOutResult { + result: Err((NotificationsOutErr::RefusedHandshake, user_data)), + }), + ); + } + }; - ( - Some(SubstreamInner::NotificationsOut { - notifications: VecDeque::new(), - user_data, - close_demanded_by_remote: false, - }), - Some(Event::NotificationsOutResult { - result: Ok(remote_handshake), - }), - ) - } - Ok((num_read, leb128::Framed::InProgress(handshake_in))) => { - read_write.advance_read(num_read); - ( + // Don't actually process incoming data before handshake is sent out, in order + // to not accidentally perform a state transition. + if !handshake_out.is_empty() { + return ( Some(SubstreamInner::NotificationsOutHandshakeRecv { timeout, + negotiation, handshake_in, handshake_out, user_data, }), None, - ) + ); } - Err(err) => ( - None, - Some(Event::NotificationsOutResult { - result: Err((NotificationsOutErr::HandshakeRecvError(err), user_data)), + + match handshake_in.update(incoming_buffer) { + Ok((num_read, leb128::Framed::Finished(remote_handshake))) => { + read_write.advance_read(num_read); + + ( + Some(SubstreamInner::NotificationsOut { + notifications: VecDeque::new(), + user_data, + close_demanded_by_remote: false, + }), + Some(Event::NotificationsOutResult { + result: Ok(remote_handshake), + }), + ) + } + Ok((num_read, leb128::Framed::InProgress(handshake_in))) => { + read_write.advance_read(num_read); + ( + Some(SubstreamInner::NotificationsOutHandshakeRecv { + timeout, + negotiation, + handshake_in, + handshake_out, + user_data, + }), + None, + ) + } + Err(err) => ( + None, + Some(Event::NotificationsOutResult { + result: Err(( + NotificationsOutErr::HandshakeRecvError(err), + user_data, + )), + }), + ), + } + } else { + ( + Some(SubstreamInner::NotificationsOutHandshakeRecv { + timeout, + negotiation, + handshake_in, + handshake_out, + user_data, }), - ), + None, + ) } } SubstreamInner::NotificationsOut { @@ -629,78 +599,9 @@ where ) } - SubstreamInner::RequestOutNegotiating { - negotiation, - timeout, - request, - max_response_size, - user_data, - } => { - // Note that this might trigger timeouts for requests whose response is available - // in `incoming_buffer`. This is intentional, as from the perspective of - // `read_write` the response arrived after the timeout. It is the responsibility - // of the user to call `read_write` in an appropriate way for this to not happen. - if timeout < read_write.now { - read_write.close_write_if_empty(); - return ( - None, - Some(Event::Response { - response: Err(RequestError::Timeout), - user_data, - }), - ); - } - read_write.wake_up_after(&timeout); - - match negotiation.read_write(read_write) { - Ok(multistream_select::Negotiation::InProgress(nego)) => ( - Some(SubstreamInner::RequestOutNegotiating { - negotiation: nego, - timeout, - request, - max_response_size, - user_data, - }), - None, - ), - Ok(multistream_select::Negotiation::Success(_)) => { - let request_payload = if let Some(request) = request { - let request_len = request.len(); - leb128::encode_usize(request_len) - .chain(request.into_iter()) - .collect::>() - } else { - VecDeque::new() - }; - - ( - Some(SubstreamInner::RequestOut { - timeout, - request: request_payload, - user_data, - response: leb128::FramedInProgress::new(max_response_size), - }), - None, - ) - } - Ok(multistream_select::Negotiation::NotAvailable) => ( - None, - Some(Event::Response { - user_data, - response: Err(RequestError::ProtocolNotAvailable), - }), - ), - Err(err) => ( - None, - Some(Event::Response { - user_data, - response: Err(RequestError::NegotiationError(err)), - }), - ), - } - } SubstreamInner::RequestOut { timeout, + mut negotiation, mut request, user_data, response, @@ -719,60 +620,105 @@ where }), ); } - read_write.wake_up_after(&timeout); - if request.is_empty() { - read_write.close_write_if_empty(); - } else { - read_write.write_from_vec_deque(&mut request); + if let Some(extracted_nego) = negotiation.take() { + match extracted_nego.read_write(read_write) { + Ok(multistream_select::Negotiation::InProgress(nego)) => { + negotiation = Some(nego) + } + Ok(multistream_select::Negotiation::Success(_)) => {} + Ok(multistream_select::Negotiation::NotAvailable) => { + return ( + None, + Some(Event::Response { + user_data, + response: Err(RequestError::ProtocolNotAvailable), + }), + ) + } + Err(err) => { + return ( + None, + Some(Event::Response { + user_data, + response: Err(RequestError::NegotiationError(err)), + }), + ) + } + } } - let incoming_buffer = match read_write.incoming_buffer { - Some(buf) => buf, - None => { + if negotiation + .as_ref() + .map_or(true, |n| n.can_write_protocol_data()) + { + if request.is_empty() { read_write.close_write_if_empty(); - return ( - None, - Some(Event::Response { - user_data, - response: Err(RequestError::SubstreamClosed), - }), - ); + } else { + read_write.write_from_vec_deque(&mut request); } - }; + } - match response.update(incoming_buffer) { - Ok((num_read, leb128::Framed::Finished(response))) => { - read_write.advance_read(num_read); - read_write.close_write_if_empty(); - ( + if negotiation.is_none() { + let incoming_buffer = match read_write.incoming_buffer { + Some(buf) => buf, + None => { + read_write.close_write_if_empty(); + return ( + None, + Some(Event::Response { + user_data, + response: Err(RequestError::SubstreamClosed), + }), + ); + } + }; + + match response.update(incoming_buffer) { + Ok((num_read, leb128::Framed::Finished(response))) => { + read_write.advance_read(num_read); + read_write.close_write_if_empty(); + ( + None, + Some(Event::Response { + user_data, + response: Ok(response), + }), + ) + } + Ok((num_read, leb128::Framed::InProgress(response))) => { + read_write.advance_read(num_read); + ( + Some(SubstreamInner::RequestOut { + timeout, + negotiation, + request, + user_data, + response, + }), + None, + ) + } + Err(err) => ( None, Some(Event::Response { user_data, - response: Ok(response), + response: Err(RequestError::ResponseLebError(err)), }), - ) - } - Ok((num_read, leb128::Framed::InProgress(response))) => { - read_write.advance_read(num_read); - ( - Some(SubstreamInner::RequestOut { - timeout, - request, - user_data, - response, - }), - None, - ) + ), } - Err(err) => ( - None, - Some(Event::Response { + } else { + ( + Some(SubstreamInner::RequestOut { + timeout, + negotiation, + request, user_data, - response: Err(RequestError::ResponseLebError(err)), + response, }), - ), + None, + ) } } @@ -1016,61 +962,6 @@ where ) } - SubstreamInner::PingOutNegotiating { - negotiation, - mut queued_pings, - mut outgoing_payload, - } => { - for timeout in queued_pings.iter_mut() { - if timeout.as_ref().map_or(false, |t| *t < read_write.now) { - *timeout = None; - return ( - Some(SubstreamInner::PingOutNegotiating { - negotiation, - outgoing_payload, - queued_pings, - }), - Some(Event::PingOutError { - num_pings: NonZeroUsize::new(1).unwrap(), - }), - ); - } - - if let Some(timeout) = timeout { - read_write.wake_up_after(timeout); - } - } - - while queued_pings.get(0).map_or(false, |p| p.is_none()) { - queued_pings.remove(0); - for _ in 0..32 { - outgoing_payload.pop_front(); - } - } - - match negotiation.read_write(read_write) { - Ok(multistream_select::Negotiation::InProgress(nego)) => ( - Some(SubstreamInner::PingOutNegotiating { - negotiation: nego, - outgoing_payload, - queued_pings, - }), - None, - ), - Ok(multistream_select::Negotiation::Success(_)) => ( - Some(SubstreamInner::PingOut { - outgoing_payload: outgoing_payload.clone(), - expected_payload: outgoing_payload, - queued_pings, - }), - None, - ), - Ok(multistream_select::Negotiation::NotAvailable) => { - (Some(SubstreamInner::PingOutFailed { queued_pings }), None) - } - Err(_) => (Some(SubstreamInner::PingOutFailed { queued_pings }), None), - } - } SubstreamInner::PingOutFailed { mut queued_pings } => { read_write.close_write_if_empty(); if !queued_pings.is_empty() { @@ -1086,11 +977,32 @@ where } } SubstreamInner::PingOut { + mut negotiation, mut queued_pings, mut outgoing_payload, mut expected_payload, } => { - read_write.write_from_vec_deque(&mut outgoing_payload); + if let Some(extracted_negotiation) = negotiation.take() { + match extracted_negotiation.read_write(read_write) { + Ok(multistream_select::Negotiation::InProgress(nego)) => { + negotiation = Some(nego) + } + Ok(multistream_select::Negotiation::Success(_)) => {} + Ok(multistream_select::Negotiation::NotAvailable) => { + return (Some(SubstreamInner::PingOutFailed { queued_pings }), None) + } + Err(_) => { + return (Some(SubstreamInner::PingOutFailed { queued_pings }), None) + } + } + } + + if negotiation + .as_ref() + .map_or(true, |n| n.can_write_protocol_data()) + { + read_write.write_from_vec_deque(&mut outgoing_payload); + } // We check the timeouts before checking the incoming data, as otherwise pings // might succeed after their timeout. @@ -1099,6 +1011,7 @@ where *timeout = None; return ( Some(SubstreamInner::PingOut { + negotiation, expected_payload, outgoing_payload, queued_pings, @@ -1114,30 +1027,34 @@ where } } - for actual_byte in read_write.incoming_bytes_iter() { - if expected_payload.pop_front() != Some(actual_byte) { - return (Some(SubstreamInner::PingOutFailed { queued_pings }), None); - } + if negotiation.is_none() { + for actual_byte in read_write.incoming_bytes_iter() { + if expected_payload.pop_front() != Some(actual_byte) { + return (Some(SubstreamInner::PingOutFailed { queued_pings }), None); + } - // When a ping has been fully answered is determined based on the number of - // bytes in `expected_payload`. - if expected_payload.len() % 32 == 0 { - debug_assert!(!queued_pings.is_empty()); // `expected_payload.pop_front()` should have returned `None` above otherwise - if queued_pings.remove(0).is_some() { - return ( - Some(SubstreamInner::PingOut { - expected_payload, - outgoing_payload, - queued_pings, - }), - Some(Event::PingOutSuccess), - ); + // When a ping has been fully answered is determined based on the number of + // bytes in `expected_payload`. + if expected_payload.len() % 32 == 0 { + debug_assert!(!queued_pings.is_empty()); // `expected_payload.pop_front()` should have returned `None` above otherwise + if queued_pings.remove(0).is_some() { + return ( + Some(SubstreamInner::PingOut { + negotiation, + expected_payload, + outgoing_payload, + queued_pings, + }), + Some(Event::PingOutSuccess), + ); + } } } } ( Some(SubstreamInner::PingOut { + negotiation, expected_payload, outgoing_payload, queued_pings, @@ -1153,8 +1070,7 @@ where SubstreamInner::InboundNegotiating(_) => None, SubstreamInner::InboundNegotiatingApiWait => None, SubstreamInner::InboundFailed => None, - SubstreamInner::RequestOutNegotiating { user_data, .. } - | SubstreamInner::RequestOut { user_data, .. } => Some(Event::Response { + SubstreamInner::RequestOut { user_data, .. } => Some(Event::Response { user_data, response: Err(RequestError::SubstreamReset), }), @@ -1165,8 +1081,7 @@ where }), SubstreamInner::NotificationsInRefused => None, SubstreamInner::NotificationsInClosed => None, - SubstreamInner::NotificationsOutNegotiating { user_data, .. } - | SubstreamInner::NotificationsOutHandshakeRecv { user_data, .. } => { + SubstreamInner::NotificationsOutHandshakeRecv { user_data, .. } => { Some(Event::NotificationsOutResult { result: Err((NotificationsOutErr::SubstreamReset, user_data)), }) @@ -1182,7 +1097,6 @@ where SubstreamInner::RequestInApiWait => None, SubstreamInner::RequestInRespond { .. } => None, SubstreamInner::PingOut { queued_pings, .. } - | SubstreamInner::PingOutNegotiating { queued_pings, .. } | SubstreamInner::PingOutFailed { queued_pings, .. } => { NonZeroUsize::new(queued_pings.len()) .map(|num_pings| Event::PingOutError { num_pings }) @@ -1293,8 +1207,7 @@ where /// pub fn close_notifications_substream(&mut self) { match &mut self.inner { - SubstreamInner::NotificationsOutNegotiating { .. } - | SubstreamInner::NotificationsOutHandshakeRecv { .. } + SubstreamInner::NotificationsOutHandshakeRecv { .. } | SubstreamInner::NotificationsOut { .. } => { self.inner = SubstreamInner::NotificationsOutClosed; } @@ -1315,7 +1228,6 @@ where pub fn queue_ping(&mut self, payload: &[u8; 32], timeout: TNow) { match &mut self.inner { SubstreamInner::PingOut { queued_pings, .. } - | SubstreamInner::PingOutNegotiating { queued_pings, .. } | SubstreamInner::PingOutFailed { queued_pings, .. } => { queued_pings.push(Some(timeout)); } @@ -1331,11 +1243,6 @@ where outgoing_payload.extend(payload.iter().copied()); expected_payload.extend(payload.iter().copied()); } - SubstreamInner::PingOutNegotiating { - outgoing_payload, .. - } => { - outgoing_payload.extend(payload.iter().copied()); - } SubstreamInner::PingOutFailed { .. } => {} _ => panic!(), } @@ -1427,9 +1334,6 @@ where SubstreamInner::InboundNegotiatingApiWait => { f.debug_tuple("incoming-negotiated-api-wait").finish() } - SubstreamInner::NotificationsOutNegotiating { .. } => { - f.debug_tuple("notifications-out-negotiating").finish() - } SubstreamInner::NotificationsOutHandshakeRecv { .. } => { f.debug_tuple("notifications-out-handshake-recv").finish() } @@ -1454,8 +1358,7 @@ where SubstreamInner::NotificationsInClosed => { f.debug_tuple("notifications-in-closed").finish() } - SubstreamInner::RequestOutNegotiating { user_data, .. } - | SubstreamInner::RequestOut { user_data, .. } => { + SubstreamInner::RequestOut { user_data, .. } => { f.debug_tuple("request-out").field(&user_data).finish() } SubstreamInner::RequestInRecv { protocol_index, .. } @@ -1465,9 +1368,6 @@ where SubstreamInner::RequestInRespond { .. } => f.debug_tuple("request-in-respond").finish(), SubstreamInner::RequestInApiWait => f.debug_tuple("request-in").finish(), SubstreamInner::PingIn { .. } => f.debug_tuple("ping-in").finish(), - SubstreamInner::PingOutNegotiating { .. } => { - f.debug_tuple("ping-out-negotiating").finish() - } SubstreamInner::PingOutFailed { .. } => f.debug_tuple("ping-out-failed").finish(), SubstreamInner::PingOut { .. } => f.debug_tuple("ping-out").finish(), } diff --git a/src/libp2p/connection/multistream_select.rs b/src/libp2p/connection/multistream_select.rs index 146f44627d..ea59153dd9 100644 --- a/src/libp2p/connection/multistream_select.rs +++ b/src/libp2p/connection/multistream_select.rs @@ -190,6 +190,20 @@ where } } + /// If this function returns true, then the multistream-select handshake has finished writing + /// all its data, and the API user can now start writing the protocol-specific data if it + /// desires, even though the multistream-handshake isn't finished. + /// + /// If the remote supports the requested protocol, then doing so will save one networking + /// round-trip. If however the remote doesn't support the requested protocol, then doing so + /// will lead to confusing errors on the remote, as it will interpret the protocol-specific + /// data as being from the multistream-select protocol, and the substream will be rendered + /// unusable. Overall, saving a round-trip is usually seen as preferable over confusing + /// errors. + pub fn can_write_protocol_data(&self) -> bool { + matches!(self.state, InProgressState::ProtocolRequestAnswerExpected) + } + /// Feeds data coming from a socket, updates the internal state machine, and writes data /// destined to the socket. /// diff --git a/src/libp2p/connection/single_stream_handshake.rs b/src/libp2p/connection/single_stream_handshake.rs index 6002d8e662..78704cad42 100644 --- a/src/libp2p/connection/single_stream_handshake.rs +++ b/src/libp2p/connection/single_stream_handshake.rs @@ -30,6 +30,7 @@ //! TCP handshake), depending on the strategy used for the multistream-select protocol. // TODO: finish commenting on the number of round trips +// TODO: some round-trips can be removed: the multistream-select ones, and maybe also a Noise one, but it's complicated use super::{ super::peer_id::PeerId,