Skip to content

Commit

Permalink
Upgrade hyper to 1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
LMG committed Mar 9, 2024
1 parent 5a650df commit 078e373
Show file tree
Hide file tree
Showing 11 changed files with 727 additions and 477 deletions.
538 changes: 331 additions & 207 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion hyper_cgi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@ repository = "https://github.com/josh-project/josh"
readme = "README.md"

[dependencies]
bytes = "1.5.0"
futures = { workspace = true }
tokio = { workspace = true }
tokio-util = { workspace = true }
hyper = { version = "0.14.28", features = ["stream", "tcp", "server", "http1"] }
hyper = { version = "1.1.0", features = ["server", "http1"] }
hyper-util = { version = "0.1.2" }
http-body = "1.0.0"
http-body-util = "0.1.0"

clap = { version = "4.5.2", optional = true }
base64 = { version = "0.21.7", optional = true }
Expand Down
76 changes: 46 additions & 30 deletions hyper_cgi/src/bin/hyper-cgi-test-server.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
#[macro_use]
extern crate lazy_static;
use bytes::Bytes;
use core::iter;
use core::str::from_utf8;
use http_body_util::Full;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper_util::rt::{TokioIo, TokioTimer};
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use std::str::FromStr;
use std::net::SocketAddr;
use tokio::net::TcpListener;

use futures::FutureExt;

use hyper::server::Server;
use hyper::body::Incoming;

// Import the base64 crate Engine trait anonymously so we can
// call its methods without adding to the namespace.
Expand Down Expand Up @@ -44,7 +50,7 @@ pub struct ServerState {
users: Vec<(String, String)>,
}

