From 877daf1c37786fa9c081babf1e058e2a98353d9d Mon Sep 17 00:00:00 2001 From: Daniel Harrison <52528+danhhz@users.noreply.github.com> Date: Tue, 30 May 2023 11:10:10 -0700 Subject: [PATCH] persist: introduce a very small in-mem blob cache A one-time (skunkworks) experiment showed that showed an environment running our demo "auction" source + mv got 90%+ cache hits with a 1 MiB cache. This doesn't scale up to prod data sizes and doesn't help with multi-process replicas, but the memory usage seems unobjectionable enough to have it for the cases that it does help. Possibly, a decent chunk of why this is true is pubsub. With the low pubsub latencies, we might write some blob to s3, then within milliseconds notify everyone in-process interested in that blob, waking them up and fetching it. This means even a very small cache is useful because things stay in it just long enough for them to get fetched by everyone that immediately needs them. 1 MiB is enough to fit things like state rollups, remap shard writes, and likely many MVs (probably less so for sources, but atm those still happen in another cluster). Touches #19225 --- Cargo.lock | 2 + src/adapter/src/catalog.rs | 1 + src/persist-client/Cargo.toml | 1 + src/persist-client/src/cache.rs | 4 + src/persist-client/src/cfg.proto | 1 + src/persist-client/src/cfg.rs | 35 ++++++ src/persist-client/src/internal/cache.rs | 133 +++++++++++++++++++++ src/persist-client/src/internal/metrics.rs | 44 +++++++ src/persist-client/src/lib.rs | 1 + src/sql/src/session/vars.rs | 16 +++ src/workspace-hack/Cargo.toml | 2 + 11 files changed, 240 insertions(+) create mode 100644 src/persist-client/src/internal/cache.rs diff --git a/Cargo.lock b/Cargo.lock index ada5d722cb764..0731928b2ed83 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4353,6 +4353,7 @@ dependencies = [ "futures-util", "h2", "humantime", + "moka", "mz-build-info", "mz-http-util", "mz-orchestrator-tracing", @@ -8560,6 +8561,7 @@ dependencies = [ "log", "lru", "memchr", + "moka", "native-tls", "nix", "nom", diff --git a/src/adapter/src/catalog.rs b/src/adapter/src/catalog.rs index 80f0a9c461988..7b7218eeabeae 100644 --- a/src/adapter/src/catalog.rs +++ b/src/adapter/src/catalog.rs @@ -6898,6 +6898,7 @@ impl Catalog { let config = self.system_config(); PersistParameters { blob_target_size: Some(config.persist_blob_target_size()), + blob_cache_mem_limit_bytes: Some(config.persist_blob_cache_mem_limit_bytes()), compaction_minimum_timeout: Some(config.persist_compaction_minimum_timeout()), consensus_connect_timeout: Some(config.crdb_connect_timeout()), consensus_tcp_user_timeout: Some(config.crdb_tcp_user_timeout()), diff --git a/src/persist-client/Cargo.toml b/src/persist-client/Cargo.toml index e87d0434a75a8..b31924964ea48 100644 --- a/src/persist-client/Cargo.toml +++ b/src/persist-client/Cargo.toml @@ -38,6 +38,7 @@ differential-dataflow = "0.12.0" futures = "0.3.25" futures-util = "0.3" h2 = "0.3.13" +moka = { version = "0.9.6", default-features = false, features = ["sync"] } mz-build-info = { path = "../build-info" } mz-ore = { path = "../ore", features = ["bytes_", "test", "tracing_"] } mz-persist = { path = "../persist" } diff --git a/src/persist-client/src/cache.rs b/src/persist-client/src/cache.rs index 50491ad126a24..09976ae0d91e1 100644 --- a/src/persist-client/src/cache.rs +++ b/src/persist-client/src/cache.rs @@ -33,6 +33,7 @@ use tracing::{debug, instrument}; use crate::async_runtime::CpuHeavyRuntime; use crate::error::{CodecConcreteType, CodecMismatch}; +use crate::internal::cache::BlobMemCache; use crate::internal::machine::retry_external; use crate::internal::metrics::{LockMetrics, Metrics, MetricsBlob, MetricsConsensus, ShardMetrics}; use crate::internal::state::TypedState; @@ -199,6 +200,9 @@ impl PersistClientCache { Self::PROMETHEUS_SCRAPE_INTERVAL, ) .await; + // This is intentionally "outside" (wrapping) MetricsBlob so + // that we don't include cached responses in blob metrics. + let blob = BlobMemCache::new(&self.cfg, Arc::clone(&self.metrics), blob); Arc::clone(&x.insert((RttLatencyTask(task), blob)).1) } }; diff --git a/src/persist-client/src/cfg.proto b/src/persist-client/src/cfg.proto index dbc69e7694b84..d56852fa9f7a8 100644 --- a/src/persist-client/src/cfg.proto +++ b/src/persist-client/src/cfg.proto @@ -27,6 +27,7 @@ message ProtoPersistParameters { optional bool pubsub_push_diff_enabled = 11; mz_proto.ProtoDuration consensus_tcp_user_timeout = 12; optional uint64 rollup_threshold = 13; + optional uint64 blob_cache_mem_limit_bytes = 14; } message ProtoRetryParameters { diff --git a/src/persist-client/src/cfg.rs b/src/persist-client/src/cfg.rs index 180022ffc4af0..797dcfd871489 100644 --- a/src/persist-client/src/cfg.rs +++ b/src/persist-client/src/cfg.rs @@ -131,6 +131,9 @@ impl PersistConfig { dynamic: Arc::new(DynamicConfig { batch_builder_max_outstanding_parts: AtomicUsize::new(2), blob_target_size: AtomicUsize::new(Self::DEFAULT_BLOB_TARGET_SIZE), + blob_cache_mem_limit_bytes: AtomicUsize::new( + Self::DEFAULT_BLOB_CACHE_MEM_LIMIT_BYTES, + ), compaction_heuristic_min_inputs: AtomicUsize::new(8), compaction_heuristic_min_parts: AtomicUsize::new(8), compaction_heuristic_min_updates: AtomicUsize::new(1024), @@ -245,6 +248,15 @@ impl PersistConfig { pub(crate) const DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER: usize = 3; + /// Default value for [`DynamicConfig::blob_cache_mem_limit_bytes`]. + /// + /// This initial value was tuned via a one-time experiment that showed an + /// environment running our demo "auction" source + mv got 90%+ cache hits + /// with a 1 MiB cache. This doesn't scale up to prod data sizes and doesn't + /// help with multi-process replicas, but the memory usage seems + /// unobjectionable enough to have it for the cases that it does help. + pub const DEFAULT_BLOB_CACHE_MEM_LIMIT_BYTES: usize = 1024 * 1024; + // Move this to a PersistConfig field when we actually have read leases. // // MIGRATION: Remove this once we remove the ReaderState <-> @@ -313,6 +325,7 @@ impl ConsensusKnobs for PersistConfig { pub struct DynamicConfig { batch_builder_max_outstanding_parts: AtomicUsize, blob_target_size: AtomicUsize, + blob_cache_mem_limit_bytes: AtomicUsize, compaction_heuristic_min_inputs: AtomicUsize, compaction_heuristic_min_parts: AtomicUsize, compaction_heuristic_min_updates: AtomicUsize, @@ -408,6 +421,11 @@ impl DynamicConfig { self.blob_target_size.load(Self::LOAD_ORDERING) } + /// Capacity of in-mem blob cache in bytes. + pub fn blob_cache_mem_limit_bytes(&self) -> usize { + self.blob_cache_mem_limit_bytes.load(Self::LOAD_ORDERING) + } + /// In Compactor::compact_and_apply, we do the compaction (don't skip it) /// if the number of inputs is at least this many. Compaction is performed /// if any of the heuristic criteria are met (they are OR'd). @@ -606,6 +624,8 @@ impl BlobKnobs for PersistConfig { pub struct PersistParameters { /// Configures [`DynamicConfig::blob_target_size`]. pub blob_target_size: Option, + /// Configures [`DynamicConfig::blob_cache_mem_limit_bytes`]. + pub blob_cache_mem_limit_bytes: Option, /// Configures [`DynamicConfig::compaction_minimum_timeout`]. pub compaction_minimum_timeout: Option, /// Configures [`DynamicConfig::consensus_connect_timeout`]. @@ -639,6 +659,7 @@ impl PersistParameters { // are added. let Self { blob_target_size: self_blob_target_size, + blob_cache_mem_limit_bytes: self_blob_cache_mem_limit_bytes, compaction_minimum_timeout: self_compaction_minimum_timeout, consensus_connect_timeout: self_consensus_connect_timeout, consensus_tcp_user_timeout: self_consensus_tcp_user_timeout, @@ -654,6 +675,7 @@ impl PersistParameters { } = self; let Self { blob_target_size: other_blob_target_size, + blob_cache_mem_limit_bytes: other_blob_cache_mem_limit_bytes, compaction_minimum_timeout: other_compaction_minimum_timeout, consensus_connect_timeout: other_consensus_connect_timeout, consensus_tcp_user_timeout: other_consensus_tcp_user_timeout, @@ -670,6 +692,9 @@ impl PersistParameters { if let Some(v) = other_blob_target_size { *self_blob_target_size = Some(v); } + if let Some(v) = other_blob_cache_mem_limit_bytes { + *self_blob_cache_mem_limit_bytes = Some(v); + } if let Some(v) = other_compaction_minimum_timeout { *self_compaction_minimum_timeout = Some(v); } @@ -716,6 +741,7 @@ impl PersistParameters { // Deconstruct self so we get a compile failure if new fields are added. let Self { blob_target_size, + blob_cache_mem_limit_bytes, compaction_minimum_timeout, consensus_connect_timeout, consensus_tcp_user_timeout, @@ -730,6 +756,7 @@ impl PersistParameters { rollup_threshold, } = self; blob_target_size.is_none() + && blob_cache_mem_limit_bytes.is_none() && compaction_minimum_timeout.is_none() && consensus_connect_timeout.is_none() && consensus_tcp_user_timeout.is_none() @@ -753,6 +780,7 @@ impl PersistParameters { // Deconstruct self so we get a compile failure if new fields are added. let Self { blob_target_size, + blob_cache_mem_limit_bytes, compaction_minimum_timeout, consensus_connect_timeout, consensus_tcp_user_timeout, @@ -771,6 +799,11 @@ impl PersistParameters { .blob_target_size .store(*blob_target_size, DynamicConfig::STORE_ORDERING); } + if let Some(blob_cache_mem_limit_bytes) = blob_cache_mem_limit_bytes { + cfg.dynamic + .blob_cache_mem_limit_bytes + .store(*blob_cache_mem_limit_bytes, DynamicConfig::STORE_ORDERING); + } if let Some(_compaction_minimum_timeout) = compaction_minimum_timeout { // TODO: Figure out how to represent Durations in DynamicConfig. } @@ -846,6 +879,7 @@ impl RustType for PersistParameters { fn into_proto(&self) -> ProtoPersistParameters { ProtoPersistParameters { blob_target_size: self.blob_target_size.into_proto(), + blob_cache_mem_limit_bytes: self.blob_cache_mem_limit_bytes.into_proto(), compaction_minimum_timeout: self.compaction_minimum_timeout.into_proto(), consensus_connect_timeout: self.consensus_connect_timeout.into_proto(), consensus_tcp_user_timeout: self.consensus_tcp_user_timeout.into_proto(), @@ -866,6 +900,7 @@ impl RustType for PersistParameters { fn from_proto(proto: ProtoPersistParameters) -> Result { Ok(Self { blob_target_size: proto.blob_target_size.into_rust()?, + blob_cache_mem_limit_bytes: proto.blob_cache_mem_limit_bytes.into_rust()?, compaction_minimum_timeout: proto.compaction_minimum_timeout.into_rust()?, consensus_connect_timeout: proto.consensus_connect_timeout.into_rust()?, consensus_tcp_user_timeout: proto.consensus_tcp_user_timeout.into_rust()?, diff --git a/src/persist-client/src/internal/cache.rs b/src/persist-client/src/internal/cache.rs new file mode 100644 index 0000000000000..f9ac246af9aa7 --- /dev/null +++ b/src/persist-client/src/internal/cache.rs @@ -0,0 +1,133 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! In-process caches of [Blob]. + +use std::sync::Arc; + +use async_trait::async_trait; +use bytes::Bytes; +use moka::notification::RemovalCause; +use moka::sync::Cache; +use mz_ore::bytes::SegmentedBytes; +use mz_ore::cast::CastFrom; +use mz_persist::location::{Atomicity, Blob, BlobMetadata, ExternalError}; +use tracing::error; + +use crate::cfg::PersistConfig; +use crate::internal::metrics::Metrics; + +// In-memory cache for [Blob]. +#[derive(Debug)] +pub struct BlobMemCache { + metrics: Arc, + cache: Cache, + blob: Arc, +} + +impl BlobMemCache { + pub fn new( + cfg: &PersistConfig, + metrics: Arc, + blob: Arc, + ) -> Arc { + let eviction_metrics = Arc::clone(&metrics); + // TODO: Make this react dynamically to changes in configuration. + let cache = Cache::::builder() + .max_capacity(u64::cast_from(cfg.dynamic.blob_cache_mem_limit_bytes())) + .weigher(|k, v| { + u32::try_from(v.len()).unwrap_or_else(|_| { + // We chunk off blobs at 128MiB, so the length should easily + // fit in a u32. + error!( + "unexpectedly large blob in persist cache {} bytes: {}", + v.len(), + k + ); + u32::MAX + }) + }) + .eviction_listener(move |_k, _v, cause| match cause { + RemovalCause::Size => eviction_metrics.blob_cache_mem.evictions.inc(), + RemovalCause::Expired | RemovalCause::Explicit | RemovalCause::Replaced => {} + }) + .build(); + let blob = BlobMemCache { + metrics, + cache, + blob, + }; + Arc::new(blob) + } + + fn update_size_metrics(&self) { + self.metrics + .blob_cache_mem + .size_blobs + .set(self.cache.entry_count()); + self.metrics + .blob_cache_mem + .size_bytes + .set(self.cache.weighted_size()); + } +} + +#[async_trait] +impl Blob for BlobMemCache { + async fn get(&self, key: &str) -> Result, ExternalError> { + // First check if the blob is in the cache. If it is, return it. If not, + // fetch it and put it in the cache. + // + // Blobs are write-once modify-never, so we don't have to worry about + // any races or cache invalidations here. If the value is in the cache, + // it's also what's in s3 (if not, then there's a horrible bug somewhere + // else). + if let Some(cached_value) = self.cache.get(key) { + self.metrics.blob_cache_mem.hits_blobs.inc(); + self.metrics + .blob_cache_mem + .hits_bytes + .inc_by(u64::cast_from(cached_value.len())); + return Ok(Some(cached_value)); + } + + // This could maybe use moka's async cache to unify any concurrent + // fetches for the same key? That's not particularly expected in + // persist's workload, so punt for now. + let res = self.blob.get(key).await?; + if let Some(blob) = res.as_ref() { + self.cache.insert(key.to_owned(), blob.clone()); + self.update_size_metrics(); + } + Ok(res) + } + + async fn list_keys_and_metadata( + &self, + key_prefix: &str, + f: &mut (dyn FnMut(BlobMetadata) + Send + Sync), + ) -> Result<(), ExternalError> { + self.blob.list_keys_and_metadata(key_prefix, f).await + } + + async fn set(&self, key: &str, value: Bytes, atomic: Atomicity) -> Result<(), ExternalError> { + let () = self.blob.set(key, value.clone(), atomic).await?; + self.cache + .insert(key.to_owned(), SegmentedBytes::from(value)); + self.update_size_metrics(); + Ok(()) + } + + async fn delete(&self, key: &str) -> Result, ExternalError> { + let res = self.blob.delete(key).await; + self.cache.invalidate(key); + self.update_size_metrics(); + res + } +} diff --git a/src/persist-client/src/internal/metrics.rs b/src/persist-client/src/internal/metrics.rs index 59067a2c45062..e36220f20b32b 100644 --- a/src/persist-client/src/internal/metrics.rs +++ b/src/persist-client/src/internal/metrics.rs @@ -79,6 +79,8 @@ pub struct Metrics { pub pubsub_client: PubSubClientMetrics, /// Metrics for mfp/filter pushdown. pub pushdown: PushdownMetrics, + /// Metrics for blob caching. + pub blob_cache_mem: BlobMemCache, /// Metrics for the persist sink. pub sink: SinkMetrics, @@ -123,6 +125,7 @@ impl Metrics { watch: WatchMetrics::new(registry), pubsub_client: PubSubClientMetrics::new(registry), pushdown: PushdownMetrics::new(registry), + blob_cache_mem: BlobMemCache::new(registry), sink: SinkMetrics::new(registry), s3_blob: S3BlobMetrics::new(registry), postgres_consensus: PostgresConsensusMetrics::new(registry), @@ -1903,6 +1906,47 @@ impl PushdownMetrics { } } +#[derive(Debug)] +pub struct BlobMemCache { + pub(crate) size_blobs: UIntGauge, + pub(crate) size_bytes: UIntGauge, + pub(crate) hits_blobs: IntCounter, + pub(crate) hits_bytes: IntCounter, + pub(crate) evictions: IntCounter, +} + +impl BlobMemCache { + fn new(registry: &MetricsRegistry) -> Self { + BlobMemCache { + size_blobs: registry.register(metric!( + name: "mz_persist_blob_cache_size_blobs", + help: "count of blobs in the cache", + const_labels: {"cache" => "mem"}, + )), + size_bytes: registry.register(metric!( + name: "mz_persist_blob_cache_size_bytes", + help: "total size of blobs in the cache", + const_labels: {"cache" => "mem"}, + )), + hits_blobs: registry.register(metric!( + name: "mz_persist_blob_cache_hits_blobs", + help: "count of blobs served via cache instead of s3", + const_labels: {"cache" => "mem"}, + )), + hits_bytes: registry.register(metric!( + name: "mz_persist_blob_cache_hits_bytes", + help: "total size of blobs served via cache instead of s3", + const_labels: {"cache" => "mem"}, + )), + evictions: registry.register(metric!( + name: "mz_persist_blob_cache_evictions", + help: "count of capacity-based cache evictions", + const_labels: {"cache" => "mem"}, + )), + } + } +} + #[derive(Debug)] pub struct ExternalOpMetrics { started: IntCounter, diff --git a/src/persist-client/src/lib.rs b/src/persist-client/src/lib.rs index 4ad91422f82e6..74c72dba7bcd0 100644 --- a/src/persist-client/src/lib.rs +++ b/src/persist-client/src/lib.rs @@ -143,6 +143,7 @@ pub mod write; /// An implementation of the public crate interface. mod internal { pub mod apply; + pub mod cache; pub mod compact; pub mod encoding; pub mod gc; diff --git a/src/sql/src/session/vars.rs b/src/sql/src/session/vars.rs index e59dcac26c073..a6036bea22207 100644 --- a/src/sql/src/session/vars.rs +++ b/src/sql/src/session/vars.rs @@ -533,6 +533,15 @@ const PERSIST_BLOB_TARGET_SIZE: ServerVar = ServerVar { internal: true, }; +/// Controls [`mz_persist_client::cfg::DynamicConfig::blob_cache_mem_limit_bytes`]. +const PERSIST_BLOB_CACHE_MEM_LIMIT_BYTES: ServerVar = ServerVar { + name: UncasedStr::new("persist_blob_cache_mem_limit_bytes"), + value: &PersistConfig::DEFAULT_BLOB_CACHE_MEM_LIMIT_BYTES, + description: + "Capacity of in-mem blob cache in bytes. Only takes effect on restart (Materialize).", + internal: true, +}; + /// Controls [`mz_persist_client::cfg::DynamicConfig::compaction_minimum_timeout`]. const PERSIST_COMPACTION_MINIMUM_TIMEOUT: ServerVar = ServerVar { name: UncasedStr::new("persist_compaction_minimum_timeout"), @@ -1652,6 +1661,7 @@ impl SystemVars { .with_var(&upsert_rocksdb::UPSERT_ROCKSDB_COMPRESSION_TYPE) .with_var(&upsert_rocksdb::UPSERT_ROCKSDB_BOTTOMMOST_COMPRESSION_TYPE) .with_var(&PERSIST_BLOB_TARGET_SIZE) + .with_var(&PERSIST_BLOB_CACHE_MEM_LIMIT_BYTES) .with_var(&PERSIST_COMPACTION_MINIMUM_TIMEOUT) .with_var(&CRDB_CONNECT_TIMEOUT) .with_var(&CRDB_TCP_USER_TIMEOUT) @@ -2019,6 +2029,11 @@ impl SystemVars { *self.expect_value(&PERSIST_BLOB_TARGET_SIZE) } + /// Returns the `persist_blob_cache_mem_limit_bytes` configuration parameter. + pub fn persist_blob_cache_mem_limit_bytes(&self) -> usize { + *self.expect_value(&PERSIST_BLOB_CACHE_MEM_LIMIT_BYTES) + } + /// Returns the `persist_next_listen_batch_retryer_initial_backoff` configuration parameter. pub fn persist_next_listen_batch_retryer_initial_backoff(&self) -> Duration { *self.expect_value(&PERSIST_NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF) @@ -3504,6 +3519,7 @@ fn is_upsert_rocksdb_config_var(name: &str) -> bool { /// Returns whether the named variable is a persist configuration parameter. fn is_persist_config_var(name: &str) -> bool { name == PERSIST_BLOB_TARGET_SIZE.name() + || name == PERSIST_BLOB_CACHE_MEM_LIMIT_BYTES.name() || name == PERSIST_COMPACTION_MINIMUM_TIMEOUT.name() || name == CRDB_CONNECT_TIMEOUT.name() || name == CRDB_TCP_USER_TIMEOUT.name() diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index 6aec84a090d0e..8426abe7031b1 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -59,6 +59,7 @@ libc = { version = "0.2.142", features = ["extra_traits", "use_std"] } log = { version = "0.4.17", default-features = false, features = ["std"] } lru = { version = "0.8.1" } memchr = { version = "2.5.0", features = ["use_std"] } +moka = { version = "0.9.6", default-features = false, features = ["atomic64", "sync"] } native-tls = { version = "0.2.11", default-features = false, features = ["alpn"] } nix = { version = "0.26.1" } nom = { version = "7.1.2" } @@ -156,6 +157,7 @@ libc = { version = "0.2.142", features = ["extra_traits", "use_std"] } log = { version = "0.4.17", default-features = false, features = ["std"] } lru = { version = "0.8.1" } memchr = { version = "2.5.0", features = ["use_std"] } +moka = { version = "0.9.6", default-features = false, features = ["atomic64", "sync"] } native-tls = { version = "0.2.11", default-features = false, features = ["alpn"] } nix = { version = "0.26.1" } nom = { version = "7.1.2" }