Skip to content

Commit

Permalink
Merge pull request #1159 from jiangliu/tarfs
Browse files Browse the repository at this point in the history
Add `export` subcommand to `nydus-image`
  • Loading branch information
imeoer authored Mar 29, 2023
2 parents d719573 + fc3979e commit 819ccaf
Show file tree
Hide file tree
Showing 3 changed files with 501 additions and 177 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ nydus-api = { version = "0.2.2", path = "api", features = ["handler"] }
nydus-app = { version = "0.3.2", path = "app" }
nydus-error = { version = "0.2.3", path = "error" }
nydus-rafs = { version = "0.2.2", path = "rafs", features = ["builder"] }
nydus-service = { version = "0.2.0", path = "service" }
nydus-service = { version = "0.2.0", path = "service", features = ["block-device"] }
nydus-storage = { version = "0.6.2", path = "storage" }
nydus-utils = { version = "0.4.1", path = "utils" }

Expand Down
204 changes: 200 additions & 4 deletions service/src/block_device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,25 @@
//! Based on the block address scheme, an RAFSv6 image can be converted into/represented as a block
//! device, so it can be directly mounted by Linux EROFS fs driver.
use std::cmp::min;
use std::cmp::{max, min};
use std::fs::OpenOptions;
use std::io::Result;
use std::path::PathBuf;
use std::sync::Arc;
use std::thread;
use std::thread::JoinHandle;

use dbs_allocator::{Constraint, IntervalTree, NodeState, Range};
use nydus_api::BlobCacheEntry;
use nydus_rafs::metadata::layout::v6::{
EROFS_BLOCK_BITS_12, EROFS_BLOCK_BITS_9, EROFS_BLOCK_SIZE_4096, EROFS_BLOCK_SIZE_512,
};
use nydus_storage::utils::alloc_buf;
use tokio_uring::buf::IoBufMut;

use crate::blob_cache::{BlobCacheMgr, BlobConfig, DataBlob, MetaBlob};
use crate::blob_cache::{generate_blob_key, BlobCacheMgr, BlobConfig, DataBlob, MetaBlob};

const BLOCK_DEVICE_EXPORT_BATCH_SIZE: usize = 0x80000;

enum BlockRange {
Hole,
Expand Down Expand Up @@ -81,7 +89,7 @@ impl BlockDevice {
meta_blob_config.blob_id()
))
})?;
ranges.update(&range, BlockRange::MetaBlob(meta_blob.clone()));
ranges.update(&range, BlockRange::MetaBlob(meta_blob));

let mut pos = blocks;
let data_blobs = meta_blob_config.get_blobs();
Expand Down Expand Up @@ -272,6 +280,194 @@ impl BlockDevice {

(Ok(total_size), buf)
}

