Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nonblocking socket initialisation #79

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
2 changes: 1 addition & 1 deletion lib/poseidon/broker_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def fetch_metadata_from_broker(broker, topics)
Connection.open(host, port, @client_id, @socket_timeout_ms) do |connection|
connection.topic_metadata(topics)
end
rescue Connection::ConnectionFailedError
rescue Connection::ConnectionFailedError, Connection::TimeoutException
return nil
end

Expand Down
33 changes: 30 additions & 3 deletions lib/poseidon/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def produce(required_acks, timeout, messages_for_topics)
req = ProduceRequest.new( request_common(:produce),
required_acks,
timeout,
messages_for_topics)
messages_for_topics)
send_request(req)
if required_acks != 0
read_response(ProduceResponse)
Expand All @@ -71,7 +71,7 @@ def fetch(max_wait_time, min_bytes, topic_fetches)
REPLICA_ID,
max_wait_time,
min_bytes,
topic_fetches)
topic_fetches)
send_request(req)
read_response(FetchResponse)
end
Expand Down Expand Up @@ -102,13 +102,40 @@ def topic_metadata(topic_names)
def ensure_connected
if @socket.nil? || @socket.closed?
begin
@socket = TCPSocket.new(@host, @port)
@socket = connect_with_timeout(@host, @port, @socket_timeout_ms / 1000.0)
rescue SystemCallError
raise_connection_failed_error
end
end
end

# Explained on http://spin.atomicobject.com/2013/09/30/socket-connection-timeout-ruby/
def connect_with_timeout(host, port, timeout = 5)
addr = Socket.getaddrinfo(host, nil)
sockaddr = Socket.pack_sockaddr_in(port, addr[0][3])

Socket.new(Socket.const_get(addr[0][0]), Socket::SOCK_STREAM, 0).tap do |socket|
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)

begin
socket.connect_nonblock(sockaddr)
rescue IO::WaitWritable
if IO.select(nil, [socket], nil, timeout)
begin
socket.connect_nonblock(sockaddr)
rescue Errno::EISCONN
rescue
socket.close
raise
end
else
socket.close
raise TimeoutException
end
end
end
end

def read_response(response_class)
r = ensure_read_or_timeout(4)
if r.nil?
Expand Down
2 changes: 1 addition & 1 deletion lib/poseidon/partition_consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def close
def handle_options(options)
@max_bytes = options.delete(:max_bytes) || 1024*1024
@min_bytes = options.delete(:min_bytes) || 1
@max_wait_ms = options.delete(:max_wait_ms) || 10_000
@max_wait_ms = options.delete(:max_wait_ms) || 100
@socket_timeout_ms = options.delete(:socket_timeout_ms) || @max_wait_ms + 10_000

if @socket_timeout_ms < @max_wait_ms
Expand Down
2 changes: 1 addition & 1 deletion lib/poseidon/sync_producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def send_to_broker(messages_for_broker)
else
messages_for_broker.successfully_sent(response)
end
rescue Connection::ConnectionFailedError
rescue Connection::ConnectionFailedError, Connection::TimeoutException
false
end
end
Expand Down
9 changes: 8 additions & 1 deletion spec/integration/multiple_brokers/consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,17 @@

RSpec.describe "consuming with multiple brokers", :type => :request do
before(:each) do
@tc = ThreeBrokerCluster.new
@tc.start

# autocreate the topic by asking for information about it
c = Connection.new("localhost", 9092, "metadata_fetcher", 10_000)
md = c.topic_metadata(["test"])
sleep 1
spec_sleep 1, "sreating topic"
end

after(:each) do
@tc.stop
end

it "finds the lead broker for each partition" do
Expand Down
9 changes: 8 additions & 1 deletion spec/integration/multiple_brokers/metadata_failures_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,19 @@
RSpec.describe "handling failures", :type => :request do
describe "metadata failures" do
before(:each) do
@tc = ThreeBrokerCluster.new
@tc.start

@messages_to_send = [
MessageToSend.new("topic1", "hello"),
MessageToSend.new("topic2", "hello")
]
end

after(:each) do
@tc.stop
end

describe "unable to connect to brokers" do
before(:each) do
@p = Producer.new(["localhost:1092","localhost:1093","localhost:1094"], "producer")
Expand All @@ -23,7 +30,7 @@
end

