Skip to content

Commit

Permalink
Add --blob-cache-dir arg use to generate raw blob cache and meta
Browse files Browse the repository at this point in the history
generate blob cache and blob meta through the —-blob-cache-dir parameters,
so that nydusd can be started directly from these two files without
going to the backend to download. this can improve the performance
of data loading in localfs mode.

Signed-off-by: zyfjeff <[email protected]>
  • Loading branch information
zyfjeff committed Oct 9, 2023
1 parent b777564 commit 1a0660f
Show file tree
Hide file tree
Showing 7 changed files with 248 additions and 15 deletions.
19 changes: 15 additions & 4 deletions builder/src/core/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@
// SPDX-License-Identifier: Apache-2.0

use std::borrow::Cow;
use std::io::Write;
use std::io::{Seek, Write};
use std::mem::size_of;
use std::slice;

use anyhow::{Context, Result};
use nydus_rafs::metadata::RAFS_MAX_CHUNK_SIZE;
use nydus_storage::device::BlobFeatures;
use nydus_storage::meta::{toc, BlobMetaChunkArray};
use nydus_storage::meta::{toc, BlobCompressionContextHeader, BlobMetaChunkArray};
use nydus_utils::digest::{self, DigestHasher, RafsDigest};
use nydus_utils::{compress, crypt};
use nydus_utils::{compress, crypt, try_round_up_4k};
use sha2::digest::Digest;

use super::layout::BlobLayout;
Expand Down Expand Up @@ -194,7 +195,6 @@ impl Blob {
} else if ctx.blob_tar_reader.is_some() {
header.set_separate_blob(true);
};

let mut compressor = Self::get_compression_algorithm_for_meta(ctx);
let (compressed_data, compressed) = compress::compress(ci_data, compressor)
.with_context(|| "failed to compress blob chunk info array".to_string())?;
Expand Down Expand Up @@ -223,6 +223,17 @@ impl Blob {
}

blob_ctx.blob_meta_header = header;
if let Some(blob_cache) = ctx.blob_cache_generator.as_ref() {
let mut guard = blob_cache.lock().unwrap();
let meta_write = guard.get_blob_meta_writer_mut();
let aligned_uncompressed_size = try_round_up_4k(uncompressed_size as u64).unwrap();
meta_write.set_len(
aligned_uncompressed_size + size_of::<BlobCompressionContextHeader>() as u64,
)?;
meta_write.write_all(ci_data)?;
meta_write.seek(std::io::SeekFrom::Start(aligned_uncompressed_size))?;
meta_write.write_all(header.as_bytes())?;
}
let encrypted_header =
crypt::encrypt_with_context(header.as_bytes(), cipher_obj, cipher_ctx, encrypt)?;
let header_size = encrypted_header.len();
Expand Down
48 changes: 46 additions & 2 deletions builder/src/core/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,13 @@ impl Write for ArtifactMemoryWriter {
}
}

struct ArtifactFileWriter(ArtifactWriter);
pub struct ArtifactFileWriter(pub ArtifactWriter);

impl ArtifactFileWriter {
pub fn finalize(&mut self, name: Option<String>) -> Result<()> {
self.0.finalize(name)
}
}

