Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] handle panics in query service #2375

Merged
merged 12 commits into from
Jun 19, 2024
36 changes: 31 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions idl/chromadb/proto/debug.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
syntax = "proto3";

package chroma;

import "google/protobuf/empty.proto";

message GetInfoResponse {
string version = 1;
}

service Debug {
rpc GetInfo(google.protobuf.Empty) returns (GetInfoResponse) {}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It this just so when testing manually you can check its up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need a handler that returns a successful response without panicking. Alternatively I could add a shoud_panic option to the request for TriggerPanic, or call a normal rpc route like QueryVectors

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thats fine makes sense

rpc TriggerPanic(google.protobuf.Empty) returns (google.protobuf.Empty) {}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestions welcome for a cleaner way to do this, I just want a way to trigger a panic within a service from a test

8 changes: 7 additions & 1 deletion rust/worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,14 @@ tracing = "0.1"
tracing-bunyan-formatter = "0.3.3"
tracing-opentelemetry = "0.19.0"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
opentelemetry = { version = "0.19.0", default-features = false, features = ["trace", "rt-tokio"] }
opentelemetry = { version = "0.19.0", default-features = false, features = [
"trace",
"rt-tokio",
] }
opentelemetry-otlp = "0.12.0"
shuttle = "0.7.1"
tower = "0.4.13"
hyper = "0.14"


[dev-dependencies]
Expand All @@ -59,6 +64,7 @@ proptest-state-machine = "0.1.0"
"rand" = "0.8.5"
rayon = "1.8.0"
criterion = "0.3"
random-port = "0.1.1"

[build-dependencies]
tonic-build = "0.10"
Expand Down
18 changes: 10 additions & 8 deletions rust/worker/build.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
// Compile the protobuf files in the chromadb proto directory.
let mut proto_paths = vec![
"../../idl/chromadb/proto/chroma.proto",
"../../idl/chromadb/proto/coordinator.proto",
"../../idl/chromadb/proto/logservice.proto",
];

#[cfg(debug_assertions)]
codetheweb marked this conversation as resolved.
Show resolved Hide resolved
proto_paths.push("../../idl/chromadb/proto/debug.proto");

tonic_build::configure()
.emit_rerun_if_changed(true)
.compile(
&[
"../../idl/chromadb/proto/chroma.proto",
"../../idl/chromadb/proto/coordinator.proto",
"../../idl/chromadb/proto/logservice.proto",
],
&["../../idl/"],
)?;
.compile(&proto_paths, &["../../idl/"])?;

// Compile the hnswlib bindings.
cc::Build::new()
Expand Down
113 changes: 101 additions & 12 deletions rust/worker/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::system::{Receiver, System};
use crate::tracing::util::wrap_span_with_parent_context;
use crate::types::MetadataValue;
use crate::types::ScalarEncoding;
use crate::utils::catch_panic_middleware::CatchPanicLayer;
use async_trait::async_trait;
use tokio::signal::unix::{signal, SignalKind};
use tonic::{transport::Server, Request, Response, Status};
Expand Down Expand Up @@ -89,23 +90,29 @@ impl WorkerServer {
let addr = format!("[::]:{}", worker.port).parse().unwrap();
println!("Worker listening on {}", addr);
let server = Server::builder()
.layer(CatchPanicLayer::default())
.add_service(chroma_proto::vector_reader_server::VectorReaderServer::new(
worker.clone(),
))
.add_service(
chroma_proto::metadata_reader_server::MetadataReaderServer::new(worker.clone()),
)
.serve_with_shutdown(addr, async {
let mut sigterm = match signal(SignalKind::terminate()) {
Ok(sigterm) => sigterm,
Err(e) => {
tracing::error!("Failed to create signal handler: {:?}", e);
return;
}
};
sigterm.recv().await;
tracing::info!("Received SIGTERM, shutting down");
});
);

#[cfg(debug_assertions)]
let server =
server.add_service(chroma_proto::debug_server::DebugServer::new(worker.clone()));

let server = server.serve_with_shutdown(addr, async {
let mut sigterm = match signal(SignalKind::terminate()) {
Ok(sigterm) => sigterm,
Err(e) => {
tracing::error!("Failed to create signal handler: {:?}", e);
return;
}
};
sigterm.recv().await;
tracing::info!("Received SIGTERM, shutting down");
});

server.await?;
Ok(())
Expand Down Expand Up @@ -522,3 +529,85 @@ impl chroma_proto::metadata_reader_server::MetadataReader for WorkerServer {
.await
}
}

