Skip to content

Commit

Permalink
fix(config): remove system params from config file (risingwavelabs#8366)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gun9niR authored Mar 15, 2023
1 parent 24fe1e8 commit b0f276b
Show file tree
Hide file tree
Showing 15 changed files with 112 additions and 198 deletions.
93 changes: 0 additions & 93 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,6 @@ pub struct RwConfig {
#[serde(default)]
pub storage: StorageConfig,

#[serde(default)]
pub backup: BackupConfig,

#[serde(flatten)]
pub unrecognized: HashMap<String, Value>,
}
Expand Down Expand Up @@ -252,18 +249,10 @@ impl Default for BatchConfig {
/// The section `[streaming]` in `risingwave.toml`.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct StreamingConfig {
/// The interval of periodic barrier.
#[serde(default = "default::streaming::barrier_interval_ms")]
pub barrier_interval_ms: u32,

/// The maximum number of barriers in-flight in the compute nodes.
#[serde(default = "default::streaming::in_flight_barrier_nums")]
pub in_flight_barrier_nums: usize,

/// There will be a checkpoint for every n barriers
#[serde(default = "default::streaming::checkpoint_frequency")]
pub checkpoint_frequency: usize,

/// The thread number of the streaming actor runtime in the compute node. The default value is
/// decided by `tokio`.
#[serde(default)]
Expand Down Expand Up @@ -297,24 +286,6 @@ impl Default for StreamingConfig {
/// The section `[storage]` in `risingwave.toml`.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct StorageConfig {
// TODO(zhidong): Remove in 0.1.18 release
// NOTE: It is now a system parameter and should not be used directly.
/// Target size of the Sstable.
#[serde(default = "default::storage::sst_size_mb")]
pub sstable_size_mb: u32,

// TODO(zhidong): Remove in 0.1.18 release
// NOTE: It is now a system parameter and should not be used directly.
/// Size of each block in bytes in SST.
#[serde(default = "default::storage::block_size_kb")]
pub block_size_kb: u32,

// TODO(zhidong): Remove in 0.1.18 release
// NOTE: It is now a system parameter and should not be used directly.
/// False positive probability of bloom filter.
#[serde(default = "default::storage::bloom_false_positive")]
pub bloom_false_positive: f64,

/// parallelism while syncing share buffers into L0 SST. Should NOT be 0.
#[serde(default = "default::storage::share_buffers_sync_parallelism")]
pub share_buffers_sync_parallelism: u32,
Expand All @@ -329,12 +300,6 @@ pub struct StorageConfig {
#[serde(default = "default::storage::shared_buffer_capacity_mb")]
pub shared_buffer_capacity_mb: usize,

// TODO(zhidong): Remove in 0.1.18 release
// NOTE: It is now a system parameter and should not be used directly.
/// Remote directory for storing data and metadata objects.
#[serde(default = "default::storage::data_directory")]
pub data_directory: String,

/// Whether to enable write conflict detection
#[serde(default = "default::storage::write_conflict_detection_enabled")]
pub write_conflict_detection_enabled: bool,
Expand Down Expand Up @@ -486,30 +451,6 @@ impl Default for DeveloperConfig {
}
}

/// Configs for meta node backup
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct BackupConfig {
// TODO: Remove in 0.1.18 release
// NOTE: It is now a system parameter and should not be used directly.
/// Remote storage url for storing snapshots.
#[serde(default = "default::backup::storage_url")]
pub storage_url: String,
// TODO: Remove in 0.1.18 release
// NOTE: It is now a system parameter and should not be used directly.
/// Remote directory for storing snapshots.
#[serde(default = "default::backup::storage_directory")]
pub storage_directory: String,

#[serde(flatten)]
pub unrecognized: HashMap<String, Value>,
}

impl Default for BackupConfig {
fn default() -> Self {
toml::from_str("").unwrap()
}
}

mod default {
pub mod meta {
use crate::config::MetaBackend;
Expand Down Expand Up @@ -576,18 +517,6 @@ mod default {

pub mod storage {

pub fn sst_size_mb() -> u32 {
256
}

pub fn block_size_kb() -> u32 {
64
}

pub fn bloom_false_positive() -> f64 {
0.001
}

pub fn share_buffers_sync_parallelism() -> u32 {
1
}
Expand All @@ -600,10 +529,6 @@ mod default {
1024
}

pub fn data_directory() -> String {
"hummock_001".to_string()
}

pub fn write_conflict_detection_enabled() -> bool {
cfg!(debug_assertions)
}
Expand Down Expand Up @@ -657,20 +582,12 @@ mod default {
pub mod streaming {
use crate::config::AsyncStackTraceOption;

pub fn barrier_interval_ms() -> u32 {
1000
}

pub fn in_flight_barrier_nums() -> usize {
// quick fix
// TODO: remove this limitation from code
10000
}

pub fn checkpoint_frequency() -> usize {
10
}

pub fn enable_jaegar_tracing() -> bool {
false
}
Expand Down Expand Up @@ -745,14 +662,4 @@ mod default {
1024
}
}

pub mod backup {
pub fn storage_url() -> String {
"memory".to_string()
}

pub fn storage_directory() -> String {
"backup".to_string()
}
}
}
6 changes: 6 additions & 0 deletions src/common/src/system_param/local_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use arc_swap::ArcSwap;
use risingwave_pb::meta::SystemParams;
use tokio::sync::watch::{channel, Receiver, Sender};

use super::default_system_params;
use super::reader::SystemParamsReader;

pub type SystemParamsReaderRef = Arc<ArcSwap<SystemParamsReader>>;
Expand All @@ -29,6 +30,7 @@ pub type LocalSystemParamsManagerRef = Arc<LocalSystemParamsManager>;
/// - `get_params` returns a reference to the latest parameters that is atomically updated.
/// - `watch_params` returns a channel on which calling `recv` will get the latest parameters.
/// Compared with `get_params`, the caller can be explicitly notified of parameter change.
#[derive(Debug)]
pub struct LocalSystemParamsManager {
/// The latest parameters.
params: SystemParamsReaderRef,
Expand All @@ -44,6 +46,10 @@ impl LocalSystemParamsManager {
Self { params, tx }
}

pub fn for_test() -> Self {
Self::new(default_system_params().into())
}

pub fn get_params(&self) -> SystemParamsReaderRef {
self.params.clone()
}
Expand Down
2 changes: 2 additions & 0 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ pub async fn compute_node_serve(
));
monitor_cache(memory_collector, &registry).unwrap();
let backup_reader = storage.backup_reader();
let system_params_manager = system_params_manager.clone();
tokio::spawn(async move {
backup_reader
.watch_config_change(system_params_manager.watch_params())
Expand Down Expand Up @@ -295,6 +296,7 @@ pub async fn compute_node_serve(
worker_id,
state_store,
dml_mgr,
system_params_manager,
source_metrics,
);

Expand Down
4 changes: 0 additions & 4 deletions src/config/ci-compaction-test-meta.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@ checkpoint_frequency = 99999999

[storage]
shared_buffer_capacity_mb = 4096
sstable_size_mb = 256
block_size_kb = 1024
bloom_false_positive = 0.001
data_directory = "hummock_001"
block_cache_capacity_mb = 4096
meta_cache_capacity_mb = 1024
compactor_memory_limit_mb = 5120
Expand Down
4 changes: 0 additions & 4 deletions src/config/ci-compaction-test.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@ checkpoint_frequency = 10

[storage]
shared_buffer_capacity_mb = 4096
sstable_size_mb = 256
block_size_kb = 1024
bloom_false_positive = 0.001
data_directory = "hummock_001"
block_cache_capacity_mb = 4096
meta_cache_capacity_mb = 1024
compactor_memory_limit_mb = 5120
Expand Down
4 changes: 0 additions & 4 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@ checkpoint_frequency = 10

[storage]
shared_buffer_capacity_mb = 4096
sstable_size_mb = 256
block_size_kb = 1024
bloom_false_positive = 0.001
data_directory = "hummock_001"
block_cache_capacity_mb = 4096
meta_cache_capacity_mb = 1024
compactor_memory_limit_mb = 5120
Expand Down
80 changes: 41 additions & 39 deletions src/meta/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use std::time::Duration;

use clap::Parser;
pub use error::{MetaError, MetaResult};
use risingwave_common::system_param::default;
use risingwave_common::{GIT_SHA, RW_VERSION};
use risingwave_common_proc_macro::OverrideConfig;
use risingwave_pb::meta::SystemParams;
Expand Down Expand Up @@ -106,11 +107,42 @@ pub struct MetaNodeOpts {
#[clap(long, env = "RW_PROMETHEUS_ENDPOINT")]
prometheus_endpoint: Option<String>,

// TODO(zhidong): Make it required in v0.1.18
/// State store url.
#[clap(long, env = "RW_STATE_STORE")]
state_store: Option<String>,

/// The interval of periodic barrier.
#[clap(long, env = "RW_BARRIER_INTERVAL_MS", default_value_t = default::barrier_interval_ms())]
barrier_interval_ms: u32,

/// There will be a checkpoint for every n barriers
#[clap(long, env = "RW_CHECKPOINT_FREQUENCY", default_value_t = default::checkpoint_frequency())]
pub checkpoint_frequency: u64,

/// Target size of the Sstable.
#[clap(long, env = "RW_SSTABLE_SIZE_MB", default_value_t = default::sstable_size_mb())]
sstable_size_mb: u32,

/// Size of each block in bytes in SST.
#[clap(long, env = "RW_BLOCK_SIZE_KB", default_value_t = default::block_size_kb())]
block_size_kb: u32,

/// False positive probability of bloom filter.
#[clap(long, env = "RW_BLOOM_FALSE_POSITIVE", default_value_t = default::bloom_false_positive())]
bloom_false_positive: f64,

/// Remote directory for storing data and metadata objects.
#[clap(long, env = "RW_DATA_DIRECTORY", default_value_t = default::data_directory())]
data_directory: String,

/// Remote storage url for storing snapshots.
#[clap(long, env = "RW_BACKUP_STORAGE_URL", default_value_t = default::backup_storage_url())]
backup_storage_url: String,

/// Remote directory for storing snapshots.
#[clap(long, env = "RW_STORAGE_DIRECTORY", default_value_t = default::backup_storage_directory())]
backup_storage_directory: String,

/// Endpoint of the connector node, there will be a sidecar connector node
/// colocated with Meta node in the cloud environment
#[clap(long, env = "RW_CONNECTOR_RPC_ENDPOINT")]
Expand All @@ -132,36 +164,6 @@ pub struct OverrideConfigOpts {
#[clap(long, env = "RW_BACKEND", value_enum)]
#[override_opts(path = meta.backend)]
backend: Option<MetaBackend>,

/// Target size of the Sstable.
#[clap(long, env = "RW_SSTABLE_SIZE_MB")]
#[override_opts(path = storage.sstable_size_mb)]
sstable_size_mb: Option<u32>,

/// Size of each block in bytes in SST.
#[clap(long, env = "RW_BLOCK_SIZE_KB")]
#[override_opts(path = storage.block_size_kb)]
block_size_kb: Option<u32>,

/// False positive probability of bloom filter.
#[clap(long, env = "RW_BLOOM_FALSE_POSITIVE")]
#[override_opts(path = storage.bloom_false_positive)]
bloom_false_positive: Option<f64>,

/// Remote directory for storing data and metadata objects.
#[clap(long, env = "RW_DATA_DIRECTORY")]
#[override_opts(path = storage.data_directory)]
data_directory: Option<String>,

/// Remote storage url for storing snapshots.
#[clap(long, env = "RW_BACKUP_STORAGE_URL")]
#[override_opts(path = backup.storage_url)]
backup_storage_url: Option<String>,

/// Remote directory for storing snapshots.
#[clap(long, env = "RW_STORAGE_DIRECTORY")]
#[override_opts(path = backup.storage_directory)]
backup_storage_directory: Option<String>,
}

use std::future::Future;
Expand Down Expand Up @@ -246,15 +248,15 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
.periodic_ttl_reclaim_compaction_interval_sec,
},
SystemParams {
barrier_interval_ms: Some(config.streaming.barrier_interval_ms),
checkpoint_frequency: Some(config.streaming.checkpoint_frequency as u64),
sstable_size_mb: Some(config.storage.sstable_size_mb),
block_size_kb: Some(config.storage.block_size_kb),
bloom_false_positive: Some(config.storage.bloom_false_positive),
barrier_interval_ms: Some(opts.barrier_interval_ms),
checkpoint_frequency: Some(opts.checkpoint_frequency),
sstable_size_mb: Some(opts.sstable_size_mb),
block_size_kb: Some(opts.block_size_kb),
bloom_false_positive: Some(opts.bloom_false_positive),
state_store: Some(opts.state_store.unwrap_or_default()),
data_directory: Some(config.storage.data_directory),
backup_storage_url: Some(config.backup.storage_url),
backup_storage_directory: Some(config.backup.storage_directory),
data_directory: Some(opts.data_directory),
backup_storage_url: Some(opts.backup_storage_url),
backup_storage_directory: Some(opts.backup_storage_directory),
},
)
.await
Expand Down
6 changes: 3 additions & 3 deletions src/meta/src/manager/system_param/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::sync::Arc;
use std::time::Duration;

use risingwave_common::system_param::reader::SystemParamsReader;
use risingwave_common::system_param::{default, set_system_param};
use risingwave_common::system_param::set_system_param;
use risingwave_common::{for_all_undeprecated_params, key_of};
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::SystemParams;
Expand Down Expand Up @@ -145,7 +145,7 @@ impl<S: MetaStore> SystemParamsManager<S> {
// 2. Some, Some: Check equality and warn if they differ.
// 3. None, Some: A new version of RW cluster is launched for the first time and newly introduced
// params are not set. Use init value.
// 4. None, None: Same as 3, but the init param is not from CLI. Use default value.
// 4. None, None: Impossible.
macro_rules! impl_merge_params {
($({ $field:ident, $type:ty, $default:expr },)*) => {
fn merge_params(mut persisted: SystemParams, init: SystemParams) -> SystemParams {
Expand All @@ -157,7 +157,7 @@ macro_rules! impl_merge_params {
}
},
(None, Some(init)) => persisted.$field = Some(init),
(None, None) => { persisted.$field = Some(default::$field()) },
(None, None) => unreachable!(),
_ => {},
}
)*
Expand Down
Loading

0 comments on commit b0f276b

Please sign in to comment.