diff --git a/src/persist-client/src/cache.rs b/src/persist-client/src/cache.rs index 2fcb803ef0db9..f70b991eb54bb 100644 --- a/src/persist-client/src/cache.rs +++ b/src/persist-client/src/cache.rs @@ -33,6 +33,7 @@ use tracing::{debug, instrument}; use crate::async_runtime::IsolatedRuntime; use crate::error::{CodecConcreteType, CodecMismatch}; +use crate::internal::cache::BlobMemCache; use crate::internal::machine::retry_external; use crate::internal::metrics::{LockMetrics, Metrics, MetricsBlob, MetricsConsensus, ShardMetrics}; use crate::internal::state::TypedState; @@ -218,6 +219,9 @@ impl PersistClientCache { Self::PROMETHEUS_SCRAPE_INTERVAL, ) .await; + // This is intentionally "outside" (wrapping) MetricsBlob so + // that we don't include cached responses in blob metrics. + let blob = BlobMemCache::new(&self.cfg, Arc::clone(&self.metrics), blob); Arc::clone(&x.insert((RttLatencyTask(task.abort_on_drop()), blob)).1) } }; diff --git a/src/persist-client/src/internal/cache.rs b/src/persist-client/src/internal/cache.rs new file mode 100644 index 0000000000000..a9def6ae82805 --- /dev/null +++ b/src/persist-client/src/internal/cache.rs @@ -0,0 +1,390 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! In-process caches of [Blob]. + +use std::sync::{Arc, Mutex}; + +use async_trait::async_trait; +use bytes::Bytes; +use mz_ore::bytes::SegmentedBytes; +use mz_ore::cast::CastFrom; +use mz_persist::location::{Atomicity, Blob, BlobMetadata, ExternalError}; + +use crate::cfg::PersistConfig; +use crate::internal::metrics::Metrics; + +// In-memory cache for [Blob]. +#[derive(Debug)] +pub struct BlobMemCache { + metrics: Arc, + cache: Mutex>, + blob: Arc, +} + +impl BlobMemCache { + pub fn new( + cfg: &PersistConfig, + metrics: Arc, + blob: Arc, + ) -> Arc { + // WIP let eviction_metrics = Arc::clone(&metrics); + // TODO: Make this react dynamically to changes in configuration. + let cache = lru::Lru::new(cfg.dynamic.blob_cache_mem_limit_bytes()); + let blob = BlobMemCache { + metrics, + cache: Mutex::new(cache), + blob, + }; + Arc::new(blob) + } + + fn update_size_metrics(&self) { + // WIP skip the double lock here + let cache = self.cache.lock().expect("WIP"); + self.metrics + .blob_cache_mem + .size_blobs + .set(u64::cast_from(cache.entry_count())); + self.metrics + .blob_cache_mem + .size_bytes + .set(u64::cast_from(cache.weighted_size())); + } +} + +#[async_trait] +impl Blob for BlobMemCache { + async fn get(&self, key: &str) -> Result, ExternalError> { + // First check if the blob is in the cache. If it is, return it. If not, + // fetch it and put it in the cache. + // + // Blobs are write-once modify-never, so we don't have to worry about + // any races or cache invalidations here. If the value is in the cache, + // it's also what's in s3 (if not, then there's a horrible bug somewhere + // else). + if let Some(cached_value) = self.cache.lock().expect("WIP").get(key) { + self.metrics.blob_cache_mem.hits_blobs.inc(); + self.metrics + .blob_cache_mem + .hits_bytes + .inc_by(u64::cast_from(cached_value.len())); + return Ok(Some(cached_value.clone())); + } + + let res = self.blob.get(key).await?; + if let Some(blob) = res.as_ref() { + self.cache + .lock() + .expect("WIP") + .insert(key.to_owned(), blob.clone(), blob.len()); + self.update_size_metrics(); + } + Ok(res) + } + + async fn list_keys_and_metadata( + &self, + key_prefix: &str, + f: &mut (dyn FnMut(BlobMetadata) + Send + Sync), + ) -> Result<(), ExternalError> { + self.blob.list_keys_and_metadata(key_prefix, f).await + } + + async fn set(&self, key: &str, value: Bytes, atomic: Atomicity) -> Result<(), ExternalError> { + let () = self.blob.set(key, value.clone(), atomic).await?; + let weight = value.len(); + self.cache + .lock() + .expect("WIP") + .insert(key.to_owned(), SegmentedBytes::from(value), weight); + self.update_size_metrics(); + Ok(()) + } + + async fn delete(&self, key: &str) -> Result, ExternalError> { + let res = self.blob.delete(key).await; + self.cache.lock().expect("WIP").remove(key); + self.update_size_metrics(); + res + } + + async fn restore(&self, key: &str) -> Result<(), ExternalError> { + self.blob.restore(key).await + } +} + +mod lru_list { + use mz_ore::collections::HashMap; + + #[derive(Clone, Debug, Hash, PartialEq, Eq)] + pub struct NodeId(u64); + + pub struct ListNode { + next: Option, + prev: Option, + + payload: T, + } + + impl std::fmt::Debug for ListNode { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ListNode") + .field("next", &self.next) + .field("prev", &self.prev) + .finish_non_exhaustive() + } + } + + pub struct List { + next_id: NodeId, + head: Option, + tail: Option, + nodes: HashMap>, + } + + impl std::fmt::Debug for List { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("List") + .field("next_id", &self.next_id) + .field("head", &self.head) + .field("tail", &self.tail) + .field("nodes", &self.nodes) + .finish() + } + } + + impl Default for List { + fn default() -> Self { + Self { + next_id: NodeId(0), + nodes: HashMap::new(), + head: None, + tail: None, + } + } + } + + impl List { + pub fn next_id(&mut self) -> NodeId { + let id = self.next_id.clone(); + self.next_id.0 += 1; + id + } + + pub fn peek_tail(&self) -> Option<(&NodeId, &T)> { + let id = self.tail.as_ref()?; + let node = self.nodes.get(id).expect("WIP"); + Some((id, &node.payload)) + } + + pub fn push_head(&mut self, id: NodeId, payload: T) { + let next = match self.head.as_mut() { + Some(head) => { + self.nodes.get_mut(head).expect("WIP").prev = Some(id.clone()); + let next = head.clone(); + *head = id.clone(); + Some(next) + } + None => { + self.head = Some(id.clone()); + self.tail = Some(id.clone()); + None + } + }; + let node = ListNode { + next, + prev: None, + payload, + }; + self.nodes.insert(id.clone(), node); + } + + pub fn get(&self, id: &NodeId) -> Option<&T> { + self.nodes.get(id).map(|x| &x.payload) + } + + pub fn remove(&mut self, id: &NodeId) -> Option { + let node = self.nodes.remove(id)?; + if let Some(next_id) = node.next.as_ref() { + self.nodes.get_mut(next_id).expect("WIP").prev = node.prev.clone(); + } else { + // No next means this was the tail + assert_eq!(self.tail.as_ref(), Some(id)); + self.tail = node.prev.clone(); + } + if let Some(prev_id) = node.prev.as_ref() { + self.nodes.get_mut(prev_id).expect("WIP").next = node.next.clone(); + } else { + // No prev means this was the head + assert_eq!(self.head.as_ref(), Some(id)); + self.head = node.next.clone(); + } + Some(node.payload) + } + + pub fn iter(&self) -> impl Iterator { + ListIter { + list: self, + next: self.head.as_ref(), + } + .map(|(id, node)| (id, &node.payload)) + } + + pub fn validate(&self) { + assert_eq!(self.nodes.is_empty(), self.head.is_none()); + assert_eq!(self.nodes.is_empty(), self.tail.is_none()); + + let mut count = 0; + let mut expected_prev = None; + let mut iter = ListIter { + list: self, + next: self.head.as_ref(), + }; + while let Some((id, node)) = iter.next() { + count += 1; + assert_eq!(node.prev.as_ref(), expected_prev); + expected_prev = Some(id); + } + assert_eq!(self.tail.as_ref(), expected_prev); + assert_eq!(count, self.nodes.len()); + } + } + + struct ListIter<'a, T> { + list: &'a List, + next: Option<&'a NodeId>, + } + + impl<'a, T> Iterator for ListIter<'a, T> { + type Item = (&'a NodeId, &'a ListNode); + + fn next(&mut self) -> Option { + let id = self.next?; + let node = self.list.nodes.get(id).expect("WIP"); + self.next = node.next.as_ref(); + Some((id, node)) + } + } +} + +mod lru { + use std::borrow::Borrow; + use std::hash::Hash; + + use mz_ore::collections::HashMap; + + use crate::internal::cache::lru_list; + + pub struct LruNode { + key: K, + val: V, + weight: usize, + } + + pub struct Lru { + capacity: usize, + total_weight: usize, + + nodes: lru_list::List>, + by_key: HashMap, + } + + impl std::fmt::Debug for Lru { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Lru") + .field("capacity", &self.capacity) + .field("total_weight", &self.total_weight) + // WIP .field("nodes", &self.nodes) + .field("by_key", &self.by_key) + .finish() + } + } + + impl Lru { + pub fn new(capacity: usize) -> Self { + Lru { + capacity, + total_weight: 0, + nodes: lru_list::List::default(), + by_key: HashMap::new(), + } + } + + pub fn get(&mut self, key: &Q) -> Option<&V> + where + K: Borrow, + Q: Hash + Eq + ?Sized, + { + let id = self.by_key.get(key)?; + let payload = self.nodes.remove(id).expect("WIP"); + self.nodes.push_head(id.clone(), payload); + self.validate(); + let payload = self.nodes.get(&id).expect("WIP"); + Some(&payload.val) + } + + pub fn insert(&mut self, key: K, val: V, weight: usize) { + let id = self.nodes.next_id(); + let payload = LruNode { key, val, weight }; + + let _ = self.remove(&payload.key); + self.total_weight += weight; + self.by_key.insert(payload.key.clone(), id.clone()); + self.nodes.push_head(id, payload); + self.resize(); + self.validate(); + } + + pub fn entry_count(&self) -> usize { + self.by_key.len() + } + + pub fn weighted_size(&self) -> usize { + self.total_weight + } + + pub fn remove(&mut self, k: &Q) -> Option> + where + K: Borrow, + Q: Hash + Eq + ?Sized, + { + let id = self.by_key.remove(k)?; + let ret = self.remove_id(id); + self.validate(); + ret + } + + fn remove_id(&mut self, id: lru_list::NodeId) -> Option> { + let payload = self.nodes.remove(&id)?; + self.total_weight -= payload.weight; + Some(payload) + } + + fn resize(&mut self) { + while self.total_weight > self.capacity { + let (id, node) = self.nodes.peek_tail().expect("WIP"); + self.by_key.remove(&node.key); + self.remove_id(id.clone()); + } + } + + fn validate(&self) { + self.nodes.validate(); + assert!(self.total_weight <= self.capacity); + + let mut w = 0; + for (id, payload) in self.nodes.iter() { + w += payload.weight; + assert_eq!(self.by_key.get(&payload.key), Some(id)); + } + assert_eq!(w, self.total_weight); + } + } +} diff --git a/src/persist-client/src/internal/metrics.rs b/src/persist-client/src/internal/metrics.rs index a995dfc9089bf..05c0e8e24e11c 100644 --- a/src/persist-client/src/internal/metrics.rs +++ b/src/persist-client/src/internal/metrics.rs @@ -2261,12 +2261,12 @@ impl ConsolidationMetrics { } #[derive(Debug)] -#[allow(dead_code)] // TODO: Remove this when we reintroduce the cache. pub struct BlobMemCache { pub(crate) size_blobs: UIntGauge, pub(crate) size_bytes: UIntGauge, pub(crate) hits_blobs: IntCounter, pub(crate) hits_bytes: IntCounter, + #[allow(dead_code)] pub(crate) evictions: IntCounter, } diff --git a/src/persist-client/src/lib.rs b/src/persist-client/src/lib.rs index e09157a59d5e4..ca6e129c954f2 100644 --- a/src/persist-client/src/lib.rs +++ b/src/persist-client/src/lib.rs @@ -153,6 +153,7 @@ pub mod write; /// An implementation of the public crate interface. mod internal { pub mod apply; + pub mod cache; pub mod compact; pub mod encoding; pub mod gc;