diff --git a/Cargo.lock b/Cargo.lock index 316279b063d..522f5b06e7e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -155,12 +155,6 @@ dependencies = [ "generic-array", ] -[[package]] -name = "build_const" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4ae4235e6dac0694637c763029ecea1a2ec9e4e06ec2729bd21ba4d9c863eb7" - [[package]] name = "bumpalo" version = "3.12.0" @@ -299,13 +293,19 @@ dependencies = [ [[package]] name = "crc" -version = "1.8.1" +version = "3.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d663548de7f5cca343f1e0a48d14dcfb0e9eb4e079ec58883b7251539fa10aeb" +checksum = "86ec7a15cbe22e59248fc7eadb1907dab5ba09372595da4d73dd805ed4417dfe" dependencies = [ - "build_const", + "crc-catalog", ] +[[package]] +name = "crc-catalog" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" + [[package]] name = "crc32fast" version = "1.3.2" @@ -431,9 +431,9 @@ dependencies = [ [[package]] name = "fallible-iterator" -version = "0.2.0" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" +checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" [[package]] name = "fallible-streaming-iterator" @@ -637,11 +637,11 @@ checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" [[package]] name = "gpt" -version = "3.0.0" +version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5dd7365d734a70ac5dd7be791b0c96083852188df015b8c665bb2dadb108a743" +checksum = "8283e7331b8c93b9756e0cfdbcfb90312852f953c6faf9bf741e684cc3b6ad69" dependencies = [ - "bitflags 1.3.2", + "bitflags 2.4.0", "crc", "log", "uuid", @@ -934,9 +934,9 @@ checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3" [[package]] name = "libsqlite3-sys" -version = "0.26.0" +version = "0.27.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "afc22eff61b133b115c6e8c74e818c628d6d5e7a502afea6f64dee076dd94326" +checksum = "cf4e226dcd58b4be396f7bd3c20da8fdee2911400705297ba7d2d7cc2c30f716" dependencies = [ "cc", "pkg-config", @@ -1324,8 +1324,11 @@ dependencies = [ "nix", "nydus-api", "nydus-utils", + "r2d2", + "r2d2_sqlite", "regex", "reqwest", + "rusqlite", "serde", "serde_json", "sha1", @@ -1513,6 +1516,12 @@ version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1df8c4ec4b0627e53bdf214615ad287367e482558cf84b109250b37464dc03ae" +[[package]] +name = "ppv-lite86" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" + [[package]] name = "proc-macro-error" version = "1.0.4" @@ -1555,6 +1564,58 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "r2d2" +version = "0.8.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93" +dependencies = [ + "log", + "parking_lot", + "scheduled-thread-pool", +] + +[[package]] +name = "r2d2_sqlite" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4dc290b669d30e20751e813517bbe13662d020419c5c8818ff10b6e8bb7777f6" +dependencies = [ + "r2d2", + "rusqlite", + "uuid", +] + +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom", +] + [[package]] name = "redox_syscall" version = "0.2.13" @@ -1629,9 +1690,9 @@ dependencies = [ [[package]] name = "rusqlite" -version = "0.29.0" +version = "0.30.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "549b9d036d571d42e6e85d1c1425e2ac83491075078ca9a15be021c56b1641f2" +checksum = "a78046161564f5e7cd9008aff3b2990b3850dc8e0349119b98e8f251e099f24d" dependencies = [ "bitflags 2.4.0", "fallible-iterator", @@ -1695,6 +1756,15 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "scheduled-thread-pool" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19" +dependencies = [ + "parking_lot", +] + [[package]] name = "scoped-tls" version = "1.0.0" @@ -2096,11 +2166,12 @@ dependencies = [ [[package]] name = "uuid" -version = "0.8.2" +version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7" +checksum = "5e395fcf16a7a3d8127ec99782007af141946b4795001f876d54fb0d55978560" dependencies = [ "getrandom", + "rand", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 9e9c1d16e27..c85a07a741b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,7 +46,7 @@ log-panics = { version = "2.1.0", features = ["with-backtrace"] } mio = { version = "0.8", features = ["os-poll", "os-ext"] } nix = "0.24.0" rlimit = "0.9.0" -rusqlite = { version = "0.29.0", features = ["bundled"] } +rusqlite = { version = "0.30.0", features = ["bundled"] } serde = { version = "1.0.110", features = ["serde_derive", "rc"] } serde_json = "1.0.51" tar = "0.4.40" diff --git a/storage/Cargo.toml b/storage/Cargo.toml index 0d75e9e8084..b35840dad26 100644 --- a/storage/Cargo.toml +++ b/storage/Cargo.toml @@ -24,6 +24,9 @@ libc = "0.2" log = "0.4.8" nix = "0.24" reqwest = { version = "0.11.14", features = ["blocking", "json"], optional = true } +rusqlite = { version = "0.30", features = ["bundled"], optional = true } +r2d2 = { version = "0.8", optional = true } +r2d2_sqlite = { version = "0.23", optional = true } serde = { version = "1.0.110", features = ["serde_derive", "rc"] } serde_json = "1.0.53" sha1 = { version = "0.10.5", optional = true } @@ -34,7 +37,7 @@ tokio = { version = "1.19.0", features = ["macros", "rt", "rt-multi-thread", "sy url = { version = "2.1.1", optional = true } vm-memory = "0.10" fuse-backend-rs = "^0.10.3" -gpt = { version = "3.0.0", optional = true } +gpt = { version = "3.1.0", optional = true } nydus-api = { version = "0.3", path = "../api" } nydus-utils = { version = "0.4", path = "../utils", features = ["encryption", "zran"] } @@ -46,6 +49,7 @@ regex = "1.7.0" toml = "0.5" [features] +default = ["dedup"] backend-localdisk = [] backend-localdisk-gpt = ["gpt", "backend-localdisk"] backend-localfs = [] @@ -53,6 +57,7 @@ backend-oss = ["base64", "httpdate", "hmac", "sha1", "reqwest", "url"] backend-registry = ["base64", "reqwest", "url"] backend-s3 = ["base64", "hmac", "http", "reqwest", "sha2", "time", "url"] backend-http-proxy = ["hyper", "hyperlocal", "http", "reqwest", "url"] +dedup = ["rusqlite", "r2d2", "r2d2_sqlite"] prefetch-rate-limit = ["leaky-bucket"] [package.metadata.docs.rs] diff --git a/storage/src/backend/localdisk.rs b/storage/src/backend/localdisk.rs index 329af02c35c..0c83bfba9db 100644 --- a/storage/src/backend/localdisk.rs +++ b/storage/src/backend/localdisk.rs @@ -297,7 +297,7 @@ impl LocalDisk { v.name.clone() } else { // The 64-byte blob_id is stored in two parts - v.name.clone() + guid.to_simple().to_string().as_str() + v.name.clone() + guid.simple().to_string().as_str() }; if name.is_empty() { diff --git a/storage/src/cache/dedup/db.rs b/storage/src/cache/dedup/db.rs new file mode 100644 index 00000000000..6daff37c70b --- /dev/null +++ b/storage/src/cache/dedup/db.rs @@ -0,0 +1,317 @@ +// Copyright (C) 2022-2023 Alibaba Cloud. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + +#![allow(unused)] + +use std::path::Path; + +use r2d2::{Pool, PooledConnection}; +use r2d2_sqlite::SqliteConnectionManager; +use rusqlite::{Connection, DropBehavior, OptionalExtension, Transaction}; + +use super::Result; + +pub struct CasDb { + pool: Pool, +} + +impl CasDb { + pub fn new(path: impl AsRef) -> Result { + let mut db_path = path.as_ref().to_owned(); + db_path.push("cas.db"); + Self::from_file(db_path) + } + + pub fn from_file(db_path: impl AsRef) -> Result { + let mgr = SqliteConnectionManager::file(db_path); + let pool = r2d2::Pool::new(mgr)?; + let conn = pool.get()?; + + conn.execute( + "CREATE TABLE IF NOT EXISTS Blobs ( + BlobId INTEGER PRIMARY KEY, + FilePath TEXT NOT NULL UNIQUE + )", + (), + )?; + + conn.execute( + "CREATE TABLE IF NOT EXISTS Chunks ( + ChunkId TEXT NOT NULL, + ChunkOffset INTEGER, + BlobId INTEGER, + UNIQUE(ChunkId, BlobId) ON CONFLICT IGNORE, + FOREIGN KEY(BlobId) REFERENCES Blobs(BlobId) + )", + (), + )?; + conn.execute( + "CREATE INDEX IF NOT EXISTS ChunkIndex ON Chunks(ChunkId)", + (), + )?; + + Ok(CasDb { pool }) + } + + pub fn get_blob_id_with_tx(tran: &Transaction, blob: &str) -> Result> { + let sql = "SELECT BlobId FROM Blobs WHERE FilePath = ?"; + + if let Some(id) = tran + .query_row(sql, [blob], |row| row.get::(0)) + .optional()? + { + return Ok(Some(id)); + } + + Ok(None) + } + + pub fn get_blob_id(&self, blob: &str) -> Result> { + let sql = "SELECT BlobId FROM Blobs WHERE FilePath = ?"; + + if let Some(id) = self + .get_connection()? + .query_row(sql, [blob], |row| row.get::(0)) + .optional()? + { + return Ok(Some(id)); + } + + Ok(None) + } + + pub fn get_blob_path(&self, id: u64) -> Result> { + let sql = "SELECT FilePath FROM Blobs WHERE BlobId = ?"; + + if let Some(path) = self + .get_connection()? + .query_row(sql, [id], |row| row.get::(0)) + .optional()? + { + return Ok(Some(path)); + }; + + Ok(None) + } + + pub fn get_all_blobs(&self) -> Result> { + let conn = self.get_connection()?; + let mut stmt = conn.prepare_cached("SELECT BlobId, FilePath FROM Blobs")?; + let rows = stmt.query_map([], |row| Ok((row.get::(0)?, row.get(1)?)))?; + let mut results: Vec<(u64, String)> = Vec::new(); + for row in rows { + results.push(row?); + } + Ok(results) + } + + pub fn add_blobs(&mut self, blobs: &[String]) -> Result<()> { + let sql = "INSERT OR IGNORE INTO Blobs (FilePath) VALUES (?1)"; + let mut conn = self.get_connection()?; + let tran = Self::begin_transaction(&mut conn)?; + + for blob in blobs { + if let Err(e) = tran.execute(sql, [blob]) { + return Err(e.into()); + }; + } + tran.commit()?; + + Ok(()) + } + + pub fn add_blob(&self, blob: &str) -> Result { + let sql = "INSERT OR IGNORE INTO Blobs (FilePath) VALUES (?1)"; + let conn = self.get_connection()?; + conn.execute(sql, [blob])?; + Ok(conn.last_insert_rowid() as u64) + } + + pub fn delete_blobs(&mut self, blobs: &[String]) -> Result<()> { + let delete_blobs_sql = "DELETE FROM Blobs WHERE BlobId = (?1)"; + let delete_chunks_sql = "DELETE FROM Chunks WHERE BlobId = (?1)"; + let mut conn = self.get_connection()?; + let tran = Self::begin_transaction(&mut conn)?; + + for blob in blobs { + if let Some(id) = Self::get_blob_id_with_tx(&tran, blob)? { + if let Err(e) = tran.execute(delete_chunks_sql, [id]) { + return Err(e.into()); + } + if let Err(e) = tran.execute(delete_blobs_sql, [id]) { + return Err(e.into()); + } + } + } + tran.commit()?; + + Ok(()) + } + + pub fn get_chunk_info(&self, chunk_id: &str) -> Result> { + let sql = "SELECT FilePath, ChunkOffset \ + FROM Chunks INDEXED BY ChunkIndex \ + JOIN Blobs ON Chunks.BlobId = Blobs.BlobId \ + WHERE ChunkId = ?\ + ORDER BY Blobs.BlobId LIMIT 1 OFFSET 0"; + + if let Some((new_blob_id, chunk_info)) = self + .get_connection()? + .query_row(sql, [chunk_id], |row| { + Ok((row.get(0)?, row.get::(1)?)) + }) + .optional()? + { + return Ok(Some((new_blob_id, chunk_info))); + } + + Ok(None) + } + + pub fn add_chunks(&mut self, chunks: &[(String, u64, String)]) -> Result<()> { + let sql = "INSERT OR IGNORE INTO Chunks (ChunkId, ChunkOffset, BlobId) VALUES (?1, ?2, ?3)"; + let mut conn = self.get_connection()?; + let tran = Self::begin_transaction(&mut conn)?; + + for chunk in chunks { + match Self::get_blob_id_with_tx(&tran, &chunk.2) { + Err(e) => return Err(e), + Ok(id) => { + if let Err(e) = tran.execute(sql, (&chunk.0, &chunk.1, id)) { + return Err(e.into()); + } + } + } + } + tran.commit()?; + + Ok(()) + } + + pub fn add_chunk(&self, chunk_id: &str, chunk_offset: u64, blob_id: &str) -> Result<()> { + let sql = "INSERT OR IGNORE INTO Chunks (ChunkId, ChunkOffset, BlobId) VALUES (?1, ?2, ?3)"; + let mut conn = self.get_connection()?; + let tran = Self::begin_transaction(&mut conn)?; + + match Self::get_blob_id_with_tx(&tran, blob_id) { + Err(e) => return Err(e), + Ok(id) => { + if let Err(e) = tran.execute(sql, (chunk_id, chunk_offset, id)) { + return Err(e.into()); + } + } + } + tran.commit()?; + + Ok(()) + } + + fn begin_transaction( + conn: &mut PooledConnection, + ) -> Result { + let mut tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?; + tx.set_drop_behavior(DropBehavior::Rollback); + Ok(tx) + } + + fn get_connection(&self) -> Result> { + let conn = self.pool.get()?; + conn.busy_handler(Some(|_v| true))?; + Ok(conn) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use vmm_sys_util::tempdir::TempDir; + + #[test] + fn test_cas_blob() { + let tmpdir = TempDir::new().unwrap(); + + let mut cas_mgr = CasDb::new(tmpdir.as_path()).unwrap(); + cas_mgr + .add_blobs(&["/tmp/blob1".to_string(), "/tmp/blob2".to_string()]) + .unwrap(); + + let mut mgr2 = CasDb::new(tmpdir.as_path()).unwrap(); + assert_eq!(mgr2.add_blob("/tmp/blob3").unwrap(), 3); + + drop(cas_mgr); + + assert_eq!(mgr2.get_blob_id("/tmp/blob1").unwrap(), Some(1)); + assert_eq!(mgr2.get_blob_id("/tmp/blob2").unwrap(), Some(2)); + assert_eq!(mgr2.get_blob_id("/tmp/blob3").unwrap(), Some(3)); + assert_eq!(mgr2.get_blob_id("/tmp/blob4").unwrap(), None); + + assert_eq!( + mgr2.get_blob_path(1).unwrap(), + Some("/tmp/blob1".to_string()) + ); + assert_eq!( + mgr2.get_blob_path(2).unwrap(), + Some("/tmp/blob2".to_string()) + ); + assert_eq!( + mgr2.get_blob_path(3).unwrap(), + Some("/tmp/blob3".to_string()) + ); + assert_eq!(mgr2.get_blob_path(4).unwrap(), None); + + let blobs = mgr2.get_all_blobs().unwrap(); + assert_eq!(blobs.len(), 3); + + mgr2.delete_blobs(&["/tmp/blob1".to_string(), "/tmp/blob2".to_string()]) + .unwrap(); + assert_eq!(mgr2.get_blob_path(1).unwrap(), None); + assert_eq!(mgr2.get_blob_path(2).unwrap(), None); + assert_eq!( + mgr2.get_blob_path(3).unwrap(), + Some("/tmp/blob3".to_string()) + ); + + let blobs = mgr2.get_all_blobs().unwrap(); + assert_eq!(blobs.len(), 1); + } + + #[test] + fn test_cas_chunk() { + let tmpdir = TempDir::new().unwrap(); + let mut cas_mgr = CasDb::new(tmpdir.as_path()).unwrap(); + cas_mgr + .add_blobs(&["/tmp/blob1".to_string(), "/tmp/blob2".to_string()]) + .unwrap(); + + cas_mgr + .add_chunks(&[ + ("chunk1".to_string(), 4096, "/tmp/blob1".to_string()), + ("chunk2".to_string(), 0, "/tmp/blob2".to_string()), + ]) + .unwrap(); + + let (file, offset) = cas_mgr.get_chunk_info("chunk1").unwrap().unwrap(); + assert_eq!(&file, "/tmp/blob1"); + assert_eq!(offset, 4096); + let (file, offset) = cas_mgr.get_chunk_info("chunk2").unwrap().unwrap(); + assert_eq!(&file, "/tmp/blob2"); + assert_eq!(offset, 0); + + cas_mgr.add_chunk("chunk1", 8192, "/tmp/blob2").unwrap(); + let (file, offset) = cas_mgr.get_chunk_info("chunk1").unwrap().unwrap(); + assert_eq!(&file, "/tmp/blob1"); + assert_eq!(offset, 4096); + + cas_mgr.delete_blobs(&["/tmp/blob1".to_string()]).unwrap(); + let (file, offset) = cas_mgr.get_chunk_info("chunk1").unwrap().unwrap(); + assert_eq!(&file, "/tmp/blob2"); + assert_eq!(offset, 8192); + + cas_mgr.delete_blobs(&["/tmp/blob2".to_string()]).unwrap(); + let res = cas_mgr.get_chunk_info("chunk1").unwrap(); + assert!(res.is_none()); + let res = cas_mgr.get_chunk_info("chunk2").unwrap(); + assert!(res.is_none()); + } +} diff --git a/storage/src/cache/dedup/mod.rs b/storage/src/cache/dedup/mod.rs new file mode 100644 index 00000000000..f52a8fcc1de --- /dev/null +++ b/storage/src/cache/dedup/mod.rs @@ -0,0 +1,49 @@ +// Copyright (C) 2022-2023 Alibaba Cloud. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + +use std::fmt::{self, Display, Formatter}; +use std::io::Error; + +mod db; + +/// Error codes related to local cas. +#[derive(Debug)] +pub enum CasError { + Io(Error), + Db(rusqlite::Error), + R2D2(r2d2::Error), +} + +impl Display for CasError { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match self { + CasError::Io(e) => write!(f, "{}", e), + CasError::Db(e) => write!(f, "{}", e), + CasError::R2D2(e) => write!(f, "{}", e), + } + } +} + +impl std::error::Error for CasError {} + +impl From for CasError { + fn from(e: rusqlite::Error) -> Self { + CasError::Db(e) + } +} + +impl From for CasError { + fn from(e: r2d2::Error) -> Self { + CasError::R2D2(e) + } +} + +impl From for CasError { + fn from(e: Error) -> Self { + CasError::Io(e) + } +} + +/// Specialized `Result` for local cas. +type Result = std::result::Result; diff --git a/storage/src/cache/mod.rs b/storage/src/cache/mod.rs index cc65f842919..1ae6dda497e 100644 --- a/storage/src/cache/mod.rs +++ b/storage/src/cache/mod.rs @@ -36,6 +36,8 @@ use crate::utils::{alloc_buf, check_digest}; use crate::{StorageResult, RAFS_MAX_CHUNK_SIZE}; mod cachedfile; +#[cfg(feature = "dedup")] +mod dedup; mod dummycache; mod filecache; #[cfg(target_os = "linux")]