describe "unknown topic" do
it "receives error callback" do
xit "receives error callback" do
pending "need a way to turn off auto-topic creation just for this test"
@p = Producer.new(["localhost:9092","localhost:9093","localhost:9094"], "producer")

Expand Down
17 changes: 12 additions & 5 deletions spec/integration/multiple_brokers/rebalance_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,17 @@

RSpec.describe "producer handles rebalancing", :type => :request do
before(:each) do
@tc = ThreeBrokerCluster.new
@tc.start

# autocreate the topic by asking for information about it
@c = Connection.new("localhost", 9093, "metadata_fetcher", 10_000)
@c.topic_metadata(["failure_spec"])
sleep 1
spec_sleep 1, "creating topic"
end

after(:each) do
@tc.stop
end

def current_leadership_mapping(c)
Expand Down Expand Up @@ -33,15 +40,15 @@ def current_leadership_mapping(c)
#
# We compare leadership before and after the message sending period
# to make sure we were successful.
$tc.stop_first_broker
sleep 30
@tc.stop_first_broker
spec_sleep 30, "stopping first broker waiting for Kafka to move leader"
SPEC_LOGGER.info "Pre start #{current_leadership_mapping(@c).inspect}"
$tc.start_first_broker
@tc.start_first_broker

pre_send_leadership = current_leadership_mapping(@c)
SPEC_LOGGER.info "Pre send #{pre_send_leadership.inspect}"
26.upto(50) do |n|
sleep 0.5
spec_sleep 0.5, "between sending messages"
@p.send_messages([MessageToSend.new("failure_spec", n.to_s)])
end
post_send_leadership = current_leadership_mapping(@c)
Expand Down
13 changes: 11 additions & 2 deletions spec/integration/multiple_brokers/round_robin_spec.rb
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
require 'integration/multiple_brokers/spec_helper'

RSpec.describe "round robin sending", :type => :request do
before(:each) do
@tc = ThreeBrokerCluster.new
@tc.start
end

after(:each) do
@tc.stop
end

describe "with small message batches" do
it "evenly distributes messages across brokers" do
c = Connection.new("localhost", 9092, "metadata_fetcher", 10_000)
md = c.topic_metadata(["test"])
sleep 1
spec_sleep 1, "between sending messages"
md = c.topic_metadata(["test"])

test_topic = md.topics.first
Expand All @@ -28,7 +37,7 @@
@p.send_messages([MessageToSend.new("test", "hello")])
end

sleep 5
spec_sleep 5, "after sending, but before reading the messages"

consumers.each do |c|
messages = c.fetch
Expand Down
66 changes: 66 additions & 0 deletions spec/integration/multiple_brokers/socket_timeout_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
require 'integration/multiple_brokers/spec_helper'
require 'timeout'

RSpec.describe "blocked port", :requires => :sudo_pfctl do
let(:topic) { "test" }
let(:cluster_metadata) { ClusterMetadata.new }
let(:message) { MessageToSend.new(topic, "dupa") }

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@szymonsobczak hey, the content of MessageToSend could be better... ;)

let(:socket_timeout_ms) { 100 }
let(:producer_opts) {{
:required_acks => 1,
:socket_timeout_ms => socket_timeout_ms,
}}

before(:each) do
# enable & reset firewall
Pfctl.enable_pfctl
Pfctl.unblock_ports

@zookeeper = ZookeeperRunner.new
@brokers = (9092..9093).map { |port| BrokerRunner.new(port - 9092, port, 1, 2) }
@zookeeper.start
@brokers.each(&:start)
sleep 2

# autocreate the topic by asking for information about it
c = Connection.new("localhost", 9092, "metadata_fetcher", 10_000)
md = c.topic_metadata([topic])
spec_sleep 1, "creating topic"
end

after(:each) do
Pfctl.unblock_ports

@brokers.each(&:stop)
@zookeeper.stop
spec_sleep 2, "waiting for cluster and ZK to stop"
end

it "socket_timeout effective when port is blocked" do
blocked_broker = block_leader("localhost:9092", cluster_metadata)

