Skip to content

Commit

Permalink
GH-44361: [C#][Integration] Include .NET in Flight integration tests (#…
Browse files Browse the repository at this point in the history
…44377)

### Rationale for this change

See #44361. This allows testing compatibility of the .NET Flight implementation with other Flight implementations.

### What changes are included in this PR?

* Adds a new `Apache.Arrow.Flight.IntegrationTest` project that can run in server or client mode for Flight integration tests.
* Includes the integration tests that send then retrieve data defined in JSON files, but doesn't add any of the named scenarios
* Configures archery to include C# in the Flight integration tests, but skip all the named scenarios
* Also skips tests that use dictionary data due to #38045, and the empty data test due to #44363

### Are these changes tested?

These changes are tests.

### Are there any user-facing changes?

No
* GitHub Issue: #44361

Authored-by: Adam Reeve <[email protected]>
Signed-off-by: Curt Hagenlocher <[email protected]>
  • Loading branch information
adamreeve authored and amoeba committed Nov 11, 2024
1 parent 47ba570 commit f42c5b0
Show file tree
Hide file tree
Showing 14 changed files with 502 additions and 20 deletions.
6 changes: 6 additions & 0 deletions csharp/Apache.Arrow.sln
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Apache.Arrow.Flight.Sql.Tes
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Apache.Arrow.Flight.Sql", "src\Apache.Arrow.Flight.Sql\Apache.Arrow.Flight.Sql.csproj", "{2ADE087A-B424-4895-8CC5-10170D10BA62}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Apache.Arrow.Flight.IntegrationTest", "test\Apache.Arrow.Flight.IntegrationTest\Apache.Arrow.Flight.IntegrationTest.csproj", "{7E66CBB4-D921-41E7-A98A-7C6DEA521696}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -81,6 +83,10 @@ Global
{2ADE087A-B424-4895-8CC5-10170D10BA62}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2ADE087A-B424-4895-8CC5-10170D10BA62}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2ADE087A-B424-4895-8CC5-10170D10BA62}.Release|Any CPU.Build.0 = Release|Any CPU
{7E66CBB4-D921-41E7-A98A-7C6DEA521696}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{7E66CBB4-D921-41E7-A98A-7C6DEA521696}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7E66CBB4-D921-41E7-A98A-7C6DEA521696}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7E66CBB4-D921-41E7-A98A-7C6DEA521696}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ protected virtual void Dispose(bool disposing)
{
if (!_disposed)
{
_flightDataStream.Dispose();
_flightDataStream?.Dispose();
_disposed = true;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?xml version="1.0" encoding="utf-8"?>
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<RootNamespace>Apache.Arrow.Flight.IntegrationTest</RootNamespace>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="System.CommandLine" Version="2.0.0-beta4.22272.1" />
<PackageReference Include="System.Text.Json" Version="8.0.5" />
<ProjectReference Include="..\..\src\Apache.Arrow.Flight\Apache.Arrow.Flight.csproj" />
<ProjectReference Include="..\Apache.Arrow.Flight.TestWeb\Apache.Arrow.Flight.TestWeb.csproj" />
<ProjectReference Include="..\Apache.Arrow.IntegrationTest\Apache.Arrow.IntegrationTest.csproj" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using System.IO;
using System.Threading.Tasks;

namespace Apache.Arrow.Flight.IntegrationTest;

public class FlightClientCommand
{
private readonly int _port;
private readonly string _scenario;
private readonly FileInfo _jsonFileInfo;

public FlightClientCommand(int port, string scenario, FileInfo jsonFileInfo)
{
_port = port;
_scenario = scenario;
_jsonFileInfo = jsonFileInfo;
}

public async Task Execute()
{
if (!string.IsNullOrEmpty(_scenario))
{
// No named scenarios are currently implemented
throw new Exception($"Scenario '{_scenario}' is not supported.");
}

if (!(_jsonFileInfo?.Exists ?? false))
{
throw new Exception($"Invalid JSON file path '{_jsonFileInfo?.FullName}'");
}

var scenario = new JsonTestScenario(_port, _jsonFileInfo);
await scenario.RunClient().ConfigureAwait(false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using System.Net;
using System.Threading.Tasks;
using Apache.Arrow.Flight.TestWeb;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Hosting.Server;
using Microsoft.AspNetCore.Hosting.Server.Features;
using Microsoft.AspNetCore.Server.Kestrel.Core;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

namespace Apache.Arrow.Flight.IntegrationTest;

public class FlightServerCommand
{
private readonly string _scenario;

public FlightServerCommand(string scenario)
{
_scenario = scenario;
}

public async Task Execute()
{
if (!string.IsNullOrEmpty(_scenario))
{
// No named scenarios are currently implemented
throw new Exception($"Scenario '{_scenario}' is not supported.");
}

var host = Host.CreateDefaultBuilder()
.ConfigureWebHostDefaults(webBuilder =>
{
webBuilder
.ConfigureKestrel(options =>
{
options.Listen(IPEndPoint.Parse("127.0.0.1:0"), l => l.Protocols = HttpProtocols.Http2);
})
.UseStartup<Startup>();
})
.Build();

await host.StartAsync().ConfigureAwait(false);

var addresses = host.Services.GetService<IServer>().Features.Get<IServerAddressesFeature>().Addresses;
foreach (var address in addresses)
{
Console.WriteLine($"Server listening on {address}");
}

await host.WaitForShutdownAsync().ConfigureAwait(false);
}
}
34 changes: 34 additions & 0 deletions csharp/test/Apache.Arrow.Flight.IntegrationTest/GrpcResolver.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using Grpc.Net.Client.Balancer;

namespace Apache.Arrow.Flight.IntegrationTest;

/// <summary>
/// The Grpc.Net.Client library doesn't know how to handle the "grpc+tcp" scheme used by Arrow Flight.
/// This ResolverFactory passes these through to the standard Static Resolver used for the http scheme.
/// </summary>
public class GrpcTcpResolverFactory : ResolverFactory
{
public override string Name => "grpc+tcp";

public override Resolver Create(ResolverOptions options)
{
return new StaticResolverFactory(
uri => new[] { new BalancerAddress(options.Address.Host, options.Address.Port) })
.Create(options);
}
}
167 changes: 167 additions & 0 deletions csharp/test/Apache.Arrow.Flight.IntegrationTest/JsonTestScenario.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Apache.Arrow.Flight.Client;
using Apache.Arrow.IntegrationTest;
using Apache.Arrow.Tests;
using Apache.Arrow.Types;
using Google.Protobuf;
using Grpc.Net.Client;
using Grpc.Core;
using Grpc.Net.Client.Balancer;
using Microsoft.Extensions.DependencyInjection;

namespace Apache.Arrow.Flight.IntegrationTest;

/// <summary>
/// A test scenario defined using a JSON data file
/// </summary>
internal class JsonTestScenario
{
private readonly int _serverPort;
private readonly FileInfo _jsonFile;
private readonly ServiceProvider _serviceProvider;

public JsonTestScenario(int serverPort, FileInfo jsonFile)
{
_serverPort = serverPort;
_jsonFile = jsonFile;

var services = new ServiceCollection();
services.AddSingleton<ResolverFactory>(new GrpcTcpResolverFactory());
_serviceProvider = services.BuildServiceProvider();
}

public async Task RunClient()
{
var address = $"grpc+tcp://localhost:{_serverPort}";
using var channel = GrpcChannel.ForAddress(
address,
new GrpcChannelOptions
{
ServiceProvider = _serviceProvider,
Credentials = ChannelCredentials.Insecure
});
var client = new FlightClient(channel);

var descriptor = FlightDescriptor.CreatePathDescriptor(_jsonFile.FullName);

var jsonFile = await JsonFile.ParseAsync(_jsonFile).ConfigureAwait(false);
var schema = jsonFile.GetSchemaAndDictionaries(out Func<DictionaryType, IArrowArray> dictionaries);
var batches = jsonFile.Batches.Select(batch => batch.ToArrow(schema, dictionaries)).ToArray();

// 1. Put the data to the server.
await UploadBatches(client, descriptor, batches).ConfigureAwait(false);

// 2. Get the ticket for the data.
var info = await client.GetInfo(descriptor).ConfigureAwait(false);
if (info.Endpoints.Count == 0)
{
throw new Exception("No endpoints received");
}

// 3. Stream data from the server, comparing individual batches.
foreach (var endpoint in info.Endpoints)
{
var locations = endpoint.Locations.ToArray();
if (locations.Length == 0)
{
// Can read with existing client
await ConsumeFlightLocation(client, endpoint.Ticket, batches).ConfigureAwait(false);
}
else
{
foreach (var location in locations)
{
using var readChannel = GrpcChannel.ForAddress(
location.Uri,
new GrpcChannelOptions
{
ServiceProvider = _serviceProvider,
Credentials = ChannelCredentials.Insecure
});
var readClient = new FlightClient(readChannel);
await ConsumeFlightLocation(readClient, endpoint.Ticket, batches).ConfigureAwait(false);
}
}
}
}

private static async Task UploadBatches(FlightClient client, FlightDescriptor descriptor, RecordBatch[] batches)
{
using var putCall = client.StartPut(descriptor);
using var writer = putCall.RequestStream;

try
{
var counter = 0;
foreach (var batch in batches)
{
var metadata = $"{counter}";

await writer.WriteAsync(batch, ByteString.CopyFromUtf8(metadata)).ConfigureAwait(false);

// Verify server has acknowledged the write request
await putCall.ResponseStream.MoveNext().ConfigureAwait(false);
var responseString = putCall.ResponseStream.Current.ApplicationMetadata.ToStringUtf8();

if (responseString != metadata)
{
throw new Exception($"Response metadata '{responseString}' does not match expected metadata '{metadata}'");
}

counter++;
}
}
finally
{
await writer.CompleteAsync().ConfigureAwait(false);
}

// Drain the response stream to ensure the server has stored the data
var hasMore = await putCall.ResponseStream.MoveNext().ConfigureAwait(false);
if (hasMore)
{
throw new Exception("Expected to have reached the end of the response stream");
}
}

private static async Task ConsumeFlightLocation(FlightClient client, FlightTicket ticket, RecordBatch[] batches)
{
using var readStream = client.GetStream(ticket);
var counter = 0;
foreach (var originalBatch in batches)
{
if (!await readStream.ResponseStream.MoveNext().ConfigureAwait(false))
{
throw new Exception($"Expected {batches.Length} batches but received {counter}");
}

var batch = readStream.ResponseStream.Current;
ArrowReaderVerifier.CompareBatches(originalBatch, batch, strictCompare: false);

counter++;
}

if (await readStream.ResponseStream.MoveNext().ConfigureAwait(false))
{
throw new Exception($"Expected to reach the end of the response stream after {batches.Length} batches");
}
}
}
Loading

0 comments on commit f42c5b0

Please sign in to comment.