Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce thread usage #330

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion lib/synapse.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ def initialize(opts={})
# configuration is initially enabled to configure on first loop
@config_updated = AtomicValue.new(true)

executor = Concurrent::ThreadPoolExecutor.new(:min_threads => 1, :max_threads => [2, @service_watchers.length / 4].max)
@task_scheduler = Concurrent::TimerSet.new(:executor => executor)

# Any exceptions in the watcher threads should wake the main thread so
# that we can fail fast.
Thread.abort_on_exception = true
Expand All @@ -59,7 +62,7 @@ def run
statsd_time('synapse.watchers.start.time') do
@service_watchers.map do |watcher|
begin
watcher.start
watcher.start(@task_scheduler)
statsd_increment("synapse.watcher.start", ['start_result:success', "watcher_name:#{watcher.name}"])
rescue Exception => e
statsd_increment("synapse.watcher.start", ['start_result:fail', "watcher_name:#{watcher.name}", "exception_name:#{e.class.name}", "exception_message:#{e.message}"])
Expand Down Expand Up @@ -119,6 +122,8 @@ def run
raise e
end
end

@task_scheduler.kill
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this kill async or not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it is async from the docs

Begin an immediate shutdown. In-progress tasks will be allowed to complete but enqueued tasks will be dismissed and no new tasks will be accepted. Has no additional effect if the thread pool is not running.

statsd_increment('synapse.stop', ['stop_avenue:clean', 'stop_location:main_loop'])
end

Expand Down
7 changes: 5 additions & 2 deletions lib/synapse/service_watcher/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@ require "synapse/service_watcher/base/base"

class Synapse::ServiceWatcher
class MyWatcher < BaseWatcher
def start
# write code which begins running service discovery
def start(scheduler)
# write code which begins running service discovery.
# Instead of running a background thread, you can use scheduler
# which is an instance of Concurrent::TimerSet to schedule tasks on a thread pool.
# (http://ruby-concurrency.github.io/concurrent-ruby/1.1.5/Concurrent/TimerSet.html)
end

def stop
Expand Down
2 changes: 1 addition & 1 deletion lib/synapse/service_watcher/base/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def haproxy
end

# this should be overridden to actually start your watcher
def start
def start(scheduler)
log.info "synapse: starting stub watcher; this means doing nothing at all!"
end

Expand Down
44 changes: 44 additions & 0 deletions lib/synapse/service_watcher/base/poll.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
require 'synapse/service_watcher/base/base'

require 'concurrent'

class Synapse::ServiceWatcher
class PollWatcher < BaseWatcher
def initialize(opts={}, synapse, reconfigure_callback)
super(opts, synapse, reconfigure_callback)

@check_interval = @discovery['check_interval'] || 15.0
@should_exit = Concurrent::AtomicBoolean.new(false)
end

def start(scheduler)
reset_schedule = Proc.new {
discover

# Schedule the next task until we should exit
unless @should_exit.true?
scheduler.post(@check_interval, &reset_schedule)
end
}

# Execute the first discover immediately
scheduler.post(0, &reset_schedule)
end

def stop
@should_exit.make_true
end

private
def validate_discovery_opts
raise ArgumentError, "invalid discovery method '#{@discovery['method']}' for poll watcher" \
unless @discovery['method'] == 'poll'

log.warn "synapse: warning: a stub watcher with no default servers is pretty useless" if @default_servers.empty?
end

def discover
log.info "base poll watcher discover"
end
end
end
46 changes: 9 additions & 37 deletions lib/synapse/service_watcher/dns/dns.rb
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
require "synapse/service_watcher/base/base"
require "synapse/service_watcher/base/poll"

require 'thread'
require 'resolv'

class Synapse::ServiceWatcher
class DnsWatcher < BaseWatcher
def start
@check_interval = @discovery['check_interval'] || 30.0
@nameserver = @discovery['nameserver']
class DnsWatcher < PollWatcher
def initialize(opts={}, synapse, reconfigure_callback)
super(opts, synapse, reconfigure_callback)

@watcher = Thread.new do
watch
end
@nameserver = @discovery['nameserver']
@check_interval = @discovery['check_interval'] || 30.0
end

def ping?
@watcher.alive? && !(resolver.getaddresses('airbnb.com').empty?)
!(resolver.getaddresses('airbnb.com').empty?)
end

def discovery_servers
Expand All @@ -30,33 +27,8 @@ def validate_discovery_opts
if discovery_servers.empty?
end

def watch
last_resolution = resolve_servers
configure_backends(last_resolution)
until @should_exit
begin
start = Time.now
current_resolution = resolve_servers
unless last_resolution == current_resolution
last_resolution = current_resolution
configure_backends(last_resolution)
end

