Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

persist: introduce a very small in-mem blob cache #19614

Merged
merged 1 commit into from
Jun 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6898,6 +6898,7 @@ impl Catalog {
let config = self.system_config();
PersistParameters {
blob_target_size: Some(config.persist_blob_target_size()),
blob_cache_mem_limit_bytes: Some(config.persist_blob_cache_mem_limit_bytes()),
compaction_minimum_timeout: Some(config.persist_compaction_minimum_timeout()),
consensus_connect_timeout: Some(config.crdb_connect_timeout()),
consensus_tcp_user_timeout: Some(config.crdb_tcp_user_timeout()),
Expand Down
1 change: 1 addition & 0 deletions src/persist-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ differential-dataflow = "0.12.0"
futures = "0.3.25"
futures-util = "0.3"
h2 = "0.3.13"
moka = { version = "0.9.6", default-features = false, features = ["sync"] }
mz-build-info = { path = "../build-info" }
mz-ore = { path = "../ore", features = ["bytes_", "test", "tracing_"] }
mz-persist = { path = "../persist" }
Expand Down
4 changes: 4 additions & 0 deletions src/persist-client/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use tracing::{debug, instrument};

use crate::async_runtime::CpuHeavyRuntime;
use crate::error::{CodecConcreteType, CodecMismatch};
use crate::internal::cache::BlobMemCache;
use crate::internal::machine::retry_external;
use crate::internal::metrics::{LockMetrics, Metrics, MetricsBlob, MetricsConsensus, ShardMetrics};
use crate::internal::state::TypedState;
Expand Down Expand Up @@ -199,6 +200,9 @@ impl PersistClientCache {
Self::PROMETHEUS_SCRAPE_INTERVAL,
)
.await;
// This is intentionally "outside" (wrapping) MetricsBlob so
// that we don't include cached responses in blob metrics.
let blob = BlobMemCache::new(&self.cfg, Arc::clone(&self.metrics), blob);
Arc::clone(&x.insert((RttLatencyTask(task), blob)).1)
}
};
Expand Down
1 change: 1 addition & 0 deletions src/persist-client/src/cfg.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ message ProtoPersistParameters {
optional bool pubsub_push_diff_enabled = 11;
mz_proto.ProtoDuration consensus_tcp_user_timeout = 12;
optional uint64 rollup_threshold = 13;
optional uint64 blob_cache_mem_limit_bytes = 14;
}

