diff --git a/lib/synapse.rb b/lib/synapse.rb index 0b18fd00..4584f7a8 100644 --- a/lib/synapse.rb +++ b/lib/synapse.rb @@ -27,6 +27,16 @@ def initialize(opts={}) # Any exceptions in the watcher threads should wake the main thread so # that we can fail fast. Thread.abort_on_exception = true + if opts['check_interval'] + @check_interval = opts['check_interval'].to_i + else + @check_interval = 15 + end + if opts['max_checks'] + @max_checks = opts['max_checks'].to_i + else + @max_checks = 0 + end log.debug "synapse: completed init" end @@ -50,10 +60,14 @@ def run log.info "synapse: regenerating haproxy config" @haproxy.update_config(@service_watchers) else - sleep 1 + sleep @check_interval end loops += 1 + if @max_checks != 0 and loops > @max_checks + log.info "synapse: exiting after #{loops} loops" + break + end log.debug "synapse: still running at #{Time.now}" if (loops % 60) == 0 end diff --git a/lib/synapse/service_watcher/ec2tag.rb b/lib/synapse/service_watcher/ec2tag.rb index a3319c26..0d309f7b 100644 --- a/lib/synapse/service_watcher/ec2tag.rb +++ b/lib/synapse/service_watcher/ec2tag.rb @@ -1,15 +1,41 @@ require 'synapse/service_watcher/base' require 'aws-sdk' +require 'ostruct' + +InstanceCache = Object.new +class << InstanceCache + def __init__ + @i_time = Time.now + @instances = nil + @cacheTimeout = 60 + @mutex = Mutex.new + end + def set(instances) + @instances = instances + @i_time = Time.now + end + def get() + if Time.now - @i_time > (@cacheTimeout + rand(10)) + @instances = nil + end + @instances + end + def get_mutex() + @mutex + end +end +InstanceCache.__init__() module Synapse class EC2Watcher < BaseWatcher + attr_reader :check_interval def start region = @discovery['aws_region'] || ENV['AWS_REGION'] log.info "Connecting to EC2 region: #{region}" - + AWS.config(:logger => log) @ec2 = AWS::EC2.new( region: region, access_key_id: @discovery['aws_access_key_id'] || ENV['AWS_ACCESS_KEY_ID'], @@ -18,9 +44,10 @@ def start @check_interval = @discovery['check_interval'] || 15.0 log.info "synapse: ec2tag watcher looking for instances " + - "tagged with #{@discovery['tag_name']}=#{@discovery['tag_value']}" + "tagged with #{@discovery['tag_name']}=#{@discovery['tag_value']} #{@discovery['selector']} " @watcher = Thread.new { watch } + instances = instances_with_tags(@discovery['tag_name'], @discovery['tag_value']) end private @@ -61,11 +88,11 @@ def watch current_backends = discover_instances if last_backends != current_backends - log.info "synapse: ec2tag watcher backends have changed." + log.info "synapse: ec2tag watcher #{@name} backends have changed." last_backends = current_backends configure_backends(current_backends) else - log.info "synapse: ec2tag watcher backends are unchanged." + log.info "synapse: ec2tag watcher #{@name} backends are unchanged." end sleep_until_next_check(start) @@ -86,30 +113,63 @@ def sleep_until_next_check(start_time) end def discover_instances - AWS.memoize do instances = instances_with_tags(@discovery['tag_name'], @discovery['tag_value']) - - new_backends = [] - - # choice of private_dns_name, dns_name, private_ip_address or - # ip_address, for now, just stick with the private fields. - instances.each do |instance| - new_backends << { - 'name' => instance.private_dns_name, - 'host' => instance.private_ip_address, - 'port' => @haproxy['server_port_override'], - } + if @discovery['selector'] + instances = eval("instances.select { |i| #{@discovery['selector']}}") end - - new_backends - end + # do not want to update the cached objects + inst = [] + # add server port + instances.each { | i | + iclone = OpenStruct.new(i.to_h) + iclone['port'] = @haproxy['server_port_override'] + inst << iclone + } + # sort so that the back end are generated in the same way + inst.sort_by! { |i| i['name'] } + inst end def instances_with_tags(tag_name, tag_value) - @ec2.instances - .tagged(tag_name) - .tagged_values(tag_value) - .select { |i| i.status == :running } + InstanceCache.get_mutex().synchronize do + inst = InstanceCache.get() + if inst.nil? + tries = 0 + ntries = 4 + AWS.memoize do + while tries < ntries do + log.info ("AWS API Call for #{tag_name}, #{tag_value}") + begin + instances = @ec2.instances + .tagged(tag_name) + .tagged_values(tag_value) + .select { |i| i.status == :running } + break + rescue Exception => e + if e.message.include?("Request limit exceeded") + sleeping = rand(6) + 4 + log.warn ("#{e.message} retry #{tries} after #{sleeping} sec") + tries += 1 + sleep(sleeping) + else + raise e + end + end + end + inst = [] + instances.each { | i | + inst_info = OpenStruct.new({'tags' => i.tags.to_h, + 'host' => i.private_ip_address, + 'name' => i.tags["Name"]}) + inst_info.freeze + inst << inst_info + } + inst.freeze + end + InstanceCache.set(inst) + end + return inst + end end def configure_backends(new_backends)