impl RafsIoWrite for ArtifactFileWriter {
fn as_any(&self) -> &dyn Any {
Expand All @@ -215,6 +221,12 @@ impl RafsIoWrite for ArtifactFileWriter {
}
}

impl ArtifactFileWriter {
pub fn set_len(&mut self, s: u64) -> std::io::Result<()> {
self.0.file.get_mut().set_len(s)
}
}

impl Seek for ArtifactFileWriter {
fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
self.0.file.seek(pos)
Expand Down Expand Up @@ -367,6 +379,35 @@ impl ArtifactWriter {
}
}

pub struct BlobCacheGenerator {
blob_data: ArtifactFileWriter,
blob_meta: ArtifactFileWriter,
}

impl BlobCacheGenerator {
pub fn new(storage: ArtifactStorage) -> Result<Self> {
Ok(BlobCacheGenerator {
blob_data: ArtifactFileWriter(ArtifactWriter::new(storage.clone())?),
blob_meta: ArtifactFileWriter(ArtifactWriter::new(storage)?),
})
}

pub fn get_blob_data_writer_mut(&mut self) -> &mut ArtifactFileWriter {
&mut self.blob_data
}

pub fn get_blob_meta_writer_mut(&mut self) -> &mut ArtifactFileWriter {
&mut self.blob_meta
}

pub fn finalize(&mut self, name: &str) -> Result<()> {
let blob_data_name = format!("{}.blob.data", name);
self.blob_data.finalize(Some(blob_data_name))?;
let blob_meta_name = format!("{}.blob.meta", name);
self.blob_meta.finalize(Some(blob_meta_name))
}
}

/// BlobContext is used to hold the blob information of a layer during build.
pub struct BlobContext {
/// Blob id (user specified or sha256(blob)).
Expand Down Expand Up @@ -1182,6 +1223,8 @@ pub struct BuildContext {

pub features: Features,
pub configuration: Arc<ConfigV2>,
/// Generate the blob cache and blob meta
pub blob_cache_generator: Option<Mutex<BlobCacheGenerator>>,
}

impl BuildContext {
Expand Down Expand Up @@ -1221,7 +1264,6 @@ impl BuildContext {
} else {
crypt::Algorithm::None
};

BuildContext {
blob_id,
aligned_chunk,
Expand Down Expand Up @@ -1250,6 +1292,7 @@ impl BuildContext {

features,
configuration: Arc::new(ConfigV2::default()),
blob_cache_generator: None,
}
}

Expand Down Expand Up @@ -1299,6 +1342,7 @@ impl Default for BuildContext {
blob_inline_meta: false,
features: Features::new(),
configuration: Arc::new(ConfigV2::default()),
blob_cache_generator: None,
}
}
}
Expand Down
15 changes: 14 additions & 1 deletion builder/src/core/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
use std::ffi::{OsStr, OsString};
use std::fmt::{self, Display, Formatter, Result as FmtResult};
use std::fs::{self, File};
use std::io::{Read, Write};
use std::io::{Read, Seek, Write};
use std::ops::Deref;
#[cfg(target_os = "linux")]
use std::os::linux::fs::MetadataExt;
Expand Down Expand Up @@ -462,6 +462,19 @@ impl Node {
chunk.set_compressed(is_compressed);
}

if let Some(blob_cache) = ctx.blob_cache_generator.as_ref() {
let mut guard = blob_cache.lock().unwrap();
let write = guard.get_blob_data_writer_mut();
let curr_pos = write.seek(std::io::SeekFrom::End(0))?;
if curr_pos < chunk.uncompressed_offset() + aligned_d_size as u64 {
write.set_len(chunk.uncompressed_offset() + aligned_d_size as u64)?;
}

write.seek(std::io::SeekFrom::Start(chunk.uncompressed_offset()))?;
write
.write_all(&chunk_data)
.context("failed to write blob cache")?;
}
event_tracer!("blob_uncompressed_size", +d_size);

Ok(chunk_info)
Expand Down
9 changes: 7 additions & 2 deletions builder/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ pub use self::compact::BlobCompactor;
pub use self::core::bootstrap::Bootstrap;
pub use self::core::chunk_dict::{parse_chunk_dict_arg, ChunkDict, HashChunkDict};
pub use self::core::context::{
ArtifactStorage, ArtifactWriter, BlobContext, BlobManager, BootstrapContext, BootstrapManager,
BuildContext, BuildOutput, ConversionType,
ArtifactFileWriter, ArtifactStorage, ArtifactWriter, BlobCacheGenerator, BlobContext,
BlobManager, BootstrapContext, BootstrapManager, BuildContext, BuildOutput, ConversionType,
};
pub use self::core::feature::{Feature, Features};
pub use self::core::node::{ChunkSource, NodeChunk};
Expand Down Expand Up @@ -237,6 +237,11 @@ fn finalize_blob(
// blob file.
if !is_tarfs {
blob_writer.finalize(Some(blob_meta_id))?;
if let Some(blob_cache) = ctx.blob_cache_generator.as_ref() {
let mut guard = blob_cache.lock().unwrap();
guard.finalize(&blob_ctx.blob_id)?;
drop(guard);
}
}
}

Expand Down
120 changes: 120 additions & 0 deletions smoke/tests/blobcache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package tests

import (
"fmt"
"io"
"io/fs"
"io/ioutil"
"os"
"path/filepath"
"testing"

"github.com/containerd/containerd/log"
"github.com/dragonflyoss/image-service/smoke/tests/texture"
"github.com/dragonflyoss/image-service/smoke/tests/tool"
"github.com/dragonflyoss/image-service/smoke/tests/tool/test"
"github.com/opencontainers/go-digest"
"github.com/stretchr/testify/require"
)

type BlobCacheTestSuite struct {
T *testing.T
}

func (a *BlobCacheTestSuite) compareTwoFiles(t *testing.T, left, right string) {

lf, err := os.Open(left)
require.NoError(t, err)
defer lf.Close()
leftDigester, err := digest.FromReader(lf)
require.NoError(t, err)


rf, err := os.Open(right)
require.NoError(t, err)
defer rf.Close()
rightDigester, err := digest.FromReader(rf)
require.NoError(t, err)

require.Equal(t, leftDigester, rightDigester)
}

func (a *BlobCacheTestSuite) TestGenerateBlobcache(t *testing.T) {

ctx := tool.DefaultContext(t)

ctx.PrepareWorkDir(t)
defer ctx.Destroy(t)

rootFs := texture.MakeLowerLayer(t, filepath.Join(ctx.Env.WorkDir, "root-fs"))

rootfsReader := rootFs.ToOCITar(t);

ociBlobDigester := digest.Canonical.Digester()
ociBlob, err := ioutil.TempFile(ctx.Env.BlobDir, "oci-blob-")
require.NoError(t, err)

_, err = io.Copy(io.MultiWriter(ociBlobDigester.Hash(), ociBlob), rootfsReader)
require.NoError(t, err)

ociBlobDigest := ociBlobDigester.Digest()
err = os.Rename(ociBlob.Name(), filepath.Join(ctx.Env.BlobDir, ociBlobDigest.Hex()))
require.NoError(t, err)

// use to generate blob.data and blob.meta
blobcacheDir := filepath.Join(ctx.Env.WorkDir, "blobcache")
err = os.MkdirAll(blobcacheDir, 0755)
require.NoError(t, err)

ctx.Env.BootstrapPath = filepath.Join(ctx.Env.WorkDir, "bootstrap")


tool.Run(t, fmt.Sprintf("%s create -t targz-ref --bootstrap %s --blob-dir %s --blob-cache-dir %s %s",
ctx.Binary.Builder, ctx.Env.BootstrapPath, ctx.Env.BlobDir, blobcacheDir,
filepath.Join(ctx.Env.BlobDir, ociBlobDigest.Hex())));

nydusd, err := tool.NewNydusd(tool.NydusdConfig{
NydusdPath: ctx.Binary.Nydusd,
BootstrapPath: ctx.Env.BootstrapPath,
ConfigPath: filepath.Join(ctx.Env.WorkDir, "nydusd-config.fusedev.json"),
MountPath: ctx.Env.MountDir,
APISockPath: filepath.Join(ctx.Env.WorkDir, "nydusd-api.sock"),
BackendType: "localfs",
BackendConfig: fmt.Sprintf(`{"dir": "%s"}`, ctx.Env.BlobDir),
EnablePrefetch: ctx.Runtime.EnablePrefetch,
BlobCacheDir: ctx.Env.CacheDir,
CacheType: ctx.Runtime.CacheType,
CacheCompressed: ctx.Runtime.CacheCompressed,
RafsMode: ctx.Runtime.RafsMode,
DigestValidate: false,
})
require.NoError(t, err)

err = nydusd.Mount()
require.NoError(t, err)
defer func() {
if err := nydusd.Umount(); err != nil {
log.L.WithError(err).Errorf("umount")
}
}()

// make sure blobcache ready
err = filepath.WalkDir(ctx.Env.MountDir, func(path string, entry fs.DirEntry, err error) error {
require.Nil(t, err)
if entry.Type().IsRegular() {
targetPath, err := filepath.Rel(ctx.Env.MountDir, path)
require.NoError(t, err)
_, _ = os.ReadFile(targetPath)
}
return nil
})
require.NoError(t, err)

a.compareTwoFiles(t, filepath.Join(blobcacheDir,fmt.Sprintf("%s.blob.data", ociBlobDigest.Hex())), filepath.Join(ctx.Env.CacheDir, fmt.Sprintf("%s.blob.data", ociBlobDigest.Hex())))
a.compareTwoFiles(t, filepath.Join(blobcacheDir, fmt.Sprintf("%s.blob.meta", ociBlobDigest.Hex())), filepath.Join(ctx.Env.CacheDir, fmt.Sprintf("%s.blob.meta", ociBlobDigest.Hex())))
}


func TestBlobCache(t *testing.T) {
test.Run(t, &BlobCacheTestSuite{T: t})
}
6 changes: 3 additions & 3 deletions smoke/tests/tool/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (l *Layer) TargetPath(t *testing.T, path string) string {

func (l *Layer) Pack(t *testing.T, packOption converter.PackOption, blobDir string) digest.Digest {
// Output OCI tar stream
ociTar := l.toOCITar(t)
ociTar := l.ToOCITar(t)
defer ociTar.Close()
l.recordFileTree(t)

Expand All @@ -141,7 +141,7 @@ func (l *Layer) Pack(t *testing.T, packOption converter.PackOption, blobDir stri

func (l *Layer) PackRef(t *testing.T, ctx Context, blobDir string, compress bool) (digest.Digest, digest.Digest) {
// Output OCI tar stream
ociTar := l.toOCITar(t)
ociTar := l.ToOCITar(t)
defer ociTar.Close()
l.recordFileTree(t)

Expand Down Expand Up @@ -238,7 +238,7 @@ func (l *Layer) recordFileTree(t *testing.T) {
})
}

func (l *Layer) toOCITar(t *testing.T) io.ReadCloser {
func (l *Layer) ToOCITar(t *testing.T) io.ReadCloser {
return archive.Diff(context.Background(), "", l.workDir)
}

Expand Down
Loading

0 comments on commit 1a0660f

Please sign in to comment.