From 402212cf79668cb77749f947626a41fdfc3011c8 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Sun, 5 Nov 2023 03:22:49 +0100 Subject: [PATCH] fix(swarm): implement `ConnectionHandler::poll_close` for combinators Follow-up to https://github.com/libp2p/rust-libp2p/pull/4076. This is especially relevant since `libp2p-swarm-derive` uses `SelectConnectionHandler`. Pull-Request: #4794. --- swarm/src/behaviour/toggle.rs | 8 ++++++++ swarm/src/handler.rs | 3 +++ swarm/src/handler/either.rs | 9 +++++++++ swarm/src/handler/map_in.rs | 4 ++++ swarm/src/handler/map_out.rs | 9 +++++++++ swarm/src/handler/multi.rs | 13 ++++++++++++- swarm/src/handler/select.rs | 14 +++++++++++++- 7 files changed, 58 insertions(+), 2 deletions(-) diff --git a/swarm/src/behaviour/toggle.rs b/swarm/src/behaviour/toggle.rs index 5c23ee099a3..e81c5343701 100644 --- a/swarm/src/behaviour/toggle.rs +++ b/swarm/src/behaviour/toggle.rs @@ -363,4 +363,12 @@ where } } } + + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll> { + let Some(inner) = self.inner.as_mut() else { + return Poll::Ready(None); + }; + + inner.poll_close(cx) + } } diff --git a/swarm/src/handler.rs b/swarm/src/handler.rs index be0ca67ab48..31d2c91e391 100644 --- a/swarm/src/handler.rs +++ b/swarm/src/handler.rs @@ -160,6 +160,9 @@ pub trait ConnectionHandler: Send + 'static { /// We therefore cannot guarantee that performing IO within here will succeed. /// /// To signal completion, [`Poll::Ready(None)`] should be returned. + /// + /// Implementations MUST have a [`fuse`](futures::StreamExt::fuse)-like behaviour. + /// That is, [`Poll::Ready(None)`] MUST be returned on repeated calls to [`ConnectionHandler::poll_close`]. fn poll_close(&mut self, _: &mut Context<'_>) -> Poll> { Poll::Ready(None) } diff --git a/swarm/src/handler/either.rs b/swarm/src/handler/either.rs index b48b7cdcb15..a5aab9b5fee 100644 --- a/swarm/src/handler/either.rs +++ b/swarm/src/handler/either.rs @@ -134,6 +134,15 @@ where Poll::Ready(event) } + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll> { + let event = match self { + Either::Left(handler) => futures::ready!(handler.poll_close(cx)).map(Either::Left), + Either::Right(handler) => futures::ready!(handler.poll_close(cx)).map(Either::Right), + }; + + Poll::Ready(event) + } + fn on_connection_event( &mut self, event: ConnectionEvent< diff --git a/swarm/src/handler/map_in.rs b/swarm/src/handler/map_in.rs index bd45eee4d97..9316ef4d2ce 100644 --- a/swarm/src/handler/map_in.rs +++ b/swarm/src/handler/map_in.rs @@ -80,6 +80,10 @@ where self.inner.poll(cx) } + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_close(cx) + } + fn on_connection_event( &mut self, event: ConnectionEvent< diff --git a/swarm/src/handler/map_out.rs b/swarm/src/handler/map_out.rs index 8ef8bad61b3..f877bfa6f64 100644 --- a/swarm/src/handler/map_out.rs +++ b/swarm/src/handler/map_out.rs @@ -21,6 +21,7 @@ use crate::handler::{ ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, SubstreamProtocol, }; +use futures::ready; use std::fmt::Debug; use std::task::{Context, Poll}; @@ -83,6 +84,14 @@ where }) } + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll> { + let Some(e) = ready!(self.inner.poll_close(cx)) else { + return Poll::Ready(None); + }; + + Poll::Ready(Some((self.map)(e))) + } + fn on_connection_event( &mut self, event: ConnectionEvent< diff --git a/swarm/src/handler/multi.rs b/swarm/src/handler/multi.rs index fc1cd750763..0b4549ed733 100644 --- a/swarm/src/handler/multi.rs +++ b/swarm/src/handler/multi.rs @@ -27,7 +27,7 @@ use crate::handler::{ }; use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, UpgradeInfoSend}; use crate::Stream; -use futures::{future::BoxFuture, prelude::*}; +use futures::{future::BoxFuture, prelude::*, ready}; use rand::Rng; use std::{ cmp, @@ -271,6 +271,17 @@ where Poll::Pending } + + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll> { + for (k, h) in self.handlers.iter_mut() { + let Some(e) = ready!(h.poll_close(cx)) else { + continue; + }; + return Poll::Ready(Some((k.clone(), e))); + } + + Poll::Ready(None) + } } /// Split [`MultiHandler`] into parts. diff --git a/swarm/src/handler/select.rs b/swarm/src/handler/select.rs index fc470ff803e..e049252d448 100644 --- a/swarm/src/handler/select.rs +++ b/swarm/src/handler/select.rs @@ -25,7 +25,7 @@ use crate::handler::{ }; use crate::upgrade::SendWrapper; use either::Either; -use futures::future; +use futures::{future, ready}; use libp2p_core::upgrade::SelectUpgrade; use std::{cmp, task::Context, task::Poll}; @@ -259,6 +259,18 @@ where Poll::Pending } + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll> { + if let Some(e) = ready!(self.proto1.poll_close(cx)) { + return Poll::Ready(Some(Either::Left(e))); + } + + if let Some(e) = ready!(self.proto2.poll_close(cx)) { + return Poll::Ready(Some(Either::Right(e))); + } + + Poll::Ready(None) + } + fn on_connection_event( &mut self, event: ConnectionEvent<