message ProtoRetryParameters {
Expand Down
35 changes: 35 additions & 0 deletions src/persist-client/src/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ impl PersistConfig {
dynamic: Arc::new(DynamicConfig {
batch_builder_max_outstanding_parts: AtomicUsize::new(2),
blob_target_size: AtomicUsize::new(Self::DEFAULT_BLOB_TARGET_SIZE),
blob_cache_mem_limit_bytes: AtomicUsize::new(
Self::DEFAULT_BLOB_CACHE_MEM_LIMIT_BYTES,
),
compaction_heuristic_min_inputs: AtomicUsize::new(8),
compaction_heuristic_min_parts: AtomicUsize::new(8),
compaction_heuristic_min_updates: AtomicUsize::new(1024),
Expand Down Expand Up @@ -245,6 +248,15 @@ impl PersistConfig {

pub(crate) const DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER: usize = 3;

/// Default value for [`DynamicConfig::blob_cache_mem_limit_bytes`].
///
/// This initial value was tuned via a one-time experiment that showed an
/// environment running our demo "auction" source + mv got 90%+ cache hits
/// with a 1 MiB cache. This doesn't scale up to prod data sizes and doesn't
/// help with multi-process replicas, but the memory usage seems
/// unobjectionable enough to have it for the cases that it does help.
pub const DEFAULT_BLOB_CACHE_MEM_LIMIT_BYTES: usize = 1024 * 1024;

// Move this to a PersistConfig field when we actually have read leases.
//
// MIGRATION: Remove this once we remove the ReaderState <->
Expand Down Expand Up @@ -313,6 +325,7 @@ impl ConsensusKnobs for PersistConfig {
pub struct DynamicConfig {
batch_builder_max_outstanding_parts: AtomicUsize,
blob_target_size: AtomicUsize,
blob_cache_mem_limit_bytes: AtomicUsize,
compaction_heuristic_min_inputs: AtomicUsize,
compaction_heuristic_min_parts: AtomicUsize,
compaction_heuristic_min_updates: AtomicUsize,
Expand Down Expand Up @@ -408,6 +421,11 @@ impl DynamicConfig {
self.blob_target_size.load(Self::LOAD_ORDERING)
}

/// Capacity of in-mem blob cache in bytes.
pub fn blob_cache_mem_limit_bytes(&self) -> usize {
self.blob_cache_mem_limit_bytes.load(Self::LOAD_ORDERING)
}

/// In Compactor::compact_and_apply, we do the compaction (don't skip it)
/// if the number of inputs is at least this many. Compaction is performed
/// if any of the heuristic criteria are met (they are OR'd).
Expand Down Expand Up @@ -606,6 +624,8 @@ impl BlobKnobs for PersistConfig {
pub struct PersistParameters {
/// Configures [`DynamicConfig::blob_target_size`].
pub blob_target_size: Option<usize>,
/// Configures [`DynamicConfig::blob_cache_mem_limit_bytes`].
pub blob_cache_mem_limit_bytes: Option<usize>,
/// Configures [`DynamicConfig::compaction_minimum_timeout`].
pub compaction_minimum_timeout: Option<Duration>,
/// Configures [`DynamicConfig::consensus_connect_timeout`].
Expand Down Expand Up @@ -639,6 +659,7 @@ impl PersistParameters {
// are added.
let Self {
blob_target_size: self_blob_target_size,
blob_cache_mem_limit_bytes: self_blob_cache_mem_limit_bytes,
compaction_minimum_timeout: self_compaction_minimum_timeout,
consensus_connect_timeout: self_consensus_connect_timeout,
consensus_tcp_user_timeout: self_consensus_tcp_user_timeout,
Expand All @@ -654,6 +675,7 @@ impl PersistParameters {
} = self;
let Self {
blob_target_size: other_blob_target_size,
blob_cache_mem_limit_bytes: other_blob_cache_mem_limit_bytes,
compaction_minimum_timeout: other_compaction_minimum_timeout,
consensus_connect_timeout: other_consensus_connect_timeout,
consensus_tcp_user_timeout: other_consensus_tcp_user_timeout,
Expand All @@ -670,6 +692,9 @@ impl PersistParameters {
if let Some(v) = other_blob_target_size {
*self_blob_target_size = Some(v);
}
if let Some(v) = other_blob_cache_mem_limit_bytes {
*self_blob_cache_mem_limit_bytes = Some(v);
}
if let Some(v) = other_compaction_minimum_timeout {
*self_compaction_minimum_timeout = Some(v);
}
Expand Down Expand Up @@ -716,6 +741,7 @@ impl PersistParameters {
// Deconstruct self so we get a compile failure if new fields are added.
let Self {
blob_target_size,
blob_cache_mem_limit_bytes,
compaction_minimum_timeout,
consensus_connect_timeout,
consensus_tcp_user_timeout,
Expand All @@ -730,6 +756,7 @@ impl PersistParameters {
rollup_threshold,
} = self;
blob_target_size.is_none()
&& blob_cache_mem_limit_bytes.is_none()
&& compaction_minimum_timeout.is_none()
&& consensus_connect_timeout.is_none()
&& consensus_tcp_user_timeout.is_none()
Expand All @@ -753,6 +780,7 @@ impl PersistParameters {
// Deconstruct self so we get a compile failure if new fields are added.
let Self {
blob_target_size,
blob_cache_mem_limit_bytes,
compaction_minimum_timeout,
consensus_connect_timeout,
consensus_tcp_user_timeout,
Expand All @@ -771,6 +799,11 @@ impl PersistParameters {
.blob_target_size
.store(*blob_target_size, DynamicConfig::STORE_ORDERING);
}
if let Some(blob_cache_mem_limit_bytes) = blob_cache_mem_limit_bytes {
cfg.dynamic
.blob_cache_mem_limit_bytes
.store(*blob_cache_mem_limit_bytes, DynamicConfig::STORE_ORDERING);
}
if let Some(_compaction_minimum_timeout) = compaction_minimum_timeout {
// TODO: Figure out how to represent Durations in DynamicConfig.
}
Expand Down Expand Up @@ -846,6 +879,7 @@ impl RustType<ProtoPersistParameters> for PersistParameters {
fn into_proto(&self) -> ProtoPersistParameters {
ProtoPersistParameters {
blob_target_size: self.blob_target_size.into_proto(),
blob_cache_mem_limit_bytes: self.blob_cache_mem_limit_bytes.into_proto(),
compaction_minimum_timeout: self.compaction_minimum_timeout.into_proto(),
consensus_connect_timeout: self.consensus_connect_timeout.into_proto(),
consensus_tcp_user_timeout: self.consensus_tcp_user_timeout.into_proto(),
Expand All @@ -866,6 +900,7 @@ impl RustType<ProtoPersistParameters> for PersistParameters {
fn from_proto(proto: ProtoPersistParameters) -> Result<Self, TryFromProtoError> {
Ok(Self {
blob_target_size: proto.blob_target_size.into_rust()?,
blob_cache_mem_limit_bytes: proto.blob_cache_mem_limit_bytes.into_rust()?,
compaction_minimum_timeout: proto.compaction_minimum_timeout.into_rust()?,
consensus_connect_timeout: proto.consensus_connect_timeout.into_rust()?,
consensus_tcp_user_timeout: proto.consensus_tcp_user_timeout.into_rust()?,
Expand Down
133 changes: 133 additions & 0 deletions src/persist-client/src/internal/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! In-process caches of [Blob].

use std::sync::Arc;

use async_trait::async_trait;
use bytes::Bytes;
use moka::notification::RemovalCause;
use moka::sync::Cache;
use mz_ore::bytes::SegmentedBytes;
use mz_ore::cast::CastFrom;
use mz_persist::location::{Atomicity, Blob, BlobMetadata, ExternalError};
use tracing::error;

use crate::cfg::PersistConfig;
use crate::internal::metrics::Metrics;

// In-memory cache for [Blob].
#[derive(Debug)]
pub struct BlobMemCache {
metrics: Arc<Metrics>,
cache: Cache<String, SegmentedBytes>,
blob: Arc<dyn Blob + Send + Sync>,
}

impl BlobMemCache {
pub fn new(
cfg: &PersistConfig,
metrics: Arc<Metrics>,
blob: Arc<dyn Blob + Send + Sync>,
) -> Arc<dyn Blob + Send + Sync> {
let eviction_metrics = Arc::clone(&metrics);
// TODO: Make this react dynamically to changes in configuration.
let cache = Cache::<String, SegmentedBytes>::builder()
.max_capacity(u64::cast_from(cfg.dynamic.blob_cache_mem_limit_bytes()))
.weigher(|k, v| {
u32::try_from(v.len()).unwrap_or_else(|_| {
// We chunk off blobs at 128MiB, so the length should easily
// fit in a u32.
error!(
"unexpectedly large blob in persist cache {} bytes: {}",
v.len(),
k
);
u32::MAX
})
})
.eviction_listener(move |_k, _v, cause| match cause {
RemovalCause::Size => eviction_metrics.blob_cache_mem.evictions.inc(),
RemovalCause::Expired | RemovalCause::Explicit | RemovalCause::Replaced => {}
})
.build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be interesting to add a listener so we can track how many removals come from explicit delete calls vs size-based evictions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great idea! will do

let blob = BlobMemCache {
metrics,
cache,
blob,
};
Arc::new(blob)
}

fn update_size_metrics(&self) {
self.metrics
.blob_cache_mem
.size_blobs
.set(self.cache.entry_count());
self.metrics
.blob_cache_mem
.size_bytes
.set(self.cache.weighted_size());
}
}

#[async_trait]
impl Blob for BlobMemCache {
async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError> {
// First check if the blob is in the cache. If it is, return it. If not,
// fetch it and put it in the cache.
//
// Blobs are write-once modify-never, so we don't have to worry about
// any races or cache invalidations here. If the value is in the cache,
// it's also what's in s3 (if not, then there's a horrible bug somewhere
// else).
if let Some(cached_value) = self.cache.get(key) {
self.metrics.blob_cache_mem.hits_blobs.inc();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the intent to calculate hit rate based on the delta between this vs existing blob metrics?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, tho we'll have to do hits / (hits + fetches) because I put this "around" the MetricsBlob wrapper so it wouldn't skew our latency histograms

self.metrics
.blob_cache_mem
.hits_bytes
.inc_by(u64::cast_from(cached_value.len()));
return Ok(Some(cached_value));
}

// This could maybe use moka's async cache to unify any concurrent
// fetches for the same key? That's not particularly expected in
// persist's workload, so punt for now.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a little curious about this... I could imagine if multiple persist_source are waiting on the same blob, they could all cache miss because their timings might be so closely aligned. hm... any metrics that would give us insight into how often that might happen? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i can't think of an easy way to measure this without just literally solving the problem. would prefer to punt this to followup work as well

let res = self.blob.get(key).await?;
if let Some(blob) = res.as_ref() {
self.cache.insert(key.to_owned(), blob.clone());
self.update_size_metrics();
}
Ok(res)
}

async fn list_keys_and_metadata(
&self,
key_prefix: &str,
f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
) -> Result<(), ExternalError> {
self.blob.list_keys_and_metadata(key_prefix, f).await
}

async fn set(&self, key: &str, value: Bytes, atomic: Atomicity) -> Result<(), ExternalError> {
let () = self.blob.set(key, value.clone(), atomic).await?;
self.cache
.insert(key.to_owned(), SegmentedBytes::from(value));
self.update_size_metrics();
Ok(())
}

async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError> {
let res = self.blob.delete(key).await;
self.cache.invalidate(key);
self.update_size_metrics();
res
}
}
44 changes: 44 additions & 0 deletions src/persist-client/src/internal/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ pub struct Metrics {
pub pubsub_client: PubSubClientMetrics,
/// Metrics for mfp/filter pushdown.
pub pushdown: PushdownMetrics,
/// Metrics for blob caching.
pub blob_cache_mem: BlobMemCache,

/// Metrics for the persist sink.
pub sink: SinkMetrics,
Expand Down Expand Up @@ -123,6 +125,7 @@ impl Metrics {
watch: WatchMetrics::new(registry),
pubsub_client: PubSubClientMetrics::new(registry),
pushdown: PushdownMetrics::new(registry),
blob_cache_mem: BlobMemCache::new(registry),
sink: SinkMetrics::new(registry),
s3_blob: S3BlobMetrics::new(registry),
postgres_consensus: PostgresConsensusMetrics::new(registry),
Expand Down Expand Up @@ -1903,6 +1906,47 @@ impl PushdownMetrics {
}
}

#[derive(Debug)]
pub struct BlobMemCache {
pub(crate) size_blobs: UIntGauge,
pub(crate) size_bytes: UIntGauge,
pub(crate) hits_blobs: IntCounter,
pub(crate) hits_bytes: IntCounter,
pub(crate) evictions: IntCounter,
}

impl BlobMemCache {
fn new(registry: &MetricsRegistry) -> Self {
BlobMemCache {
size_blobs: registry.register(metric!(
name: "mz_persist_blob_cache_size_blobs",
help: "count of blobs in the cache",
const_labels: {"cache" => "mem"},
)),
size_bytes: registry.register(metric!(
name: "mz_persist_blob_cache_size_bytes",
help: "total size of blobs in the cache",
const_labels: {"cache" => "mem"},
)),
hits_blobs: registry.register(metric!(
name: "mz_persist_blob_cache_hits_blobs",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: elsewhere it's referred to as BlobMemCache vs the metric name blob_cache

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup! did that on purpose to set up a hypothetical disk cache. this has a label of "cache" -> "mem"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh yeah, I totally saw that and forgot it when reading it over a second time

help: "count of blobs served via cache instead of s3",
const_labels: {"cache" => "mem"},
)),
hits_bytes: registry.register(metric!(
name: "mz_persist_blob_cache_hits_bytes",
help: "total size of blobs served via cache instead of s3",
const_labels: {"cache" => "mem"},
)),
evictions: registry.register(metric!(
name: "mz_persist_blob_cache_evictions",
help: "count of capacity-based cache evictions",
const_labels: {"cache" => "mem"},
)),
}
}
}

#[derive(Debug)]
pub struct ExternalOpMetrics {
started: IntCounter,
Expand Down
1 change: 1 addition & 0 deletions src/persist-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ pub mod write;
/// An implementation of the public crate interface.
mod internal {
pub mod apply;
pub mod cache;
pub mod compact;
pub mod encoding;
pub mod gc;
Expand Down
Loading