From fdfc0b73323100a902c046a4b9418ad9acdbba8c Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 30 Sep 2024 16:42:05 +0800 Subject: [PATCH 1/9] add watch fn Signed-off-by: Bugen Zhao --- Cargo.lock | 173 ++++++++++++++++++++++++++++++------ src/meta/Cargo.toml | 1 + src/meta/src/manager/env.rs | 78 ++++++++++++++++ 3 files changed, 224 insertions(+), 28 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e86fb54e5805..f6d83ef8ca44 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2723,7 +2723,7 @@ dependencies = [ "num-traits", "serde", "wasm-bindgen", - "windows-targets 0.52.0", + "windows-targets 0.52.6", ] [[package]] @@ -4844,6 +4844,18 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "filetime" +version = "0.2.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35c0522e981e68cbfa8c3f978441a5f34b30b96e146b33cd3359176b50fe8586" +dependencies = [ + "cfg-if", + "libc", + "libredox 0.1.3", + "windows-sys 0.59.0", +] + [[package]] name = "findshlibs" version = "0.10.2" @@ -5202,6 +5214,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" +[[package]] +name = "fsevent-sys" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" +dependencies = [ + "libc", +] + [[package]] name = "function_name" version = "0.3.0" @@ -6416,6 +6437,26 @@ dependencies = [ "syn 2.0.66", ] +[[package]] +name = "inotify" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff" +dependencies = [ + "bitflags 1.3.2", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "inout" version = "0.1.3" @@ -6689,6 +6730,26 @@ dependencies = [ "indexmap 1.9.3", ] +[[package]] +name = "kqueue" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7447f1ca1b7b563588a205fe93dea8df60fd981423a768bc1c0ded35ed147d0c" +dependencies = [ + "kqueue-sys", + "libc", +] + +[[package]] +name = "kqueue-sys" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b" +dependencies = [ + "bitflags 1.3.2", + "libc", +] + [[package]] name = "krb5-src" version = "0.3.2+1.19.2" @@ -6849,6 +6910,17 @@ dependencies = [ "redox_syscall 0.4.1", ] +[[package]] +name = "libredox" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d" +dependencies = [ + "bitflags 2.6.0", + "libc", + "redox_syscall 0.5.7", +] + [[package]] name = "libsqlite3-sys" version = "0.27.0" @@ -7776,6 +7848,25 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be" +[[package]] +name = "notify" +version = "6.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6205bd8bb1e454ad2e27422015fb5e4f2bcc7e08fa8f27058670d208324a4d2d" +dependencies = [ + "bitflags 2.6.0", + "crossbeam-channel", + "filetime", + "fsevent-sys", + "inotify", + "kqueue", + "libc", + "log", + "mio", + "walkdir", + "windows-sys 0.48.0", +] + [[package]] name = "npm_rs" version = "1.0.0" @@ -9856,6 +9947,15 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "redox_syscall" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b6dfecf2c74bce2466cabf93f6664d6998a69eb21e39f4207930065b27b771f" +dependencies = [ + "bitflags 2.6.0", +] + [[package]] name = "redox_users" version = "0.4.4" @@ -9863,7 +9963,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a18479200779601e498ada4e8c1e1f50e3ee19deb0259c25825a98b5603b2cb4" dependencies = [ "getrandom", - "libredox", + "libredox 0.0.1", "thiserror", ] @@ -11394,6 +11494,7 @@ dependencies = [ "maplit", "memcomparable", "mime_guess", + "notify", "num-integer", "num-traits", "otlp-embedded", @@ -15943,7 +16044,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be" dependencies = [ "windows-core 0.52.0", - "windows-targets 0.52.0", + "windows-targets 0.52.6", ] [[package]] @@ -15961,7 +16062,7 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" dependencies = [ - "windows-targets 0.52.0", + "windows-targets 0.52.6", ] [[package]] @@ -15988,7 +16089,16 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ - "windows-targets 0.52.0", + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-sys" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" +dependencies = [ + "windows-targets 0.52.6", ] [[package]] @@ -16023,17 +16133,18 @@ dependencies = [ [[package]] name = "windows-targets" -version = "0.52.0" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a18201040b24831fbb9e4eb208f8892e1f50a37feb53cc7ff887feb8f50e7cd" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" dependencies = [ - "windows_aarch64_gnullvm 0.52.0", - "windows_aarch64_msvc 0.52.0", - "windows_i686_gnu 0.52.0", - "windows_i686_msvc 0.52.0", - "windows_x86_64_gnu 0.52.0", - "windows_x86_64_gnullvm 0.52.0", - "windows_x86_64_msvc 0.52.0", + "windows_aarch64_gnullvm 0.52.6", + "windows_aarch64_msvc 0.52.6", + "windows_i686_gnu 0.52.6", + "windows_i686_gnullvm", + "windows_i686_msvc 0.52.6", + "windows_x86_64_gnu 0.52.6", + "windows_x86_64_gnullvm 0.52.6", + "windows_x86_64_msvc 0.52.6", ] [[package]] @@ -16050,9 +16161,9 @@ checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" [[package]] name = "windows_aarch64_gnullvm" -version = "0.52.0" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb7764e35d4db8a7921e09562a0304bf2f93e0a51bfccee0bd0bb0b666b015ea" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" [[package]] name = "windows_aarch64_msvc" @@ -16068,9 +16179,9 @@ checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" [[package]] name = "windows_aarch64_msvc" -version = "0.52.0" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbaa0368d4f1d2aaefc55b6fcfee13f41544ddf36801e793edbbfd7d7df075ef" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" [[package]] name = "windows_i686_gnu" @@ -16086,9 +16197,15 @@ checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" [[package]] name = "windows_i686_gnu" -version = "0.52.0" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a28637cb1fa3560a16915793afb20081aba2c92ee8af57b4d5f28e4b3e7df313" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" [[package]] name = "windows_i686_msvc" @@ -16104,9 +16221,9 @@ checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" [[package]] name = "windows_i686_msvc" -version = "0.52.0" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffe5e8e31046ce6230cc7215707b816e339ff4d4d67c65dffa206fd0f7aa7b9a" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" [[package]] name = "windows_x86_64_gnu" @@ -16122,9 +16239,9 @@ checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" [[package]] name = "windows_x86_64_gnu" -version = "0.52.0" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d6fa32db2bc4a2f5abeacf2b69f7992cd09dca97498da74a151a3132c26befd" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" [[package]] name = "windows_x86_64_gnullvm" @@ -16140,9 +16257,9 @@ checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" [[package]] name = "windows_x86_64_gnullvm" -version = "0.52.0" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a657e1e9d3f514745a572a6846d3c7aa7dbe1658c056ed9c3344c4109a6949e" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" [[package]] name = "windows_x86_64_msvc" @@ -16158,9 +16275,9 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" [[package]] name = "windows_x86_64_msvc" -version = "0.52.0" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" [[package]] name = "winnow" diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml index a7f37bf50591..1558d83e9277 100644 --- a/src/meta/Cargo.toml +++ b/src/meta/Cargo.toml @@ -43,6 +43,7 @@ jsonbb = { workspace = true } maplit = "1.0.2" memcomparable = { version = "0.2" } mime_guess = "2" +notify = { version = "6", no-default-features = true, features = ["macos_fsevent"] } num-integer = "0.1" num-traits = "0.2" otlp-embedded = { workspace = true } diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index fded79bc7f8a..6d8c469d709f 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -13,13 +13,17 @@ // limitations under the License. use std::ops::Deref; +use std::path::Path; use std::sync::Arc; +use anyhow::Context; +use notify::Watcher; use risingwave_common::config::{ CompactionConfig, DefaultParallelism, MetaBackend, ObjectStoreConfig, }; use risingwave_common::session_config::SessionConfig; use risingwave_common::system_param::reader::SystemParamsReader; +use risingwave_common::system_param::LICENSE_KEY_KEY; use risingwave_meta_model_migration::{MigrationStatus, Migrator, MigratorTrait}; use risingwave_meta_model_v2::prelude::Cluster; use risingwave_pb::meta::SystemParams; @@ -27,6 +31,9 @@ use risingwave_rpc_client::{ FrontendClientPool, FrontendClientPoolRef, StreamClientPool, StreamClientPoolRef, }; use sea_orm::EntityTrait; +use thiserror_ext::AsReport; +use tokio::sync::watch; +use tokio::task::JoinHandle; use super::{ SessionParamsManager, SessionParamsManagerRef, SystemParamsManager, SystemParamsManagerRef, @@ -626,3 +633,74 @@ impl MetaSrvEnv { .unwrap() } } + +impl MetaSrvEnv { + pub fn start_watch_license_key_file( + &self, + path: impl AsRef, + ) -> MetaResult> { + let path = path.as_ref(); + let (changed_tx, mut changed_rx) = watch::channel(()); + + let mut watcher = + notify::recommended_watcher(move |event: Result| { + if let Err(e) = event { + tracing::warn!( + error = %e.as_report(), + "error occurred while watching license key file" + ); + return; + } + let _ = changed_tx.send(()); + }) + .context("failed to create license key file watcher")?; + + // This will spawn a new thread to watch the file, so no need to be concerned about blocking. + watcher + .watch(path, notify::RecursiveMode::NonRecursive) + .context("failed to watch license key file")?; + + let updater = { + let mgr = self.system_param_manager_impl.clone(); + let path = path.to_path_buf(); + async move { + // Let the watcher live until the end of the updater to prevent dropping (then stopping). + let _watcher = watcher; + + while changed_rx.changed().await.is_ok() { + let content = match tokio::fs::read_to_string(&path).await { + Ok(v) => v, + Err(e) => { + tracing::warn!( + path = %path.display(), + error = %e.as_report(), + "failed to read license key file" + ); + continue; + } + }; + let content = content.trim().to_owned(); + let value = (!content.is_empty()).then_some(content); + + let result = match &mgr { + SystemParamsManagerImpl::Kv(mgr) => { + mgr.set_param(LICENSE_KEY_KEY, value).await + } + SystemParamsManagerImpl::Sql(mgr) => { + mgr.set_param(LICENSE_KEY_KEY, value).await + } + }; + + if let Err(e) = result { + tracing::error!( + error = %e.as_report(), + "failed to set license key from file" + ); + } + } + } + }; + + Ok(tokio::spawn(updater)) + } +} From 67dc6906adae7443bb1a2486c62eecec89232d8a Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 30 Sep 2024 16:51:56 +0800 Subject: [PATCH 2/9] watch license key file Signed-off-by: Bugen Zhao --- src/meta/node/src/lib.rs | 6 ++++++ src/meta/src/manager/env.rs | 20 ++++++++++++++------ 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index 2adc2ee592ca..842f0dbc46a5 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -17,6 +17,7 @@ mod server; +use std::path::PathBuf; use std::time::Duration; use clap::Parser; @@ -192,6 +193,10 @@ pub struct MetaNodeOpts { #[override_opts(path = system.license_key)] pub license_key: Option, + /// The path of the license key file to be watched and hot-reloaded. + #[clap(long, env = "RW_LICENSE_KEY_PATH")] + pub license_key_file: Option, + /// 128-bit AES key for secret store in HEX format. #[educe(Debug(ignore))] // TODO: use newtype to redact debug impl #[clap(long, hide = true, env = "RW_SECRET_STORE_PRIVATE_KEY_HEX")] @@ -468,6 +473,7 @@ pub fn start( .meta .developer .actor_cnt_per_worker_parallelism_soft_limit, + license_key_path: opts.license_key_file, }, config.system.into_init_system_params(), Default::default(), diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index 6d8c469d709f..1c5c9147b919 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::ops::Deref; -use std::path::Path; +use std::path::PathBuf; use std::sync::Arc; use anyhow::Context; @@ -305,6 +305,8 @@ pub struct MetaOpts { // Cluster limits pub actor_cnt_per_worker_parallelism_hard_limit: usize, pub actor_cnt_per_worker_parallelism_soft_limit: usize, + + pub license_key_path: Option, } impl MetaOpts { @@ -372,6 +374,7 @@ impl MetaOpts { table_info_statistic_history_times: 240, actor_cnt_per_worker_parallelism_hard_limit: usize::MAX, actor_cnt_per_worker_parallelism_soft_limit: usize::MAX, + license_key_path: None, } } } @@ -533,6 +536,9 @@ impl MetaSrvEnv { } } }; + + env.may_start_watch_license_key_file()?; + Ok(env) } @@ -635,11 +641,13 @@ impl MetaSrvEnv { } impl MetaSrvEnv { - pub fn start_watch_license_key_file( - &self, - path: impl AsRef, - ) -> MetaResult> { - let path = path.as_ref(); + /// Spawn background tasks to watch the license key file and update the system parameter, + /// if configured. + pub fn may_start_watch_license_key_file(&self) -> MetaResult> { + let Some(path) = self.opts.license_key_path.as_ref() else { + return Ok(()); + }; + let (changed_tx, mut changed_rx) = watch::channel(()); let mut watcher = From 00a1c33b4f3fa7eef4f8691d2744eaeee7840cdc Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 30 Sep 2024 17:33:04 +0800 Subject: [PATCH 3/9] minor fixes Signed-off-by: Bugen Zhao --- Cargo.lock | 1 - src/meta/Cargo.toml | 2 +- src/meta/node/src/server.rs | 1 + src/meta/src/manager/env.rs | 8 +++----- 4 files changed, 5 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f6d83ef8ca44..b2ad41df9663 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7855,7 +7855,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6205bd8bb1e454ad2e27422015fb5e4f2bcc7e08fa8f27058670d208324a4d2d" dependencies = [ "bitflags 2.6.0", - "crossbeam-channel", "filetime", "fsevent-sys", "inotify", diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml index 1558d83e9277..4d4086d4782b 100644 --- a/src/meta/Cargo.toml +++ b/src/meta/Cargo.toml @@ -43,7 +43,7 @@ jsonbb = { workspace = true } maplit = "1.0.2" memcomparable = { version = "0.2" } mime_guess = "2" -notify = { version = "6", no-default-features = true, features = ["macos_fsevent"] } +notify = { version = "6", default-features = false, features = ["macos_fsevent"] } num-integer = "0.1" num-traits = "0.2" otlp-embedded = { workspace = true } diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index 11b22014f9f9..5f518a1076e0 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -402,6 +402,7 @@ pub async fn start_service_as_election_leader( meta_store_impl, ) .await?; + env.may_start_watch_license_key_file()?; let system_params_reader = env.system_params_reader().await; let data_directory = system_params_reader.data_directory(); diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index 1c5c9147b919..a50c56c639f2 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -33,7 +33,6 @@ use risingwave_rpc_client::{ use sea_orm::EntityTrait; use thiserror_ext::AsReport; use tokio::sync::watch; -use tokio::task::JoinHandle; use super::{ SessionParamsManager, SessionParamsManagerRef, SystemParamsManager, SystemParamsManagerRef, @@ -537,8 +536,6 @@ impl MetaSrvEnv { } }; - env.may_start_watch_license_key_file()?; - Ok(env) } @@ -643,7 +640,7 @@ impl MetaSrvEnv { impl MetaSrvEnv { /// Spawn background tasks to watch the license key file and update the system parameter, /// if configured. - pub fn may_start_watch_license_key_file(&self) -> MetaResult> { + pub fn may_start_watch_license_key_file(&self) -> MetaResult<()> { let Some(path) = self.opts.license_key_path.as_ref() else { return Ok(()); }; @@ -708,7 +705,8 @@ impl MetaSrvEnv { } } }; + let _handle = tokio::spawn(updater); - Ok(tokio::spawn(updater)) + Ok(()) } } From 05749c59ad8787422a7c76338736154f07b5a51a Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 30 Sep 2024 17:37:36 +0800 Subject: [PATCH 4/9] move to a separate file Signed-off-by: Bugen Zhao --- src/meta/src/manager/env.rs | 79 --------------------------- src/meta/src/manager/license.rs | 96 +++++++++++++++++++++++++++++++++ src/meta/src/manager/mod.rs | 1 + 3 files changed, 97 insertions(+), 79 deletions(-) create mode 100644 src/meta/src/manager/license.rs diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index a50c56c639f2..d6d053dae1b1 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -16,14 +16,11 @@ use std::ops::Deref; use std::path::PathBuf; use std::sync::Arc; -use anyhow::Context; -use notify::Watcher; use risingwave_common::config::{ CompactionConfig, DefaultParallelism, MetaBackend, ObjectStoreConfig, }; use risingwave_common::session_config::SessionConfig; use risingwave_common::system_param::reader::SystemParamsReader; -use risingwave_common::system_param::LICENSE_KEY_KEY; use risingwave_meta_model_migration::{MigrationStatus, Migrator, MigratorTrait}; use risingwave_meta_model_v2::prelude::Cluster; use risingwave_pb::meta::SystemParams; @@ -31,8 +28,6 @@ use risingwave_rpc_client::{ FrontendClientPool, FrontendClientPoolRef, StreamClientPool, StreamClientPoolRef, }; use sea_orm::EntityTrait; -use thiserror_ext::AsReport; -use tokio::sync::watch; use super::{ SessionParamsManager, SessionParamsManagerRef, SystemParamsManager, SystemParamsManagerRef, @@ -636,77 +631,3 @@ impl MetaSrvEnv { .unwrap() } } - -impl MetaSrvEnv { - /// Spawn background tasks to watch the license key file and update the system parameter, - /// if configured. - pub fn may_start_watch_license_key_file(&self) -> MetaResult<()> { - let Some(path) = self.opts.license_key_path.as_ref() else { - return Ok(()); - }; - - let (changed_tx, mut changed_rx) = watch::channel(()); - - let mut watcher = - notify::recommended_watcher(move |event: Result| { - if let Err(e) = event { - tracing::warn!( - error = %e.as_report(), - "error occurred while watching license key file" - ); - return; - } - let _ = changed_tx.send(()); - }) - .context("failed to create license key file watcher")?; - - // This will spawn a new thread to watch the file, so no need to be concerned about blocking. - watcher - .watch(path, notify::RecursiveMode::NonRecursive) - .context("failed to watch license key file")?; - - let updater = { - let mgr = self.system_param_manager_impl.clone(); - let path = path.to_path_buf(); - async move { - // Let the watcher live until the end of the updater to prevent dropping (then stopping). - let _watcher = watcher; - - while changed_rx.changed().await.is_ok() { - let content = match tokio::fs::read_to_string(&path).await { - Ok(v) => v, - Err(e) => { - tracing::warn!( - path = %path.display(), - error = %e.as_report(), - "failed to read license key file" - ); - continue; - } - }; - let content = content.trim().to_owned(); - let value = (!content.is_empty()).then_some(content); - - let result = match &mgr { - SystemParamsManagerImpl::Kv(mgr) => { - mgr.set_param(LICENSE_KEY_KEY, value).await - } - SystemParamsManagerImpl::Sql(mgr) => { - mgr.set_param(LICENSE_KEY_KEY, value).await - } - }; - - if let Err(e) = result { - tracing::error!( - error = %e.as_report(), - "failed to set license key from file" - ); - } - } - } - }; - let _handle = tokio::spawn(updater); - - Ok(()) - } -} diff --git a/src/meta/src/manager/license.rs b/src/meta/src/manager/license.rs new file mode 100644 index 000000000000..74afde5783db --- /dev/null +++ b/src/meta/src/manager/license.rs @@ -0,0 +1,96 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use anyhow::Context; +use notify::Watcher; +use risingwave_common::system_param::LICENSE_KEY_KEY; +use thiserror_ext::AsReport; +use tokio::sync::watch; + +use super::{MetaSrvEnv, SystemParamsManagerImpl}; +use crate::MetaResult; + +impl MetaSrvEnv { + /// Spawn background tasks to watch the license key file and update the system parameter, + /// if configured. + pub fn may_start_watch_license_key_file(&self) -> MetaResult<()> { + let Some(path) = self.opts.license_key_path.as_ref() else { + return Ok(()); + }; + + let (changed_tx, mut changed_rx) = watch::channel(()); + + let mut watcher = + notify::recommended_watcher(move |event: Result| { + if let Err(e) = event { + tracing::warn!( + error = %e.as_report(), + "error occurred while watching license key file" + ); + return; + } + let _ = changed_tx.send(()); + }) + .context("failed to create license key file watcher")?; + + // This will spawn a new thread to watch the file, so no need to be concerned about blocking. + watcher + .watch(path, notify::RecursiveMode::NonRecursive) + .context("failed to watch license key file")?; + + let updater = { + let mgr = self.system_params_manager_impl_ref(); + let path = path.to_path_buf(); + async move { + // Let the watcher live until the end of the updater to prevent dropping (then stopping). + let _watcher = watcher; + + while changed_rx.changed().await.is_ok() { + let content = match tokio::fs::read_to_string(&path).await { + Ok(v) => v, + Err(e) => { + tracing::warn!( + path = %path.display(), + error = %e.as_report(), + "failed to read license key file" + ); + continue; + } + }; + let content = content.trim().to_owned(); + let value = (!content.is_empty()).then_some(content); + + let result = match &mgr { + SystemParamsManagerImpl::Kv(mgr) => { + mgr.set_param(LICENSE_KEY_KEY, value).await + } + SystemParamsManagerImpl::Sql(mgr) => { + mgr.set_param(LICENSE_KEY_KEY, value).await + } + }; + + if let Err(e) = result { + tracing::error!( + error = %e.as_report(), + "failed to set license key from file" + ); + } + } + } + }; + let _handle = tokio::spawn(updater); + + Ok(()) + } +} diff --git a/src/meta/src/manager/mod.rs b/src/meta/src/manager/mod.rs index eac1b15c8e69..63a564faa904 100644 --- a/src/meta/src/manager/mod.rs +++ b/src/meta/src/manager/mod.rs @@ -19,6 +19,7 @@ mod env; pub mod event_log; mod id; mod idle; +mod license; mod metadata; mod notification; mod notification_version; From 9ef87b718a99ff496cc0ae3161ec5ee5ba4d5256 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 30 Sep 2024 17:41:56 +0800 Subject: [PATCH 5/9] update comments Signed-off-by: Bugen Zhao --- src/meta/src/manager/license.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/meta/src/manager/license.rs b/src/meta/src/manager/license.rs index 74afde5783db..20368b031cea 100644 --- a/src/meta/src/manager/license.rs +++ b/src/meta/src/manager/license.rs @@ -40,6 +40,7 @@ impl MetaSrvEnv { ); return; } + // We don't check the event type but always notify the updater for simplicity. let _ = changed_tx.send(()); }) .context("failed to create license key file watcher")?; @@ -57,6 +58,8 @@ impl MetaSrvEnv { let _watcher = watcher; while changed_rx.changed().await.is_ok() { + tracing::info!(path = %path.display(), "license key file changed, reloading..."); + let content = match tokio::fs::read_to_string(&path).await { Ok(v) => v, Err(e) => { @@ -69,6 +72,9 @@ impl MetaSrvEnv { } }; let content = content.trim().to_owned(); + + // An empty license key file is considered as setting it to the default value, + // i.e., `None` when calling `set_param`. let value = (!content.is_empty()).then_some(content); let result = match &mgr { From ba5d4786f2e81d982bf1e3261966647836a784e5 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 8 Oct 2024 14:12:43 +0800 Subject: [PATCH 6/9] fix unit test Signed-off-by: Bugen Zhao --- src/cmd_all/src/standalone.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/cmd_all/src/standalone.rs b/src/cmd_all/src/standalone.rs index fd3e950f34d6..6946930a8b70 100644 --- a/src/cmd_all/src/standalone.rs +++ b/src/cmd_all/src/standalone.rs @@ -468,6 +468,7 @@ mod test { dangerous_max_idle_secs: None, connector_rpc_endpoint: None, license_key: None, + license_key_file: None, temp_secret_file_dir: "./meta/secrets/", }, ), From f42d7e1a5f35f9623b42a284e72033619d095543 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 8 Oct 2024 16:09:21 +0800 Subject: [PATCH 7/9] add unit tests Signed-off-by: Bugen Zhao --- Cargo.lock | 2 + src/license/src/key.rs | 11 +++++ src/license/src/manager.rs | 14 ++++-- src/meta/Cargo.toml | 2 + src/meta/node/src/server.rs | 2 +- src/meta/src/manager/license.rs | 75 +++++++++++++++++++++++++++++---- 6 files changed, 93 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4e0c8f7271eb..a1c4da041028 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11524,6 +11524,7 @@ dependencies = [ "serde_json", "strum 0.26.3", "sync-point", + "tempfile", "thiserror", "thiserror-ext", "tokio-retry", @@ -11531,6 +11532,7 @@ dependencies = [ "tower", "tower-http", "tracing", + "tracing-subscriber", "url", "uuid", "workspace-hack", diff --git a/src/license/src/key.rs b/src/license/src/key.rs index f27ad22719e8..2533c2a533c4 100644 --- a/src/license/src/key.rs +++ b/src/license/src/key.rs @@ -18,6 +18,17 @@ use std::str::FromStr; use serde::{Deserialize, Serialize}; /// A license key with the paid tier that only works in tests. +/// +/// The content is a JWT token with the following payload: +/// ```text +/// License { +/// sub: "rw-test", +/// iss: Test, +/// tier: Paid, +/// cpu_core_limit: None, +/// exp: 9999999999, +/// } +/// ``` pub(crate) const TEST_PAID_LICENSE_KEY_CONTENT: &str = "eyJhbGciOiJSUzUxMiIsInR5cCI6IkpXVCJ9.\ eyJzdWIiOiJydy10ZXN0IiwidGllciI6InBhaWQiLCJpc3MiOiJ0ZXN0LnJpc2luZ3dhdmUuY29tIiwiZXhwIjo5OTk5OTk5OTk5fQ.\ diff --git a/src/license/src/manager.rs b/src/license/src/manager.rs index 5c1bc298388d..b6c1941ea170 100644 --- a/src/license/src/manager.rs +++ b/src/license/src/manager.rs @@ -43,7 +43,7 @@ pub enum Tier { /// /// The issuer must be `prod.risingwave.com` in production, and can be `test.risingwave.com` in /// development. This will be validated when refreshing the license key. -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, Deserialize, PartialEq, Eq)] pub enum Issuer { #[serde(rename = "prod.risingwave.com")] Prod, @@ -58,10 +58,13 @@ pub enum Issuer { /// The content of a license. /// /// We use JSON Web Token (JWT) to represent the license. This struct is the payload. +/// +/// Prefer calling [`crate::Feature::check_available`] to check the availability of a feature, +/// other than directly checking the content of the license. // TODO(license): Shall we add a version field? -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "snake_case")] -pub(super) struct License { +pub struct License { /// Subject of the license. /// /// See . @@ -171,7 +174,10 @@ impl LicenseManager { /// Get the current license if it is valid. /// /// Since the license can expire, the returned license should not be cached by the caller. - pub(super) fn license(&self) -> Result { + /// + /// Prefer calling [`crate::Feature::check_available`] to check the availability of a feature, + /// other than directly calling this method and checking the content of the license. + pub fn license(&self) -> Result { let license = self.inner.read().unwrap().license.clone()?; // Check the expiration time additionally. diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml index 8f02399535ad..4b38734a70fa 100644 --- a/src/meta/Cargo.toml +++ b/src/meta/Cargo.toml @@ -106,6 +106,8 @@ expect-test = "1.5" rand = { workspace = true } risingwave_hummock_sdk = { workspace = true, features = ["test"] } risingwave_test_runner = { workspace = true } +tempfile = "3" +tracing-subscriber = "0.3" [features] test = [] diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index aa37b7c32d5b..1d690cb17646 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -398,7 +398,7 @@ pub async fn start_service_as_election_leader( meta_store_impl, ) .await?; - env.may_start_watch_license_key_file()?; + let _ = env.may_start_watch_license_key_file()?; let system_params_reader = env.system_params_reader().await; let data_directory = system_params_reader.data_directory(); diff --git a/src/meta/src/manager/license.rs b/src/meta/src/manager/license.rs index 20368b031cea..edd86eeb0c9b 100644 --- a/src/meta/src/manager/license.rs +++ b/src/meta/src/manager/license.rs @@ -17,6 +17,7 @@ use notify::Watcher; use risingwave_common::system_param::LICENSE_KEY_KEY; use thiserror_ext::AsReport; use tokio::sync::watch; +use tokio::task::JoinHandle; use super::{MetaSrvEnv, SystemParamsManagerImpl}; use crate::MetaResult; @@ -24,12 +25,14 @@ use crate::MetaResult; impl MetaSrvEnv { /// Spawn background tasks to watch the license key file and update the system parameter, /// if configured. - pub fn may_start_watch_license_key_file(&self) -> MetaResult<()> { + pub fn may_start_watch_license_key_file(&self) -> MetaResult>> { let Some(path) = self.opts.license_key_path.as_ref() else { - return Ok(()); + return Ok(None); }; let (changed_tx, mut changed_rx) = watch::channel(()); + // Send an initial event to trigger the initial load. + changed_tx.send(()).unwrap(); let mut watcher = notify::recommended_watcher(move |event: Result| { @@ -57,6 +60,9 @@ impl MetaSrvEnv { // Let the watcher live until the end of the updater to prevent dropping (then stopping). let _watcher = watcher; + // Read the file content and set the system parameter every time the file changes. + // Note that `changed()` will immediately resolves on the very first call, so we + // will do the initialization then. while changed_rx.changed().await.is_ok() { tracing::info!(path = %path.display(), "license key file changed, reloading..."); @@ -71,11 +77,14 @@ impl MetaSrvEnv { continue; } }; - let content = content.trim().to_owned(); - // An empty license key file is considered as setting it to the default value, - // i.e., `None` when calling `set_param`. - let value = (!content.is_empty()).then_some(content); + // Trim the content and use it as the new license key value. + // + // It's always a `Some`, meaning that an empty license key file here is equivalent to + // `ALTER SYSTEM SET license_key TO ''`, instead of `... TO DEFAULT`. Please note + // the slight difference in behavior of debug build, where the default value of the + // `license_key` system parameter is a test key but not an empty string. + let value = Some(content.trim().to_owned()); let result = match &mgr { SystemParamsManagerImpl::Kv(mgr) => { @@ -95,8 +104,58 @@ impl MetaSrvEnv { } } }; - let _handle = tokio::spawn(updater); - Ok(()) + let handle = tokio::spawn(updater); + Ok(Some(handle)) + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use risingwave_license::{License, LicenseManager, Tier}; + + use super::*; + use crate::manager::MetaOpts; + + // License { + // sub: "rw-test", + // iss: Test, + // tier: Free, <- difference from the default license in debug build + // cpu_core_limit: None, + // exp: 9999999999, + // } + const INITIAL_KEY: &str = + "eyJhbGciOiJSUzUxMiIsInR5cCI6IkpXVCJ9.\ + eyJzdWIiOiJydy10ZXN0IiwidGllciI6ImZyZWUiLCJpc3MiOiJ0ZXN0LnJpc2luZ3dhdmUuY29tIiwiZXhwIjo5OTk5OTk5OTk5fQ.\ + ALC3Kc9LI6u0S-jeMB1YTxg1k8Azxwvc750ihuSZgjA_e1OJC9moxMvpLrHdLZDzCXHjBYi0XJ_1lowmuO_0iPEuPqN5AFpDV1ywmzJvGmMCMtw3A2wuN7hhem9OsWbwe6lzdwrefZLipyo4GZtIkg5ZdwGuHzm33zsM-X5gl_Ns4P6axHKiorNSR6nTAyA6B32YVET_FAM2YJQrXqpwA61wn1XLfarZqpdIQyJ5cgyiC33BFBlUL3lcRXLMLeYe6TjYGeV4K63qARCjM9yeOlsRbbW5ViWeGtR2Yf18pN8ysPXdbaXm_P_IVhl3jCTDJt9ctPh6pUCbkt36FZqO9A"; + + #[tokio::test] + async fn test_watch_license_key_file() { + tracing_subscriber::fmt() + .with_max_level(tracing::Level::INFO) + .init(); + + let key_file = tempfile::NamedTempFile::new().unwrap(); + std::fs::write(key_file.path(), INITIAL_KEY).unwrap(); + + let srv = MetaSrvEnv::for_test_opts(MetaOpts { + license_key_path: Some(key_file.path().to_path_buf()), + ..MetaOpts::test(false) + }) + .await; + let _updater_handle = srv.may_start_watch_license_key_file().unwrap().unwrap(); + + // Since we've filled the key file with the initial key, the license should be loaded. + tokio::time::sleep(Duration::from_secs(1)).await; + let license = LicenseManager::get().license().unwrap(); + assert_eq!(license.tier, Tier::Free); + + // Update the key file with an empty content, which should reset the license to the default. + std::fs::write(key_file.path(), "").unwrap(); + tokio::time::sleep(Duration::from_secs(1)).await; + let license = LicenseManager::get().license().unwrap(); + assert_eq!(license, License::default()); } } From 0466821e274232ad5ccf2a30d1c9ca92d50dccb5 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 8 Oct 2024 16:19:27 +0800 Subject: [PATCH 8/9] add cfg debug assertions Signed-off-by: Bugen Zhao --- src/meta/src/manager/license.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/meta/src/manager/license.rs b/src/meta/src/manager/license.rs index edd86eeb0c9b..fa677ad07358 100644 --- a/src/meta/src/manager/license.rs +++ b/src/meta/src/manager/license.rs @@ -132,6 +132,7 @@ mod tests { ALC3Kc9LI6u0S-jeMB1YTxg1k8Azxwvc750ihuSZgjA_e1OJC9moxMvpLrHdLZDzCXHjBYi0XJ_1lowmuO_0iPEuPqN5AFpDV1ywmzJvGmMCMtw3A2wuN7hhem9OsWbwe6lzdwrefZLipyo4GZtIkg5ZdwGuHzm33zsM-X5gl_Ns4P6axHKiorNSR6nTAyA6B32YVET_FAM2YJQrXqpwA61wn1XLfarZqpdIQyJ5cgyiC33BFBlUL3lcRXLMLeYe6TjYGeV4K63qARCjM9yeOlsRbbW5ViWeGtR2Yf18pN8ysPXdbaXm_P_IVhl3jCTDJt9ctPh6pUCbkt36FZqO9A"; #[tokio::test] + #[cfg_attr(not(debug_assertions), ignore)] // skip in release build async fn test_watch_license_key_file() { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) @@ -150,6 +151,7 @@ mod tests { // Since we've filled the key file with the initial key, the license should be loaded. tokio::time::sleep(Duration::from_secs(1)).await; let license = LicenseManager::get().license().unwrap(); + assert_eq!(license.sub, "rw-test"); assert_eq!(license.tier, Tier::Free); // Update the key file with an empty content, which should reset the license to the default. From 64ab0e0c9363b2b316138e30851ba6da560d8667 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 9 Oct 2024 10:37:57 +0800 Subject: [PATCH 9/9] disable test in madsim Signed-off-by: Bugen Zhao --- src/meta/src/manager/license.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/meta/src/manager/license.rs b/src/meta/src/manager/license.rs index fa677ad07358..f71f167a0dc5 100644 --- a/src/meta/src/manager/license.rs +++ b/src/meta/src/manager/license.rs @@ -131,6 +131,7 @@ mod tests { eyJzdWIiOiJydy10ZXN0IiwidGllciI6ImZyZWUiLCJpc3MiOiJ0ZXN0LnJpc2luZ3dhdmUuY29tIiwiZXhwIjo5OTk5OTk5OTk5fQ.\ ALC3Kc9LI6u0S-jeMB1YTxg1k8Azxwvc750ihuSZgjA_e1OJC9moxMvpLrHdLZDzCXHjBYi0XJ_1lowmuO_0iPEuPqN5AFpDV1ywmzJvGmMCMtw3A2wuN7hhem9OsWbwe6lzdwrefZLipyo4GZtIkg5ZdwGuHzm33zsM-X5gl_Ns4P6axHKiorNSR6nTAyA6B32YVET_FAM2YJQrXqpwA61wn1XLfarZqpdIQyJ5cgyiC33BFBlUL3lcRXLMLeYe6TjYGeV4K63qARCjM9yeOlsRbbW5ViWeGtR2Yf18pN8ysPXdbaXm_P_IVhl3jCTDJt9ctPh6pUCbkt36FZqO9A"; + #[cfg(not(madsim))] // `notify` will spawn system threads, which is not allowed in madsim #[tokio::test] #[cfg_attr(not(debug_assertions), ignore)] // skip in release build async fn test_watch_license_key_file() {