sleep_until_next_check(start)
rescue => e
log.warn "Error in watcher thread: #{e.inspect}"
log.warn e.backtrace
end
end

log.info "synapse: dns watcher exited successfully"
end

def sleep_until_next_check(start_time)
sleep_time = @check_interval - (Time.now - start_time)
if sleep_time > 0.0
sleep(sleep_time)
end
def discover
configure_backends(resolve_servers)
end

IP_REGEX = Regexp.union([Resolv::IPv4::Regex, Resolv::IPv6::Regex])
Expand Down
33 changes: 4 additions & 29 deletions lib/synapse/service_watcher/docker/docker.rb
Original file line number Diff line number Diff line change
@@ -1,15 +1,8 @@
require "synapse/service_watcher/base/base"
require "synapse/service_watcher/base/poll"
require 'docker'

class Synapse::ServiceWatcher
class DockerWatcher < BaseWatcher
def start
@check_interval = @discovery['check_interval'] || 15.0
@watcher = Thread.new do
watch
end
end

class DockerWatcher < PollWatcher
private
def validate_discovery_opts
raise ArgumentError, "invalid discovery method #{@discovery['method']}" \
Expand All @@ -22,26 +15,8 @@ def validate_discovery_opts
if @discovery['container_port'].nil?
end

def watch
until @should_exit
begin
start = Time.now
set_backends(containers)
sleep_until_next_check(start)
rescue Exception => e
log.warn "synapse: error in watcher thread: #{e.inspect}"
log.warn e.backtrace
end
end

log.info "synapse: docker watcher exited successfully"
end

def sleep_until_next_check(start_time)
sleep_time = @check_interval - (Time.now - start_time)
if sleep_time > 0.0
sleep(sleep_time)
end
def discover
set_backends(containers)
end

def rewrite_container_ports(ports)
Expand Down
39 changes: 8 additions & 31 deletions lib/synapse/service_watcher/ec2tag/ec2tag.rb
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
require 'synapse/service_watcher/base/base'
require 'synapse/service_watcher/base/poll'
require 'aws-sdk'

class Synapse::ServiceWatcher
class Ec2tagWatcher < BaseWatcher

class Ec2tagWatcher < PollWatcher
attr_reader :check_interval

def start
def start(scheduler)
region = @discovery['aws_region'] || ENV['AWS_REGION']
log.info "Connecting to EC2 region: #{region}"

Expand All @@ -15,16 +14,13 @@ def start
access_key_id: @discovery['aws_access_key_id'] || ENV['AWS_ACCESS_KEY_ID'],
secret_access_key: @discovery['aws_secret_access_key'] || ENV['AWS_SECRET_ACCESS_KEY'] )

@check_interval = @discovery['check_interval'] || 15.0

log.info "synapse: ec2tag watcher looking for instances " +
"tagged with #{@discovery['tag_name']}=#{@discovery['tag_value']}"
"tagged with #{@discovery['tag_name']}=#{@discovery['tag_value']}"

@watcher = Thread.new { watch }
super(scheduler)
end

private

def validate_discovery_opts
# Required, via options only.
raise ArgumentError, "invalid discovery method #{@discovery['method']}" \
Expand Down Expand Up @@ -52,28 +48,9 @@ def validate_discovery_opts
end
end

def watch
until @should_exit
begin
start = Time.now
if set_backends(discover_instances)
log.info "synapse: ec2tag watcher backends have changed."
end
rescue Exception => e
log.warn "synapse: error in ec2tag watcher thread: #{e.inspect}"
log.warn e.backtrace
ensure
sleep_until_next_check(start)
end
end

log.info "synapse: ec2tag watcher exited successfully"
end

def sleep_until_next_check(start_time)
sleep_time = check_interval - (Time.now - start_time)
if sleep_time > 0.0
sleep(sleep_time)
def discover
if set_backends(discover_instances)
log.info "synapse: ec2tag watcher backends have changed."
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/synapse/service_watcher/marathon/marathon.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

class Synapse::ServiceWatcher
class MarathonWatcher < BaseWatcher
def start
def start(scheduler)
@check_interval = @discovery['check_interval'] || 10.0
@connection = nil
@watcher = Thread.new { sleep splay; watch }
Expand Down
4 changes: 2 additions & 2 deletions lib/synapse/service_watcher/multi/multi.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ def initialize(opts={}, synapse, reconfigure_callback)
-> { resolver_notification })
end

def start
def start(scheduler)
log.info "synapse: starting multi watcher"
statsd_increment("synapse.watcher.multi.start")

@watchers.values.each do |w|
w.start
w.start(scheduler)
end

@resolver.start
Expand Down
Loading