From 50ebb7479eb034aa7223e1b43a904949344539e3 Mon Sep 17 00:00:00 2001 From: Rushy Panchal Date: Fri, 14 Aug 2020 06:23:33 -0400 Subject: [PATCH 01/23] Remove extraneous sleep from ZookeeperWatcher thread --- lib/synapse/service_watcher/zookeeper/zookeeper.rb | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/lib/synapse/service_watcher/zookeeper/zookeeper.rb b/lib/synapse/service_watcher/zookeeper/zookeeper.rb index 67aa40c1..301eb4e8 100644 --- a/lib/synapse/service_watcher/zookeeper/zookeeper.rb +++ b/lib/synapse/service_watcher/zookeeper/zookeeper.rb @@ -63,15 +63,11 @@ def initialize(opts={}, synapse, reconfigure_callback) def start log.info "synapse: starting ZK watcher #{@name} @ cluster: #{@zk_cluster} path: #{@discovery['path']} retry policy: #{@retry_policy}" - # Zookeeper processing is run in a background thread so that any retries - # do not block the main thread. zk_connect do + # Start a thread just for the initial discovery/watch setup, which can take + # a while. The watches will be handled in their own thread. @thread = Thread.new { start_discovery - - until @should_exit.get - sleep 0.5 - end } end end From dc5adec00cbff145094af01b0d178aff423b886a Mon Sep 17 00:00:00 2001 From: Rushy Panchal Date: Fri, 14 Aug 2020 07:44:03 -0400 Subject: [PATCH 02/23] Add concurrent-ruby --- synapse.gemspec | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse.gemspec b/synapse.gemspec index 94f791f4..05fb62bf 100644 --- a/synapse.gemspec +++ b/synapse.gemspec @@ -27,6 +27,8 @@ Gem::Specification.new do |gem| gem.add_runtime_dependency "logging", "~> 1.8" gem.add_runtime_dependency "hashdiff", "~> 0.2.3" gem.add_runtime_dependency "dogstatsd-ruby", "~> 3.3.0" + gem.add_runtime_dependency "nokogiri", "~> 1.6.8.1" + gem.add_runtime_dependency "concurrent-ruby", "~> 1.1.6" gem.add_development_dependency "rake", "~> 11" gem.add_development_dependency "rspec", "~> 3.1.0" From f4ff20e94700d0a9a9e402f4928ee01a50f1bda9 Mon Sep 17 00:00:00 2001 From: Rushy Panchal Date: Fri, 14 Aug 2020 07:44:35 -0400 Subject: [PATCH 03/23] Use Concurrent::TimerSet to schedule periodic events --- lib/synapse.rb | 7 ++++- .../service_watcher/zookeeper/zookeeper.rb | 7 ++--- .../zookeeper_poll/zookeeper_poll.rb | 30 +++++-------------- 3 files changed, 16 insertions(+), 28 deletions(-) diff --git a/lib/synapse.rb b/lib/synapse.rb index b35cefb8..83c6054a 100644 --- a/lib/synapse.rb +++ b/lib/synapse.rb @@ -40,6 +40,9 @@ def initialize(opts={}) # configuration is initially enabled to configure on first loop @config_updated = AtomicValue.new(true) + # TODO(rushy_panchal): minimum and maximum thread counts + @task_scheduler = Concurrent.TimerSet.new(:executor => Concurrent::ThreadPoolExecutor.new) + # Any exceptions in the watcher threads should wake the main thread so # that we can fail fast. Thread.abort_on_exception = true @@ -59,7 +62,7 @@ def run statsd_time('synapse.watchers.start.time') do @service_watchers.map do |watcher| begin - watcher.start + watcher.start(@task_scheduler) statsd_increment("synapse.watcher.start", ['start_result:success', "watcher_name:#{watcher.name}"]) rescue Exception => e statsd_increment("synapse.watcher.start", ['start_result:fail', "watcher_name:#{watcher.name}", "exception_name:#{e.class.name}", "exception_message:#{e.message}"]) @@ -119,6 +122,8 @@ def run raise e end end + + @task_scheduler.kill statsd_increment('synapse.stop', ['stop_avenue:clean', 'stop_location:main_loop']) end diff --git a/lib/synapse/service_watcher/zookeeper/zookeeper.rb b/lib/synapse/service_watcher/zookeeper/zookeeper.rb index 301eb4e8..54ed4b8b 100644 --- a/lib/synapse/service_watcher/zookeeper/zookeeper.rb +++ b/lib/synapse/service_watcher/zookeeper/zookeeper.rb @@ -60,13 +60,12 @@ def initialize(opts={}, synapse, reconfigure_callback) @should_exit = Synapse::AtomicValue.new(false) end - def start + def start(scheduler) log.info "synapse: starting ZK watcher #{@name} @ cluster: #{@zk_cluster} path: #{@discovery['path']} retry policy: #{@retry_policy}" zk_connect do - # Start a thread just for the initial discovery/watch setup, which can take - # a while. The watches will be handled in their own thread. - @thread = Thread.new { + # Asynchronously start the discovery. + scheduler.post(0) { start_discovery } end diff --git a/lib/synapse/service_watcher/zookeeper_poll/zookeeper_poll.rb b/lib/synapse/service_watcher/zookeeper_poll/zookeeper_poll.rb index 40e2f6e3..98ddc63f 100644 --- a/lib/synapse/service_watcher/zookeeper_poll/zookeeper_poll.rb +++ b/lib/synapse/service_watcher/zookeeper_poll/zookeeper_poll.rb @@ -13,33 +13,18 @@ def initialize(opts, synapse, reconfigure_callback) @poll_interval = @discovery['polling_interval_sec'] || 60 @should_exit = Synapse::AtomicValue.new(false) - @thread = nil end - def start + def start(scheduler) log.info 'synapse: ZookeeperPollWatcher starting' zk_connect do - @thread = Thread.new { - log.info 'synapse: zookeeper polling thread started' - - # Ensure we poll on first start. - last_run = Time.now - @poll_interval - 1 - - until @should_exit.get - now = Time.now - elapsed = now - last_run - - if elapsed >= @poll_interval - last_run = now - discover - end - - sleep 0.5 - end - - log.info 'synapse: zookeeper polling thread exiting normally' + reset_schedule = Proc.new { + discover + scheduler.post(@polling_interval, reset_schedule) unless @should_exit.get } + + scheduler.post(0, reset_schedule) end end @@ -47,8 +32,7 @@ def stop log.warn 'synapse: ZookeeperPollWatcher stopping' zk_teardown do - # Signal to the thread that it should exit, and then wait for it to - # exit. + # Signal to the process that it should not reset. @should_exit.set(true) end end From ee5ac315ca8f974738cc872ef0ffde7eb1919685 Mon Sep 17 00:00:00 2001 From: Rushy Panchal Date: Fri, 14 Aug 2020 10:34:34 -0400 Subject: [PATCH 04/23] Add PollWatcher base class --- lib/synapse/service_watcher/base/poll.rb | 31 ++++++++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 lib/synapse/service_watcher/base/poll.rb diff --git a/lib/synapse/service_watcher/base/poll.rb b/lib/synapse/service_watcher/base/poll.rb new file mode 100644 index 00000000..a8a5d249 --- /dev/null +++ b/lib/synapse/service_watcher/base/poll.rb @@ -0,0 +1,31 @@ +require 'synapse/service_watcher/base/base' + +require 'concurrency' + +class Synapse::ServiceWatcher + class PollWatcher < BaseWatcher + def initialize(opts={}, synapse, reconfigure_callback) + super(opts, synapse, reconfigure_callback) + + @check_interval = @discovery['check_interval'] || 15.0 + @should_exit = Concurrency::AtomicBoolean.new(false) + end + + def start(scheduler) + reset_schedule = Proc.new { + discover + scheduler.post(@check_interval, reset_schedule) unless @should_exit.true? + } + + scheduler.post(0, reset_schedule) + end + + def stop + @should_exit.make_true + end + + def discover + log.info "base poll watcher discover" + end + end +end From 3b62a98305403fce8de4e294b3a35950aacaf398 Mon Sep 17 00:00:00 2001 From: Rushy Panchal Date: Fri, 14 Aug 2020 10:34:43 -0400 Subject: [PATCH 05/23] Convert all watchers to using schedulers --- lib/synapse.rb | 2 +- lib/synapse/service_watcher/base/base.rb | 2 +- lib/synapse/service_watcher/dns/dns.rb | 46 ++++++------------- lib/synapse/service_watcher/docker/docker.rb | 33 ++----------- lib/synapse/service_watcher/ec2tag/ec2tag.rb | 39 ++++------------ .../service_watcher/marathon/marathon.rb | 2 +- lib/synapse/service_watcher/multi/multi.rb | 4 +- .../zookeeper_dns/zookeeper_dns.rb | 27 ++++------- .../zookeeper_poll/zookeeper_poll.rb | 11 ++--- 9 files changed, 44 insertions(+), 122 deletions(-) diff --git a/lib/synapse.rb b/lib/synapse.rb index 83c6054a..4df23e2a 100644 --- a/lib/synapse.rb +++ b/lib/synapse.rb @@ -41,7 +41,7 @@ def initialize(opts={}) @config_updated = AtomicValue.new(true) # TODO(rushy_panchal): minimum and maximum thread counts - @task_scheduler = Concurrent.TimerSet.new(:executor => Concurrent::ThreadPoolExecutor.new) + @task_scheduler = Concurrent::TimerSet.new(:executor => Concurrent::ThreadPoolExecutor.new) # Any exceptions in the watcher threads should wake the main thread so # that we can fail fast. diff --git a/lib/synapse/service_watcher/base/base.rb b/lib/synapse/service_watcher/base/base.rb index 0a712ec8..3e380914 100644 --- a/lib/synapse/service_watcher/base/base.rb +++ b/lib/synapse/service_watcher/base/base.rb @@ -94,7 +94,7 @@ def haproxy end # this should be overridden to actually start your watcher - def start + def start(scheduler) log.info "synapse: starting stub watcher; this means doing nothing at all!" end diff --git a/lib/synapse/service_watcher/dns/dns.rb b/lib/synapse/service_watcher/dns/dns.rb index f8ec70d7..5ec8357f 100644 --- a/lib/synapse/service_watcher/dns/dns.rb +++ b/lib/synapse/service_watcher/dns/dns.rb @@ -1,17 +1,17 @@ -require "synapse/service_watcher/base/base" +require "synapse/service_watcher/base/poll" require 'thread' +require 'concurrent' require 'resolv' class Synapse::ServiceWatcher - class DnsWatcher < BaseWatcher - def start - @check_interval = @discovery['check_interval'] || 30.0 - @nameserver = @discovery['nameserver'] + class DnsWatcher < PollWatcher + def initialize(opts={}, synapse, reconfigure_callback) + super(opts, synapse, reconfigure_callback) - @watcher = Thread.new do - watch - end + @last_resolution = Concurrent::Atom.new(nil) + @nameserver = @discovery['nameserver'] + @check_interval = @discovery['check_interval'] || 30.0 end def ping? @@ -30,32 +30,12 @@ def validate_discovery_opts if discovery_servers.empty? end - def watch - last_resolution = resolve_servers - configure_backends(last_resolution) - until @should_exit - begin - start = Time.now - current_resolution = resolve_servers - unless last_resolution == current_resolution - last_resolution = current_resolution - configure_backends(last_resolution) - end - - sleep_until_next_check(start) - rescue => e - log.warn "Error in watcher thread: #{e.inspect}" - log.warn e.backtrace - end - end - - log.info "synapse: dns watcher exited successfully" - end + def discover + current_resolution = resolve_servers - def sleep_until_next_check(start_time) - sleep_time = @check_interval - (Time.now - start_time) - if sleep_time > 0.0 - sleep(sleep_time) + unless @last_resolution.value == current_resolution + @last_resolution.reset(current_resolution) + configure_backends(current_resolution) end end diff --git a/lib/synapse/service_watcher/docker/docker.rb b/lib/synapse/service_watcher/docker/docker.rb index a34a5ab1..0a5a6ada 100644 --- a/lib/synapse/service_watcher/docker/docker.rb +++ b/lib/synapse/service_watcher/docker/docker.rb @@ -1,15 +1,8 @@ -require "synapse/service_watcher/base/base" +require "synapse/service_watcher/base/poll" require 'docker' class Synapse::ServiceWatcher - class DockerWatcher < BaseWatcher - def start - @check_interval = @discovery['check_interval'] || 15.0 - @watcher = Thread.new do - watch - end - end - + class DockerWatcher < PollWatcher private def validate_discovery_opts raise ArgumentError, "invalid discovery method #{@discovery['method']}" \ @@ -22,26 +15,8 @@ def validate_discovery_opts if @discovery['container_port'].nil? end - def watch - until @should_exit - begin - start = Time.now - set_backends(containers) - sleep_until_next_check(start) - rescue Exception => e - log.warn "synapse: error in watcher thread: #{e.inspect}" - log.warn e.backtrace - end - end - - log.info "synapse: docker watcher exited successfully" - end - - def sleep_until_next_check(start_time) - sleep_time = @check_interval - (Time.now - start_time) - if sleep_time > 0.0 - sleep(sleep_time) - end + def discover + set_backends(containers) end def rewrite_container_ports(ports) diff --git a/lib/synapse/service_watcher/ec2tag/ec2tag.rb b/lib/synapse/service_watcher/ec2tag/ec2tag.rb index 15889f6b..021fabc7 100644 --- a/lib/synapse/service_watcher/ec2tag/ec2tag.rb +++ b/lib/synapse/service_watcher/ec2tag/ec2tag.rb @@ -1,12 +1,13 @@ -require 'synapse/service_watcher/base/base' +require 'synapse/service_watcher/base/poll' require 'aws-sdk' class Synapse::ServiceWatcher - class Ec2tagWatcher < BaseWatcher - + class Ec2tagWatcher < PollWatcher attr_reader :check_interval - def start + def initialize(opts={}, synapse, reconfigure_callback) + super(opts, synapse, reconfigure_callback) + region = @discovery['aws_region'] || ENV['AWS_REGION'] log.info "Connecting to EC2 region: #{region}" @@ -15,16 +16,11 @@ def start access_key_id: @discovery['aws_access_key_id'] || ENV['AWS_ACCESS_KEY_ID'], secret_access_key: @discovery['aws_secret_access_key'] || ENV['AWS_SECRET_ACCESS_KEY'] ) - @check_interval = @discovery['check_interval'] || 15.0 - log.info "synapse: ec2tag watcher looking for instances " + "tagged with #{@discovery['tag_name']}=#{@discovery['tag_value']}" - - @watcher = Thread.new { watch } end private - def validate_discovery_opts # Required, via options only. raise ArgumentError, "invalid discovery method #{@discovery['method']}" \ @@ -52,28 +48,9 @@ def validate_discovery_opts end end - def watch - until @should_exit - begin - start = Time.now - if set_backends(discover_instances) - log.info "synapse: ec2tag watcher backends have changed." - end - rescue Exception => e - log.warn "synapse: error in ec2tag watcher thread: #{e.inspect}" - log.warn e.backtrace - ensure - sleep_until_next_check(start) - end - end - - log.info "synapse: ec2tag watcher exited successfully" - end - - def sleep_until_next_check(start_time) - sleep_time = check_interval - (Time.now - start_time) - if sleep_time > 0.0 - sleep(sleep_time) + def discover + if set_backends(discover_instances) + log.info "synapse: ec2tag watcher backends have changed." end end diff --git a/lib/synapse/service_watcher/marathon/marathon.rb b/lib/synapse/service_watcher/marathon/marathon.rb index 47565485..06131589 100644 --- a/lib/synapse/service_watcher/marathon/marathon.rb +++ b/lib/synapse/service_watcher/marathon/marathon.rb @@ -5,7 +5,7 @@ class Synapse::ServiceWatcher class MarathonWatcher < BaseWatcher - def start + def start(scheduler) @check_interval = @discovery['check_interval'] || 10.0 @connection = nil @watcher = Thread.new { sleep splay; watch } diff --git a/lib/synapse/service_watcher/multi/multi.rb b/lib/synapse/service_watcher/multi/multi.rb index 54ab7c98..1fc7a71e 100644 --- a/lib/synapse/service_watcher/multi/multi.rb +++ b/lib/synapse/service_watcher/multi/multi.rb @@ -58,12 +58,12 @@ def initialize(opts={}, synapse, reconfigure_callback) -> { resolver_notification }) end - def start + def start(scheduler) log.info "synapse: starting multi watcher" statsd_increment("synapse.watcher.multi.start") @watchers.values.each do |w| - w.start + w.start(scheduler) end @resolver.start diff --git a/lib/synapse/service_watcher/zookeeper_dns/zookeeper_dns.rb b/lib/synapse/service_watcher/zookeeper_dns/zookeeper_dns.rb index d4fdafd3..ae290916 100644 --- a/lib/synapse/service_watcher/zookeeper_dns/zookeeper_dns.rb +++ b/lib/synapse/service_watcher/zookeeper_dns/zookeeper_dns.rb @@ -1,4 +1,4 @@ -require 'synapse/service_watcher/base/base' +require 'synapse/service_watcher/base/poll' require 'synapse/service_watcher/dns/dns' require 'synapse/service_watcher/zookeeper/zookeeper' @@ -20,7 +20,7 @@ # has passed (triggering a re-resolve), or that the watcher should shut down. # The DNS watcher is responsible for the actual reconfiguring of backends. class Synapse::ServiceWatcher - class ZookeeperDnsWatcher < BaseWatcher + class ZookeeperDnsWatcher < PollWatcher # Valid messages that can be passed through the internal message queue module Messages @@ -112,7 +112,7 @@ def validate_discovery_opts end end - def start + def start(scheduler) @check_interval = @discovery['check_interval'] || 30.0 @message_queue = Queue.new @@ -122,20 +122,7 @@ def start @zk.start @dns.start - @watcher = Thread.new do - until @should_exit - # Trigger a DNS resolve every @check_interval seconds - sleep @check_interval - - # Only trigger the resolve if the queue is empty, every other message - # on the queue would either cause a resolve or stop the watcher - if @message_queue.empty? - @message_queue.push(Messages::CHECK_INTERVAL_MESSAGE) - end - - end - log.info "synapse: zookeeper_dns watcher exited successfully" - end + super(scheduler) end def ping? @@ -162,6 +149,12 @@ def reconfigure! private + def discover + if @message_queue.empty? + @message_queue.push(Messages::CHECK_INTERVAL_MESSAGE) + end + end + def make_dns_watcher(queue) dns_discovery_opts = @discovery.select do |k,_| k == 'nameserver' || k == 'label_filter' diff --git a/lib/synapse/service_watcher/zookeeper_poll/zookeeper_poll.rb b/lib/synapse/service_watcher/zookeeper_poll/zookeeper_poll.rb index 98ddc63f..0848cba2 100644 --- a/lib/synapse/service_watcher/zookeeper_poll/zookeeper_poll.rb +++ b/lib/synapse/service_watcher/zookeeper_poll/zookeeper_poll.rb @@ -1,9 +1,7 @@ require 'synapse/service_watcher/base/base' require 'synapse/service_watcher/zookeeper/zookeeper' -require 'synapse/atomic' -require 'zk' -require 'thread' +require 'concurrency' class Synapse::ServiceWatcher class ZookeeperPollWatcher < ZookeeperWatcher @@ -11,8 +9,7 @@ def initialize(opts, synapse, reconfigure_callback) super(opts, synapse, reconfigure_callback) @poll_interval = @discovery['polling_interval_sec'] || 60 - - @should_exit = Synapse::AtomicValue.new(false) + @should_exit = Concurrency::AtomicBoolean.new(false) end def start(scheduler) @@ -21,7 +18,7 @@ def start(scheduler) zk_connect do reset_schedule = Proc.new { discover - scheduler.post(@polling_interval, reset_schedule) unless @should_exit.get + scheduler.post(@polling_interval, reset_schedule) unless @should_exit.true? } scheduler.post(0, reset_schedule) @@ -33,7 +30,7 @@ def stop zk_teardown do # Signal to the process that it should not reset. - @should_exit.set(true) + @should_exit.make_true end end From 54e0a5c4cd527290b92349094dbf50cafbbf4f74 Mon Sep 17 00:00:00 2001 From: Rushy Panchal Date: Fri, 14 Aug 2020 10:37:05 -0400 Subject: [PATCH 06/23] Fix concurrent module naming --- lib/synapse/service_watcher/base/poll.rb | 4 ++-- lib/synapse/service_watcher/zookeeper_poll/zookeeper_poll.rb | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/synapse/service_watcher/base/poll.rb b/lib/synapse/service_watcher/base/poll.rb index a8a5d249..96a77df8 100644 --- a/lib/synapse/service_watcher/base/poll.rb +++ b/lib/synapse/service_watcher/base/poll.rb @@ -1,6 +1,6 @@ require 'synapse/service_watcher/base/base' -require 'concurrency' +require 'concurrent' class Synapse::ServiceWatcher class PollWatcher < BaseWatcher @@ -8,7 +8,7 @@ def initialize(opts={}, synapse, reconfigure_callback) super(opts, synapse, reconfigure_callback) @check_interval = @discovery['check_interval'] || 15.0 - @should_exit = Concurrency::AtomicBoolean.new(false) + @should_exit = Concurrent::AtomicBoolean.new(false) end def start(scheduler) diff --git a/lib/synapse/service_watcher/zookeeper_poll/zookeeper_poll.rb b/lib/synapse/service_watcher/zookeeper_poll/zookeeper_poll.rb index 0848cba2..08296495 100644 --- a/lib/synapse/service_watcher/zookeeper_poll/zookeeper_poll.rb +++ b/lib/synapse/service_watcher/zookeeper_poll/zookeeper_poll.rb @@ -1,7 +1,7 @@ require 'synapse/service_watcher/base/base' require 'synapse/service_watcher/zookeeper/zookeeper' -require 'concurrency' +require 'concurrent' class Synapse::ServiceWatcher class ZookeeperPollWatcher < ZookeeperWatcher @@ -9,7 +9,7 @@ def initialize(opts, synapse, reconfigure_callback) super(opts, synapse, reconfigure_callback) @poll_interval = @discovery['polling_interval_sec'] || 60 - @should_exit = Concurrency::AtomicBoolean.new(false) + @should_exit = Concurrent::AtomicBoolean.new(false) end def start(scheduler) From e5f68f6fbf66bc1294c3080e0a7fd0126633ad8c Mon Sep 17 00:00:00 2001 From: Rushy Panchal Date: Fri, 14 Aug 2020 11:09:11 -0400 Subject: [PATCH 07/23] Fix tests --- lib/synapse/service_watcher/base/poll.rb | 11 +++++- .../zookeeper_poll/zookeeper_poll.rb | 11 +++++- .../synapse/service_watcher_docker_spec.rb | 34 ++++++----------- .../synapse/service_watcher_ec2tags_spec.rb | 13 +++---- .../synapse/service_watcher_marathon_spec.rb | 23 +++++++----- .../lib/synapse/service_watcher_multi_spec.rb | 8 +++- .../synapse/service_watcher_zookeeper_spec.rb | 37 ++++++++----------- 7 files changed, 70 insertions(+), 67 deletions(-) diff --git a/lib/synapse/service_watcher/base/poll.rb b/lib/synapse/service_watcher/base/poll.rb index 96a77df8..0442c692 100644 --- a/lib/synapse/service_watcher/base/poll.rb +++ b/lib/synapse/service_watcher/base/poll.rb @@ -14,10 +14,17 @@ def initialize(opts={}, synapse, reconfigure_callback) def start(scheduler) reset_schedule = Proc.new { discover - scheduler.post(@check_interval, reset_schedule) unless @should_exit.true? + + unless @should_exit.true? + scheduler.post(@check_interval) { + reset_schedule + } + end } - scheduler.post(0, reset_schedule) + scheduler.post(0) { + reset_schedule + } end def stop diff --git a/lib/synapse/service_watcher/zookeeper_poll/zookeeper_poll.rb b/lib/synapse/service_watcher/zookeeper_poll/zookeeper_poll.rb index 08296495..d561859e 100644 --- a/lib/synapse/service_watcher/zookeeper_poll/zookeeper_poll.rb +++ b/lib/synapse/service_watcher/zookeeper_poll/zookeeper_poll.rb @@ -18,10 +18,17 @@ def start(scheduler) zk_connect do reset_schedule = Proc.new { discover - scheduler.post(@polling_interval, reset_schedule) unless @should_exit.true? + + unless @should_exit.true? + scheduler.post(@polling_interval, reset_schedule) { + reset_schedule + } + end } - scheduler.post(0, reset_schedule) + scheduler.post(0) { + reset_schedule + } end end diff --git a/spec/lib/synapse/service_watcher_docker_spec.rb b/spec/lib/synapse/service_watcher_docker_spec.rb index 5f99ef1b..e7b9c58c 100644 --- a/spec/lib/synapse/service_watcher_docker_spec.rb +++ b/spec/lib/synapse/service_watcher_docker_spec.rb @@ -15,8 +15,15 @@ class Synapse::ServiceWatcher::DockerWatcher }) mock_synapse end + + let(:mock_scheduler) do + Concurrent::SimpleExecutorService.new + end + subject { Synapse::ServiceWatcher::DockerWatcher.new(testargs, mocksynapse, -> {}) } + let(:testargs) { { 'name' => 'foo', 'discovery' => { 'method' => 'docker', 'servers' => [{'host' => 'server1.local', 'name' => 'mainserver'}], 'image_name' => 'mycool/image', 'container_port' => 6379 }, 'haproxy' => {} }} + before(:each) do allow(subject.log).to receive(:warn) allow(subject.log).to receive(:info) @@ -34,37 +41,20 @@ def add_arg(name, value) context "normal tests" do it('starts a watcher thread') do - watcher_mock = double() - expect(Thread).to receive(:new).and_return(watcher_mock) - subject.start - expect(subject.watcher).to equal(watcher_mock) + subject.start(mock_scheduler) end + it('sets default check interval') do - expect(Thread).to receive(:new).and_return(double) - subject.start + subject.start(mock_scheduler) expect(subject.check_interval).to eq(15.0) end end - context "watch tests" do - before(:each) do - expect(subject).to receive(:sleep_until_next_check) do |arg| - subject.instance_variable_set('@should_exit', true) - end - end + context "discover tests" do it('has a happy first run path, configuring backends') do expect(subject).to receive(:containers).and_return(['container1']) expect(subject).to receive(:set_backends).with(['container1']) - subject.send(:watch) - end - end - context "watch eats exceptions" do - it "blows up when finding containers" do - expect(subject).to receive(:containers) do |arg| - subject.instance_variable_set('@should_exit', true) - raise('throw exception inside watch') - end - expect { subject.send(:watch) }.not_to raise_error + subject.send(:discover) end end diff --git a/spec/lib/synapse/service_watcher_ec2tags_spec.rb b/spec/lib/synapse/service_watcher_ec2tags_spec.rb index b7faf157..9468a1e2 100644 --- a/spec/lib/synapse/service_watcher_ec2tags_spec.rb +++ b/spec/lib/synapse/service_watcher_ec2tags_spec.rb @@ -150,23 +150,20 @@ def munge_arg(name, new_value) let(:instance1) { FakeAWSInstance.new } let(:instance2) { FakeAWSInstance.new } - context 'watch' do - - it 'discovers instances, configures backends, then sleeps' do + context 'discover' do + it 'discovers instances and configures backends' do fake_backends = [1,2,3] expect(subject).to receive(:discover_instances).and_return(fake_backends) expect(subject).to receive(:set_backends).with(fake_backends) { subject.stop } - expect(subject).to receive(:sleep_until_next_check) - subject.send(:watch) + subject.send(:discover) end - it 'sleeps until next check if discover_instances fails' do + it 'throws error if discover_instances fails' do expect(subject).to receive(:discover_instances) do subject.stop raise "discover failed" end - expect(subject).to receive(:sleep_until_next_check) - subject.send(:watch) + expect { subject.send(:discover) }.to raise_error end end diff --git a/spec/lib/synapse/service_watcher_marathon_spec.rb b/spec/lib/synapse/service_watcher_marathon_spec.rb index 485225b6..e29d919b 100644 --- a/spec/lib/synapse/service_watcher_marathon_spec.rb +++ b/spec/lib/synapse/service_watcher_marathon_spec.rb @@ -10,6 +10,11 @@ }) mock_synapse end + + let(:mock_scheduler) do + Concurrent::SimpleExecutorService.new + end + let(:marathon_host) { '127.0.0.1' } let(:marathon_port) { '8080' } let(:app_name) { 'foo' } @@ -60,12 +65,12 @@ end it 'does not crash' do - expect { subject.start }.not_to raise_error + expect { subject.start(mock_scheduler) }.not_to raise_error end end it 'requests the proper API endpoint one time' do - subject.start + subject.start(mock_scheduler) expect(a_request(:get, marathon_request_uri)).to have_been_made.times(1) end @@ -79,7 +84,7 @@ let(:marathon_request_uri) { "#{marathon_host}:#{marathon_port}/v3/tasks/#{app_name}" } it 'calls the customized path' do - subject.start + subject.start(mock_scheduler) expect(a_request(:get, marathon_request_uri)).to have_been_made.times(1) end end @@ -110,7 +115,7 @@ it 'adds the task as a backend' do expect(subject).to receive(:set_backends).with([expected_backend_hash]) - subject.start + subject.start(mock_scheduler) end context 'with a custom port_index' do @@ -128,7 +133,7 @@ it 'adds the task as a backend' do expect(subject).to receive(:set_backends).with([expected_backend_hash]) - subject.start + subject.start(mock_scheduler) end context 'when that port_index does not exist' do @@ -138,7 +143,7 @@ it 'does not include the backend' do expect(subject).to receive(:set_backends).with([]) - subject.start + subject.start(mock_scheduler) end end end @@ -162,14 +167,14 @@ it 'filters tasks that have no startedAt value' do expect(subject).to receive(:set_backends).with([expected_backend_hash]) - subject.start + subject.start(mock_scheduler) end end context 'when marathon returns invalid response' do let(:marathon_response) { [] } it 'does not blow up' do - expect { subject.start }.to_not raise_error + expect { subject.start(mock_scheduler) }.to_not raise_error end end @@ -189,7 +194,7 @@ it 'only sleeps for the difference' do expect(subject).to receive(:sleep).with(check_interval - job_duration) - subject.start + subject.start(mock_scheduler) end end end diff --git a/spec/lib/synapse/service_watcher_multi_spec.rb b/spec/lib/synapse/service_watcher_multi_spec.rb index 7ef1c341..a752c2ee 100644 --- a/spec/lib/synapse/service_watcher_multi_spec.rb +++ b/spec/lib/synapse/service_watcher_multi_spec.rb @@ -14,6 +14,10 @@ mock_synapse end + let(:mock_scheduler) do + Concurrent::SimpleExecutorService.new + end + subject { Synapse::ServiceWatcher::MultiWatcher.new(config, mock_synapse, reconfigure_callback) } @@ -222,7 +226,7 @@ expect(w).to receive(:start) end - expect { subject.start }.not_to raise_error + expect { subject.start(mock_scheduler) }.not_to raise_error end it 'starts resolver' do @@ -233,7 +237,7 @@ end expect(resolver).to receive(:start) - expect { subject.start }.not_to raise_error + expect { subject.start(mock_scheduler) }.not_to raise_error end end diff --git a/spec/lib/synapse/service_watcher_zookeeper_spec.rb b/spec/lib/synapse/service_watcher_zookeeper_spec.rb index c90d09ca..861c3a74 100644 --- a/spec/lib/synapse/service_watcher_zookeeper_spec.rb +++ b/spec/lib/synapse/service_watcher_zookeeper_spec.rb @@ -16,6 +16,11 @@ }) mock_synapse end + + let(:mock_scheduler) do + Concurrent::SimpleExecutorService.new + end + let(:config) do { 'name' => 'test', @@ -266,7 +271,7 @@ it 'calls zk_connect' do expect(subject).to receive(:zk_connect).exactly(:once) - subject.start + subject.start(mock_scheduler) end end @@ -600,10 +605,10 @@ end describe '#start' do - it 'starts a thread' do - expect(Thread).to receive(:new) + it 'queues onto the scheduler' do + expect(mock_scheduler).to receive(:post).exactly(:once) allow(ZK).to receive(:new).and_return(mock_zk) - subject.start + subject.start(mock_scheduler) end it 'connects to zookeeper' do @@ -614,7 +619,7 @@ .with('somehost', :timeout => 5, :receive_timeout_msec => 18000, :thread => :per_callback) .and_return(mock_zk) - subject.start + subject.start(mock_scheduler) end it 'does not call create' do @@ -622,24 +627,22 @@ allow(ZK).to receive(:new).and_return(mock_zk) expect(mock_zk).not_to receive(:create) - subject.start + subject.start(mock_scheduler) end end describe '#stop' do context 'when connected to zookeeper' do before :each do - subject.instance_variable_set(:@thread, mock_thread.as_null_object) allow(mock_zk).to receive(:connecting?).and_return(false) allow(mock_zk).to receive(:connected?).and_return(true) end it 'disconnects' do allow(ZK).to receive(:new).and_return(mock_zk) - allow(Thread).to receive(:new) - expect(mock_zk).to receive(:close!).exactly(:once) - subject.start + + subject.start(mock_scheduler) subject.stop end end @@ -649,16 +652,6 @@ subject.stop end end - - context 'when thread is not running' do - before :each do - subject.instance_variable_set(:@zk, mock_zk.as_null_object) - end - - it 'continues silently' do - expect { subject.stop }.not_to raise_error - end - end end describe '#ping' do @@ -851,7 +844,7 @@ expect(mock_dns).to receive(:start).exactly(:once) expect(Thread).to receive(:new).exactly(:once) - subject.start + subject.start(mock_scheduler) end end @@ -899,7 +892,7 @@ expect(mock_dns).to receive(:start).exactly(:once) expect(Thread).to receive(:new).exactly(:once) - subject.start + subject.start(mock_scheduler) end end From 47bd3d733f4cee44acc7b0fc6a8ff25d275e0b4a Mon Sep 17 00:00:00 2001 From: Rushy Panchal Date: Fri, 14 Aug 2020 15:45:16 -0400 Subject: [PATCH 08/23] Add tests for PollWatcher --- lib/synapse/service_watcher/base/poll.rb | 14 +++- spec/lib/synapse/service_watcher_poll_spec.rb | 82 +++++++++++++++++++ 2 files changed, 94 insertions(+), 2 deletions(-) create mode 100644 spec/lib/synapse/service_watcher_poll_spec.rb diff --git a/lib/synapse/service_watcher/base/poll.rb b/lib/synapse/service_watcher/base/poll.rb index 0442c692..0416ed46 100644 --- a/lib/synapse/service_watcher/base/poll.rb +++ b/lib/synapse/service_watcher/base/poll.rb @@ -15,15 +15,17 @@ def start(scheduler) reset_schedule = Proc.new { discover + # Schedule the next task until we should exit unless @should_exit.true? scheduler.post(@check_interval) { - reset_schedule + reset_schedule.call } end } + # Execute the first discover immediately scheduler.post(0) { - reset_schedule + reset_schedule.call } end @@ -31,6 +33,14 @@ def stop @should_exit.make_true end + private + def validate_discovery_opts + raise ArgumentError, "invalid discovery method '#{@discovery['method']}' for poll watcher" \ + unless @discovery['method'] == 'poll' + + log.warn "synapse: warning: a stub watcher with no default servers is pretty useless" if @default_servers.empty? + end + def discover log.info "base poll watcher discover" end diff --git a/spec/lib/synapse/service_watcher_poll_spec.rb b/spec/lib/synapse/service_watcher_poll_spec.rb new file mode 100644 index 00000000..ed7185ac --- /dev/null +++ b/spec/lib/synapse/service_watcher_poll_spec.rb @@ -0,0 +1,82 @@ +require 'spec_helper' + +require 'synapse/service_watcher/base/poll' +require 'concurrent' + +describe Synapse::ServiceWatcher::PollWatcher do + let(:mock_synapse) do + mock_synapse = instance_double(Synapse::Synapse) + mockgenerator = Synapse::ConfigGenerator::BaseGenerator.new() + allow(mock_synapse).to receive(:available_generators).and_return({ + 'haproxy' => mockgenerator + }) + mock_synapse + end + + let(:mock_scheduler) do + # Concurrent::TimerSet.new(:executor => :immediate) + Concurrent::ImmediateExecutor.new + end + + let(:config) do + { + 'name' => 'test', + 'haproxy' => {}, + 'discovery' => discovery, + } + end + + let(:discovery) { { 'method' => 'poll' } } + + subject { Synapse::ServiceWatcher::PollWatcher.new(config, mock_synapse, -> {}) } + + describe '#initialize' do + it 'has a default check interval' do + expect(subject.instance_variable_get(:@check_interval)).to eq(15) + end + end + + describe '#start' do + it 'schedules a recurring task' do + expect(mock_scheduler).to receive(:post).exactly(:once).with(0).and_call_original + expect(mock_scheduler).to receive(:post).exactly(:once).with(15) + expect(subject).to receive(:discover).exactly(:once) + + subject.start(mock_scheduler) + end + + context 'when stopped' do + before :each do + subject.stop + end + + it 'does not reschedule' do + expect(mock_scheduler).to receive(:post).exactly(:once).with(0).and_call_original + expect(mock_scheduler).not_to receive(:post).with(15) + expect(subject).to receive(:discover).exactly(:once) + + subject.start(mock_scheduler) + end + end + + context 'with check_interval=0' do + let(:discovery) { { 'method' => 'poll', 'check_interval' => 0 } } + + it 'keeps calling discover until stop is called' do + count = 0 + expect(mock_scheduler).to receive(:post).with(0).exactly(15).times.and_wrap_original { |m, *args, &block| + count += 1 + subject.stop if count >= 15 + + m.call(*args) { + block.call + } + } + + expect(subject).to receive(:discover).exactly(15).times + + subject.start(mock_scheduler) + end + end + end +end From c1001afb56892e0df744ee3d23e7a58f231c8135 Mon Sep 17 00:00:00 2001 From: Rushy Panchal Date: Fri, 14 Aug 2020 16:02:05 -0400 Subject: [PATCH 09/23] Use right type of executor --- spec/lib/synapse/service_watcher_docker_spec.rb | 2 +- spec/lib/synapse/service_watcher_marathon_spec.rb | 2 +- spec/lib/synapse/service_watcher_multi_spec.rb | 2 +- spec/lib/synapse/service_watcher_poll_spec.rb | 1 - spec/lib/synapse/service_watcher_zookeeper_spec.rb | 2 +- 5 files changed, 4 insertions(+), 5 deletions(-) diff --git a/spec/lib/synapse/service_watcher_docker_spec.rb b/spec/lib/synapse/service_watcher_docker_spec.rb index e7b9c58c..b65099ad 100644 --- a/spec/lib/synapse/service_watcher_docker_spec.rb +++ b/spec/lib/synapse/service_watcher_docker_spec.rb @@ -17,7 +17,7 @@ class Synapse::ServiceWatcher::DockerWatcher end let(:mock_scheduler) do - Concurrent::SimpleExecutorService.new + Concurrent::TimerSet.new(:executor => :immediate) end subject { Synapse::ServiceWatcher::DockerWatcher.new(testargs, mocksynapse, -> {}) } diff --git a/spec/lib/synapse/service_watcher_marathon_spec.rb b/spec/lib/synapse/service_watcher_marathon_spec.rb index e29d919b..e05979a7 100644 --- a/spec/lib/synapse/service_watcher_marathon_spec.rb +++ b/spec/lib/synapse/service_watcher_marathon_spec.rb @@ -12,7 +12,7 @@ end let(:mock_scheduler) do - Concurrent::SimpleExecutorService.new + Concurrent::TimerSet.new(:executor => :immediate) end let(:marathon_host) { '127.0.0.1' } diff --git a/spec/lib/synapse/service_watcher_multi_spec.rb b/spec/lib/synapse/service_watcher_multi_spec.rb index a752c2ee..9a3e2a83 100644 --- a/spec/lib/synapse/service_watcher_multi_spec.rb +++ b/spec/lib/synapse/service_watcher_multi_spec.rb @@ -15,7 +15,7 @@ end let(:mock_scheduler) do - Concurrent::SimpleExecutorService.new + Concurrent::TimerSet.new(:executor => :immediate) end subject { diff --git a/spec/lib/synapse/service_watcher_poll_spec.rb b/spec/lib/synapse/service_watcher_poll_spec.rb index ed7185ac..1e9d10d6 100644 --- a/spec/lib/synapse/service_watcher_poll_spec.rb +++ b/spec/lib/synapse/service_watcher_poll_spec.rb @@ -14,7 +14,6 @@ end let(:mock_scheduler) do - # Concurrent::TimerSet.new(:executor => :immediate) Concurrent::ImmediateExecutor.new end diff --git a/spec/lib/synapse/service_watcher_zookeeper_spec.rb b/spec/lib/synapse/service_watcher_zookeeper_spec.rb index 861c3a74..4a7bbded 100644 --- a/spec/lib/synapse/service_watcher_zookeeper_spec.rb +++ b/spec/lib/synapse/service_watcher_zookeeper_spec.rb @@ -18,7 +18,7 @@ end let(:mock_scheduler) do - Concurrent::SimpleExecutorService.new + Concurrent::TimerSet.new(:executor => :immediate) end let(:config) do From c13dc2af35ff022da47058de312abf8b12ce26d2 Mon Sep 17 00:00:00 2001 From: Rushy Panchal Date: Mon, 17 Aug 2020 10:06:11 -0400 Subject: [PATCH 10/23] Update ServiceWatcher README --- lib/synapse/service_watcher/README.md | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/lib/synapse/service_watcher/README.md b/lib/synapse/service_watcher/README.md index bf138f4f..ec8dc6bc 100644 --- a/lib/synapse/service_watcher/README.md +++ b/lib/synapse/service_watcher/README.md @@ -14,8 +14,11 @@ require "synapse/service_watcher/base/base" class Synapse::ServiceWatcher class MyWatcher < BaseWatcher - def start - # write code which begins running service discovery + def start(scheduler) + # write code which begins running service discovery. + # Instead of running a background thread, you can use scheduler + # which is an instance of Concurrent::TimerSet to schedule tasks on a thread pool. + # (http://ruby-concurrency.github.io/concurrent-ruby/1.1.5/Concurrent/TimerSet.html) end def stop From 909daf6f1f7bee321e8b50fbbad6c4d85d46e2a9 Mon Sep 17 00:00:00 2001 From: Rushy Panchal Date: Mon, 17 Aug 2020 10:10:40 -0400 Subject: [PATCH 11/23] Set min and max threads for task scheduler --- lib/synapse.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/synapse.rb b/lib/synapse.rb index 4df23e2a..7c0279ea 100644 --- a/lib/synapse.rb +++ b/lib/synapse.rb @@ -40,8 +40,8 @@ def initialize(opts={}) # configuration is initially enabled to configure on first loop @config_updated = AtomicValue.new(true) - # TODO(rushy_panchal): minimum and maximum thread counts - @task_scheduler = Concurrent::TimerSet.new(:executor => Concurrent::ThreadPoolExecutor.new) + executor = Concurrent::ThreadPoolExecutor.new(:min_threads => 1, :max_threads => @service_watchers.length) + @task_scheduler = Concurrent::TimerSet.new(:executor => executor) # Any exceptions in the watcher threads should wake the main thread so # that we can fail fast. From 766d020b1116234bc719050c4cba144e0353ad0a Mon Sep 17 00:00:00 2001 From: Rushy Panchal Date: Mon, 17 Aug 2020 12:53:15 -0400 Subject: [PATCH 12/23] Fix ZookeeperPollWatcher --- .../service_watcher/zookeeper_poll/zookeeper_poll.rb | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/lib/synapse/service_watcher/zookeeper_poll/zookeeper_poll.rb b/lib/synapse/service_watcher/zookeeper_poll/zookeeper_poll.rb index d561859e..182a511e 100644 --- a/lib/synapse/service_watcher/zookeeper_poll/zookeeper_poll.rb +++ b/lib/synapse/service_watcher/zookeeper_poll/zookeeper_poll.rb @@ -20,15 +20,11 @@ def start(scheduler) discover unless @should_exit.true? - scheduler.post(@polling_interval, reset_schedule) { - reset_schedule - } + scheduler.post(@poll_interval, &reset_schedule) end } - scheduler.post(0) { - reset_schedule - } + scheduler.post(0, &reset_schedule) end end From f9d65eb96c1970b8ce86462bdf9a5bfd65f40ce5 Mon Sep 17 00:00:00 2001 From: Rushy Panchal Date: Mon, 17 Aug 2020 12:53:42 -0400 Subject: [PATCH 13/23] Pass Proc directly to scheduler.post --- lib/synapse/service_watcher/base/poll.rb | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/lib/synapse/service_watcher/base/poll.rb b/lib/synapse/service_watcher/base/poll.rb index 0416ed46..3042bccd 100644 --- a/lib/synapse/service_watcher/base/poll.rb +++ b/lib/synapse/service_watcher/base/poll.rb @@ -17,16 +17,12 @@ def start(scheduler) # Schedule the next task until we should exit unless @should_exit.true? - scheduler.post(@check_interval) { - reset_schedule.call - } + scheduler.post(@check_interval, &reset_schedule) end } # Execute the first discover immediately - scheduler.post(0) { - reset_schedule.call - } + scheduler.post(0, &reset_schedule) end def stop From 516081801140d03459683e8c3cb9cb46f4aa39a3 Mon Sep 17 00:00:00 2001 From: Rushy Panchal Date: Mon, 17 Aug 2020 15:14:28 -0400 Subject: [PATCH 14/23] Fix flaky Ec2tagWatcher tests --- lib/synapse/service_watcher/ec2tag/ec2tag.rb | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/synapse/service_watcher/ec2tag/ec2tag.rb b/lib/synapse/service_watcher/ec2tag/ec2tag.rb index 021fabc7..31ec6fd9 100644 --- a/lib/synapse/service_watcher/ec2tag/ec2tag.rb +++ b/lib/synapse/service_watcher/ec2tag/ec2tag.rb @@ -5,9 +5,7 @@ class Synapse::ServiceWatcher class Ec2tagWatcher < PollWatcher attr_reader :check_interval - def initialize(opts={}, synapse, reconfigure_callback) - super(opts, synapse, reconfigure_callback) - + def start(scheduler) region = @discovery['aws_region'] || ENV['AWS_REGION'] log.info "Connecting to EC2 region: #{region}" @@ -17,7 +15,9 @@ def initialize(opts={}, synapse, reconfigure_callback) secret_access_key: @discovery['aws_secret_access_key'] || ENV['AWS_SECRET_ACCESS_KEY'] ) log.info "synapse: ec2tag watcher looking for instances " + - "tagged with #{@discovery['tag_name']}=#{@discovery['tag_value']}" + "tagged with #{@discovery['tag_name']}=#{@discovery['tag_value']}" + + super(scheduler) end private From 2c4429ee40b6b176604ba9edae6324bd5d947d50 Mon Sep 17 00:00:00 2001 From: Rushy Panchal Date: Mon, 17 Aug 2020 15:47:42 -0400 Subject: [PATCH 15/23] Migrate ZookeeperDnsWatcher::Dns to using scheduler --- .../zookeeper_dns/zookeeper_dns.rb | 87 ++++++++----------- 1 file changed, 37 insertions(+), 50 deletions(-) diff --git a/lib/synapse/service_watcher/zookeeper_dns/zookeeper_dns.rb b/lib/synapse/service_watcher/zookeeper_dns/zookeeper_dns.rb index ae290916..e35e7c2a 100644 --- a/lib/synapse/service_watcher/zookeeper_dns/zookeeper_dns.rb +++ b/lib/synapse/service_watcher/zookeeper_dns/zookeeper_dns.rb @@ -2,7 +2,7 @@ require 'synapse/service_watcher/dns/dns' require 'synapse/service_watcher/zookeeper/zookeeper' -require 'thread' +require 'concurrent' # Watcher for watching Zookeeper for entries containing DNS names that are # continuously resolved to IP Addresses. The use case for this watcher is to @@ -36,13 +36,8 @@ class NewServers < Struct.new(:servers); end # refresh of the IP addresses. class CheckInterval; end - # Indicates that the DNS watcher should shut down. This is sent when - # stop is called. - class StopWatcher; end - # Saved instances of message types with contents that cannot vary. This # reduces object allocation. - STOP_WATCHER_MESSAGE = StopWatcher.new CHECK_INTERVAL_MESSAGE = CheckInterval.new end @@ -54,53 +49,45 @@ class Dns < Synapse::ServiceWatcher::DnsWatcher def initialize(opts={}, parent=nil, synapse, reconfigure_callback, message_queue) @message_queue = message_queue @parent = parent + @last_resolution = Concurrent::Atom.new(nil) super(opts, synapse, reconfigure_callback) end - def stop - @message_queue.push(Messages::STOP_WATCHER_MESSAGE) - end - - def watch - last_resolution = nil - while true - # Blocks on message queue, the message will be a signal to stop - # watching, to check a new set of servers from ZK, or to re-resolve - # the DNS (triggered every check_interval seconds) - message = @message_queue.pop - - log.debug "synapse: received message #{message.inspect}" - - case message - when Messages::StopWatcher - break - when Messages::NewServers - self.discovery_servers = message.servers - when Messages::CheckInterval - # Proceed to re-resolve the DNS - else - raise Messages::InvalidMessageError, - "Received unrecognized message: #{message.inspect}" - end + def discover + # Blocks on message queue, the message will be a signal to stop + # watching, to check a new set of servers from ZK, or to re-resolve + # the DNS (triggered every check_interval seconds) + message = @message_queue.pop + + log.debug "synapse: received message #{message.inspect}" + + case message + when Messages::NewServers + self.discovery_servers = message.servers + when Messages::CheckInterval + # Proceed to re-resolve the DNS + else + raise Messages::InvalidMessageError, + "Received unrecognized message: #{message.inspect}" + end - # Empty servers means we haven't heard back from ZK yet or ZK is - # empty. This should only occur if we don't get results from ZK - # within check_interval seconds or if ZK is empty. - if self.discovery_servers.nil? || self.discovery_servers.empty? - log.warn "synapse: no backends for service #{@name}" - else - # Resolve DNS names with the nameserver - current_resolution = resolve_servers - unless last_resolution == current_resolution - last_resolution = current_resolution - configure_backends(last_resolution) - - # Propagate revision updates down to ZookeeperDnsWatcher, so - # that stanza cache can work properly. - @revision += 1 - @parent.reconfigure! unless @parent.nil? - end + # Empty servers means we haven't heard back from ZK yet or ZK is + # empty. This should only occur if we don't get results from ZK + # within check_interval seconds or if ZK is empty. + if self.discovery_servers.nil? || self.discovery_servers.empty? + log.warn "synapse: no backends for service #{@name}" + else + # Resolve DNS names with the nameserver + current_resolution = resolve_servers + unless @last_resolution.value == current_resolution + last_resolution.reset(current_resolution) + configure_backends(current_resolution) + + # Propagate revision updates down to ZookeeperDnsWatcher, so + # that stanza cache can work properly. + @revision += 1 + @parent.reconfigure! unless @parent.nil? end end end @@ -119,8 +106,8 @@ def start(scheduler) @dns = make_dns_watcher(@message_queue) @zk = make_zookeeper_watcher(@message_queue) - @zk.start - @dns.start + @zk.start(scheduler) + @dns.start(scheduler) super(scheduler) end From b21a41a255f0378aaf48bf4f9531a836e943b40e Mon Sep 17 00:00:00 2001 From: Rushy Panchal Date: Mon, 17 Aug 2020 17:17:46 -0400 Subject: [PATCH 16/23] Fix Dns watchers --- lib/synapse/service_watcher/dns/dns.rb | 12 ++--------- .../zookeeper_dns/zookeeper_dns.rb | 21 +++++++++---------- 2 files changed, 12 insertions(+), 21 deletions(-) diff --git a/lib/synapse/service_watcher/dns/dns.rb b/lib/synapse/service_watcher/dns/dns.rb index 5ec8357f..7a21e13a 100644 --- a/lib/synapse/service_watcher/dns/dns.rb +++ b/lib/synapse/service_watcher/dns/dns.rb @@ -1,7 +1,5 @@ require "synapse/service_watcher/base/poll" -require 'thread' -require 'concurrent' require 'resolv' class Synapse::ServiceWatcher @@ -9,13 +7,12 @@ class DnsWatcher < PollWatcher def initialize(opts={}, synapse, reconfigure_callback) super(opts, synapse, reconfigure_callback) - @last_resolution = Concurrent::Atom.new(nil) @nameserver = @discovery['nameserver'] @check_interval = @discovery['check_interval'] || 30.0 end def ping? - @watcher.alive? && !(resolver.getaddresses('airbnb.com').empty?) + !(resolver.getaddresses('airbnb.com').empty?) end def discovery_servers @@ -31,12 +28,7 @@ def validate_discovery_opts end def discover - current_resolution = resolve_servers - - unless @last_resolution.value == current_resolution - @last_resolution.reset(current_resolution) - configure_backends(current_resolution) - end + configure_backends(resolve_servers) end IP_REGEX = Regexp.union([Resolv::IPv4::Regex, Resolv::IPv6::Regex]) diff --git a/lib/synapse/service_watcher/zookeeper_dns/zookeeper_dns.rb b/lib/synapse/service_watcher/zookeeper_dns/zookeeper_dns.rb index e35e7c2a..19a38f6d 100644 --- a/lib/synapse/service_watcher/zookeeper_dns/zookeeper_dns.rb +++ b/lib/synapse/service_watcher/zookeeper_dns/zookeeper_dns.rb @@ -2,7 +2,7 @@ require 'synapse/service_watcher/dns/dns' require 'synapse/service_watcher/zookeeper/zookeeper' -require 'concurrent' +require 'thread' # Watcher for watching Zookeeper for entries containing DNS names that are # continuously resolved to IP Addresses. The use case for this watcher is to @@ -49,16 +49,19 @@ class Dns < Synapse::ServiceWatcher::DnsWatcher def initialize(opts={}, parent=nil, synapse, reconfigure_callback, message_queue) @message_queue = message_queue @parent = parent - @last_resolution = Concurrent::Atom.new(nil) super(opts, synapse, reconfigure_callback) end def discover - # Blocks on message queue, the message will be a signal to stop - # watching, to check a new set of servers from ZK, or to re-resolve + # The message will be to check a new set of servers from ZK, or to re-resolve # the DNS (triggered every check_interval seconds) - message = @message_queue.pop + begin + message = @message_queue.pop(false) + rescue ThreadError + # no item from the queue + return + end log.debug "synapse: received message #{message.inspect}" @@ -79,11 +82,7 @@ def discover log.warn "synapse: no backends for service #{@name}" else # Resolve DNS names with the nameserver - current_resolution = resolve_servers - unless @last_resolution.value == current_resolution - last_resolution.reset(current_resolution) - configure_backends(current_resolution) - + if configure_backends(resolve_servers) # Propagate revision updates down to ZookeeperDnsWatcher, so # that stanza cache can work properly. @revision += 1 @@ -113,7 +112,7 @@ def start(scheduler) end def ping? - @watcher.alive? && @dns.ping? && @zk.ping? + @dns.ping? && @zk.ping? end def stop From d198e380d588592e4df8884f81a23075df3fcac3 Mon Sep 17 00:00:00 2001 From: Rushy Panchal Date: Mon, 17 Aug 2020 18:36:08 -0400 Subject: [PATCH 17/23] Cut down threads by 1/4 --- lib/synapse.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/synapse.rb b/lib/synapse.rb index 7c0279ea..8a8f105d 100644 --- a/lib/synapse.rb +++ b/lib/synapse.rb @@ -40,7 +40,7 @@ def initialize(opts={}) # configuration is initially enabled to configure on first loop @config_updated = AtomicValue.new(true) - executor = Concurrent::ThreadPoolExecutor.new(:min_threads => 1, :max_threads => @service_watchers.length) + executor = Concurrent::ThreadPoolExecutor.new(:min_threads => 1, :max_threads => [2, @service_watchers.length / 4].max) @task_scheduler = Concurrent::TimerSet.new(:executor => executor) # Any exceptions in the watcher threads should wake the main thread so From 241834628b8248c9ed25236810b4fa89a5b2f0a0 Mon Sep 17 00:00:00 2001 From: Rushy Panchal Date: Wed, 19 Aug 2020 11:15:11 -0400 Subject: [PATCH 18/23] reduce indentation in comment --- lib/synapse/service_watcher/README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/synapse/service_watcher/README.md b/lib/synapse/service_watcher/README.md index ec8dc6bc..6686cb4d 100644 --- a/lib/synapse/service_watcher/README.md +++ b/lib/synapse/service_watcher/README.md @@ -16,9 +16,9 @@ class Synapse::ServiceWatcher class MyWatcher < BaseWatcher def start(scheduler) # write code which begins running service discovery. - # Instead of running a background thread, you can use scheduler - # which is an instance of Concurrent::TimerSet to schedule tasks on a thread pool. - # (http://ruby-concurrency.github.io/concurrent-ruby/1.1.5/Concurrent/TimerSet.html) + # Instead of running a background thread, you can use scheduler + # which is an instance of Concurrent::TimerSet to schedule tasks on a thread pool. + # (http://ruby-concurrency.github.io/concurrent-ruby/1.1.5/Concurrent/TimerSet.html) end def stop From 9e215fd221de7d327a0d44b25a629d2e54296530 Mon Sep 17 00:00:00 2001 From: Rushy Panchal Date: Wed, 19 Aug 2020 12:40:08 -0400 Subject: [PATCH 19/23] Remove extra code from ZookeeperWatcher --- lib/synapse/service_watcher/zookeeper/zookeeper.rb | 5 ----- 1 file changed, 5 deletions(-) diff --git a/lib/synapse/service_watcher/zookeeper/zookeeper.rb b/lib/synapse/service_watcher/zookeeper/zookeeper.rb index 54ed4b8b..d1122050 100644 --- a/lib/synapse/service_watcher/zookeeper/zookeeper.rb +++ b/lib/synapse/service_watcher/zookeeper/zookeeper.rb @@ -1,7 +1,5 @@ require "synapse/service_watcher/base/base" -require 'synapse/atomic' -require 'thread' require 'zk' require 'zookeeper' require 'base64' @@ -57,7 +55,6 @@ def initialize(opts={}, synapse, reconfigure_callback) @zk = nil @watcher = nil @thread = nil - @should_exit = Synapse::AtomicValue.new(false) end def start(scheduler) @@ -74,8 +71,6 @@ def start(scheduler) def stop log.warn "synapse: zookeeper watcher exiting" - @should_exit.set(true) - zk_teardown do @watcher.unsubscribe unless @watcher.nil? @watcher = nil From 699be2cac5fca4ccedb5e0ffd18ea2f2890e5261 Mon Sep 17 00:00:00 2001 From: Rushy Panchal Date: Wed, 19 Aug 2020 17:35:59 -0400 Subject: [PATCH 20/23] Only register on_expired_session handler once --- .../service_watcher/zookeeper/zookeeper.rb | 29 ++++++++++++------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/lib/synapse/service_watcher/zookeeper/zookeeper.rb b/lib/synapse/service_watcher/zookeeper/zookeeper.rb index d1122050..6bf9b5c7 100644 --- a/lib/synapse/service_watcher/zookeeper/zookeeper.rb +++ b/lib/synapse/service_watcher/zookeeper/zookeeper.rb @@ -4,6 +4,7 @@ require 'zookeeper' require 'base64' require 'objspace' +require 'concurrent' class Synapse::ServiceWatcher class ZookeeperWatcher < BaseWatcher @@ -23,6 +24,7 @@ class ZookeeperWatcher < BaseWatcher @@zk_pool = {} @@zk_pool_count = {} + @@zk_should_exit = Concurrent::AtomicBoolean.new(false) @@zk_pool_lock = Mutex.new def initialize(opts={}, synapse, reconfigure_callback) @@ -54,7 +56,6 @@ def initialize(opts={}, synapse, reconfigure_callback) @zk = nil @watcher = nil - @thread = nil end def start(scheduler) @@ -78,6 +79,8 @@ def stop end def ping? + stop if @@zk_should_exit.true? + # @zk being nil implies no session *or* a lost session, do not remove # the check on @zk being truthy # if the client is in any of the three states: associating, connecting, connected @@ -381,9 +384,21 @@ def zk_connect(&bootstrap) # https://github.com/zk-ruby/zookeeper/blob/80a88e3179fd1d526f7e62a364ab5760f5f5da12/ext/zkrb.c @@zk_pool[@zk_hosts] = with_retry(@retry_policy.merge({'retriable_errors' => RuntimeError})) do |attempts| log.info "synapse: creating pooled connection to #{@zk_hosts} for #{attempts} times" - # zk session timeout is 2 * receive_timeout_msec (as of zookeeper-1.4.x) + # zk session timeout is 2 * receive_timeout_msec (as of zookeeper-1.4.x) # i.e. 18000 means 36 sec - ZK.new(@zk_hosts, :timeout => 5, :receive_timeout_msec => 18000, :thread => :per_callback) + zk = ZK.new(@zk_hosts, :timeout => 5, :receive_timeout_msec => 18000, :thread => :per_callback) + + # handle session expiry -- mark that all watchers should shutdown now. + # since this eventually causes Synapse to shutdown, we do not scope the + # flag to a single client (or watcher). + zk.on_expired_session do + statsd_increment('synapse.watcher.zk.session.expired', ["zk_cluster:#{@zk_cluster}", "service_name:#{@name}"]) + log.warn "synapse: ZK client session expired #{@name}" + + @@zk_should_exit.make_true + end + + zk end @@zk_pool_count[@zk_hosts] = 1 log.info "synapse: successfully created zk connection to #{@zk_hosts}" @@ -398,14 +413,6 @@ def zk_connect(&bootstrap) @zk = @@zk_pool[@zk_hosts] log.info "synapse: retrieved zk connection to #{@zk_hosts}" - # handle session expiry -- by cleaning up zk, this will make `ping?` - # fail and so synapse will exit - @zk.on_expired_session do - statsd_increment('synapse.watcher.zk.session.expired', ["zk_cluster:#{@zk_cluster}", "service_name:#{@name}"]) - log.warn "synapse: ZK client session expired #{@name}" - stop - end - bootstrap.call end end From 53b75764d4544c4731b8cf281be9cf41ab6d83a3 Mon Sep 17 00:00:00 2001 From: Rushy Panchal Date: Mon, 24 Aug 2020 13:11:06 -0400 Subject: [PATCH 21/23] Add unit tests for on_expired_session --- .../synapse/service_watcher_zookeeper_spec.rb | 49 ++++++++++++++++--- 1 file changed, 43 insertions(+), 6 deletions(-) diff --git a/spec/lib/synapse/service_watcher_zookeeper_spec.rb b/spec/lib/synapse/service_watcher_zookeeper_spec.rb index 4a7bbded..ebfae1b2 100644 --- a/spec/lib/synapse/service_watcher_zookeeper_spec.rb +++ b/spec/lib/synapse/service_watcher_zookeeper_spec.rb @@ -238,6 +238,16 @@ expect(ZK).to receive(:new).exactly(2).and_raise(RuntimeError) expect { subject.send(:zk_connect) }.to raise_error(RuntimeError) end + + it 'only sets one callback for expired session' do + allow(ZK).to receive(:new).exactly(:once).and_return(mock_zk) + expect(mock_zk).to receive(:on_expired_session).exactly(:once) + + x = Synapse::ServiceWatcher::ZookeeperWatcher.new(config, mock_synapse, ->(*args) {}) + y = Synapse::ServiceWatcher::ZookeeperWatcher.new(config, mock_synapse, ->(*args) {}) + x.start(mock_scheduler) + y.start(mock_scheduler) + end end describe 'start_discovery' do @@ -454,6 +464,28 @@ end end + describe "#ping" do + before :each do + Synapse::ServiceWatcher::ZookeeperWatcher.class_variable_set(:@@zk_pool, {}) + end + + context 'after on_expired_session' do + it 'calls stop' do + allow(ZK).to receive(:new).and_return(mock_zk) + allow(mock_zk).to receive(:connecting?).and_return(true) + allow(mock_zk).to receive(:close!) + + expect(mock_zk).to receive(:on_expired_session) { |*args, &block| + block.call + } + expect(subject).to receive(:stop).exactly(:once).and_call_original + + expect { subject.start(mock_scheduler) }.not_to raise_error + expect(subject.ping?).to eq(false) + end + end + end + describe "discover" do let(:service_data) { { @@ -655,12 +687,17 @@ end describe '#ping' do - before :each do - subject.instance_variable_set(:@zk, mock_zk) - allow(mock_zk).to receive(:connecting?).and_return(false) - allow(mock_zk).to receive(:associating?).and_return(false) - allow(mock_zk).to receive(:connected?).and_return(false) - end + let(:mock_zk) { + zk = double(ZK) + Synapse::ServiceWatcher::ZookeeperPollWatcher.class_variable_set(:@@zk_pool, {}) + Synapse::ServiceWatcher::ZookeeperPollWatcher.class_variable_get(:@@zk_should_exit).make_false + + subject.instance_variable_set(:@zk, zk) + allow(zk).to receive(:connecting?).and_return(false) + allow(zk).to receive(:associating?).and_return(false) + allow(zk).to receive(:connected?).and_return(false) + zk + } it 'checks zookeeper' do expect(mock_zk).to receive(:connecting?) From 7ab5cc50bd35f4d1aa73e5e721f777ef4b2f0b51 Mon Sep 17 00:00:00 2001 From: Rushy Panchal Date: Mon, 24 Aug 2020 13:11:20 -0400 Subject: [PATCH 22/23] Return false, not nil, if @zk is nil --- lib/synapse/service_watcher/zookeeper/zookeeper.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/synapse/service_watcher/zookeeper/zookeeper.rb b/lib/synapse/service_watcher/zookeeper/zookeeper.rb index 6bf9b5c7..68d83d0d 100644 --- a/lib/synapse/service_watcher/zookeeper/zookeeper.rb +++ b/lib/synapse/service_watcher/zookeeper/zookeeper.rb @@ -85,7 +85,7 @@ def ping? # the check on @zk being truthy # if the client is in any of the three states: associating, connecting, connected # we consider it alive. this can avoid synapse restart on short network dis-connection - @zk && (@zk.associating? || @zk.connecting? || @zk.connected?) + !@zk.nil? && (@zk.associating? || @zk.connecting? || @zk.connected?) end def watching? From 3cdc1481d819b8a620b853cbb44bdb6dd0ba673a Mon Sep 17 00:00:00 2001 From: Rushy Panchal Date: Tue, 25 Aug 2020 14:55:42 -0400 Subject: [PATCH 23/23] Bump version to 0.18.5 --- lib/synapse/version.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/synapse/version.rb b/lib/synapse/version.rb index 9f12066a..24c1becb 100644 --- a/lib/synapse/version.rb +++ b/lib/synapse/version.rb @@ -1,3 +1,3 @@ module Synapse - VERSION = "0.18.4" + VERSION = "0.18.5" end