Skip to content

Commit

Permalink
assets: add support for configurable bufsize for FileHandler (#649)
Browse files Browse the repository at this point in the history
* assets: add support for configurable bufsize for FileHandler

The current implementation is using the blocksize as buffer size.
In many use cases, the size is too small and limiting throughput.
This commit adds an new option, buffer_size in the FileOptions, which,
when set, overrides the default value.

Also included is a cargo criterion benchmark, which can be run with

```
cargo bench --bench file_handler
```

The benchmark shows major performance improvements with larger buffers
when serving large files.

Signed-off-by: Janne Pelkonen <[email protected]>

* Fix formatting

Signed-off-by: Janne Pelkonen <[email protected]>

---------

Signed-off-by: Janne Pelkonen <[email protected]>
  • Loading branch information
jpelkonen authored Apr 10, 2024
1 parent f6e467e commit 82b5a03
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 2 deletions.
7 changes: 7 additions & 0 deletions gotham/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,15 @@ tokio-rustls = { version = "0.23", optional = true }
uuid = { version = "1.0", features = ["v4"] }

[dev-dependencies]
criterion = { version = "0.5.1", features = ["cargo_bench_support", "plotters", "rayon", "async_futures", "async_tokio"] }
futures-executor = "0.3.14"
reqwest = "0.12.2"
tempfile = "3.10.1"
tokio = { version = "1.11.0", features = ["macros", "test-util"] }

[package.metadata.docs.rs]
all-features = true

[[bench]]
name = "file_handler"
harness = false
152 changes: 152 additions & 0 deletions gotham/benches/file_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
use std::{
collections::HashMap,
fs::File,
io::{BufWriter, Write},
net::{SocketAddr, ToSocketAddrs},
sync::atomic::{AtomicU64, Ordering::Relaxed},
time::{Duration, SystemTime},
};

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use futures_util::future;
use gotham::{
bind_server,
handler::FileOptions,
router::{
build_simple_router,
builder::{DefineSingleRoute, DrawRoutes},
},
};
use tempfile::TempDir;
use tokio::{
net::TcpListener,
runtime::{self, Runtime},
};

struct BenchServer {
runtime: Runtime,
addr: SocketAddr,
#[allow(dead_code)]
tmp: TempDir,
// sizes of test files
sizes: Vec<u64>,
buf_paths: HashMap<String, Option<usize>>,
}

impl BenchServer {
fn new() -> anyhow::Result<Self> {
let tmp = TempDir::new()?;
// temporary datafiles
let sizes = [10, 17, 24]
.iter()
.filter_map(|sz| {
let size = 1 << sz;
mk_tmp(&tmp, size).ok()
})
.collect();
let buf_paths = HashMap::from([
("default".to_string(), None),
("128k".to_string(), Some(1 << 17)),
]);

let router = build_simple_router(|route| {
for (path, sz) in &buf_paths {
let mut opts = FileOptions::from(tmp.path().to_owned());
if let Some(size) = sz {
opts.with_buffer_size(*size);
}
route
.get(format!("/{path}/*").as_str())
.to_dir(opts.to_owned())
}
});
let runtime = runtime::Builder::new_multi_thread()
.worker_threads(num_cpus::get())
.thread_name("file_handler-bench")
.enable_all()
.build()
.unwrap();
// build server manually so that we can capture the actual port instead of 0
let addr: std::net::SocketAddr = "127.0.0.1:0".to_socket_addrs().unwrap().next().unwrap();
let listener = runtime.block_on(TcpListener::bind(addr)).unwrap();
// use any free port
let addr = listener.local_addr().unwrap();
let _ = runtime.spawn(async move {
bind_server(listener, router, future::ok).await;
});
std::thread::sleep(Duration::from_millis(100));
Ok(Self {
runtime,
addr,
tmp,
sizes,
buf_paths,
})
}
}

fn mk_tmp(tmp: &TempDir, size: u64) -> anyhow::Result<u64> {
let filename = tmp.path().join(format!("{size}"));
let file = File::create(filename)?;
let mut w = BufWriter::with_capacity(2 << 16, file);
// pseudo random data: time stamp as bytes
let ts_data = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_nanos()
.to_le_bytes();
for _ in (0..size).step_by(ts_data.len()) {
w.write_all(&ts_data)?;
}
Ok(size)
}

pub fn filehandler_benchmark(c: &mut Criterion) {
let server = BenchServer::new().unwrap();

let runtime = server.runtime;
let client = reqwest::Client::builder().build().unwrap();
let counter = AtomicU64::new(0);
let failed = AtomicU64::new(0);

for file_size in server.sizes {
let mut group = c.benchmark_group("server_bench");
group.throughput(Throughput::Bytes(file_size));
for (path, buf_size) in &server.buf_paths {
let url = format!("http://{}/{path}/{file_size}", server.addr);
let req = client.get(url).build().unwrap();
group.bench_with_input(
BenchmarkId::new(
"test_file_handler",
format!("filesize: {file_size}, bufsize: {buf_size:?}"),
),
&req,
|b, req| {
b.to_async(&runtime).iter(|| async {
let r = client.execute(req.try_clone().unwrap()).await;
counter.fetch_add(1, Relaxed);
match r {
Err(_) => {
failed.fetch_add(1, Relaxed);
}
Ok(res) => {
// sanity check: did we get what was expected?
assert_eq!(res.content_length().unwrap(), file_size);
let _ = res.bytes().await.unwrap();
}
}
});
},
);
}
}
println!("Errors {}/{}", failed.load(Relaxed), counter.load(Relaxed));
}

criterion_group! {
name = file_handler;
config = Criterion::default().measurement_time(Duration::from_millis(10_000)).warm_up_time(Duration::from_millis(10));
targets = filehandler_benchmark
}

criterion_main!(file_handler);
15 changes: 13 additions & 2 deletions gotham/src/handler/assets/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub struct FileOptions {
cache_control: String,
gzip: bool,
brotli: bool,
buffer_size: Option<usize>,
}

impl FileOptions {
Expand All @@ -88,6 +89,7 @@ impl FileOptions {
cache_control: "public".to_string(),
gzip: false,
brotli: false,
buffer_size: None,
}
}

Expand All @@ -111,6 +113,13 @@ impl FileOptions {
self
}

/// Sets the maximum buffer size to be used when serving the file.
/// If unset, the default maximum buffer size corresponding to file system block size will be used.
pub fn with_buffer_size(&mut self, buf_sz: usize) -> &mut Self {
self.buffer_size = Some(buf_sz);
self
}

/// Clones `self` to return an owned value for passing to a handler.
pub fn build(&mut self) -> Self {
self.clone()
Expand Down Expand Up @@ -215,7 +224,9 @@ fn create_file_response(options: FileOptions, state: State) -> Pin<Box<HandlerFu
.body(Body::empty())
.unwrap());
}
let buf_size = optimal_buf_size(&meta);
let buf_size = options
.buffer_size
.unwrap_or_else(|| optimal_buf_size(&meta));
let (len, range_start) = match resolve_range(meta.len(), &headers) {
Ok((len, range_start)) => (len, range_start),
Err(e) => {
Expand All @@ -229,7 +240,7 @@ fn create_file_response(options: FileOptions, state: State) -> Pin<Box<HandlerFu
file.seek(SeekFrom::Start(seek_to)).await?;
};

let stream = file_stream(file, buf_size, len);
let stream = file_stream(file, cmp::min(buf_size, len as usize), len);
let body = Body::wrap_stream(stream.into_stream());
let mut response = hyper::Response::builder()
.status(StatusCode::OK)
Expand Down

0 comments on commit 82b5a03

Please sign in to comment.