Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: assorted cleanups of store #6577

Merged
merged 3 commits into from
Apr 11, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions core/store/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ impl From<rocksdb::Error> for DBError {
}
}

impl Into<io::Error> for DBError {
fn into(self) -> io::Error {
io::Error::new(io::ErrorKind::Other, self)
impl From<DBError> for io::Error {
fn from(err: DBError) -> io::Error {
io::Error::new(io::ErrorKind::Other, err)
}
}

Expand Down
138 changes: 62 additions & 76 deletions core/store/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::fs::File;
use std::io::{BufReader, Read, Write};
use std::ops::Deref;
use std::io::{BufReader, BufWriter, Read, Write};
use std::path::Path;
use std::sync::Arc;
use std::{fmt, io};
Expand Down Expand Up @@ -53,31 +52,26 @@ impl Store {
Store { storage }
}

pub fn get(&self, column: DBCol, key: &[u8]) -> Result<Option<Vec<u8>>, io::Error> {
self.storage.get(column, key).map_err(|e| e.into())
pub fn get(&self, column: DBCol, key: &[u8]) -> io::Result<Option<Vec<u8>>> {
self.storage.get(column, key).map_err(io::Error::from)
}

pub fn get_ser<T: BorshDeserialize>(
&self,
column: DBCol,
key: &[u8],
) -> Result<Option<T>, io::Error> {
match self.storage.get(column, key) {
Ok(Some(bytes)) => match T::try_from_slice(bytes.as_ref()) {
pub fn get_ser<T: BorshDeserialize>(&self, column: DBCol, key: &[u8]) -> io::Result<Option<T>> {
match self.get(column, key)? {
matklad marked this conversation as resolved.
Show resolved Hide resolved
Some(bytes) => match T::try_from_slice(&bytes) {
Ok(result) => Ok(Some(result)),
Err(e) => Err(e),
},
matklad marked this conversation as resolved.
Show resolved Hide resolved
Ok(None) => Ok(None),
Err(e) => Err(e.into()),
None => Ok(None),
}
}

pub fn exists(&self, column: DBCol, key: &[u8]) -> Result<bool, io::Error> {
self.storage.get(column, key).map(|value| value.is_some()).map_err(|e| e.into())
pub fn exists(&self, column: DBCol, key: &[u8]) -> io::Result<bool> {
self.get(column, key).map(|value| value.is_some())
}

pub fn store_update(&self) -> StoreUpdate {
StoreUpdate::new(self.storage.clone())
StoreUpdate::new(Arc::clone(&self.storage))
}

pub fn iter<'a>(
Expand Down Expand Up @@ -106,16 +100,15 @@ impl Store {
&'a self,
column: DBCol,
key_prefix: &'a [u8],
) -> Box<dyn Iterator<Item = Result<(Vec<u8>, T), io::Error>> + 'a> {
Box::new(
self.storage
.iter_prefix(column, key_prefix)
.map(|(key, value)| Ok((key.to_vec(), T::try_from_slice(value.as_ref())?))),
)
) -> impl Iterator<Item = io::Result<(Box<[u8]>, T)>> + 'a {
self.storage
.iter_prefix(column, key_prefix)
.map(|(key, value)| Ok((key, T::try_from_slice(value.as_ref())?)))
}

pub fn save_to_file(&self, column: DBCol, filename: &Path) -> Result<(), std::io::Error> {
let mut file = File::create(filename)?;
pub fn save_to_file(&self, column: DBCol, filename: &Path) -> io::Result<()> {
let file = File::create(filename)?;
let mut file = BufWriter::new(file);
for (key, value) in self.storage.iter_without_rc_logic(column) {
file.write_u32::<LittleEndian>(key.len() as u32)?;
file.write_all(&key)?;
Expand All @@ -125,7 +118,7 @@ impl Store {
Ok(())
}

pub fn load_from_file(&self, column: DBCol, filename: &Path) -> Result<(), std::io::Error> {
pub fn load_from_file(&self, column: DBCol, filename: &Path) -> io::Result<()> {
let file = File::open(filename)?;
let mut file = BufReader::new(file);
let mut transaction = self.storage.transaction();
Expand All @@ -134,7 +127,7 @@ impl Store {
loop {
let key_len = match file.read_u32::<LittleEndian>() {
Ok(key_len) => key_len as usize,
Err(ref err) if err.kind() == std::io::ErrorKind::UnexpectedEof => break,
Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => break,
Err(err) => return Err(err),
};
key.resize(key_len, 0);
Expand All @@ -146,7 +139,7 @@ impl Store {

transaction.put(column, &key, &value);
}
self.storage.write(transaction).map_err(|e| e.into())
self.storage.write(transaction).map_err(io::Error::from)
}

pub fn get_rocksdb(&self) -> Option<&RocksDB> {
Expand All @@ -173,7 +166,7 @@ impl StoreUpdate {
}

pub fn new_with_tries(tries: ShardTries) -> Self {
let storage = tries.get_store().storage.clone();
let storage = Arc::clone(&tries.get_store().storage);
let transaction = storage.transaction();
StoreUpdate { storage, transaction, tries: Some(tries) }
}
Expand All @@ -193,7 +186,7 @@ impl StoreUpdate {
column: DBCol,
key: &[u8],
value: &T,
) -> Result<(), io::Error> {
) -> io::Result<()> {
debug_assert!(!column.is_rc());
let data = value.try_to_vec()?;
self.set(column, key, &data);
Expand All @@ -210,19 +203,17 @@ impl StoreUpdate {

/// Merge another store update into this one.
pub fn merge(&mut self, other: StoreUpdate) {
if let Some(tries) = other.tries {
if self.tries.is_none() {
self.tries = Some(tries);
} else {
debug_assert!(self.tries.as_ref().unwrap().is_same(&tries));
}
match (&self.tries, other.tries) {
(None | Some(_), None) => (),
matklad marked this conversation as resolved.
Show resolved Hide resolved
(None, Some(tries)) => self.tries = Some(tries),
(Some(t1), Some(t2)) => debug_assert!(t1.is_same(&t2)),
}

self.merge_transaction(other.transaction);
}

/// Merge DB Transaction.
pub fn merge_transaction(&mut self, transaction: DBTransaction) {
fn merge_transaction(&mut self, transaction: DBTransaction) {
for op in transaction.ops {
match op {
DBOp::Insert { col, key, value } => self.transaction.put(col, &key, &value),
Expand All @@ -235,18 +226,18 @@ impl StoreUpdate {
}
}

pub fn commit(self) -> Result<(), io::Error> {
pub fn commit(self) -> io::Result<()> {
debug_assert!(
{
let non_refcount_keys = self
.transaction
.ops
.iter()
.filter_map(|op| match op {
DBOp::Insert { col, key, .. } => Some((*col as u8, key)),
DBOp::Delete { col, key } => Some((*col as u8, key)),
DBOp::UpdateRefcount { .. } => None,
DBOp::DeleteAll { .. } => None,
DBOp::Insert { col, key, .. } | DBOp::Delete { col, key } => {
Some((*col as u8, key))
}
DBOp::UpdateRefcount { .. } | DBOp::DeleteAll { .. } => None,
})
.collect::<Vec<_>>();
non_refcount_keys.len()
Expand All @@ -256,13 +247,13 @@ impl StoreUpdate {
self
);
if let Some(tries) = self.tries {
assert_eq!(
tries.get_store().storage.deref() as *const _,
self.storage.deref() as *const _
);
// Note: avoid comparing wide pointers here to work-around
// https://github.com/rust-lang/rust/issues/69757
let addr = |arc| Arc::as_ptr(arc) as *const u8;
assert_eq!(addr(&tries.get_store().storage), addr(&self.storage),);
tries.update_cache(&self.transaction)?;
}
self.storage.write(self.transaction).map_err(|e| e.into())
self.storage.write(self.transaction).map_err(io::Error::from)
}
}

Expand All @@ -289,22 +280,18 @@ pub fn read_with_cache<'a, T: BorshDeserialize + 'a>(
cache: &'a mut LruCache<Vec<u8>, T>,
key: &[u8],
) -> io::Result<Option<&'a T>> {
let key_vec = key.to_vec();
if cache.get(&key_vec).is_some() {
return Ok(Some(cache.get(&key_vec).unwrap()));
// Note: Due to `&mut -> &` conversions, it's not possible to avoid double
// hash map lookups here.
if cache.contains(key) {
return Ok(cache.get(key));
}
if let Some(result) = storage.get_ser(col, key)? {
cache.put(key.to_vec(), result);
return Ok(cache.get(&key_vec));
return Ok(cache.get(key));
}
Ok(None)
}

pub fn create_store(path: &Path) -> Store {
let db = Arc::new(RocksDB::new(path).expect("Failed to open the database"));
Store::new(db)
}

#[derive(Default, Debug)]
pub struct StoreConfig {
/// Attempted writes to the DB will fail. Doesn't require a `LOCK` file.
Expand All @@ -314,17 +301,19 @@ pub struct StoreConfig {
pub enable_statistics: bool,
}

pub fn create_store(path: &Path) -> Store {
create_store_with_config(path, StoreConfig::default())
}

pub fn create_store_with_config(path: &Path, store_config: StoreConfig) -> Store {
let mut opts = RocksDBOptions::default();
if store_config.enable_statistics {
opts = opts.enable_statistics();
}

let db = Arc::new(
(if store_config.read_only { opts.read_only(path) } else { opts.read_write(path) })
.expect("Failed to open the database"),
);
Store::new(db)
let db = if store_config.read_only { opts.read_only(path) } else { opts.read_write(path) }
.expect("Failed to open the database");
Store::new(Arc::new(db))
}

/// Reads an object from Trie.
Expand All @@ -334,18 +323,15 @@ pub fn get<T: BorshDeserialize>(
state_update: &TrieUpdate,
key: &TrieKey,
) -> Result<Option<T>, StorageError> {
state_update.get(key).and_then(|opt| {
opt.map_or_else(
|| Ok(None),
|data| {
T::try_from_slice(&data)
.map_err(|_| {
StorageError::StorageInconsistentState("Failed to deserialize".to_string())
})
.map(Some)
},
)
})
match state_update.get(key)? {
None => Ok(None),
Some(data) => match T::try_from_slice(&data) {
Err(_err) => {
Err(StorageError::StorageInconsistentState("Failed to deserialize".to_string()))
}
Ok(value) => Ok(Some(value)),
},
}
}

/// Writes an object into Trie.
Expand Down Expand Up @@ -509,11 +495,11 @@ pub fn remove_account(
Ok(())
}

pub fn get_genesis_state_roots(store: &Store) -> Result<Option<Vec<StateRoot>>, std::io::Error> {
pub fn get_genesis_state_roots(store: &Store) -> io::Result<Option<Vec<StateRoot>>> {
store.get_ser::<Vec<StateRoot>>(DBCol::ColBlockMisc, GENESIS_STATE_ROOTS_KEY)
}

pub fn get_genesis_hash(store: &Store) -> Result<Option<CryptoHash>, std::io::Error> {
pub fn get_genesis_hash(store: &Store) -> io::Result<Option<CryptoHash>> {
store.get_ser::<CryptoHash>(DBCol::ColBlockMisc, GENESIS_JSON_HASH_KEY)
}

Expand All @@ -538,13 +524,13 @@ pub struct StoreCompiledContractCache {
/// Key must take into account VM being used and its configuration, so that
/// we don't cache non-gas metered binaries, for example.
impl CompiledContractCache for StoreCompiledContractCache {
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), std::io::Error> {
fn put(&self, key: &[u8], value: &[u8]) -> io::Result<()> {
let mut store_update = self.store.store_update();
store_update.set(DBCol::ColCachedContractCode, key, value);
store_update.commit()
}

fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, std::io::Error> {
fn get(&self, key: &[u8]) -> io::Result<Option<Vec<u8>>> {
self.store.get(DBCol::ColCachedContractCode, key)
}
}
Expand Down