Skip to content

Commit

Permalink
Reorganize arrow-flight test code (apache#6065)
Browse files Browse the repository at this point in the history
* Reorganize test code

* asf header

* reuse TestFixture

* .await

* Create flight_sql_client.rs

* remove code

* remove unused import

* Fix clippy lints
  • Loading branch information
lewiszlw authored Jul 16, 2024
1 parent 6ab853d commit 62f9e72
Show file tree
Hide file tree
Showing 7 changed files with 300 additions and 268 deletions.
5 changes: 5 additions & 0 deletions arrow-flight/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ required-features = ["flight-sql-experimental", "tls"]
name = "flight_sql_client"
required-features = ["cli", "flight-sql-experimental", "tls"]

[[test]]
name = "flight_sql_client"
path = "tests/flight_sql_client.rs"
required-features = ["flight-sql-experimental", "tls"]

[[test]]
name = "flight_sql_client_cli"
path = "tests/flight_sql_client_cli.rs"
Expand Down
108 changes: 7 additions & 101 deletions arrow-flight/tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@

//! Integration test for "mid level" Client
mod common {
pub mod server;
pub mod trailers_layer;
}
mod common;

use crate::common::fixture::TestFixture;
use arrow_array::{RecordBatch, UInt64Array};
use arrow_flight::{
decode::FlightRecordBatchStream, encode::FlightDataEncoderBuilder, error::FlightError, Action,
Expand All @@ -30,18 +29,12 @@ use arrow_flight::{
};
use arrow_schema::{DataType, Field, Schema};
use bytes::Bytes;
use common::{server::TestFlightServer, trailers_layer::TrailersLayer};
use common::server::TestFlightServer;
use futures::{Future, StreamExt, TryStreamExt};
use prost::Message;
use tokio::{net::TcpListener, task::JoinHandle};
use tonic::{
transport::{Channel, Uri},
Status,
};

use std::{net::SocketAddr, sync::Arc, time::Duration};
use tonic::Status;

const DEFAULT_TIMEOUT_SECONDS: u64 = 30;
use std::sync::Arc;

#[tokio::test]
async fn test_handshake() {
Expand Down Expand Up @@ -1123,7 +1116,7 @@ where
Fut: Future<Output = ()>,
{
let test_server = TestFlightServer::new();
let fixture = TestFixture::new(&test_server).await;
let fixture = TestFixture::new(test_server.service()).await;
let client = FlightClient::new(fixture.channel().await);

// run the test function
Expand Down Expand Up @@ -1156,90 +1149,3 @@ fn expect_status(error: FlightError, expected: Status) {
"Got {status:?} want {expected:?}"
);
}

/// Creates and manages a running TestServer with a background task
struct TestFixture {
/// channel to send shutdown command
shutdown: Option<tokio::sync::oneshot::Sender<()>>,

/// Address the server is listening on
addr: SocketAddr,

// handle for the server task
handle: Option<JoinHandle<Result<(), tonic::transport::Error>>>,
}

impl TestFixture {
/// create a new test fixture from the server
pub async fn new(test_server: &TestFlightServer) -> Self {
// let OS choose a a free port
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();

println!("Listening on {addr}");

// prepare the shutdown channel
let (tx, rx) = tokio::sync::oneshot::channel();

let server_timeout = Duration::from_secs(DEFAULT_TIMEOUT_SECONDS);

let shutdown_future = async move {
rx.await.ok();
};

let serve_future = tonic::transport::Server::builder()
.timeout(server_timeout)
.layer(TrailersLayer)
.add_service(test_server.service())
.serve_with_incoming_shutdown(
tokio_stream::wrappers::TcpListenerStream::new(listener),
shutdown_future,
);

// Run the server in its own background task
let handle = tokio::task::spawn(serve_future);

Self {
shutdown: Some(tx),
addr,
handle: Some(handle),
}
}

/// Return a [`Channel`] connected to the TestServer
pub async fn channel(&self) -> Channel {
let url = format!("http://{}", self.addr);
let uri: Uri = url.parse().expect("Valid URI");
Channel::builder(uri)
.timeout(Duration::from_secs(DEFAULT_TIMEOUT_SECONDS))
.connect()
.await
.expect("error connecting to server")
}

/// Stops the test server and waits for the server to shutdown
pub async fn shutdown_and_wait(mut self) {
if let Some(shutdown) = self.shutdown.take() {
shutdown.send(()).expect("server quit early");
}
if let Some(handle) = self.handle.take() {
println!("Waiting on server to finish");
handle
.await
.expect("task join error (panic?)")
.expect("Server Error found at shutdown");
}
}
}

impl Drop for TestFixture {
fn drop(&mut self) {
if let Some(shutdown) = self.shutdown.take() {
shutdown.send(()).ok();
}
if self.handle.is_some() {
// tests should properly clean up TestFixture
println!("TestFixture::Drop called prior to `shutdown_and_wait`");
}
}
}
117 changes: 117 additions & 0 deletions arrow-flight/tests/common/fixture.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::common::trailers_layer::TrailersLayer;
use arrow_flight::flight_service_server::{FlightService, FlightServiceServer};
use http::Uri;
use std::net::SocketAddr;
use std::time::Duration;
use tokio::net::TcpListener;
use tokio::task::JoinHandle;
use tonic::transport::Channel;

/// All tests must complete within this many seconds or else the test server is shutdown
const DEFAULT_TIMEOUT_SECONDS: u64 = 30;

/// Creates and manages a running TestServer with a background task
pub struct TestFixture {
/// channel to send shutdown command
shutdown: Option<tokio::sync::oneshot::Sender<()>>,

/// Address the server is listening on
pub addr: SocketAddr,

/// handle for the server task
handle: Option<JoinHandle<Result<(), tonic::transport::Error>>>,
}

impl TestFixture {
/// create a new test fixture from the server
pub async fn new<T: FlightService>(test_server: FlightServiceServer<T>) -> Self {
// let OS choose a free port
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();

println!("Listening on {addr}");

// prepare the shutdown channel
let (tx, rx) = tokio::sync::oneshot::channel();

let server_timeout = Duration::from_secs(DEFAULT_TIMEOUT_SECONDS);

let shutdown_future = async move {
rx.await.ok();
};

let serve_future = tonic::transport::Server::builder()
.timeout(server_timeout)
.layer(TrailersLayer)
.add_service(test_server)
.serve_with_incoming_shutdown(
tokio_stream::wrappers::TcpListenerStream::new(listener),
shutdown_future,
);

// Run the server in its own background task
let handle = tokio::task::spawn(serve_future);

Self {
shutdown: Some(tx),
addr,
handle: Some(handle),
}
}

/// Return a [`Channel`] connected to the TestServer
#[allow(dead_code)]
pub async fn channel(&self) -> Channel {
let url = format!("http://{}", self.addr);
let uri: Uri = url.parse().expect("Valid URI");
Channel::builder(uri)
.timeout(Duration::from_secs(DEFAULT_TIMEOUT_SECONDS))
.connect()
.await
.expect("error connecting to server")
}

/// Stops the test server and waits for the server to shutdown
#[allow(dead_code)]
pub async fn shutdown_and_wait(mut self) {
if let Some(shutdown) = self.shutdown.take() {
shutdown.send(()).expect("server quit early");
}
if let Some(handle) = self.handle.take() {
println!("Waiting on server to finish");
handle
.await
.expect("task join error (panic?)")
.expect("Server Error found at shutdown");
}
}
}

impl Drop for TestFixture {
fn drop(&mut self) {
if let Some(shutdown) = self.shutdown.take() {
shutdown.send(()).ok();
}
if self.handle.is_some() {
// tests should properly clean up TestFixture
println!("TestFixture::Drop called prior to `shutdown_and_wait`");
}
}
}
20 changes: 20 additions & 0 deletions arrow-flight/tests/common/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

pub mod fixture;
pub mod server;
pub mod trailers_layer;
Loading

0 comments on commit 62f9e72

Please sign in to comment.