#[tonic::async_trait]
impl chroma_proto::debug_server::Debug for WorkerServer {
async fn get_info(
&self,
_: Request<()>,
) -> Result<Response<chroma_proto::GetInfoResponse>, Status> {
let response = chroma_proto::GetInfoResponse {
version: option_env!("CARGO_PKG_VERSION")
.unwrap_or("unknown")
.to_string(),
};
Ok(Response::new(response))
}

async fn trigger_panic(&self, _: Request<()>) -> Result<Response<()>, Status> {
panic!("Intentional panic triggered");
}
}

#[cfg(test)]
mod tests {
use crate::execution::dispatcher;
use crate::log::log::InMemoryLog;
use crate::storage::local::LocalStorage;
use crate::storage::Storage;
use crate::sysdb::test_sysdb::TestSysDb;
use crate::system;

use super::*;
use chroma_proto::debug_client::DebugClient;
use tempfile::tempdir;

#[tokio::test]
async fn gracefully_handles_panics() {
let sysdb = TestSysDb::new();
let log = InMemoryLog::new();
let tmp_dir = tempdir().unwrap();
let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap()));

let port = random_port::PortPicker::new().pick().unwrap();
let mut server = WorkerServer {
dispatcher: None,
system: None,
sysdb: Box::new(SysDb::Test(sysdb)),
log: Box::new(Log::InMemory(log)),
hnsw_index_provider: HnswIndexProvider::new(
storage.clone(),
tmp_dir.path().to_path_buf(),
),
blockfile_provider: BlockfileProvider::new_arrow(storage),
port,
};

let system: system::System = system::System::new();
let dispatcher = dispatcher::Dispatcher::new(4, 10, 10);
let dispatcher_handle = system.start_component(dispatcher);

server.set_system(system.clone());
server.set_dispatcher(dispatcher_handle.receiver());

tokio::spawn(async move {
let _ = crate::server::WorkerServer::run(server).await;
});
codetheweb marked this conversation as resolved.
Show resolved Hide resolved

let mut client = DebugClient::connect(format!("http://localhost:{}", port))
.await
.unwrap();

// Test response when handler panics
let err_response = client.trigger_panic(Request::new(())).await.unwrap_err();
assert_eq!(err_response.code(), tonic::Code::Internal);
assert_eq!(
err_response.message(),
"Service panicked: Intentional panic triggered"
);

// The server should still work, even after a panic was thrown
let response = client.get_info(Request::new(())).await;
assert!(response.is_ok());
}
}
11 changes: 2 additions & 9 deletions rust/worker/src/sysdb/test_sysdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,8 @@ impl TestSysDb {
if id.is_some() && id.unwrap() != segment.id {
return false;
}
if r#type.is_some() {
match r#type.unwrap().as_str() {
"hnsw" => {
if segment.r#type != SegmentType::HnswDistributed {
return false;
}
}
_ => return false,
}
if let Some(r#type) = r#type {
HammadB marked this conversation as resolved.
Show resolved Hide resolved
return segment.r#type == SegmentType::try_from(r#type.as_str()).unwrap();
}
if scope.is_some() && scope.unwrap() != segment.scope {
return false;
Expand Down
14 changes: 14 additions & 0 deletions rust/worker/src/types/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,20 @@ impl From<SegmentType> for String {
}
}

impl TryFrom<&str> for SegmentType {
type Error = SegmentConversionError;

fn try_from(segment_type: &str) -> Result<Self, Self::Error> {
match segment_type {
"urn:chroma:segment/vector/hnsw-distributed" => Ok(SegmentType::HnswDistributed),
"urn:chroma:segment/record/blockfile" => Ok(SegmentType::BlockfileRecord),
"urn:chroma:segment/metadata/sqlite" => Ok(SegmentType::Sqlite),
"urn:chroma:segment/metadata/blockfile" => Ok(SegmentType::BlockfileMetadata),
_ => Err(SegmentConversionError::InvalidSegmentType),
}
}
}

#[derive(Clone, Debug, PartialEq)]
pub(crate) struct Segment {
pub(crate) id: Uuid,
Expand Down
Loading
Loading