pub fn parse_auth(req: &hyper::Request<hyper::Body>) -> Option<(String, String)> {
pub fn parse_auth(req: &hyper::Request<Incoming>) -> Option<(String, String)> {
let line = some_or!(
req.headers()
.get("authorization")
Expand All @@ -69,9 +75,9 @@ pub fn parse_auth(req: &hyper::Request<hyper::Body>) -> Option<(String, String)>
}

fn auth_response(
req: &hyper::Request<hyper::Body>,
req: &hyper::Request<Incoming>,
users: &Vec<(String, String)>,
) -> Option<hyper::Response<hyper::Body>> {
) -> Option<hyper::Response<Full<Bytes>>> {
if users.len() == 0 {
return None;
}
Expand All @@ -83,7 +89,7 @@ fn auth_response(
let builder = hyper::Response::builder()
.header("WWW-Authenticate", "Basic realm=User Visible Realm")
.status(hyper::StatusCode::UNAUTHORIZED);
return Some(builder.body(hyper::Body::empty()).unwrap());
return Some(builder.body(Full::new(Bytes::new())).unwrap());
}
};

Expand All @@ -102,17 +108,16 @@ fn auth_response(
.status(hyper::StatusCode::UNAUTHORIZED);
return Some(
builder
.body(hyper::Body::empty())
.body(Full::new(Bytes::new()))
.unwrap_or(hyper::Response::default()),
);
}

async fn call(
serv: std::sync::Arc<std::sync::Mutex<ServerState>>,
req: hyper::Request<hyper::Body>,
) -> hyper::Response<hyper::Body> {
req: hyper::Request<Incoming>,
) -> hyper::Response<Full<Bytes>> {
println!("call {:?}", req.uri().path());
let mut req = req;

let path = req.uri().path();

Expand All @@ -133,21 +138,22 @@ async fn call(
for (u, p) in serv.lock().unwrap().users.iter() {
if username == *u {
password = p.clone();
return builder.body(hyper::Body::from(password)).unwrap();
return builder.body(Full::new(Bytes::from(password))).unwrap();
}
}
serv.lock()
.unwrap()
.users
.push((username, password.clone()));
println!("users: {:?}", serv.lock().unwrap().users);
return builder.body(hyper::Body::from(password)).unwrap();
return builder.body(Full::new(Bytes::from(password))).unwrap();
}

if let Some(response) = auth_response(&req, &serv.lock().unwrap().users) {
return response;
}

/*
if let Some(proxy) = &ARGS.get_one::<String>("proxy") {
for proxy in proxy.split(",") {
if let [proxy_path, proxy_target] = proxy.split("=").collect::<Vec<_>>().as_slice() {
Expand All @@ -159,13 +165,14 @@ async fn call(
Ok(response) => response,
Err(error) => hyper::Response::builder()
.status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
.body(hyper::Body::from(format!("Proxy error: {:?}", error)))
.body(format!("Proxy error: {:?}", error))
.unwrap(),
};
}
}
}
}
*/

let workdir = std::path::PathBuf::from(
ARGS.get_one::<String>("dir")
Expand All @@ -184,34 +191,43 @@ async fn call(
}

#[tokio::main]
async fn main() {
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let server_state = std::sync::Arc::new(std::sync::Mutex::new(ServerState { users: vec![] }));

let make_service = hyper::service::make_service_fn(move |_| {
let server_state = server_state.clone();

let service = hyper::service::service_fn(move |_req| {
let server_state = server_state.clone();

call(server_state, _req).map(Ok::<_, hyper::http::Error>)
});

futures::future::ok::<_, hyper::http::Error>(service)
});

let addr = format!(
let addr: SocketAddr = format!(
"0.0.0.0:{}",
ARGS.get_one::<String>("port")
.unwrap_or(&"8000".to_owned())
.to_owned()
)
.parse()
.unwrap();
let server = Server::bind(&addr).serve(make_service);

let listener = TcpListener::bind(addr).await?;
println!("Now listening on {}", addr);
let server_state = server_state.clone();

loop {
let (tcp, _) = listener.accept().await?;
let io = TokioIo::new(tcp);
let server_state = server_state.clone();

if let Err(e) = server.await {
eprintln!("server error: {}", e);
tokio::task::spawn(async move {
if let Err(err) = http1::Builder::new()
.timer(TokioTimer::new())
.serve_connection(
io,
service_fn(move |_req| {
let server_state = server_state.clone();

call(server_state, _req).map(Ok::<_, hyper::http::Error>)
}),
)
.await
{
println!("Error serving connection: {:?}", err);
}
});
}
}

Expand Down
53 changes: 29 additions & 24 deletions hyper_cgi/src/hyper_cgi.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,28 @@
//! This module implements a do_cgi function, to run CGI scripts with hyper
use futures::stream::StreamExt;
use bytes::Bytes;
use futures::TryStreamExt;
use hyper::{Request, Response};
use http_body_util::{BodyStream, Full};
use hyper::{body::Body, Request, Response};
use std::io;
use std::process::Stdio;
use std::str::FromStr;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
use tokio::process::Command;

/// do_cgi is an async function that takes an hyper request and a CGI compatible
/// command, and passes the request to be executed to the command.
/// It then returns an hyper response and the stderr output of the command.
pub async fn do_cgi(
req: Request<hyper::Body>,
pub async fn do_cgi<B, E>(
req: Request<B>,
cmd: Command,
) -> (hyper::http::Response<hyper::Body>, Vec<u8>) {
) -> (hyper::http::Response<Full<Bytes>>, Vec<u8>)
where
B: hyper::body::Body<Error = E>,
E: std::error::Error + Sync + Send + 'static,
{
let mut cmd = cmd;
setup_cmd(&mut cmd, &req);

Expand Down Expand Up @@ -56,22 +63,23 @@ pub async fn do_cgi(
}
};

let req_body = req
.into_body()
.map(|result| {
result.map_err(|_error| std::io::Error::new(std::io::ErrorKind::Other, "Error!"))
})
.into_async_read();
let stream_of_frames = BodyStream::new(req.into_body());
let stream_of_bytes = stream_of_frames
.try_filter_map(|frame| async move { Ok(frame.into_data().ok()) })
.map_err(|err| io::Error::new(io::ErrorKind::Other, err));
let async_read = tokio_util::io::StreamReader::new(stream_of_bytes);
let mut req_body = std::pin::pin!(async_read);

let mut req_body = to_tokio_async_read(req_body);
let mut err_output = vec![];

let mut stdout = BufReader::new(stdout);

let mut data = vec![];
let write_stdin = async {
let mut stdin = stdin;
tokio::io::copy(&mut req_body, &mut stdin).await
let res = tokio::io::copy(&mut req_body, &mut stdin).await;
stdin.flush().await.unwrap();
res
};

let read_stderr = async {
Expand Down Expand Up @@ -105,7 +113,7 @@ pub async fn do_cgi(
line = String::new();
}
stdout.read_to_end(&mut data).await?;
convert_error_io_hyper(response.body(hyper::Body::from(data)))
convert_error_io_hyper(response.body(Full::new(Bytes::from(data))))
};

let wait_process = async { child.wait().await };
Expand All @@ -119,7 +127,7 @@ pub async fn do_cgi(
(error_response(), err_output)
}

fn setup_cmd(cmd: &mut Command, req: &Request<hyper::Body>) {
fn setup_cmd(cmd: &mut Command, req: &Request<impl Body>) {
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
cmd.stdin(Stdio::piped());
Expand Down Expand Up @@ -157,14 +165,10 @@ fn setup_cmd(cmd: &mut Command, req: &Request<hyper::Body>) {
);
}

fn to_tokio_async_read(r: impl futures::io::AsyncRead) -> impl tokio::io::AsyncRead {
tokio_util::compat::FuturesAsyncReadCompatExt::compat(r)
}

fn error_response() -> hyper::Response<hyper::Body> {
fn error_response() -> hyper::Response<Full<Bytes>> {
Response::builder()
.status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
.body(hyper::Body::empty())
.body(Full::new(Bytes::new()))
.unwrap()
}

Expand All @@ -177,7 +181,8 @@ fn convert_error_io_hyper<T>(res: Result<T, hyper::http::Error>) -> Result<T, st

#[cfg(test)]
mod tests {
use hyper::body::HttpBody;
use bytes::Bytes;
use http_body_util::{BodyExt, Full};

#[tokio::test]
async fn run_cmd() {
Expand All @@ -193,7 +198,7 @@ mod tests {
.header("Accept-Encoding", "deflate, gzip, br")
.header("Accept-Language", "en-US, *;q=0.9")
.header("Pragma", "no-cache")
.body(hyper::Body::from("\r\na body"))
.body(Full::new(Bytes::from("\r\na body")))
.unwrap();

let mut cmd = tokio::process::Command::new("cat");
Expand All @@ -203,7 +208,7 @@ mod tests {

assert_eq!(resp.status(), hyper::StatusCode::OK);

let resp_string = resp.into_body().data().await.unwrap().unwrap().to_vec();
let resp_string = resp.into_body().collect().await.unwrap().to_bytes();
let resp_string = String::from_utf8_lossy(&resp_string);

assert_eq!("", std::str::from_utf8(&stderr).unwrap());
Expand Down
10 changes: 6 additions & 4 deletions josh-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,17 @@ repository = "https://github.com/josh-project/josh"
version = "22.4.15"

[dependencies]
bytes = "1.5.0"
base64 = "0.21.7"
bytes = "1.5.0"
clap = { workspace = true }
futures = { workspace = true }
hyper = { version = "0.14.28", features = ["stream"] }
hyper = { version = "1.1.0" }
hyper-reverse-proxy = { workspace = true }
hyper-staticfile = "0.9.5"
hyper-tls = "0.5.0"
hyper-staticfile = "0.10.0"
hyper-tls = "0.6.0"
hyper_cgi = { path = "../hyper_cgi" }
hyper-util = { version = "0.1.2", features = ["client", "client-legacy", "http1", "tokio"] }
http-body-util = "0.1.0"
indoc = "2.0.4"
josh = {path = "../josh-core" }
lazy_static = { workspace = true }
Expand Down
18 changes: 10 additions & 8 deletions josh-proxy/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
// call its methods without adding to the namespace.
use base64::engine::general_purpose::STANDARD as BASE64;
use base64::engine::Engine as _;
use bytes::Bytes;
use http_body_util::Empty;
use hyper::body::Incoming;
use hyper_util::client::legacy::Client;
use hyper_util::rt::TokioExecutor;

lazy_static! {
static ref AUTH: std::sync::Mutex<std::collections::HashMap<Handle, Header>> =
Expand Down Expand Up @@ -90,7 +95,7 @@ pub async fn check_auth(url: &str, auth: &Handle, required: bool) -> josh::JoshR
tracing::trace!("no cached auth {:?}", *AUTH_TIMERS.lock()?);

let https = hyper_tls::HttpsConnector::new();
let client = hyper::Client::builder().build::<_, hyper::Body>(https);
let client = Client::builder(TokioExecutor::new()).build::<_, Empty<Bytes>>(https);

let password = AUTH
.lock()?
Expand All @@ -109,7 +114,7 @@ pub async fn check_auth(url: &str, auth: &Handle, required: bool) -> josh::JoshR
builder
};

let request = builder.body(hyper::Body::empty())?;
let request = builder.body(Empty::new())?;
let resp = client.request(request).await?;

let status = resp.status();
Expand All @@ -125,19 +130,16 @@ pub async fn check_auth(url: &str, auth: &Handle, required: bool) -> josh::JoshR
Ok(true)
} else if status == hyper::StatusCode::UNAUTHORIZED {
tracing::warn!("resp.status == 401: {:?}", &err_msg);
tracing::trace!(
"body: {:?}",
std::str::from_utf8(&hyper::body::to_bytes(resp.into_body()).await?)
);
tracing::trace!("body: {:?}", resp.into_body());
Ok(false)
} else {
return Err(josh::josh_error(&err_msg));
}
}

pub fn strip_auth(
req: hyper::Request<hyper::Body>,
) -> josh::JoshResult<(Handle, hyper::Request<hyper::Body>)> {
req: hyper::Request<Incoming>,
) -> josh::JoshResult<(Handle, hyper::Request<Incoming>)> {
let mut req = req;
let header: Option<hyper::header::HeaderValue> =
req.headers_mut().remove(hyper::header::AUTHORIZATION);
Expand Down
Loading

0 comments on commit 078e373

Please sign in to comment.