Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ec2tags selector #89

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion lib/synapse.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ def initialize(opts={})
# Any exceptions in the watcher threads should wake the main thread so
# that we can fail fast.
Thread.abort_on_exception = true
if opts['check_interval']
@check_interval = opts['check_interval'].to_i
else
@check_interval = 15
end
if opts['max_checks']
@max_checks = opts['max_checks'].to_i
else
@max_checks = 0
end

log.debug "synapse: completed init"
end
Expand All @@ -50,10 +60,14 @@ def run
log.info "synapse: regenerating haproxy config"
@haproxy.update_config(@service_watchers)
else
sleep 1
sleep @check_interval
end

loops += 1
if @max_checks != 0 and loops > @max_checks
log.info "synapse: exiting after #{loops} loops"
break
end
log.debug "synapse: still running at #{Time.now}" if (loops % 60) == 0
end

Expand Down
106 changes: 83 additions & 23 deletions lib/synapse/service_watcher/ec2tag.rb
Original file line number Diff line number Diff line change
@@ -1,15 +1,41 @@
require 'synapse/service_watcher/base'
require 'aws-sdk'
require 'ostruct'

InstanceCache = Object.new
class << InstanceCache
def __init__
@i_time = Time.now
@instances = nil
@cacheTimeout = 60
@mutex = Mutex.new
end
def set(instances)
@instances = instances
@i_time = Time.now
end
def get()
if Time.now - @i_time > (@cacheTimeout + rand(10))
@instances = nil
end
@instances
end
def get_mutex()
@mutex
end
end

InstanceCache.__init__()
module Synapse
class EC2Watcher < BaseWatcher


attr_reader :check_interval

def start
region = @discovery['aws_region'] || ENV['AWS_REGION']
log.info "Connecting to EC2 region: #{region}"

AWS.config(:logger => log)
@ec2 = AWS::EC2.new(
region: region,
access_key_id: @discovery['aws_access_key_id'] || ENV['AWS_ACCESS_KEY_ID'],
Expand All @@ -18,9 +44,10 @@ def start
@check_interval = @discovery['check_interval'] || 15.0

log.info "synapse: ec2tag watcher looking for instances " +
"tagged with #{@discovery['tag_name']}=#{@discovery['tag_value']}"
"tagged with #{@discovery['tag_name']}=#{@discovery['tag_value']} #{@discovery['selector']} "

@watcher = Thread.new { watch }
instances = instances_with_tags(@discovery['tag_name'], @discovery['tag_value'])
end

private
Expand Down Expand Up @@ -61,11 +88,11 @@ def watch
current_backends = discover_instances

if last_backends != current_backends
log.info "synapse: ec2tag watcher backends have changed."
log.info "synapse: ec2tag watcher #{@name} backends have changed."
last_backends = current_backends
configure_backends(current_backends)
else
log.info "synapse: ec2tag watcher backends are unchanged."
log.info "synapse: ec2tag watcher #{@name} backends are unchanged."
end

sleep_until_next_check(start)
Expand All @@ -86,30 +113,63 @@ def sleep_until_next_check(start_time)
end

def discover_instances
AWS.memoize do
instances = instances_with_tags(@discovery['tag_name'], @discovery['tag_value'])

new_backends = []

# choice of private_dns_name, dns_name, private_ip_address or
# ip_address, for now, just stick with the private fields.
instances.each do |instance|
new_backends << {
'name' => instance.private_dns_name,
'host' => instance.private_ip_address,
'port' => @haproxy['server_port_override'],
}
if @discovery['selector']
instances = eval("instances.select { |i| #{@discovery['selector']}}")
end

new_backends
end
# do not want to update the cached objects
inst = []
# add server port
instances.each { | i |
iclone = OpenStruct.new(i.to_h)
iclone['port'] = @haproxy['server_port_override']
inst << iclone
}
# sort so that the back end are generated in the same way
inst.sort_by! { |i| i['name'] }
inst
end

def instances_with_tags(tag_name, tag_value)
@ec2.instances
.tagged(tag_name)
.tagged_values(tag_value)
.select { |i| i.status == :running }
InstanceCache.get_mutex().synchronize do
inst = InstanceCache.get()
if inst.nil?
tries = 0
ntries = 4
AWS.memoize do
while tries < ntries do
log.info ("AWS API Call for #{tag_name}, #{tag_value}")
begin
instances = @ec2.instances
.tagged(tag_name)
.tagged_values(tag_value)
.select { |i| i.status == :running }
break
rescue Exception => e
if e.message.include?("Request limit exceeded")
sleeping = rand(6) + 4
log.warn ("#{e.message} retry #{tries} after #{sleeping} sec")
tries += 1
sleep(sleeping)
else
raise e
end
end
end
inst = []
instances.each { | i |
inst_info = OpenStruct.new({'tags' => i.tags.to_h,
'host' => i.private_ip_address,
'name' => i.tags["Name"]})
inst_info.freeze
inst << inst_info
}
inst.freeze
end
InstanceCache.set(inst)
end
return inst
end
end

def configure_backends(new_backends)
Expand Down