Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Discard not ready connections from HTTP/2 pool #227

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions lib/finch/http2/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,7 @@ defmodule Finch.HTTP2.Pool do
end

@impl true
def init({{scheme, host, port} = shp, registry, _pool_size, pool_opts}) do
{:ok, _} = Registry.register(registry, shp, __MODULE__)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remove the registering code here since we have another one after we enter connected state. IMO, registration happens after we establish the connection makes more sense


def init({{scheme, host, port}, registry, _pool_size, pool_opts}) do
data = %{
conn: nil,
scheme: scheme,
Expand All @@ -171,7 +169,8 @@ defmodule Finch.HTTP2.Pool do
requests: %{},
backoff_base: 500,
backoff_max: 10_000,
connect_opts: pool_opts[:conn_opts] || []
connect_opts: pool_opts[:conn_opts] || [],
registry: registry
}

{:ok, :disconnected, data, {:next_event, :internal, {:connect, 0}}}
Expand All @@ -187,6 +186,8 @@ defmodule Finch.HTTP2.Pool do
# When entering a disconnected state we need to fail all of the pending
# requests
def disconnected(:enter, _, data) do
Registry.unregister(data.registry, {data.scheme, data.host, data.port})

:ok =
Enum.each(data.requests, fn {ref, request} ->
send(request.from_pid, {:error, ref, Error.exception(:connection_closed)})
Expand Down Expand Up @@ -276,7 +277,8 @@ defmodule Finch.HTTP2.Pool do
@doc false
def connected(event, content, data)

def connected(:enter, _old_state, _data) do
def connected(:enter, _old_state, data) do
{:ok, _} = Registry.register(data.registry, {data.scheme, data.host, data.port}, __MODULE__)
:keep_state_and_data
end

Expand Down Expand Up @@ -398,6 +400,8 @@ defmodule Finch.HTTP2.Pool do
def connected_read_only(event, content, data)

def connected_read_only(:enter, _old_state, data) do
Registry.unregister(data.registry, {data.scheme, data.host, data.port})

{actions, data} =
Enum.flat_map_reduce(data.requests, data, fn
# request is awaiting a response and should stay in state
Expand Down
20 changes: 12 additions & 8 deletions test/finch/http2/integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -81,20 +81,24 @@ defmodule Finch.HTTP2.IntegrationTest do
# they shouldn't block each other which we check with a rough time estimates
request = Finch.build(:get, url <> "/wait/1000")

# Warm the connection
{:ok, _} = Finch.request(request, TestFinch)

results =
1..50
|> Enum.map(fn _ ->
Task.async(fn ->
Task.async_stream(
1..50,
fn _ ->
start = System.monotonic_time()
{:ok, _} = Finch.request(request, TestFinch)
System.monotonic_time() - start
end)
end)
|> Enum.map(&Task.await/1)
end,
ordered: false
)
|> Enum.into([])

for result <- results do
for {:ok, result} <- results do
time = System.convert_time_unit(result, :native, :millisecond)
assert time <= 1200
assert time <= 1_050
end
end

Expand Down
97 changes: 97 additions & 0 deletions test/finch/http2/pool_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defmodule Finch.HTTP2.PoolTest do

import Mint.HTTP2.Frame

alias Finch.PoolManager
alias Finch.HTTP2.Pool
alias Finch.MockHTTP2Server

Expand Down Expand Up @@ -162,6 +163,97 @@ defmodule Finch.HTTP2.PoolTest do
request(pool, req, [])
end

test "if connections are in connected_read_only state, don't let clients check them out from the pool",
%{
request: req
} do
us = self()

port =
start_server_and_connect_with(fn port ->
start_supervised!(
{Finch,
name: TestFinch,
pools: %{
"https://localhost:#{port}" => [
protocol: :http2,
count: 1,
conn_opts: [
transport_opts: [
verify: :verify_none
]
]
]
}}
)

port
end)

{pool, _} = PoolManager.lookup_pool(TestFinch, {:https, "localhost", port})

spawn(fn ->
result = request(pool, req, [])
send(us, {:resp, result})
end)

assert_recv_frames([headers(stream_id: stream_id)])

# Force the connection to enter read only mode
server_send_frames([
goaway(last_stream_id: stream_id, error_code: :no_error, debug_data: "all good")
])

:timer.sleep(50)
# The connection should be discarded from the pool
assert :none = PoolManager.lookup_pool(TestFinch, {:https, "localhost", port})
end

test "if connections are in disconnected state, don't let clients check them out from the pool",
%{
request: req
} do
us = self()

port =
start_server_and_connect_with(fn port ->
start_supervised!(
{Finch,
name: TestFinch,
pools: %{
"https://localhost:#{port}" => [
protocol: :http2,
count: 1,
conn_opts: [
transport_opts: [
verify: :verify_none
]
]
]
}}
)

port
end)

{pool, _} = PoolManager.lookup_pool(TestFinch, {:https, "localhost", port})

spawn(fn ->
result = request(pool, req, [])
send(us, {:resp, result})
end)

# If the server closes the socket, the connection should be discarded from the pool
:ok = :ssl.close(server_socket())
:timer.sleep(50)
assert :none = PoolManager.lookup_pool(TestFinch, {:https, "localhost", port})

# But after the client reconnects, it should be added back in
server_accept_socket()
:timer.sleep(50)
assert PoolManager.lookup_pool(TestFinch, {:https, "localhost", port}) != :none
end

test "request timeout with timeout of 0", %{request: req} do
us = self()

Expand Down Expand Up @@ -288,6 +380,11 @@ defmodule Finch.HTTP2.PoolTest do
result
end

defp server_accept_socket() do
server = Process.get(@pdict_key)
MockHTTP2Server.accept_socket(server)
end

defp recv_next_frames(n) do
server = Process.get(@pdict_key)
MockHTTP2Server.recv_next_frames(server, n)
Expand Down
10 changes: 8 additions & 2 deletions test/support/mock_http2_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Finch.MockHTTP2Server do

alias Mint.{HTTP2.Frame, HTTP2.HPACK}

defstruct [:socket, :encode_table, :decode_table]
defstruct [:socket, :encode_table, :decode_table, :listen_socket, :server_settings]

@fixtures_dir Path.expand("../fixtures", __DIR__)

Expand Down Expand Up @@ -36,7 +36,9 @@ defmodule Finch.MockHTTP2Server do
server = %__MODULE__{
socket: server_socket,
encode_table: HPACK.new(4096),
decode_table: HPACK.new(4096)
decode_table: HPACK.new(4096),
listen_socket: listen_socket,
server_settings: server_settings
}

{result, server}
Expand Down Expand Up @@ -107,6 +109,10 @@ defmodule Finch.MockHTTP2Server do
server.socket
end

def accept_socket(%__MODULE__{listen_socket: listen_socket, server_settings: server_settings}) do
accept(listen_socket, self(), server_settings)
end

defp accept(listen_socket, parent, server_settings) do
{:ok, socket} = :ssl.transport_accept(listen_socket)
{:ok, socket} = :ssl.handshake(socket)
Expand Down