diff --git a/bin/wasm-node/CHANGELOG.md b/bin/wasm-node/CHANGELOG.md index 65e0f49e70..d0aa5f4af1 100644 --- a/bin/wasm-node/CHANGELOG.md +++ b/bin/wasm-node/CHANGELOG.md @@ -2,6 +2,10 @@ ## Unreleased +### Changed + +- Reduced the number of networking round-trips after a connection has been opened by assuming that the remote supports the desired networking protocols instead of waiting for its confirmation. ([#2984](https://github.com/paritytech/smoldot/pull/2984)) + ## 0.7.6 - 2022-11-04 ### Fixed diff --git a/src/libp2p/connection/established/substream.rs b/src/libp2p/connection/established/substream.rs index 32e5cf3559..e26272a3ca 100644 --- a/src/libp2p/connection/established/substream.rs +++ b/src/libp2p/connection/established/substream.rs @@ -52,26 +52,15 @@ enum SubstreamInner { /// ignored. InboundFailed, - /// Negotiating a protocol for a notifications protocol substream. - NotificationsOutNegotiating { - /// When the opening will time out in the absence of response. - timeout: TNow, - /// State of the protocol negotiation. - negotiation: multistream_select::InProgress, String>, - /// Maximum allowed size for the remote's handshake. - max_handshake_size: usize, - /// Bytes of the handshake to send after the substream is open. - handshake_out: Vec, - /// Data passed by the user to [`Substream::notifications_out`]. - user_data: TNotifUd, - }, /// Failure to negotiate an outbound notifications substream. NotificationsOutNegotiationFailed, - /// A notifications protocol has been negotiated on a substream. Either a successful handshake - /// or an abrupt closing is now expected. + /// A notifications protocol is being negotiated or has been negotiated on a substream. Either + /// a successful handshake or an abrupt closing is now expected. NotificationsOutHandshakeRecv { /// When the opening will time out in the absence of response. timeout: TNow, + /// State of the protocol negotiation. `None` if the handshake has already finished. + negotiation: Option, String>>, /// Buffer for the incoming handshake. handshake_in: leb128::FramedInProgress, /// Handshake payload to write out. @@ -131,27 +120,12 @@ enum SubstreamInner { /// An inbound notifications protocol was open, but then the remote closed its writing side. NotificationsInClosed, - /// Negotiating a protocol for an outgoing request. - RequestOutNegotiating { - /// When the request will time out in the absence of response. - timeout: TNow, - /// State of the protocol negotiation. - negotiation: multistream_select::InProgress, String>, - /// Bytes of the request to send after the substream is open. - /// - /// If `None`, nothing should be sent on the substream at all, not even the length prefix. - /// This contrasts with `Some(empty_vec)` where a `0` length prefix must be sent. - request: Option>, - /// Maximum allowed size for the response. - max_response_size: usize, - /// Data passed by the user to [`Substream::request_out`]. - user_data: TRqUd, - }, - /// Outgoing request has been sent out or is queued for send out, and a response from the - /// remote is now expected. Substream has been closed. + /// Outgoing request. RequestOut { /// When the request will time out in the absence of response. timeout: TNow, + /// State of the protocol negotiation. `None` if the negotiation has finished. + negotiation: Option, String>>, /// Request payload to write out. request: VecDeque, /// Data passed by the user to [`Substream::request_out`]. @@ -188,21 +162,6 @@ enum SubstreamInner { payload_out: VecDeque, }, - /// Negotiating a protocol for an outgoing ping substream. - /// - /// Note that the negotiation process doesn't have any timeout. Individual outgoing ping - /// requests *will* time out. - PingOutNegotiating { - /// State of the protocol negotiation. - negotiation: multistream_select::InProgress, String>, - /// Payload of the queued pings that remains to write out. Since the substream is still - /// negotiating, no ping has been sent out, and this is thus always equal to 32 times the - /// number of queued pings. - outgoing_payload: VecDeque, - /// FIFO queue of pings waiting to be answered. For each ping, when the ping will time - /// out, or `None` if the timeout has already occurred. - queued_pings: smallvec::SmallVec<[Option; 1]>, - }, /// Failed to negotiate a protocol for an outgoing ping substream. PingOutFailed { /// FIFO queue of pings that will immediately fail. @@ -210,6 +169,8 @@ enum SubstreamInner { }, /// Outbound ping substream. PingOut { + /// State of the protocol negotiation. `None` if the handshake is already finished. + negotiation: Option, String>>, /// Payload of the queued pings that remains to write out. outgoing_payload: VecDeque, /// Data waiting to be received from the remote. Any mismatch will cause an error. @@ -282,12 +243,19 @@ where requested_protocol, }); + let handshake_out = { + let handshake_len = handshake.len(); + leb128::encode_usize(handshake_len) + .chain(handshake.into_iter()) + .collect::>() + }; + Substream { - inner: SubstreamInner::NotificationsOutNegotiating { + inner: SubstreamInner::NotificationsOutHandshakeRecv { timeout, - negotiation, - max_handshake_size, - handshake_out: handshake, + negotiation: Some(negotiation), + handshake_in: leb128::FramedInProgress::new(max_handshake_size), + handshake_out, user_data, }, } @@ -312,12 +280,21 @@ where requested_protocol, }); + let request_payload = if let Some(request) = request { + let request_len = request.len(); + leb128::encode_usize(request_len) + .chain(request.into_iter()) + .collect::>() + } else { + VecDeque::new() + }; + Substream { - inner: SubstreamInner::RequestOutNegotiating { + inner: SubstreamInner::RequestOut { timeout, - negotiation, - request, - max_response_size, + negotiation: Some(negotiation), + request: request_payload, + response: leb128::FramedInProgress::new(max_response_size), user_data, }, } @@ -341,9 +318,10 @@ where }); Substream { - inner: SubstreamInner::PingOutNegotiating { - negotiation, + inner: SubstreamInner::PingOut { + negotiation: Some(negotiation), outgoing_payload: VecDeque::with_capacity(32), + expected_payload: VecDeque::with_capacity(32), queued_pings: smallvec::SmallVec::new(), }, } @@ -354,7 +332,6 @@ where /// Returns `None` if the substream isn't a request substream. pub fn request_substream_user_data_mut(&mut self) -> Option<&mut TRqUd> { match &mut self.inner { - SubstreamInner::RequestOutNegotiating { user_data, .. } => Some(user_data), SubstreamInner::RequestOut { user_data, .. } => Some(user_data), _ => None, } @@ -365,7 +342,6 @@ where /// Returns `None` if the substream isn't a notifications substream. pub fn notifications_substream_user_data_mut(&mut self) -> Option<&mut TNotifUd> { match &mut self.inner { - SubstreamInner::NotificationsOutNegotiating { user_data, .. } => Some(user_data), SubstreamInner::NotificationsOutHandshakeRecv { user_data, .. } => Some(user_data), SubstreamInner::NotificationsOut { user_data, .. } => Some(user_data), SubstreamInner::NotificationsIn { user_data, .. } => Some(user_data), @@ -428,67 +404,6 @@ where } } - SubstreamInner::NotificationsOutNegotiating { - negotiation, - timeout, - max_handshake_size, - handshake_out, - user_data, - } => { - if timeout < read_write.now { - return ( - Some(SubstreamInner::NotificationsOutNegotiationFailed), - Some(Event::NotificationsOutResult { - result: Err((NotificationsOutErr::Timeout, user_data)), - }), - ); - } - - read_write.wake_up_after(&timeout); - - match negotiation.read_write(read_write) { - Ok(multistream_select::Negotiation::InProgress(nego)) => ( - Some(SubstreamInner::NotificationsOutNegotiating { - negotiation: nego, - timeout, - max_handshake_size, - handshake_out, - user_data, - }), - None, - ), - Ok(multistream_select::Negotiation::Success(_)) => { - let handshake_out = { - let handshake_len = handshake_out.len(); - leb128::encode_usize(handshake_len) - .chain(handshake_out.into_iter()) - .collect::>() - }; - - ( - Some(SubstreamInner::NotificationsOutHandshakeRecv { - timeout, - handshake_in: leb128::FramedInProgress::new(max_handshake_size), - handshake_out, - user_data, - }), - None, - ) - } - Ok(multistream_select::Negotiation::NotAvailable) => ( - Some(SubstreamInner::NotificationsOutNegotiationFailed), - Some(Event::NotificationsOutResult { - result: Err((NotificationsOutErr::ProtocolNotAvailable, user_data)), - }), - ), - Err(err) => ( - None, - Some(Event::NotificationsOutResult { - result: Err((NotificationsOutErr::NegotiationError(err), user_data)), - }), - ), - } - } SubstreamInner::NotificationsOutNegotiationFailed => { // Substream has failed to negotiate a protocol. The substream is expected to // close soon. @@ -505,6 +420,7 @@ where } SubstreamInner::NotificationsOutHandshakeRecv { timeout, + mut negotiation, handshake_in, mut handshake_out, user_data, @@ -520,67 +436,121 @@ where read_write.wake_up_after(&timeout); - read_write.write_from_vec_deque(&mut handshake_out); - - let incoming_buffer = match read_write.incoming_buffer { - Some(buf) => buf, - None => { - return ( - Some(SubstreamInner::NotificationsOutNegotiationFailed), - Some(Event::NotificationsOutResult { - result: Err((NotificationsOutErr::RefusedHandshake, user_data)), - }), - ); + if let Some(extracted_negotiation) = negotiation.take() { + match extracted_negotiation.read_write(read_write) { + Ok(multistream_select::Negotiation::InProgress(nego)) => { + negotiation = Some(nego) + } + Ok(multistream_select::Negotiation::Success(_)) => {} + Ok(multistream_select::Negotiation::NotAvailable) => { + return ( + Some(SubstreamInner::NotificationsOutNegotiationFailed), + Some(Event::NotificationsOutResult { + result: Err(( + NotificationsOutErr::ProtocolNotAvailable, + user_data, + )), + }), + ) + } + Err(err) => { + return ( + None, + Some(Event::NotificationsOutResult { + result: Err(( + NotificationsOutErr::NegotiationError(err), + user_data, + )), + }), + ) + } } - }; + } - // Don't actually process incoming data before handshake is sent out, in order to - // not accidentally perform a state transition. - if !handshake_out.is_empty() { - return ( - Some(SubstreamInner::NotificationsOutHandshakeRecv { - timeout, - handshake_in, - handshake_out, - user_data, - }), - None, - ); + if negotiation + .as_ref() + .map_or(true, |n| n.can_write_protocol_data()) + { + read_write.write_from_vec_deque(&mut handshake_out); } - match handshake_in.update(incoming_buffer) { - Ok((num_read, leb128::Framed::Finished(remote_handshake))) => { - read_write.advance_read(num_read); + if negotiation.is_none() { + let incoming_buffer = match read_write.incoming_buffer { + Some(buf) => buf, + None => { + return ( + Some(SubstreamInner::NotificationsOutNegotiationFailed), + Some(Event::NotificationsOutResult { + result: Err((NotificationsOutErr::RefusedHandshake, user_data)), + }), + ); + } + }; - ( - Some(SubstreamInner::NotificationsOut { - notifications: VecDeque::new(), - user_data, - close_demanded_by_remote: false, - }), - Some(Event::NotificationsOutResult { - result: Ok(remote_handshake), - }), - ) - } - Ok((num_read, leb128::Framed::InProgress(handshake_in))) => { - read_write.advance_read(num_read); - ( + // Don't actually process incoming data before handshake is sent out, in order + // to not accidentally perform a state transition. + if !handshake_out.is_empty() { + return ( Some(SubstreamInner::NotificationsOutHandshakeRecv { timeout, + negotiation, handshake_in, handshake_out, user_data, }), None, - ) + ); } - Err(err) => ( - None, - Some(Event::NotificationsOutResult { - result: Err((NotificationsOutErr::HandshakeRecvError(err), user_data)), + + match handshake_in.update(incoming_buffer) { + Ok((num_read, leb128::Framed::Finished(remote_handshake))) => { + read_write.advance_read(num_read); + + ( + Some(SubstreamInner::NotificationsOut { + notifications: VecDeque::new(), + user_data, + close_demanded_by_remote: false, + }), + Some(Event::NotificationsOutResult { + result: Ok(remote_handshake), + }), + ) + } + Ok((num_read, leb128::Framed::InProgress(handshake_in))) => { + read_write.advance_read(num_read); + ( + Some(SubstreamInner::NotificationsOutHandshakeRecv { + timeout, + negotiation, + handshake_in, + handshake_out, + user_data, + }), + None, + ) + } + Err(err) => ( + None, + Some(Event::NotificationsOutResult { + result: Err(( + NotificationsOutErr::HandshakeRecvError(err), + user_data, + )), + }), + ), + } + } else { + ( + Some(SubstreamInner::NotificationsOutHandshakeRecv { + timeout, + negotiation, + handshake_in, + handshake_out, + user_data, }), - ), + None, + ) } } SubstreamInner::NotificationsOut { @@ -629,78 +599,9 @@ where ) } - SubstreamInner::RequestOutNegotiating { - negotiation, - timeout, - request, - max_response_size, - user_data, - } => { - // Note that this might trigger timeouts for requests whose response is available - // in `incoming_buffer`. This is intentional, as from the perspective of - // `read_write` the response arrived after the timeout. It is the responsibility - // of the user to call `read_write` in an appropriate way for this to not happen. - if timeout < read_write.now { - read_write.close_write_if_empty(); - return ( - None, - Some(Event::Response { - response: Err(RequestError::Timeout), - user_data, - }), - ); - } - read_write.wake_up_after(&timeout); - - match negotiation.read_write(read_write) { - Ok(multistream_select::Negotiation::InProgress(nego)) => ( - Some(SubstreamInner::RequestOutNegotiating { - negotiation: nego, - timeout, - request, - max_response_size, - user_data, - }), - None, - ), - Ok(multistream_select::Negotiation::Success(_)) => { - let request_payload = if let Some(request) = request { - let request_len = request.len(); - leb128::encode_usize(request_len) - .chain(request.into_iter()) - .collect::>() - } else { - VecDeque::new() - }; - - ( - Some(SubstreamInner::RequestOut { - timeout, - request: request_payload, - user_data, - response: leb128::FramedInProgress::new(max_response_size), - }), - None, - ) - } - Ok(multistream_select::Negotiation::NotAvailable) => ( - None, - Some(Event::Response { - user_data, - response: Err(RequestError::ProtocolNotAvailable), - }), - ), - Err(err) => ( - None, - Some(Event::Response { - user_data, - response: Err(RequestError::NegotiationError(err)), - }), - ), - } - } SubstreamInner::RequestOut { timeout, + mut negotiation, mut request, user_data, response, @@ -719,60 +620,105 @@ where }), ); } - read_write.wake_up_after(&timeout); - if request.is_empty() { - read_write.close_write_if_empty(); - } else { - read_write.write_from_vec_deque(&mut request); + if let Some(extracted_nego) = negotiation.take() { + match extracted_nego.read_write(read_write) { + Ok(multistream_select::Negotiation::InProgress(nego)) => { + negotiation = Some(nego) + } + Ok(multistream_select::Negotiation::Success(_)) => {} + Ok(multistream_select::Negotiation::NotAvailable) => { + return ( + None, + Some(Event::Response { + user_data, + response: Err(RequestError::ProtocolNotAvailable), + }), + ) + } + Err(err) => { + return ( + None, + Some(Event::Response { + user_data, + response: Err(RequestError::NegotiationError(err)), + }), + ) + } + } } - let incoming_buffer = match read_write.incoming_buffer { - Some(buf) => buf, - None => { + if negotiation + .as_ref() + .map_or(true, |n| n.can_write_protocol_data()) + { + if request.is_empty() { read_write.close_write_if_empty(); - return ( - None, - Some(Event::Response { - user_data, - response: Err(RequestError::SubstreamClosed), - }), - ); + } else { + read_write.write_from_vec_deque(&mut request); } - }; + } - match response.update(incoming_buffer) { - Ok((num_read, leb128::Framed::Finished(response))) => { - read_write.advance_read(num_read); - read_write.close_write_if_empty(); - ( + if negotiation.is_none() { + let incoming_buffer = match read_write.incoming_buffer { + Some(buf) => buf, + None => { + read_write.close_write_if_empty(); + return ( + None, + Some(Event::Response { + user_data, + response: Err(RequestError::SubstreamClosed), + }), + ); + } + }; + + match response.update(incoming_buffer) { + Ok((num_read, leb128::Framed::Finished(response))) => { + read_write.advance_read(num_read); + read_write.close_write_if_empty(); + ( + None, + Some(Event::Response { + user_data, + response: Ok(response), + }), + ) + } + Ok((num_read, leb128::Framed::InProgress(response))) => { + read_write.advance_read(num_read); + ( + Some(SubstreamInner::RequestOut { + timeout, + negotiation, + request, + user_data, + response, + }), + None, + ) + } + Err(err) => ( None, Some(Event::Response { user_data, - response: Ok(response), + response: Err(RequestError::ResponseLebError(err)), }), - ) - } - Ok((num_read, leb128::Framed::InProgress(response))) => { - read_write.advance_read(num_read); - ( - Some(SubstreamInner::RequestOut { - timeout, - request, - user_data, - response, - }), - None, - ) + ), } - Err(err) => ( - None, - Some(Event::Response { + } else { + ( + Some(SubstreamInner::RequestOut { + timeout, + negotiation, + request, user_data, - response: Err(RequestError::ResponseLebError(err)), + response, }), - ), + None, + ) } } @@ -1016,61 +962,6 @@ where ) } - SubstreamInner::PingOutNegotiating { - negotiation, - mut queued_pings, - mut outgoing_payload, - } => { - for timeout in queued_pings.iter_mut() { - if timeout.as_ref().map_or(false, |t| *t < read_write.now) { - *timeout = None; - return ( - Some(SubstreamInner::PingOutNegotiating { - negotiation, - outgoing_payload, - queued_pings, - }), - Some(Event::PingOutError { - num_pings: NonZeroUsize::new(1).unwrap(), - }), - ); - } - - if let Some(timeout) = timeout { - read_write.wake_up_after(timeout); - } - } - - while queued_pings.get(0).map_or(false, |p| p.is_none()) { - queued_pings.remove(0); - for _ in 0..32 { - outgoing_payload.pop_front(); - } - } - - match negotiation.read_write(read_write) { - Ok(multistream_select::Negotiation::InProgress(nego)) => ( - Some(SubstreamInner::PingOutNegotiating { - negotiation: nego, - outgoing_payload, - queued_pings, - }), - None, - ), - Ok(multistream_select::Negotiation::Success(_)) => ( - Some(SubstreamInner::PingOut { - outgoing_payload: outgoing_payload.clone(), - expected_payload: outgoing_payload, - queued_pings, - }), - None, - ), - Ok(multistream_select::Negotiation::NotAvailable) => { - (Some(SubstreamInner::PingOutFailed { queued_pings }), None) - } - Err(_) => (Some(SubstreamInner::PingOutFailed { queued_pings }), None), - } - } SubstreamInner::PingOutFailed { mut queued_pings } => { read_write.close_write_if_empty(); if !queued_pings.is_empty() { @@ -1086,11 +977,32 @@ where } } SubstreamInner::PingOut { + mut negotiation, mut queued_pings, mut outgoing_payload, mut expected_payload, } => { - read_write.write_from_vec_deque(&mut outgoing_payload); + if let Some(extracted_negotiation) = negotiation.take() { + match extracted_negotiation.read_write(read_write) { + Ok(multistream_select::Negotiation::InProgress(nego)) => { + negotiation = Some(nego) + } + Ok(multistream_select::Negotiation::Success(_)) => {} + Ok(multistream_select::Negotiation::NotAvailable) => { + return (Some(SubstreamInner::PingOutFailed { queued_pings }), None) + } + Err(_) => { + return (Some(SubstreamInner::PingOutFailed { queued_pings }), None) + } + } + } + + if negotiation + .as_ref() + .map_or(true, |n| n.can_write_protocol_data()) + { + read_write.write_from_vec_deque(&mut outgoing_payload); + } // We check the timeouts before checking the incoming data, as otherwise pings // might succeed after their timeout. @@ -1099,6 +1011,7 @@ where *timeout = None; return ( Some(SubstreamInner::PingOut { + negotiation, expected_payload, outgoing_payload, queued_pings, @@ -1114,30 +1027,34 @@ where } } - for actual_byte in read_write.incoming_bytes_iter() { - if expected_payload.pop_front() != Some(actual_byte) { - return (Some(SubstreamInner::PingOutFailed { queued_pings }), None); - } + if negotiation.is_none() { + for actual_byte in read_write.incoming_bytes_iter() { + if expected_payload.pop_front() != Some(actual_byte) { + return (Some(SubstreamInner::PingOutFailed { queued_pings }), None); + } - // When a ping has been fully answered is determined based on the number of - // bytes in `expected_payload`. - if expected_payload.len() % 32 == 0 { - debug_assert!(!queued_pings.is_empty()); // `expected_payload.pop_front()` should have returned `None` above otherwise - if queued_pings.remove(0).is_some() { - return ( - Some(SubstreamInner::PingOut { - expected_payload, - outgoing_payload, - queued_pings, - }), - Some(Event::PingOutSuccess), - ); + // When a ping has been fully answered is determined based on the number of + // bytes in `expected_payload`. + if expected_payload.len() % 32 == 0 { + debug_assert!(!queued_pings.is_empty()); // `expected_payload.pop_front()` should have returned `None` above otherwise + if queued_pings.remove(0).is_some() { + return ( + Some(SubstreamInner::PingOut { + negotiation, + expected_payload, + outgoing_payload, + queued_pings, + }), + Some(Event::PingOutSuccess), + ); + } } } } ( Some(SubstreamInner::PingOut { + negotiation, expected_payload, outgoing_payload, queued_pings, @@ -1153,8 +1070,7 @@ where SubstreamInner::InboundNegotiating(_) => None, SubstreamInner::InboundNegotiatingApiWait => None, SubstreamInner::InboundFailed => None, - SubstreamInner::RequestOutNegotiating { user_data, .. } - | SubstreamInner::RequestOut { user_data, .. } => Some(Event::Response { + SubstreamInner::RequestOut { user_data, .. } => Some(Event::Response { user_data, response: Err(RequestError::SubstreamReset), }), @@ -1165,8 +1081,7 @@ where }), SubstreamInner::NotificationsInRefused => None, SubstreamInner::NotificationsInClosed => None, - SubstreamInner::NotificationsOutNegotiating { user_data, .. } - | SubstreamInner::NotificationsOutHandshakeRecv { user_data, .. } => { + SubstreamInner::NotificationsOutHandshakeRecv { user_data, .. } => { Some(Event::NotificationsOutResult { result: Err((NotificationsOutErr::SubstreamReset, user_data)), }) @@ -1182,7 +1097,6 @@ where SubstreamInner::RequestInApiWait => None, SubstreamInner::RequestInRespond { .. } => None, SubstreamInner::PingOut { queued_pings, .. } - | SubstreamInner::PingOutNegotiating { queued_pings, .. } | SubstreamInner::PingOutFailed { queued_pings, .. } => { NonZeroUsize::new(queued_pings.len()) .map(|num_pings| Event::PingOutError { num_pings }) @@ -1293,8 +1207,7 @@ where /// pub fn close_notifications_substream(&mut self) { match &mut self.inner { - SubstreamInner::NotificationsOutNegotiating { .. } - | SubstreamInner::NotificationsOutHandshakeRecv { .. } + SubstreamInner::NotificationsOutHandshakeRecv { .. } | SubstreamInner::NotificationsOut { .. } => { self.inner = SubstreamInner::NotificationsOutClosed; } @@ -1315,7 +1228,6 @@ where pub fn queue_ping(&mut self, payload: &[u8; 32], timeout: TNow) { match &mut self.inner { SubstreamInner::PingOut { queued_pings, .. } - | SubstreamInner::PingOutNegotiating { queued_pings, .. } | SubstreamInner::PingOutFailed { queued_pings, .. } => { queued_pings.push(Some(timeout)); } @@ -1331,11 +1243,6 @@ where outgoing_payload.extend(payload.iter().copied()); expected_payload.extend(payload.iter().copied()); } - SubstreamInner::PingOutNegotiating { - outgoing_payload, .. - } => { - outgoing_payload.extend(payload.iter().copied()); - } SubstreamInner::PingOutFailed { .. } => {} _ => panic!(), } @@ -1427,9 +1334,6 @@ where SubstreamInner::InboundNegotiatingApiWait => { f.debug_tuple("incoming-negotiated-api-wait").finish() } - SubstreamInner::NotificationsOutNegotiating { .. } => { - f.debug_tuple("notifications-out-negotiating").finish() - } SubstreamInner::NotificationsOutHandshakeRecv { .. } => { f.debug_tuple("notifications-out-handshake-recv").finish() } @@ -1454,8 +1358,7 @@ where SubstreamInner::NotificationsInClosed => { f.debug_tuple("notifications-in-closed").finish() } - SubstreamInner::RequestOutNegotiating { user_data, .. } - | SubstreamInner::RequestOut { user_data, .. } => { + SubstreamInner::RequestOut { user_data, .. } => { f.debug_tuple("request-out").field(&user_data).finish() } SubstreamInner::RequestInRecv { protocol_index, .. } @@ -1465,9 +1368,6 @@ where SubstreamInner::RequestInRespond { .. } => f.debug_tuple("request-in-respond").finish(), SubstreamInner::RequestInApiWait => f.debug_tuple("request-in").finish(), SubstreamInner::PingIn { .. } => f.debug_tuple("ping-in").finish(), - SubstreamInner::PingOutNegotiating { .. } => { - f.debug_tuple("ping-out-negotiating").finish() - } SubstreamInner::PingOutFailed { .. } => f.debug_tuple("ping-out-failed").finish(), SubstreamInner::PingOut { .. } => f.debug_tuple("ping-out").finish(), } diff --git a/src/libp2p/connection/multistream_select.rs b/src/libp2p/connection/multistream_select.rs index 146f44627d..ea59153dd9 100644 --- a/src/libp2p/connection/multistream_select.rs +++ b/src/libp2p/connection/multistream_select.rs @@ -190,6 +190,20 @@ where } } + /// If this function returns true, then the multistream-select handshake has finished writing + /// all its data, and the API user can now start writing the protocol-specific data if it + /// desires, even though the multistream-handshake isn't finished. + /// + /// If the remote supports the requested protocol, then doing so will save one networking + /// round-trip. If however the remote doesn't support the requested protocol, then doing so + /// will lead to confusing errors on the remote, as it will interpret the protocol-specific + /// data as being from the multistream-select protocol, and the substream will be rendered + /// unusable. Overall, saving a round-trip is usually seen as preferable over confusing + /// errors. + pub fn can_write_protocol_data(&self) -> bool { + matches!(self.state, InProgressState::ProtocolRequestAnswerExpected) + } + /// Feeds data coming from a socket, updates the internal state machine, and writes data /// destined to the socket. /// diff --git a/src/libp2p/connection/single_stream_handshake.rs b/src/libp2p/connection/single_stream_handshake.rs index 6002d8e662..78704cad42 100644 --- a/src/libp2p/connection/single_stream_handshake.rs +++ b/src/libp2p/connection/single_stream_handshake.rs @@ -30,6 +30,7 @@ //! TCP handshake), depending on the strategy used for the multistream-select protocol. // TODO: finish commenting on the number of round trips +// TODO: some round-trips can be removed: the multistream-select ones, and maybe also a Noise one, but it's complicated use super::{ super::peer_id::PeerId,