diff --git a/lib/poseidon/broker_pool.rb b/lib/poseidon/broker_pool.rb index fd99b57..7c858d0 100644 --- a/lib/poseidon/broker_pool.rb +++ b/lib/poseidon/broker_pool.rb @@ -67,7 +67,7 @@ def fetch_metadata_from_broker(broker, topics) Connection.open(host, port, @client_id, @socket_timeout_ms) do |connection| connection.topic_metadata(topics) end - rescue Connection::ConnectionFailedError + rescue Connection::ConnectionFailedError, Connection::TimeoutException return nil end diff --git a/lib/poseidon/connection.rb b/lib/poseidon/connection.rb index a2d81f6..2187626 100644 --- a/lib/poseidon/connection.rb +++ b/lib/poseidon/connection.rb @@ -51,7 +51,7 @@ def produce(required_acks, timeout, messages_for_topics) req = ProduceRequest.new( request_common(:produce), required_acks, timeout, - messages_for_topics) + messages_for_topics) send_request(req) if required_acks != 0 read_response(ProduceResponse) @@ -71,7 +71,7 @@ def fetch(max_wait_time, min_bytes, topic_fetches) REPLICA_ID, max_wait_time, min_bytes, - topic_fetches) + topic_fetches) send_request(req) read_response(FetchResponse) end @@ -102,13 +102,40 @@ def topic_metadata(topic_names) def ensure_connected if @socket.nil? || @socket.closed? begin - @socket = TCPSocket.new(@host, @port) + @socket = connect_with_timeout(@host, @port, @socket_timeout_ms / 1000.0) rescue SystemCallError raise_connection_failed_error end end end + # Explained on http://spin.atomicobject.com/2013/09/30/socket-connection-timeout-ruby/ + def connect_with_timeout(host, port, timeout = 5) + addr = Socket.getaddrinfo(host, nil) + sockaddr = Socket.pack_sockaddr_in(port, addr[0][3]) + + Socket.new(Socket.const_get(addr[0][0]), Socket::SOCK_STREAM, 0).tap do |socket| + socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) + + begin + socket.connect_nonblock(sockaddr) + rescue IO::WaitWritable + if IO.select(nil, [socket], nil, timeout) + begin + socket.connect_nonblock(sockaddr) + rescue Errno::EISCONN + rescue + socket.close + raise + end + else + socket.close + raise TimeoutException + end + end + end + end + def read_response(response_class) r = ensure_read_or_timeout(4) if r.nil? diff --git a/lib/poseidon/partition_consumer.rb b/lib/poseidon/partition_consumer.rb index d8abfd1..c933fd5 100644 --- a/lib/poseidon/partition_consumer.rb +++ b/lib/poseidon/partition_consumer.rb @@ -151,7 +151,7 @@ def close def handle_options(options) @max_bytes = options.delete(:max_bytes) || 1024*1024 @min_bytes = options.delete(:min_bytes) || 1 - @max_wait_ms = options.delete(:max_wait_ms) || 10_000 + @max_wait_ms = options.delete(:max_wait_ms) || 100 @socket_timeout_ms = options.delete(:socket_timeout_ms) || @max_wait_ms + 10_000 if @socket_timeout_ms < @max_wait_ms diff --git a/lib/poseidon/sync_producer.rb b/lib/poseidon/sync_producer.rb index e2d460c..0e0ba97 100644 --- a/lib/poseidon/sync_producer.rb +++ b/lib/poseidon/sync_producer.rb @@ -147,7 +147,7 @@ def send_to_broker(messages_for_broker) else messages_for_broker.successfully_sent(response) end - rescue Connection::ConnectionFailedError + rescue Connection::ConnectionFailedError, Connection::TimeoutException false end end diff --git a/spec/integration/multiple_brokers/consumer_spec.rb b/spec/integration/multiple_brokers/consumer_spec.rb index 1fcb6e5..a61f22a 100644 --- a/spec/integration/multiple_brokers/consumer_spec.rb +++ b/spec/integration/multiple_brokers/consumer_spec.rb @@ -2,10 +2,17 @@ RSpec.describe "consuming with multiple brokers", :type => :request do before(:each) do + @tc = ThreeBrokerCluster.new + @tc.start + # autocreate the topic by asking for information about it c = Connection.new("localhost", 9092, "metadata_fetcher", 10_000) md = c.topic_metadata(["test"]) - sleep 1 + spec_sleep 1, "sreating topic" + end + + after(:each) do + @tc.stop end it "finds the lead broker for each partition" do diff --git a/spec/integration/multiple_brokers/metadata_failures_spec.rb b/spec/integration/multiple_brokers/metadata_failures_spec.rb index 2c79c16..d9fc0f5 100644 --- a/spec/integration/multiple_brokers/metadata_failures_spec.rb +++ b/spec/integration/multiple_brokers/metadata_failures_spec.rb @@ -3,12 +3,19 @@ RSpec.describe "handling failures", :type => :request do describe "metadata failures" do before(:each) do + @tc = ThreeBrokerCluster.new + @tc.start + @messages_to_send = [ MessageToSend.new("topic1", "hello"), MessageToSend.new("topic2", "hello") ] end + after(:each) do + @tc.stop + end + describe "unable to connect to brokers" do before(:each) do @p = Producer.new(["localhost:1092","localhost:1093","localhost:1094"], "producer") @@ -23,7 +30,7 @@ end describe "unknown topic" do - it "receives error callback" do + xit "receives error callback" do pending "need a way to turn off auto-topic creation just for this test" @p = Producer.new(["localhost:9092","localhost:9093","localhost:9094"], "producer") diff --git a/spec/integration/multiple_brokers/rebalance_spec.rb b/spec/integration/multiple_brokers/rebalance_spec.rb index 157f4b2..a53789b 100644 --- a/spec/integration/multiple_brokers/rebalance_spec.rb +++ b/spec/integration/multiple_brokers/rebalance_spec.rb @@ -2,10 +2,17 @@ RSpec.describe "producer handles rebalancing", :type => :request do before(:each) do + @tc = ThreeBrokerCluster.new + @tc.start + # autocreate the topic by asking for information about it @c = Connection.new("localhost", 9093, "metadata_fetcher", 10_000) @c.topic_metadata(["failure_spec"]) - sleep 1 + spec_sleep 1, "creating topic" + end + + after(:each) do + @tc.stop end def current_leadership_mapping(c) @@ -33,15 +40,15 @@ def current_leadership_mapping(c) # # We compare leadership before and after the message sending period # to make sure we were successful. - $tc.stop_first_broker - sleep 30 + @tc.stop_first_broker + spec_sleep 30, "stopping first broker waiting for Kafka to move leader" SPEC_LOGGER.info "Pre start #{current_leadership_mapping(@c).inspect}" - $tc.start_first_broker + @tc.start_first_broker pre_send_leadership = current_leadership_mapping(@c) SPEC_LOGGER.info "Pre send #{pre_send_leadership.inspect}" 26.upto(50) do |n| - sleep 0.5 + spec_sleep 0.5, "between sending messages" @p.send_messages([MessageToSend.new("failure_spec", n.to_s)]) end post_send_leadership = current_leadership_mapping(@c) diff --git a/spec/integration/multiple_brokers/round_robin_spec.rb b/spec/integration/multiple_brokers/round_robin_spec.rb index 2e1b26c..27142e9 100644 --- a/spec/integration/multiple_brokers/round_robin_spec.rb +++ b/spec/integration/multiple_brokers/round_robin_spec.rb @@ -1,11 +1,20 @@ require 'integration/multiple_brokers/spec_helper' RSpec.describe "round robin sending", :type => :request do + before(:each) do + @tc = ThreeBrokerCluster.new + @tc.start + end + + after(:each) do + @tc.stop + end + describe "with small message batches" do it "evenly distributes messages across brokers" do c = Connection.new("localhost", 9092, "metadata_fetcher", 10_000) md = c.topic_metadata(["test"]) - sleep 1 + spec_sleep 1, "between sending messages" md = c.topic_metadata(["test"]) test_topic = md.topics.first @@ -28,7 +37,7 @@ @p.send_messages([MessageToSend.new("test", "hello")]) end - sleep 5 + spec_sleep 5, "after sending, but before reading the messages" consumers.each do |c| messages = c.fetch diff --git a/spec/integration/multiple_brokers/socket_timeout_spec.rb b/spec/integration/multiple_brokers/socket_timeout_spec.rb new file mode 100644 index 0000000..a64dd7e --- /dev/null +++ b/spec/integration/multiple_brokers/socket_timeout_spec.rb @@ -0,0 +1,66 @@ +require 'integration/multiple_brokers/spec_helper' +require 'timeout' + +RSpec.describe "blocked port", :requires => :sudo_pfctl do + let(:topic) { "test" } + let(:cluster_metadata) { ClusterMetadata.new } + let(:message) { MessageToSend.new(topic, "dupa") } + let(:socket_timeout_ms) { 100 } + let(:producer_opts) {{ + :required_acks => 1, + :socket_timeout_ms => socket_timeout_ms, + }} + + before(:each) do + # enable & reset firewall + Pfctl.enable_pfctl + Pfctl.unblock_ports + + @zookeeper = ZookeeperRunner.new + @brokers = (9092..9093).map { |port| BrokerRunner.new(port - 9092, port, 1, 2) } + @zookeeper.start + @brokers.each(&:start) + sleep 2 + + # autocreate the topic by asking for information about it + c = Connection.new("localhost", 9092, "metadata_fetcher", 10_000) + md = c.topic_metadata([topic]) + spec_sleep 1, "creating topic" + end + + after(:each) do + Pfctl.unblock_ports + + @brokers.each(&:stop) + @zookeeper.stop + spec_sleep 2, "waiting for cluster and ZK to stop" + end + + it "socket_timeout effective when port is blocked" do + blocked_broker = block_leader("localhost:9092", cluster_metadata) + + expect { + Timeout::timeout(5*socket_timeout_ms / 1000.0) { + prod = Producer.new([broker_string(blocked_broker)], topic, producer_opts) + prod.send_messages([message]) + } + }.to raise_error(Poseidon::Errors::UnableToFetchMetadata) + end + + it "reconnects to slave within timeout when leader is blocked" do + blocked_broker = block_leader("localhost:9092", cluster_metadata) + not_blocked_broker = not_blocked(cluster_metadata.brokers, blocked_broker) + brokers = [broker_string(blocked_broker), broker_string(not_blocked_broker)] + + Timeout::timeout(5*socket_timeout_ms / 1000.0) { + prod = Producer.new(brokers, topic, producer_opts) + prod.send_messages([message]) + } + + pc = PartitionConsumer.new("test_client", not_blocked_broker.host, not_blocked_broker.port, + topic, 0, :earliest_offset, {:socket_timeout_ms => 100}) + res = pc.fetch + + expect(res.first.value).to eq message.value + end +end diff --git a/spec/integration/multiple_brokers/spec_helper.rb b/spec/integration/multiple_brokers/spec_helper.rb index bf07c52..771bd58 100644 --- a/spec/integration/multiple_brokers/spec_helper.rb +++ b/spec/integration/multiple_brokers/spec_helper.rb @@ -14,24 +14,22 @@ def initialize(properties = {}) def start @zookeeper.start @brokers.each(&:start) - sleep 5 + SPEC_LOGGER.info "Waiting on cluster" + sleep 2 end def stop SPEC_LOGGER.info "Stopping three broker cluster" SPEC_LOGGER.info "Stopping brokers" @brokers.each(&:stop) - sleep 5 SPEC_LOGGER.info "Stopping ZK" @zookeeper.stop - sleep 5 end def stop_first_broker SPEC_LOGGER.info "Stopping first broker" @brokers.first.stop - sleep 5 end def start_first_broker @@ -40,17 +38,68 @@ def start_first_broker end end +class Pfctl + class << self + def test_sudo_pfctl + res = `sudo -n pfctl -sa 2>&1` + return res.include? "INFO" + end + + def enable_pfctl + SPEC_LOGGER.info "Enabling pfctl." + system "sudo -n pfctl -e > /dev/null 2>&1" + end + + def unblock_ports + SPEC_LOGGER.info "Unblocking ports." + system "sudo -n pfctl -f /etc/pf.conf > /dev/null 2>&1" + end + + def block_port(port) + SPEC_LOGGER.info "Blocking TCP on port #{port}." + system "(sudo -n pfctl -sr 2>/dev/null; echo 'block drop quick on { en0 lo0 } proto tcp from any to any port = #{port}') | sudo -n pfctl -f - > /dev/null 2>&1" + end + end +end + +def broker_string(broker) + "#{broker.host}:#{broker.port}" +end + +def not_blocked(brokers, blocked) + all = brokers.keys + all.delete blocked.id + brokers[all.first] +end + +def block_leader(broker_str, cluster_metadata) + broker = "" + + # simulate graceful shutdown of the machine + BrokerPool.open("test_client", [broker_str], 100) do |broker_pool| + cluster_metadata.update(broker_pool.fetch_metadata([topic])) + broker_pool.update_known_brokers(cluster_metadata.brokers) + broker = cluster_metadata.lead_broker_for_partition(topic, 0) + # first stop the broker - Kafka will move the leader + @brokers[broker.id].stop + # block communication with the broker + Pfctl.block_port broker.port + end + + broker +end + RSpec.configure do |config| + if !Pfctl.test_sudo_pfctl then + puts "Unable to execute 'sudo pfctl'. Some tests disabled." + puts "Add the following to /etc/sudoers USING VISUDO!:" + puts "#{`whoami`.strip}\tALL=NOPASSWD:\t/sbin/pfctl *" + + config.filter_run_excluding :requires => :sudo_pfctl + end + config.before(:each) do JavaRunner.remove_tmp JavaRunner.set_kafka_path! - $tc = ThreeBrokerCluster.new - $tc.start - SPEC_LOGGER.info "Waiting on cluster" - sleep 10 # wait for cluster to come up - end - - config.after(:each) do - $tc.stop if $tc end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 5214d5f..554092e 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -24,6 +24,12 @@ require 'poseidon' include Poseidon +Poseidon.logger = SPEC_LOGGER require 'coveralls' Coveralls.wear! + +def spec_sleep(time, reason) + SPEC_LOGGER.info("Sleeping #{time}s, reason: #{reason}...") + sleep time +end diff --git a/spec/test_cluster.rb b/spec/test_cluster.rb index 8915629..b48dec7 100644 --- a/spec/test_cluster.rb +++ b/spec/test_cluster.rb @@ -14,13 +14,17 @@ def stop # The broker will end up in a state where it ignores SIGTERM # if zookeeper is stopped before the broker. @broker.stop - sleep 5 + spec_sleep(5, "TestCluster::broker.stop") @zookeeper.stop end end class JavaRunner + KILL_SIGNALS = { + sigterm: 15, + sigkill: 9 + } def self.remove_tmp FileUtils.rm_rf("#{POSEIDON_PATH}/tmp") end @@ -47,7 +51,7 @@ def initialize(id, start_cmd, pid_cmd, kill_signal, properties = {}) @properties = properties @start_cmd = start_cmd @pid_cmd = pid_cmd - @kill_signal = kill_signal + @kill_signal = KILL_SIGNALS[kill_signal] || 9 @stopped = false end @@ -69,24 +73,24 @@ def stop raise "Failed to kill process!" end - SPEC_LOGGER.info "Sending #{@kill_signal} To #{pid}" + SPEC_LOGGER.info "Sending signal #{@kill_signal} to #{pid}" SPEC_LOGGER.info "(#{@start_cmd})" - `kill -#{@kill_signal} #{pid}` + system "kill -#{@kill_signal} #{pid} > /dev/null 2>&1" - sleep 5 + spec_sleep 0.5, "In loop with KILL" end + @stopped = true end end def without_process stop - sleep 5 begin yield ensure start - sleep 5 + spec_sleep 5, "JavaRunner::without_process, ensuring process has started" end end @@ -94,7 +98,7 @@ def without_process def run FileUtils.mkdir_p(log_dir) - `LOG_DIR=#{log_dir} #{@start_cmd} #{config_path}` + system "LOG_DIR=#{log_dir} #{@start_cmd} #{config_path} >> tmp/log/stdout.log 2>&1 &" @stopped = false end @@ -156,20 +160,22 @@ class BrokerRunner def initialize(id, port, partition_count = 1, replication_factor = 1, properties = {}) @id = id @port = port - @jr = JavaRunner.new("broker_#{id}", - "#{ENV['KAFKA_PATH']}/bin/kafka-run-class.sh -daemon -name broker_#{id} kafka.Kafka", + @log_dir = DEFAULT_PROPERTIES["log.dir"] + "_broker_#{id}" + @jr = JavaRunner.new("broker_#{id}", + "#{ENV['KAFKA_PATH']}/bin/kafka-run-class.sh -name broker_#{id} kafka.Kafka", "ps ax | grep -i 'kafka\.Kafka' | grep java | grep broker_#{id} | grep -v grep | awk '{print $1}'", - "SIGTERM", + :sigterm, DEFAULT_PROPERTIES.merge( "broker.id" => id, "port" => port, - "log.dir" => "#{POSEIDON_PATH}/tmp/kafka-logs_#{id}", + "log.dir" => @log_dir, "default.replication.factor" => replication_factor, "num.partitions" => partition_count ).merge(properties)) end def start + FileUtils.mkdir_p(@log_dir) @jr.start end @@ -185,11 +191,13 @@ def without_process class ZookeeperRunner def initialize + @data_dir = "#{POSEIDON_PATH}/tmp/zookeeper" + @jr = JavaRunner.new("zookeeper", - "#{ENV['KAFKA_PATH']}/bin/zookeeper-server-start.sh -daemon", + "#{ENV['KAFKA_PATH']}/bin/zookeeper-server-start.sh", "ps ax | grep -i 'zookeeper' | grep -v grep | awk '{print $1}'", - "SIGKILL", - :dataDir => "#{POSEIDON_PATH}/tmp/zookeeper", + :sigkill, + :dataDir => @data_dir, :clientPort => 2181, :maxClientCnxns => 0) end @@ -197,8 +205,9 @@ def initialize def pid @jr.pid end - + def start + FileUtils.mkdir_p(@data_dir) @jr.start end diff --git a/spec/unit/partition_consumer_spec.rb b/spec/unit/partition_consumer_spec.rb index 8f4a877..02c025b 100644 --- a/spec/unit/partition_consumer_spec.rb +++ b/spec/unit/partition_consumer_spec.rb @@ -97,7 +97,7 @@ end it "uses object defaults" do - expect(@connection).to receive(:fetch).with(10_000, 1, anything) + expect(@connection).to receive(:fetch).with(100, 1, anything) @pc.fetch end