Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
maciejhirsz committed Jun 15, 2021
1 parent 8cb237b commit 88f4614
Show file tree
Hide file tree
Showing 11 changed files with 568 additions and 356 deletions.
42 changes: 32 additions & 10 deletions examples/autobahn_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// See https://github.com/crossbario/autobahn-testsuite for details.

use futures::io::{BufReader, BufWriter};
use soketto::{BoxedError, connection, handshake};
use soketto::{connection, handshake, BoxedError};
use std::str::FromStr;
use tokio::net::TcpStream;
use tokio_util::compat::{Compat, TokioAsyncReadCompatExt};
Expand All @@ -25,7 +25,7 @@ const SOKETTO_VERSION: &str = env!("CARGO_PKG_VERSION");
#[tokio::main]
async fn main() -> Result<(), BoxedError> {
let n = num_of_cases().await?;
for i in 1 ..= n {
for i in 1..=n {
if let Err(e) = run_case(i).await {
log::error!("case {}: {:?}", i, e)
}
Expand All @@ -37,7 +37,10 @@ async fn main() -> Result<(), BoxedError> {
async fn num_of_cases() -> Result<usize, BoxedError> {
let socket = TcpStream::connect("127.0.0.1:9001").await?;
let mut client = new_client(socket, "/getCaseCount");
assert!(matches!(client.handshake().await?, handshake::ServerResponse::Accepted {..}));
assert!(matches!(
client.handshake().await?,
handshake::ServerResponse::Accepted { .. }
));
let (_, mut receiver) = client.into_builder().finish();
let mut data = Vec::new();
let kind = receiver.receive_data(&mut data).await?;
Expand All @@ -52,7 +55,10 @@ async fn run_case(n: usize) -> Result<(), BoxedError> {
let resource = format!("/runCase?case={}&agent=soketto-{}", n, SOKETTO_VERSION);
let socket = TcpStream::connect("127.0.0.1:9001").await?;
let mut client = new_client(socket, &resource);
assert!(matches!(client.handshake().await?, handshake::ServerResponse::Accepted {..}));
assert!(matches!(
client.handshake().await?,
handshake::ServerResponse::Accepted { .. }
));
let (mut sender, mut receiver) = client.into_builder().finish();
let mut message = Vec::new();
loop {
Expand All @@ -69,7 +75,7 @@ async fn run_case(n: usize) -> Result<(), BoxedError> {
sender.flush().await?
}
Err(connection::Error::Closed) => return Ok(()),
Err(e) => return Err(e.into())
Err(e) => return Err(e.into()),
}
}
}
Expand All @@ -79,19 +85,35 @@ async fn update_report() -> Result<(), BoxedError> {
let resource = format!("/updateReports?agent=soketto-{}", SOKETTO_VERSION);
let socket = TcpStream::connect("127.0.0.1:9001").await?;
let mut client = new_client(socket, &resource);
assert!(matches!(client.handshake().await?, handshake::ServerResponse::Accepted {..}));
assert!(matches!(
client.handshake().await?,
handshake::ServerResponse::Accepted { .. }
));
client.into_builder().finish().0.close().await?;
Ok(())
}

#[cfg(not(feature = "deflate"))]
fn new_client(socket: TcpStream, path: &str) -> handshake::Client<'_, BufReader<BufWriter<Compat<TcpStream>>>> {
handshake::Client::new(BufReader::new(BufWriter::new(socket.compat())), "127.0.0.1:9001", path)
fn new_client(
socket: TcpStream,
path: &str,
) -> handshake::Client<'_, BufReader<BufWriter<Compat<TcpStream>>>> {
handshake::Client::new(
BufReader::new(BufWriter::new(socket.compat())),
"127.0.0.1:9001",
path,
)
}

#[cfg(feature = "deflate")]
fn new_client(socket: TcpStream, path: &str) -> handshake::Client<'_, BufReader<BufWriter<Compat<TcpStream>>>> {
let socket = BufReader::with_capacity(8 * 1024, BufWriter::with_capacity(64 * 1024, socket.compat()));
fn new_client(
socket: TcpStream,
path: &str,
) -> handshake::Client<'_, BufReader<BufWriter<Compat<TcpStream>>>> {
let socket = BufReader::with_capacity(
8 * 1024,
BufWriter::with_capacity(64 * 1024, socket.compat()),
);
let mut client = handshake::Client::new(socket, "127.0.0.1:9001", path);
let deflate = soketto::extension::deflate::Deflate::new(soketto::Mode::Client);
client.add_extension(Box::new(deflate));
Expand Down
26 changes: 18 additions & 8 deletions examples/autobahn_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
// See https://github.com/crossbario/autobahn-testsuite for details.

use futures::io::{BufReader, BufWriter};
use soketto::{BoxedError, connection, handshake};
use soketto::{connection, handshake, BoxedError};
use tokio::net::{TcpListener, TcpStream};
use tokio_util::compat::{Compat, TokioAsyncReadCompatExt};
use tokio_stream::{wrappers::TcpListenerStream, StreamExt};
use tokio_util::compat::{Compat, TokioAsyncReadCompatExt};
#[tokio::main]
async fn main() -> Result<(), BoxedError> {
let listener = TcpListener::bind("127.0.0.1:9001").await?;
Expand All @@ -29,7 +29,10 @@ async fn main() -> Result<(), BoxedError> {
let req = server.receive_request().await?;
req.key()
};
let accept = handshake::server::Response::Accept { key, protocol: None };
let accept = handshake::server::Response::Accept {
key,
protocol: None,
};
server.send_response(&accept).await?;
let (mut sender, mut receiver) = server.into_builder().finish();
let mut message = Vec::new();
Expand All @@ -47,13 +50,13 @@ async fn main() -> Result<(), BoxedError> {
sender.send_text(txt).await?;
sender.flush().await?
} else {
break
break;
}
}
Err(connection::Error::Closed) => break,
Err(e) => {
log::error!("connection error: {}", e);
break
break;
}
}
}
Expand All @@ -62,13 +65,20 @@ async fn main() -> Result<(), BoxedError> {
}

#[cfg(not(feature = "deflate"))]
fn new_server<'a>(socket: TcpStream) -> handshake::Server<'a, BufReader<BufWriter<Compat<TcpStream>>>> {
fn new_server<'a>(
socket: TcpStream,
) -> handshake::Server<'a, BufReader<BufWriter<Compat<TcpStream>>>> {
handshake::Server::new(BufReader::new(BufWriter::new(socket.compat())))
}

#[cfg(feature = "deflate")]
fn new_server<'a>(socket: TcpStream) -> handshake::Server<'a, BufReader<BufWriter<Compat<TcpStream>>>> {
let socket = BufReader::with_capacity(8 * 1024, BufWriter::with_capacity(16 * 1024, socket.compat()));
fn new_server<'a>(
socket: TcpStream,
) -> handshake::Server<'a, BufReader<BufWriter<Compat<TcpStream>>>> {
let socket = BufReader::with_capacity(
8 * 1024,
BufWriter::with_capacity(16 * 1024, socket.compat()),
);
let mut server = handshake::Server::new(socket);
let deflate = soketto::extension::deflate::Deflate::new(soketto::Mode::Server);
server.add_extension(Box::new(deflate));
Expand Down
Loading

0 comments on commit 88f4614

Please sign in to comment.