/// Export a RAFS filesystem as a raw block disk image.
pub fn export(
blob_entry: BlobCacheEntry,
output: Option<String>,
localfs_dir: Option<String>,
threads: u32,
) -> Result<()> {
let cache_mgr = Arc::new(BlobCacheMgr::new());
cache_mgr.add_blob_entry(&blob_entry).map_err(|e| {
eother!(format!(
"block_device: failed to add blob into CacheMgr, {}",
e
))
})?;
let blob_id = generate_blob_key(&blob_entry.domain_id, &blob_entry.blob_id);
let block_device = BlockDevice::new(blob_id.clone(), cache_mgr.clone()).map_err(|e| {
eother!(format!(
"block_device: failed to create block device object, {}",
e
))
})?;
let block_device = Arc::new(block_device);

let path = match output {
Some(v) => PathBuf::from(v),
None => {
let path = match cache_mgr.get_config(&blob_id) {
Some(BlobConfig::MetaBlob(meta)) => meta.path().to_path_buf(),
_ => return Err(enoent!("block_device: failed to get meta blob")),
};
if !path.is_file() {
return Err(eother!(format!(
"block_device: meta blob {} is not a file",
path.display()
)));
}
let name = path
.file_name()
.ok_or_else(|| {
eother!(format!(
"block_device: failed to get file name from {}",
path.display()
))
})?
.to_str()
.ok_or_else(|| {
eother!(format!(
"block_device: failed to get file name from {}",
path.display()
))
})?;
let dir = localfs_dir
.ok_or_else(|| einval!("block_device: parameter `localfs_dir` is missing"))?;
let path = PathBuf::from(dir);
path.join(name.to_string() + ".disk")
}
};

let output_file = OpenOptions::new()
.create_new(true)
.read(true)
.write(true)
.open(&path)
.map_err(|e| {
eother!(format!(
"block_device: failed to create output file {}, {}",
path.display(),
e
))
})?;
let output_file = Arc::new(tokio_uring::fs::File::from_std(output_file));

let blocks = block_device.blocks();
let batch_size = BLOCK_DEVICE_EXPORT_BATCH_SIZE as u32 / block_device.block_size() as u32;
assert_eq!(batch_size.count_ones(), 1);
let threads = max(threads, 1);
let mut threads = min(threads, 32);
while blocks / threads < batch_size && threads > 1 {
threads /= 2;
}

if threads == 1 {
tokio_uring::start(async move {
Self::do_export(block_device.clone(), output_file, 0, block_device.blocks()).await
})?;
} else {
let mut thread_handlers: Vec<JoinHandle<Result<()>>> =
Vec::with_capacity(threads as usize);
let step = (blocks + batch_size - 1) & !(batch_size - 1);
let mut pos = 0;

for _i in 0..threads {
let count = min(blocks - pos, step);
let mgr = cache_mgr.clone();
let id = blob_id.clone();
let path = path.to_path_buf();

let handler = thread::spawn(move || {
let output_file = OpenOptions::new()
.read(true)
.write(true)
.open(&path)
.map_err(|e| {
eother!(format!(
"block_device: failed to create output file {}, {}",
path.display(),
e
))
})?;
let file = Arc::new(tokio_uring::fs::File::from_std(output_file));
let block_device = BlockDevice::new(id, mgr).map_err(|e| {
eother!(format!(
"block_device: failed to create block device object, {}",
e
))
})?;
let device = Arc::new(block_device);

tokio_uring::start(
async move { Self::do_export(device, file, pos, count).await },
)?;
Ok(())
});
pos += count;
thread_handlers.push(handler);
}
assert_eq!(pos, blocks);
assert_eq!(thread_handlers.len(), threads as usize);

for handler in thread_handlers {
handler
.join()
.map_err(|e| {
eother!(format!(
"block_device: failed to wait for worker thread, {:?}",
e
))
})?
.map_err(|e| {
eother!(format!("block_device: failed to export disk image, {}", e))
})?;
}
}
Ok(())
}

async fn do_export(
block_device: Arc<BlockDevice>,
output_file: Arc<tokio_uring::fs::File>,
start: u32,
mut blocks: u32,
) -> Result<()> {
let batch_size = BLOCK_DEVICE_EXPORT_BATCH_SIZE as u32 / block_device.block_size() as u32;
let mut pos = start;
let mut buf = alloc_buf(BLOCK_DEVICE_EXPORT_BATCH_SIZE);

while blocks > 0 {
let count = min(batch_size, blocks);
let (res, buf1) = block_device.async_read(pos, count, buf).await;
let sz = res?;
if sz != count as usize * block_device.block_size() as usize {
return Err(eio!(
"block_device: failed to read data, got less data than requested"
));
}
buf = buf1;

if sz != buf.len() {
buf.resize(sz, 0);
}
let (res, buf2) = output_file
.write_at(buf, block_device.blocks_to_size(pos))
.await;
let sz1 = res?;
if sz1 != sz {
return Err(eio!(
"block_device: failed to write data to disk image file, written less data than requested"
));
}
buf = buf2;

pos += count;
blocks -= count;
}

Ok(())
}
}

#[cfg(test)]
Expand Down Expand Up @@ -337,7 +533,7 @@ mod tests {
assert!(mgr.get_config(&key).is_some());

let mgr = Arc::new(mgr);
let device = BlockDevice::new(blob_id.clone(), mgr).unwrap();
let device = BlockDevice::new(blob_id, mgr).unwrap();
assert_eq!(device.blocks(), 0x209);

tokio_uring::start(async move {
Expand Down
Loading

0 comments on commit 819ccaf

Please sign in to comment.