expect {
Timeout::timeout(5*socket_timeout_ms / 1000.0) {
prod = Producer.new([broker_string(blocked_broker)], topic, producer_opts)
prod.send_messages([message])
}
}.to raise_error(Poseidon::Errors::UnableToFetchMetadata)
end

it "reconnects to slave within timeout when leader is blocked" do
blocked_broker = block_leader("localhost:9092", cluster_metadata)
not_blocked_broker = not_blocked(cluster_metadata.brokers, blocked_broker)
brokers = [broker_string(blocked_broker), broker_string(not_blocked_broker)]

Timeout::timeout(5*socket_timeout_ms / 1000.0) {
prod = Producer.new(brokers, topic, producer_opts)
prod.send_messages([message])
}

pc = PartitionConsumer.new("test_client", not_blocked_broker.host, not_blocked_broker.port,
topic, 0, :earliest_offset, {:socket_timeout_ms => 100})
res = pc.fetch

expect(res.first.value).to eq message.value
end
end
73 changes: 61 additions & 12 deletions spec/integration/multiple_brokers/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,22 @@ def initialize(properties = {})
def start
@zookeeper.start
@brokers.each(&:start)
sleep 5
SPEC_LOGGER.info "Waiting on cluster"
sleep 2
end

def stop
SPEC_LOGGER.info "Stopping three broker cluster"
SPEC_LOGGER.info "Stopping brokers"
@brokers.each(&:stop)
sleep 5

SPEC_LOGGER.info "Stopping ZK"
@zookeeper.stop
sleep 5
end

def stop_first_broker
SPEC_LOGGER.info "Stopping first broker"
@brokers.first.stop
sleep 5
end

def start_first_broker
Expand All @@ -40,17 +38,68 @@ def start_first_broker
end
end

class Pfctl
class << self
def test_sudo_pfctl
res = `sudo -n pfctl -sa 2>&1`
return res.include? "INFO"
end

def enable_pfctl
SPEC_LOGGER.info "Enabling pfctl."
system "sudo -n pfctl -e > /dev/null 2>&1"
end

def unblock_ports
SPEC_LOGGER.info "Unblocking ports."
system "sudo -n pfctl -f /etc/pf.conf > /dev/null 2>&1"
end

def block_port(port)
SPEC_LOGGER.info "Blocking TCP on port #{port}."
system "(sudo -n pfctl -sr 2>/dev/null; echo 'block drop quick on { en0 lo0 } proto tcp from any to any port = #{port}') | sudo -n pfctl -f - > /dev/null 2>&1"
end
end
end

def broker_string(broker)
"#{broker.host}:#{broker.port}"
end

def not_blocked(brokers, blocked)
all = brokers.keys
all.delete blocked.id
brokers[all.first]
end

def block_leader(broker_str, cluster_metadata)
broker = ""

# simulate graceful shutdown of the machine
BrokerPool.open("test_client", [broker_str], 100) do |broker_pool|
cluster_metadata.update(broker_pool.fetch_metadata([topic]))
broker_pool.update_known_brokers(cluster_metadata.brokers)
broker = cluster_metadata.lead_broker_for_partition(topic, 0)
# first stop the broker - Kafka will move the leader
@brokers[broker.id].stop
# block communication with the broker
Pfctl.block_port broker.port
end

broker
end

RSpec.configure do |config|
if !Pfctl.test_sudo_pfctl then
puts "Unable to execute 'sudo pfctl'. Some tests disabled."
puts "Add the following to /etc/sudoers USING VISUDO!:"
puts "#{`whoami`.strip}\tALL=NOPASSWD:\t/sbin/pfctl *"

config.filter_run_excluding :requires => :sudo_pfctl
end

config.before(:each) do
JavaRunner.remove_tmp
JavaRunner.set_kafka_path!
$tc = ThreeBrokerCluster.new
$tc.start
SPEC_LOGGER.info "Waiting on cluster"
sleep 10 # wait for cluster to come up
end

config.after(:each) do
$tc.stop if $tc
end
end
6 changes: 6 additions & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@

require 'poseidon'
include Poseidon
Poseidon.logger = SPEC_LOGGER

require 'coveralls'
Coveralls.wear!

def spec_sleep(time, reason)
SPEC_LOGGER.info("Sleeping #{time}s, reason: #{reason}...")
sleep time
end
Loading