-
Notifications
You must be signed in to change notification settings - Fork 251
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Etcd service watcher #58
base: master
Are you sure you want to change the base?
Changes from all commits
9323300
5fc104f
35b1a25
479d393
16567de
9426ba7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,6 @@ | ||
source 'https://rubygems.org' | ||
|
||
gem 'docker-api', :require => 'docker' | ||
|
||
# Specify your gem's dependencies in synapse.gemspec | ||
gemspec |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,166 @@ | ||
require "synapse/service_watcher/base" | ||
|
||
require 'etcd' | ||
|
||
module Synapse | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be class Synapse::ServiceWatcher Also can you add this watcher to the auto creation test? |
||
class EtcdWatcher < BaseWatcher | ||
NUMBERS_RE = /^\d+$/ | ||
|
||
def start | ||
@etcd_hosts = @discovery['hosts'].shuffle | ||
|
||
log.info "synapse: starting etcd watcher #{@name} @ hosts: #{@discovery['hosts']}, path: #{@discovery['path']}" | ||
@should_exit = false | ||
|
||
@etcd_hosts.each do |h| | ||
host, port = h.split(':') | ||
port = port || 4003 | ||
@etcd = ::Etcd.client(:host => host, :port => port) | ||
|
||
connected = | ||
begin | ||
@etcd.leader | ||
rescue | ||
false | ||
end | ||
|
||
break if connected | ||
end | ||
|
||
# call the callback to bootstrap the process | ||
discover | ||
@synapse.reconfigure! | ||
@watcher = Thread.new do | ||
watch | ||
end | ||
end | ||
|
||
def stop | ||
log.warn "synapse: etcd watcher exiting" | ||
|
||
@should_exit = true | ||
@etcd = nil | ||
|
||
log.info "synapse: etcd watcher cleaned up successfully" | ||
end | ||
|
||
def ping? | ||
@etcd.leader | ||
end | ||
|
||
private | ||
def validate_discovery_opts | ||
raise ArgumentError, "invalid discovery method #{@discovery['method']}" \ | ||
unless @discovery['method'] == 'etcd' | ||
raise ArgumentError, "missing or invalid etcd hosts for service #{@name}" \ | ||
unless @discovery['hosts'] | ||
raise ArgumentError, "invalid etcd path for service #{@name}" \ | ||
unless @discovery['path'] | ||
end | ||
|
||
# helper method that ensures that the discovery path exists | ||
def create(path) | ||
log.debug "synapse: creating etcd path: #{path}" | ||
@etcd.create(path, dir: true) | ||
end | ||
|
||
def each_node(node) | ||
begin | ||
host, port, name = deserialize_service_instance(node.value) | ||
rescue StandardError => e | ||
log.error "synapse: invalid data in etcd node #{node.inspect} at #{@discovery['path']}: #{e} DATA #{node.value}" | ||
nil | ||
else | ||
server_port = @server_port_override ? @server_port_override : port | ||
|
||
# find the numberic id in the node name; used for leader elections if enabled | ||
numeric_id = node.key.split('/').last | ||
numeric_id = NUMBERS_RE =~ numeric_id ? numeric_id.to_i : nil | ||
|
||
log.warn "synapse: discovered backend #{name} at #{host}:#{server_port} for service #{@name}" | ||
{ 'name' => name, 'host' => host, 'port' => server_port, 'id' => numeric_id} | ||
end | ||
end | ||
|
||
def each_dir(d) | ||
new_backends = [] | ||
d.children.each do |node| | ||
if node.directory? | ||
new_backends << each_dir(@etcd.get(node.key)) | ||
else | ||
backend = each_node(node) | ||
if backend | ||
new_backends << backend | ||
end | ||
end | ||
end | ||
new_backends.flatten | ||
end | ||
|
||
# find the current backends at the discovery path; sets @backends | ||
def discover | ||
log.info "synapse: discovering backends for service #{@name}" | ||
|
||
d = nil | ||
begin | ||
d = @etcd.get(@discovery['path']) | ||
rescue Etcd::KeyNotFound | ||
create(@discovery['path']) | ||
d = @etcd.get(@discovery['path']) | ||
end | ||
|
||
new_backends = [] | ||
if d.directory? | ||
new_backends = each_dir(d) | ||
else | ||
log.warn "synapse: path #{@discovery['path']} is not a directory" | ||
end | ||
|
||
if new_backends.empty? | ||
if @default_servers.empty? | ||
log.warn "synapse: no backends and no default servers for service #{@name}; using previous backends: #{@backends.inspect}" | ||
false | ||
else | ||
log.warn "synapse: no backends for service #{@name}; using default servers: #{@default_servers.inspect}" | ||
@backends = @default_servers | ||
true | ||
end | ||
else | ||
if @backends != new_backends | ||
log.info "synapse: discovered #{new_backends.length} backends (including new) for service #{@name}" | ||
@backends = new_backends | ||
true | ||
else | ||
log.info "synapse: discovered #{new_backends.length} backends for service #{@name}" | ||
false | ||
end | ||
end | ||
end | ||
|
||
def watch | ||
while !@should_exit | ||
begin | ||
@etcd.watch(@discovery['path'], :timeout => 60, :recursive => true) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Alright so while testing I came across this gem. If my understanding is correct etcd makes us not only do our own heartbeats from nerve (hella writes), but we also end up basically constantly polling in this function because there is no capability to filter watch events? Basically without etcd-io/etcd#633 or etcd-io/etcd#174 closed SmartStack scalability on etcd will be limited to ... not very much. If that understanding is correct can we implement get children watches internally, where we do a lightweight "see if the list of children changed" operation and if that is different we actually go through loading all the data (aka run discover). This way we're at least not constantly pulling all of the etcd state? |
||
rescue Timeout::Error | ||
else | ||
if discover | ||
@synapse.reconfigure! | ||
end | ||
end | ||
end | ||
end | ||
|
||
# decode the data at an etcd endpoint | ||
def deserialize_service_instance(data) | ||
log.debug "synapse: deserializing process data" | ||
decoded = JSON.parse(data) | ||
|
||
host = decoded['host'] || (raise ValueError, 'instance json data does not have host key') | ||
port = decoded['port'] || (raise ValueError, 'instance json data does not have port key') | ||
name = decoded['name'] || nil | ||
|
||
return host, port, name | ||
end | ||
end | ||
end | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So while testing this I came across quite a pickle, which is that the etcd ruby gem < 0.3.0 cannot handle etcd 2.0+ (it errors all over the place regarding 404 errors)
ranjib/etcd-ruby#51 has been merged and my local testing indicates that version 0.3.0 seems to fix things.
@bobtfish do you think we should depend on an old etcd or the new one? I sorta prefer the new one but I'm not sure what the community progress on moving to etcd 2.0 is like?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that we recommend installing via gem, can we just do ~> 0.2 ?