From c73657231ba2cde8ec4748103bc1dc4f52dcdaee Mon Sep 17 00:00:00 2001 From: szymonsobczak Date: Mon, 24 Nov 2014 13:05:04 +0100 Subject: [PATCH 1/8] use non-blocking way of initializing sockets --- lib/poseidon/connection.rb | 44 +++++++++++++++++++++++++++++++++++--- 1 file changed, 41 insertions(+), 3 deletions(-) diff --git a/lib/poseidon/connection.rb b/lib/poseidon/connection.rb index a2d81f6..2ecd4c9 100644 --- a/lib/poseidon/connection.rb +++ b/lib/poseidon/connection.rb @@ -51,7 +51,7 @@ def produce(required_acks, timeout, messages_for_topics) req = ProduceRequest.new( request_common(:produce), required_acks, timeout, - messages_for_topics) + messages_for_topics) send_request(req) if required_acks != 0 read_response(ProduceResponse) @@ -71,7 +71,7 @@ def fetch(max_wait_time, min_bytes, topic_fetches) REPLICA_ID, max_wait_time, min_bytes, - topic_fetches) + topic_fetches) send_request(req) read_response(FetchResponse) end @@ -102,13 +102,51 @@ def topic_metadata(topic_names) def ensure_connected if @socket.nil? || @socket.closed? begin - @socket = TCPSocket.new(@host, @port) + @socket = connect_with_timeout(@host, @port, @socket_timeout_ms / 1000.0) rescue SystemCallError raise_connection_failed_error end end end + def connect_with_timeout(host, port, timeout = 5) + # Convert the passed host into structures the non-blocking calls can deal with + addr = Socket.getaddrinfo(host, nil) + sockaddr = Socket.pack_sockaddr_in(port, addr[0][3]) + + Socket.new(Socket.const_get(addr[0][0]), Socket::SOCK_STREAM, 0).tap do |socket| + socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) + + begin + # Initiate the socket connection in the background. If it doesn't fail + # immediatelyit will raise an IO::WaitWritable (Errno::EINPROGRESS) + # indicating the connection is in progress. + socket.connect_nonblock(sockaddr) + + rescue IO::WaitWritable + # IO.select will block until the socket is writable or the timeout + # is exceeded - whichever comes first. + if IO.select(nil, [socket], nil, timeout) + begin + # Verify there is now a good connection + socket.connect_nonblock(sockaddr) + rescue Errno::EISCONN + # Good news everybody, the socket is connected! + rescue + # An unexpected exception was raised - the connection is no good. + socket.close + raise + end + else + # IO.select returns nil when the socket is not ready before timeout + # seconds have elapsed + socket.close + raise "Connection timeout" + end + end + end + end + def read_response(response_class) r = ensure_read_or_timeout(4) if r.nil? From b06ce97ca997fb1be2777f8eec06e5caaf31eb79 Mon Sep 17 00:00:00 2001 From: szymonsobczak Date: Mon, 24 Nov 2014 13:41:29 +0100 Subject: [PATCH 2/8] handle timeouts nicely --- lib/poseidon/broker_pool.rb | 2 +- lib/poseidon/connection.rb | 2 +- lib/poseidon/sync_producer.rb | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/poseidon/broker_pool.rb b/lib/poseidon/broker_pool.rb index fd99b57..7c858d0 100644 --- a/lib/poseidon/broker_pool.rb +++ b/lib/poseidon/broker_pool.rb @@ -67,7 +67,7 @@ def fetch_metadata_from_broker(broker, topics) Connection.open(host, port, @client_id, @socket_timeout_ms) do |connection| connection.topic_metadata(topics) end - rescue Connection::ConnectionFailedError + rescue Connection::ConnectionFailedError, Connection::TimeoutException return nil end diff --git a/lib/poseidon/connection.rb b/lib/poseidon/connection.rb index 2ecd4c9..ce3ee3c 100644 --- a/lib/poseidon/connection.rb +++ b/lib/poseidon/connection.rb @@ -141,7 +141,7 @@ def connect_with_timeout(host, port, timeout = 5) # IO.select returns nil when the socket is not ready before timeout # seconds have elapsed socket.close - raise "Connection timeout" + raise TimeoutException end end end diff --git a/lib/poseidon/sync_producer.rb b/lib/poseidon/sync_producer.rb index e2d460c..0e0ba97 100644 --- a/lib/poseidon/sync_producer.rb +++ b/lib/poseidon/sync_producer.rb @@ -147,7 +147,7 @@ def send_to_broker(messages_for_broker) else messages_for_broker.successfully_sent(response) end - rescue Connection::ConnectionFailedError + rescue Connection::ConnectionFailedError, Connection::TimeoutException false end end From 08c85fb1c5789527d5c9ade8c12a163affdf6bdd Mon Sep 17 00:00:00 2001 From: szymonsobczak Date: Mon, 24 Nov 2014 16:35:27 +0100 Subject: [PATCH 3/8] remove verbose comments, add link to author --- lib/poseidon/connection.rb | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/lib/poseidon/connection.rb b/lib/poseidon/connection.rb index ce3ee3c..2187626 100644 --- a/lib/poseidon/connection.rb +++ b/lib/poseidon/connection.rb @@ -109,8 +109,8 @@ def ensure_connected end end + # Explained on http://spin.atomicobject.com/2013/09/30/socket-connection-timeout-ruby/ def connect_with_timeout(host, port, timeout = 5) - # Convert the passed host into structures the non-blocking calls can deal with addr = Socket.getaddrinfo(host, nil) sockaddr = Socket.pack_sockaddr_in(port, addr[0][3]) @@ -118,28 +118,17 @@ def connect_with_timeout(host, port, timeout = 5) socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) begin - # Initiate the socket connection in the background. If it doesn't fail - # immediatelyit will raise an IO::WaitWritable (Errno::EINPROGRESS) - # indicating the connection is in progress. socket.connect_nonblock(sockaddr) - rescue IO::WaitWritable - # IO.select will block until the socket is writable or the timeout - # is exceeded - whichever comes first. if IO.select(nil, [socket], nil, timeout) begin - # Verify there is now a good connection socket.connect_nonblock(sockaddr) rescue Errno::EISCONN - # Good news everybody, the socket is connected! rescue - # An unexpected exception was raised - the connection is no good. socket.close raise end else - # IO.select returns nil when the socket is not ready before timeout - # seconds have elapsed socket.close raise TimeoutException end From 04815e1ba7dabb43ae1a068ec87bba72641e13cf Mon Sep 17 00:00:00 2001 From: szymonsobczak Date: Tue, 27 Jan 2015 17:26:20 +0100 Subject: [PATCH 4/8] make integration test pass on osx, decrease number of timeouts Conflicts: spec/test_cluster.rb --- .../multiple_brokers/consumer_spec.rb | 2 +- .../multiple_brokers/rebalance_spec.rb | 6 ++-- .../multiple_brokers/round_robin_spec.rb | 4 +-- .../multiple_brokers/spec_helper.rb | 14 ++------ spec/spec_helper.rb | 6 ++++ spec/test_cluster.rb | 33 +++++++++++-------- 6 files changed, 33 insertions(+), 32 deletions(-) diff --git a/spec/integration/multiple_brokers/consumer_spec.rb b/spec/integration/multiple_brokers/consumer_spec.rb index 1fcb6e5..d7d91b3 100644 --- a/spec/integration/multiple_brokers/consumer_spec.rb +++ b/spec/integration/multiple_brokers/consumer_spec.rb @@ -5,7 +5,7 @@ # autocreate the topic by asking for information about it c = Connection.new("localhost", 9092, "metadata_fetcher", 10_000) md = c.topic_metadata(["test"]) - sleep 1 + spec_sleep 1, "sreating topic" end it "finds the lead broker for each partition" do diff --git a/spec/integration/multiple_brokers/rebalance_spec.rb b/spec/integration/multiple_brokers/rebalance_spec.rb index 157f4b2..2d47ca9 100644 --- a/spec/integration/multiple_brokers/rebalance_spec.rb +++ b/spec/integration/multiple_brokers/rebalance_spec.rb @@ -5,7 +5,7 @@ # autocreate the topic by asking for information about it @c = Connection.new("localhost", 9093, "metadata_fetcher", 10_000) @c.topic_metadata(["failure_spec"]) - sleep 1 + spec_sleep 1, "creating topic" end def current_leadership_mapping(c) @@ -34,14 +34,14 @@ def current_leadership_mapping(c) # We compare leadership before and after the message sending period # to make sure we were successful. $tc.stop_first_broker - sleep 30 + spec_sleep 30, "stopping first broker waiting for Kafka to move leader" SPEC_LOGGER.info "Pre start #{current_leadership_mapping(@c).inspect}" $tc.start_first_broker pre_send_leadership = current_leadership_mapping(@c) SPEC_LOGGER.info "Pre send #{pre_send_leadership.inspect}" 26.upto(50) do |n| - sleep 0.5 + spec_sleep 0.5, "between sending messages" @p.send_messages([MessageToSend.new("failure_spec", n.to_s)]) end post_send_leadership = current_leadership_mapping(@c) diff --git a/spec/integration/multiple_brokers/round_robin_spec.rb b/spec/integration/multiple_brokers/round_robin_spec.rb index 2e1b26c..0115e5a 100644 --- a/spec/integration/multiple_brokers/round_robin_spec.rb +++ b/spec/integration/multiple_brokers/round_robin_spec.rb @@ -5,7 +5,7 @@ it "evenly distributes messages across brokers" do c = Connection.new("localhost", 9092, "metadata_fetcher", 10_000) md = c.topic_metadata(["test"]) - sleep 1 + spec_sleep 1, "between sending messages" md = c.topic_metadata(["test"]) test_topic = md.topics.first @@ -28,7 +28,7 @@ @p.send_messages([MessageToSend.new("test", "hello")]) end - sleep 5 + spec_sleep 5, "after sending, but before reading the messages" consumers.each do |c| messages = c.fetch diff --git a/spec/integration/multiple_brokers/spec_helper.rb b/spec/integration/multiple_brokers/spec_helper.rb index bf07c52..eba4c39 100644 --- a/spec/integration/multiple_brokers/spec_helper.rb +++ b/spec/integration/multiple_brokers/spec_helper.rb @@ -14,24 +14,22 @@ def initialize(properties = {}) def start @zookeeper.start @brokers.each(&:start) - sleep 5 + SPEC_LOGGER.info "Waiting on cluster" + sleep 2 end def stop SPEC_LOGGER.info "Stopping three broker cluster" SPEC_LOGGER.info "Stopping brokers" @brokers.each(&:stop) - sleep 5 SPEC_LOGGER.info "Stopping ZK" @zookeeper.stop - sleep 5 end def stop_first_broker SPEC_LOGGER.info "Stopping first broker" @brokers.first.stop - sleep 5 end def start_first_broker @@ -44,13 +42,5 @@ def start_first_broker config.before(:each) do JavaRunner.remove_tmp JavaRunner.set_kafka_path! - $tc = ThreeBrokerCluster.new - $tc.start - SPEC_LOGGER.info "Waiting on cluster" - sleep 10 # wait for cluster to come up - end - - config.after(:each) do - $tc.stop if $tc end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 5214d5f..bbb2bff 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -24,6 +24,12 @@ require 'poseidon' include Poseidon +Poseidon.logger = SPEC_LOGGER require 'coveralls' Coveralls.wear! + +def spec_sleep(time, reason) + SPEC_LOGGER.info("Sleeping #{time}, reason: #{reason}...") + sleep time +end diff --git a/spec/test_cluster.rb b/spec/test_cluster.rb index 8915629..7ff2c20 100644 --- a/spec/test_cluster.rb +++ b/spec/test_cluster.rb @@ -14,13 +14,17 @@ def stop # The broker will end up in a state where it ignores SIGTERM # if zookeeper is stopped before the broker. @broker.stop - sleep 5 + spec_sleep(5, "TestCluster::broker.stop") @zookeeper.stop end end class JavaRunner + KILL_SIGNALS = { + sigterm: 15, + sigkill: 9 + } def self.remove_tmp FileUtils.rm_rf("#{POSEIDON_PATH}/tmp") end @@ -47,7 +51,7 @@ def initialize(id, start_cmd, pid_cmd, kill_signal, properties = {}) @properties = properties @start_cmd = start_cmd @pid_cmd = pid_cmd - @kill_signal = kill_signal + @kill_signal = KILL_SIGNALS[kill_signal] || 9 @stopped = false end @@ -69,24 +73,24 @@ def stop raise "Failed to kill process!" end - SPEC_LOGGER.info "Sending #{@kill_signal} To #{pid}" + SPEC_LOGGER.info "Sending signal #{@kill_signal} to #{pid}" SPEC_LOGGER.info "(#{@start_cmd})" - `kill -#{@kill_signal} #{pid}` + system "kill -#{@kill_signal} #{pid} > /dev/null 2>&1" - sleep 5 + spec_sleep 0.5, "In loop with KILL" end + @stopped = true end end def without_process stop - sleep 5 begin yield ensure start - sleep 5 + spec_sleep 5, "JavaRunner::without_process, ensuring process has started" end end @@ -94,7 +98,7 @@ def without_process def run FileUtils.mkdir_p(log_dir) - `LOG_DIR=#{log_dir} #{@start_cmd} #{config_path}` + system "LOG_DIR=#{log_dir} #{@start_cmd} #{config_path} >> tmp/log/stdout.log 2>&1 &" @stopped = false end @@ -156,10 +160,11 @@ class BrokerRunner def initialize(id, port, partition_count = 1, replication_factor = 1, properties = {}) @id = id @port = port - @jr = JavaRunner.new("broker_#{id}", - "#{ENV['KAFKA_PATH']}/bin/kafka-run-class.sh -daemon -name broker_#{id} kafka.Kafka", + @log_dir = DEFAULT_PROPERTIES["log.dir"] + "_broker_#{id}" + @jr = JavaRunner.new("broker_#{id}", + "#{ENV['KAFKA_PATH']}/bin/kafka-run-class.sh -name broker_#{id} kafka.Kafka", "ps ax | grep -i 'kafka\.Kafka' | grep java | grep broker_#{id} | grep -v grep | awk '{print $1}'", - "SIGTERM", + :sigterm, DEFAULT_PROPERTIES.merge( "broker.id" => id, "port" => port, @@ -186,10 +191,10 @@ def without_process class ZookeeperRunner def initialize @jr = JavaRunner.new("zookeeper", - "#{ENV['KAFKA_PATH']}/bin/zookeeper-server-start.sh -daemon", + "#{ENV['KAFKA_PATH']}/bin/zookeeper-server-start.sh", "ps ax | grep -i 'zookeeper' | grep -v grep | awk '{print $1}'", - "SIGKILL", - :dataDir => "#{POSEIDON_PATH}/tmp/zookeeper", + :sigkill, + :dataDir => @data_dir, :clientPort => 2181, :maxClientCnxns => 0) end From 2d083104885c795b98b28679edeb5bb6b9fa0e4d Mon Sep 17 00:00:00 2001 From: szymonsobczak Date: Tue, 27 Jan 2015 17:27:56 +0100 Subject: [PATCH 5/8] add specs for blocked port --- .../multiple_brokers/consumer_spec.rb | 7 ++ .../metadata_failures_spec.rb | 7 ++ .../multiple_brokers/rebalance_spec.rb | 11 +++- .../multiple_brokers/round_robin_spec.rb | 9 +++ .../multiple_brokers/socket_timeout_spec.rb | 66 +++++++++++++++++++ .../multiple_brokers/spec_helper.rb | 59 +++++++++++++++++ spec/spec_helper.rb | 2 +- spec/test_cluster.rb | 8 ++- 8 files changed, 164 insertions(+), 5 deletions(-) create mode 100644 spec/integration/multiple_brokers/socket_timeout_spec.rb diff --git a/spec/integration/multiple_brokers/consumer_spec.rb b/spec/integration/multiple_brokers/consumer_spec.rb index d7d91b3..a61f22a 100644 --- a/spec/integration/multiple_brokers/consumer_spec.rb +++ b/spec/integration/multiple_brokers/consumer_spec.rb @@ -2,12 +2,19 @@ RSpec.describe "consuming with multiple brokers", :type => :request do before(:each) do + @tc = ThreeBrokerCluster.new + @tc.start + # autocreate the topic by asking for information about it c = Connection.new("localhost", 9092, "metadata_fetcher", 10_000) md = c.topic_metadata(["test"]) spec_sleep 1, "sreating topic" end + after(:each) do + @tc.stop + end + it "finds the lead broker for each partition" do brokers = Set.new 0.upto(2) do |partition| diff --git a/spec/integration/multiple_brokers/metadata_failures_spec.rb b/spec/integration/multiple_brokers/metadata_failures_spec.rb index 2c79c16..50914d2 100644 --- a/spec/integration/multiple_brokers/metadata_failures_spec.rb +++ b/spec/integration/multiple_brokers/metadata_failures_spec.rb @@ -3,12 +3,19 @@ RSpec.describe "handling failures", :type => :request do describe "metadata failures" do before(:each) do + @tc = ThreeBrokerCluster.new + @tc.start + @messages_to_send = [ MessageToSend.new("topic1", "hello"), MessageToSend.new("topic2", "hello") ] end + after(:each) do + @tc.stop + end + describe "unable to connect to brokers" do before(:each) do @p = Producer.new(["localhost:1092","localhost:1093","localhost:1094"], "producer") diff --git a/spec/integration/multiple_brokers/rebalance_spec.rb b/spec/integration/multiple_brokers/rebalance_spec.rb index 2d47ca9..a53789b 100644 --- a/spec/integration/multiple_brokers/rebalance_spec.rb +++ b/spec/integration/multiple_brokers/rebalance_spec.rb @@ -2,12 +2,19 @@ RSpec.describe "producer handles rebalancing", :type => :request do before(:each) do + @tc = ThreeBrokerCluster.new + @tc.start + # autocreate the topic by asking for information about it @c = Connection.new("localhost", 9093, "metadata_fetcher", 10_000) @c.topic_metadata(["failure_spec"]) spec_sleep 1, "creating topic" end + after(:each) do + @tc.stop + end + def current_leadership_mapping(c) metadata = c.topic_metadata(["failure_spec"]) topic_metadata = metadata.topics.find { |t| t.name == "failure_spec" } @@ -33,10 +40,10 @@ def current_leadership_mapping(c) # # We compare leadership before and after the message sending period # to make sure we were successful. - $tc.stop_first_broker + @tc.stop_first_broker spec_sleep 30, "stopping first broker waiting for Kafka to move leader" SPEC_LOGGER.info "Pre start #{current_leadership_mapping(@c).inspect}" - $tc.start_first_broker + @tc.start_first_broker pre_send_leadership = current_leadership_mapping(@c) SPEC_LOGGER.info "Pre send #{pre_send_leadership.inspect}" diff --git a/spec/integration/multiple_brokers/round_robin_spec.rb b/spec/integration/multiple_brokers/round_robin_spec.rb index 0115e5a..27142e9 100644 --- a/spec/integration/multiple_brokers/round_robin_spec.rb +++ b/spec/integration/multiple_brokers/round_robin_spec.rb @@ -1,6 +1,15 @@ require 'integration/multiple_brokers/spec_helper' RSpec.describe "round robin sending", :type => :request do + before(:each) do + @tc = ThreeBrokerCluster.new + @tc.start + end + + after(:each) do + @tc.stop + end + describe "with small message batches" do it "evenly distributes messages across brokers" do c = Connection.new("localhost", 9092, "metadata_fetcher", 10_000) diff --git a/spec/integration/multiple_brokers/socket_timeout_spec.rb b/spec/integration/multiple_brokers/socket_timeout_spec.rb new file mode 100644 index 0000000..a64dd7e --- /dev/null +++ b/spec/integration/multiple_brokers/socket_timeout_spec.rb @@ -0,0 +1,66 @@ +require 'integration/multiple_brokers/spec_helper' +require 'timeout' + +RSpec.describe "blocked port", :requires => :sudo_pfctl do + let(:topic) { "test" } + let(:cluster_metadata) { ClusterMetadata.new } + let(:message) { MessageToSend.new(topic, "dupa") } + let(:socket_timeout_ms) { 100 } + let(:producer_opts) {{ + :required_acks => 1, + :socket_timeout_ms => socket_timeout_ms, + }} + + before(:each) do + # enable & reset firewall + Pfctl.enable_pfctl + Pfctl.unblock_ports + + @zookeeper = ZookeeperRunner.new + @brokers = (9092..9093).map { |port| BrokerRunner.new(port - 9092, port, 1, 2) } + @zookeeper.start + @brokers.each(&:start) + sleep 2 + + # autocreate the topic by asking for information about it + c = Connection.new("localhost", 9092, "metadata_fetcher", 10_000) + md = c.topic_metadata([topic]) + spec_sleep 1, "creating topic" + end + + after(:each) do + Pfctl.unblock_ports + + @brokers.each(&:stop) + @zookeeper.stop + spec_sleep 2, "waiting for cluster and ZK to stop" + end + + it "socket_timeout effective when port is blocked" do + blocked_broker = block_leader("localhost:9092", cluster_metadata) + + expect { + Timeout::timeout(5*socket_timeout_ms / 1000.0) { + prod = Producer.new([broker_string(blocked_broker)], topic, producer_opts) + prod.send_messages([message]) + } + }.to raise_error(Poseidon::Errors::UnableToFetchMetadata) + end + + it "reconnects to slave within timeout when leader is blocked" do + blocked_broker = block_leader("localhost:9092", cluster_metadata) + not_blocked_broker = not_blocked(cluster_metadata.brokers, blocked_broker) + brokers = [broker_string(blocked_broker), broker_string(not_blocked_broker)] + + Timeout::timeout(5*socket_timeout_ms / 1000.0) { + prod = Producer.new(brokers, topic, producer_opts) + prod.send_messages([message]) + } + + pc = PartitionConsumer.new("test_client", not_blocked_broker.host, not_blocked_broker.port, + topic, 0, :earliest_offset, {:socket_timeout_ms => 100}) + res = pc.fetch + + expect(res.first.value).to eq message.value + end +end diff --git a/spec/integration/multiple_brokers/spec_helper.rb b/spec/integration/multiple_brokers/spec_helper.rb index eba4c39..771bd58 100644 --- a/spec/integration/multiple_brokers/spec_helper.rb +++ b/spec/integration/multiple_brokers/spec_helper.rb @@ -38,7 +38,66 @@ def start_first_broker end end +class Pfctl + class << self + def test_sudo_pfctl + res = `sudo -n pfctl -sa 2>&1` + return res.include? "INFO" + end + + def enable_pfctl + SPEC_LOGGER.info "Enabling pfctl." + system "sudo -n pfctl -e > /dev/null 2>&1" + end + + def unblock_ports + SPEC_LOGGER.info "Unblocking ports." + system "sudo -n pfctl -f /etc/pf.conf > /dev/null 2>&1" + end + + def block_port(port) + SPEC_LOGGER.info "Blocking TCP on port #{port}." + system "(sudo -n pfctl -sr 2>/dev/null; echo 'block drop quick on { en0 lo0 } proto tcp from any to any port = #{port}') | sudo -n pfctl -f - > /dev/null 2>&1" + end + end +end + +def broker_string(broker) + "#{broker.host}:#{broker.port}" +end + +def not_blocked(brokers, blocked) + all = brokers.keys + all.delete blocked.id + brokers[all.first] +end + +def block_leader(broker_str, cluster_metadata) + broker = "" + + # simulate graceful shutdown of the machine + BrokerPool.open("test_client", [broker_str], 100) do |broker_pool| + cluster_metadata.update(broker_pool.fetch_metadata([topic])) + broker_pool.update_known_brokers(cluster_metadata.brokers) + broker = cluster_metadata.lead_broker_for_partition(topic, 0) + # first stop the broker - Kafka will move the leader + @brokers[broker.id].stop + # block communication with the broker + Pfctl.block_port broker.port + end + + broker +end + RSpec.configure do |config| + if !Pfctl.test_sudo_pfctl then + puts "Unable to execute 'sudo pfctl'. Some tests disabled." + puts "Add the following to /etc/sudoers USING VISUDO!:" + puts "#{`whoami`.strip}\tALL=NOPASSWD:\t/sbin/pfctl *" + + config.filter_run_excluding :requires => :sudo_pfctl + end + config.before(:each) do JavaRunner.remove_tmp JavaRunner.set_kafka_path! diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index bbb2bff..554092e 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -30,6 +30,6 @@ Coveralls.wear! def spec_sleep(time, reason) - SPEC_LOGGER.info("Sleeping #{time}, reason: #{reason}...") + SPEC_LOGGER.info("Sleeping #{time}s, reason: #{reason}...") sleep time end diff --git a/spec/test_cluster.rb b/spec/test_cluster.rb index 7ff2c20..b48dec7 100644 --- a/spec/test_cluster.rb +++ b/spec/test_cluster.rb @@ -168,13 +168,14 @@ def initialize(id, port, partition_count = 1, replication_factor = 1, properties DEFAULT_PROPERTIES.merge( "broker.id" => id, "port" => port, - "log.dir" => "#{POSEIDON_PATH}/tmp/kafka-logs_#{id}", + "log.dir" => @log_dir, "default.replication.factor" => replication_factor, "num.partitions" => partition_count ).merge(properties)) end def start + FileUtils.mkdir_p(@log_dir) @jr.start end @@ -190,6 +191,8 @@ def without_process class ZookeeperRunner def initialize + @data_dir = "#{POSEIDON_PATH}/tmp/zookeeper" + @jr = JavaRunner.new("zookeeper", "#{ENV['KAFKA_PATH']}/bin/zookeeper-server-start.sh", "ps ax | grep -i 'zookeeper' | grep -v grep | awk '{print $1}'", @@ -202,8 +205,9 @@ def initialize def pid @jr.pid end - + def start + FileUtils.mkdir_p(@data_dir) @jr.start end From cedb7d673df4d5e251e926048fdc1f78cee5636c Mon Sep 17 00:00:00 2001 From: szymonsobczak Date: Tue, 27 Jan 2015 17:43:01 +0100 Subject: [PATCH 6/8] change consumer's max_wait_ms to 100, according to docs --- lib/poseidon/partition_consumer.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/poseidon/partition_consumer.rb b/lib/poseidon/partition_consumer.rb index d8abfd1..c933fd5 100644 --- a/lib/poseidon/partition_consumer.rb +++ b/lib/poseidon/partition_consumer.rb @@ -151,7 +151,7 @@ def close def handle_options(options) @max_bytes = options.delete(:max_bytes) || 1024*1024 @min_bytes = options.delete(:min_bytes) || 1 - @max_wait_ms = options.delete(:max_wait_ms) || 10_000 + @max_wait_ms = options.delete(:max_wait_ms) || 100 @socket_timeout_ms = options.delete(:socket_timeout_ms) || @max_wait_ms + 10_000 if @socket_timeout_ms < @max_wait_ms From b67a106499c5d2a61a977fb0b68be3ab8996f9ac Mon Sep 17 00:00:00 2001 From: szymonsobczak Date: Tue, 27 Jan 2015 17:45:30 +0100 Subject: [PATCH 7/8] temporarily disable pending spec --- spec/integration/multiple_brokers/metadata_failures_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/integration/multiple_brokers/metadata_failures_spec.rb b/spec/integration/multiple_brokers/metadata_failures_spec.rb index 50914d2..d9fc0f5 100644 --- a/spec/integration/multiple_brokers/metadata_failures_spec.rb +++ b/spec/integration/multiple_brokers/metadata_failures_spec.rb @@ -30,7 +30,7 @@ end describe "unknown topic" do - it "receives error callback" do + xit "receives error callback" do pending "need a way to turn off auto-topic creation just for this test" @p = Producer.new(["localhost:9092","localhost:9093","localhost:9094"], "producer") From a6877d18a86c8aca86d92cbf3051101ee7b08017 Mon Sep 17 00:00:00 2001 From: szymonsobczak Date: Tue, 27 Jan 2015 18:15:50 +0100 Subject: [PATCH 8/8] missing changed spec --- spec/unit/partition_consumer_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/unit/partition_consumer_spec.rb b/spec/unit/partition_consumer_spec.rb index 8f4a877..02c025b 100644 --- a/spec/unit/partition_consumer_spec.rb +++ b/spec/unit/partition_consumer_spec.rb @@ -97,7 +97,7 @@ end it "uses object defaults" do - expect(@connection).to receive(:fetch).with(10_000, 1, anything) + expect(@connection).to receive(:fetch).with(100, 1, anything) @pc.fetch end