Skip to content

Commit

Permalink
fix: no need to wrap the Pact for a mock server in a mutex (mock serv…
Browse files Browse the repository at this point in the history
…er is already behind a mutex) as this can cause deadlocks #274
  • Loading branch information
rholshausen committed Jun 14, 2023
1 parent d677585 commit e58aa91
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 53 deletions.
6 changes: 3 additions & 3 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions rust/pact_consumer/src/mock_server/http_mock_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl ValidatingHttpMockServer {

let (description, url_str) = {
let ms = mock_server.lock().unwrap();
let pact = ms.pact.lock().unwrap();
let pact = ms.pact.as_ref();
let description = format!(
"{}/{}", pact.consumer().name, pact.provider().name
);
Expand Down Expand Up @@ -163,7 +163,7 @@ impl ValidatingHttpMockServer {

let (description, url_str) = {
let ms = mock_server.lock().unwrap();
let pact = ms.pact.lock().unwrap();
let pact = ms.pact.as_ref();
let description = format!(
"{}/{}", pact.consumer().name, pact.provider().name
);
Expand Down Expand Up @@ -194,7 +194,7 @@ impl ValidatingHttpMockServer {

// Send any metrics in another thread as this thread could be panicking due to an assertion.
let interactions = {
let pact = ms.pact.lock().unwrap();
let pact = ms.pact.as_ref();
pact.interactions().len()
};
thread::spawn(move || {
Expand Down
2 changes: 1 addition & 1 deletion rust/pact_ffi/src/mock_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ pub extern fn pactffi_cleanup_mock_server(mock_server_port: i32) -> bool {
let id = pact_mock_server::find_mock_server_by_port(mock_server_port as u16, &|_, id, mock_server| {
let interactions = match mock_server {
Either::Left(ms) => {
let pact = ms.pact.lock().unwrap();
let pact = ms.pact.as_ref();
pact.interactions().len()
},
Either::Right(ms) => ms.pact.interactions.len()
Expand Down
19 changes: 9 additions & 10 deletions rust/pact_mock_server/src/hyper_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ async fn match_result_to_hyper_response(

async fn handle_request(
req: hyper::Request<Body>,
pact: Arc<Mutex<dyn Pact + Send + Sync>>,
pact: Arc<dyn Pact + Send + Sync>,
matches: Arc<Mutex<Vec<MatchResult>>>,
mock_server: Arc<Mutex<MockServer>>
) -> Result<Response<Body>, InteractionError> {
Expand Down Expand Up @@ -303,11 +303,9 @@ async fn handle_request(
);
}

let pact = {
let inner = pact.lock().unwrap();
inner.as_v4_pact().unwrap()
};
let match_result = match_request(&pact_request, &pact).await;
// This unwrap is safe, as all pact models can be upgraded to V4 format
let v4_pact = pact.as_v4_pact().unwrap();
let match_result = match_request(&pact_request, &v4_pact).await;

matches.lock().unwrap().push(match_result.clone());

Expand Down Expand Up @@ -343,14 +341,14 @@ fn handle_mock_request_error(result: Result<Response<Body>, InteractionError>) -
// The reason that the function itself is still async (even if it performs
// no async operations) is that it needs a tokio context to be able to call try_bind.
pub(crate) async fn create_and_bind(
pact: Arc<Mutex<dyn Pact + Send + Sync>>,
pact: Box<dyn Pact + Send + Sync>,
addr: SocketAddr,
shutdown: impl std::future::Future<Output = ()>,
matches: Arc<Mutex<Vec<MatchResult>>>,
mock_server: Arc<Mutex<MockServer>>,
mock_server_id: &String
) -> Result<(impl std::future::Future<Output = ()>, SocketAddr), hyper::Error> {
let pact = pact.clone();
let pact = pact.arced();
let ms_id = Arc::new(mock_server_id.clone());

let server = Server::try_bind(&addr)?
Expand Down Expand Up @@ -409,7 +407,7 @@ impl hyper::server::accept::Accept for HyperAcceptor {
}

pub(crate) async fn create_and_bind_tls(
pact: Arc<Mutex<dyn Pact + Send + Sync>>,
pact: Box<dyn Pact + Send + Sync>,
addr: SocketAddr,
shutdown: impl std::future::Future<Output = ()>,
matches: Arc<Mutex<Vec<MatchResult>>>,
Expand All @@ -430,6 +428,7 @@ pub(crate) async fn create_and_bind_tls(
}
});

let pact = pact.arced();
let server = Server::builder(HyperAcceptor {
stream: tls_stream.boxed()
})
Expand Down Expand Up @@ -482,7 +481,7 @@ mod tests {
let matches = Arc::new(Mutex::new(vec![]));

let (future, _) = create_and_bind(
RequestResponsePact::default().thread_safe(),
RequestResponsePact::default().boxed(),
([0, 0, 0, 0], 0 as u16).into(),
async {
shutdown_rx.await.ok();
Expand Down
38 changes: 18 additions & 20 deletions rust/pact_mock_server/src/mock_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
use std::cell::RefCell;
use std::collections::HashMap;
use std::ffi::CString;
use std::ops::{Deref, DerefMut};
use std::ops::DerefMut;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use pact_models::json_utils::json_to_string;
Expand Down Expand Up @@ -104,7 +104,7 @@ pub struct MockServer {
#[deprecated(since = "0.9.1", note = "Resources should be stored on the mock server manager entry")]
pub resources: Vec<CString>,
/// Pact that this mock server is based on
pub pact: Arc<Mutex<dyn Pact + Send + Sync>>,
pub pact: Box<dyn Pact + Send + Sync>,
/// Receiver of match results
matches: Arc<Mutex<Vec<MatchResult>>>,
/// Shutdown signal
Expand Down Expand Up @@ -135,7 +135,7 @@ impl MockServer {
address: None,
scheme: MockServerScheme::HTTP,
resources: vec![],
pact: pact.thread_safe(),
pact: pact.boxed(),
matches: matches.clone(),
shutdown_tx: RefCell::new(Some(shutdown_tx)),
config: config.clone(),
Expand All @@ -144,7 +144,7 @@ impl MockServer {
}));

let (future, socket_addr) = hyper_server::create_and_bind(
pact.thread_safe(),
pact,
addr,
async {
shutdown_rx.await.ok();
Expand Down Expand Up @@ -185,7 +185,7 @@ impl MockServer {
address: None,
scheme: MockServerScheme::HTTPS,
resources: vec![],
pact: pact.thread_safe(),
pact: pact.boxed(),
matches: matches.clone(),
shutdown_tx: RefCell::new(Some(shutdown_tx)),
config: config.clone(),
Expand All @@ -194,7 +194,7 @@ impl MockServer {
}));

let (future, socket_addr) = hyper_server::create_and_bind_tls(
pact.thread_safe(),
pact,
addr,
async {
shutdown_rx.await.ok();
Expand Down Expand Up @@ -234,13 +234,12 @@ impl MockServer {

/// Converts this mock server to a `Value` struct
pub fn to_json(&self) -> serde_json::Value {
let pact = self.pact.lock().unwrap();
json!({
"id" : self.id.clone(),
"port" : self.port.unwrap_or_default() as u64,
"address" : self.address.clone().unwrap_or_default(),
"scheme" : self.scheme.to_string(),
"provider" : pact.provider().name.clone(),
"provider" : self.pact.provider().name.clone(),
"status" : if self.mismatches().is_empty() { "ok" } else { "error" },
"metrics" : self.metrics
})
Expand All @@ -266,8 +265,7 @@ impl MockServer {
}
}).filter(|o| o.is_some()).map(|o| o.unwrap().clone()).collect();

let pact = self.pact.lock().unwrap();
let interactions = pact.interactions();
let interactions = self.pact.interactions();
let missing = interactions.iter()
.map(|i| i.as_v4_http().unwrap().request)
.filter(|req| !requests.contains(req))
Expand All @@ -278,17 +276,17 @@ impl MockServer {
/// Mock server writes its pact out to the provided directory
pub fn write_pact(&self, output_path: &Option<String>, overwrite: bool) -> anyhow::Result<()> {
trace!("write_pact: output_path = {:?}, overwrite = {}", output_path, overwrite);
let mut pact = self.pact.lock().unwrap();
pact.add_md_version("mockserver", option_env!("CARGO_PKG_VERSION").unwrap_or("unknown"));

let mut v4_pact = pact.as_v4_pact().unwrap_or_default();
let pact = if pact.is_v4() {
let pact = if self.pact.is_v4() {
let mut v4_pact = self.pact.as_v4_pact().unwrap_or_default();
v4_pact.add_md_version("mockserver", option_env!("CARGO_PKG_VERSION").unwrap_or("unknown"));
for interaction in &mut v4_pact.interactions {
interaction.set_transport(Some("http".to_string()));
}
&v4_pact as &(dyn Pact + Send + Sync)
v4_pact.boxed()
} else {
pact.deref()
let mut pact = self.pact.boxed();
pact.add_md_version("mockserver", option_env!("CARGO_PKG_VERSION").unwrap_or("unknown"));
pact
};

let pact_file_name = pact.default_file_name();
Expand All @@ -306,7 +304,7 @@ impl MockServer {
PactSpecification::Unknown => PactSpecification::V3,
_ => self.spec_version
};
match write_pact(pact.boxed(), filename.as_path(), specification, overwrite) {
match write_pact(pact, filename.as_path(), specification, overwrite) {
Ok(_) => Ok(()),
Err(err) => {
warn!("Failed to write pact to file - {}", err);
Expand Down Expand Up @@ -344,7 +342,7 @@ impl Clone for MockServer {
address: self.address.clone(),
scheme: self.scheme.clone(),
resources: vec![],
pact: self.pact.clone(),
pact: self.pact.boxed(),
matches: self.matches.clone(),
shutdown_tx: RefCell::new(None),
config: self.config.clone(),
Expand All @@ -363,7 +361,7 @@ impl Default for MockServer {
port: None,
address: None,
resources: vec![],
pact: Arc::new(Mutex::new(RequestResponsePact::default())),
pact: Box::new(RequestResponsePact::default()),
matches: Arc::new(Mutex::new(vec![])),
shutdown_tx: RefCell::new(None),
config: Default::default(),
Expand Down
14 changes: 10 additions & 4 deletions rust/pact_mock_server/src/server_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use pact_models::prelude::v4::V4Pact;
use pact_plugin_driver::catalogue_manager::{CatalogueEntry, CatalogueEntryProviderType};
use pact_plugin_driver::mock_server::MockServerDetails;
use rustls::ServerConfig;
use tracing::{debug, error};
use tracing::{debug, error, trace};
use url::Url;

use crate::mock_server::{MockServer, MockServerConfig};
Expand Down Expand Up @@ -337,13 +337,19 @@ impl ServerManager {

/// Map all the running mock servers This will only work for locally managed mock servers,
/// not mock servers provided by plugins.
pub fn map_mock_servers<R>(&self, f: &dyn Fn(&MockServer) -> R) -> Vec<R> {
pub fn map_mock_servers<R, F>(&self, f: F) -> Vec<R>
where F: Fn(&MockServer) -> R {
let mut results = vec![];
for (_id_, entry) in self.mock_servers.iter() {
for (id, entry) in self.mock_servers.iter() {
trace!(?id, "mock server entry");
if let Either::Left(mock_server) = &entry.mock_server {
results.push(f(&mock_server.lock().unwrap()));
trace!(?id, "Waiting on lock for mock server");
let guard = mock_server.lock().unwrap();
trace!(?id, "Got access to mock server, invoking callback");
results.push(f(&guard));
}
}
trace!("returning results");
return results;
}

Expand Down
2 changes: 1 addition & 1 deletion rust/pact_mock_server_cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ tracing-core = "0.1.30"
tracing-subscriber = { version = "0.3.16", features = ["env-filter", "local-time", "tracing-log"] }
url = "2.4.0"
uuid = { version = "1.3.3", features = ["v4"] }
webmachine-rust = "0.2.2"
webmachine-rust = "0.3.0"

[dev-dependencies]
quickcheck = "1"
Expand Down
Loading

0 comments on commit e58aa91

Please sign in to comment.