Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Standalone mode #201

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions bin/synapse_standalone
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/usr/bin/env ruby
require "json"
require "synapse/service_watcher"

service_data = JSON.parse(ARGV[0])
watcher = Synapse::ServiceWatcher.create('', {
"discovery" => service_data,
'haproxy' => {},
}, {})
watcher.start(initial_discover: false)
puts JSON.pretty_generate(watcher.read)
69 changes: 36 additions & 33 deletions lib/synapse/service_watcher/zookeeper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ def initialize(opts={}, synapse)
end
end

def start
def start(initial_discover=true)
@zk_hosts = @discovery['hosts'].sort.join(',')

@watcher = nil
@zk = nil

log.info "synapse: starting ZK watcher #{@name} @ hosts: #{@zk_hosts}, path: #{@discovery['path']}"
zk_connect
zk_connect(initial_discover)
end

def stop
Expand All @@ -50,6 +50,34 @@ def ping?
@zk && @zk.connected?
end

def read
@zk.children(@discovery['path'], :watch => true).collect do |id|
node = @zk.get("#{@discovery['path']}/#{id}")

begin
# TODO: Do less munging, or refactor out this processing
host, port, name, weight, haproxy_server_options, labels = deserialize_service_instance(node.first)
rescue StandardError => e
log.error "synapse: invalid data in ZK node #{id} at #{@discovery['path']}: #{e}"
nil
else
server_port = @haproxy['server_port_override'] ? @haproxy['server_port_override'] : port

# find the numberic id in the node name; used for leader elections if enabled
numeric_id = id.split('_').last
numeric_id = NUMBERS_RE =~ numeric_id ? numeric_id.to_i : nil

log.debug "synapse: discovered backend #{name} at #{host}:#{server_port} for service #{@name}"
{
'name' => name, 'host' => host, 'port' => server_port,
'id' => numeric_id, 'weight' => weight,
'haproxy_server_options' => haproxy_server_options,
'labels' => labels
}
end
end.compact
end

private

def validate_discovery_opts
Expand Down Expand Up @@ -124,33 +152,7 @@ def create(path)
# find the current backends at the discovery path
def discover
log.info "synapse: discovering backends for service #{@name}"

new_backends = []
@zk.children(@discovery['path'], :watch => true).each do |id|
node = @zk.get("#{@discovery['path']}/#{id}")

begin
# TODO: Do less munging, or refactor out this processing
host, port, name, weight, haproxy_server_options, labels = deserialize_service_instance(node.first)
rescue StandardError => e
log.error "synapse: invalid data in ZK node #{id} at #{@discovery['path']}: #{e}"
else
server_port = @haproxy['server_port_override'] ? @haproxy['server_port_override'] : port

# find the numberic id in the node name; used for leader elections if enabled
numeric_id = id.split('_').last
numeric_id = NUMBERS_RE =~ numeric_id ? numeric_id.to_i : nil

log.debug "synapse: discovered backend #{name} at #{host}:#{server_port} for service #{@name}"
new_backends << {
'name' => name, 'host' => host, 'port' => server_port,
'id' => numeric_id, 'weight' => weight,
'haproxy_server_options' => haproxy_server_options,
'labels' => labels
}
end
end

new_backends = read
set_backends(new_backends)
end

Expand Down Expand Up @@ -206,7 +208,7 @@ def zk_cleanup
log.info "synapse: zookeeper watcher cleaned up successfully"
end

def zk_connect
def zk_connect(initial_discover)
log.info "synapse: zookeeper watcher connecting to ZK at #{@zk_hosts}"

# Ensure that all Zookeeper watcher re-use a single zookeeper
Expand Down Expand Up @@ -236,8 +238,10 @@ def zk_connect
# the path must exist, otherwise watch callbacks will not work
create(@discovery['path'])

# call the callback to bootstrap the process
watcher_callback.call
if initial_discover
# call the callback to bootstrap the process
watcher_callback.call
end
end

# decode the data at a zookeeper endpoint
Expand All @@ -256,4 +260,3 @@ def deserialize_service_instance(data)
end
end
end