diff --git a/.gitignore b/.gitignore index 842eb0fc..e1fe6b67 100644 --- a/.gitignore +++ b/.gitignore @@ -19,5 +19,6 @@ tmp .vagrant .*sw? vendor/ +/.idea synapse.jar diff --git a/config/synapse_zookeeper_recursive.yaml b/config/synapse_zookeeper_recursive.yaml new file mode 100644 index 00000000..ca081ce9 --- /dev/null +++ b/config/synapse_zookeeper_recursive.yaml @@ -0,0 +1,35 @@ +haproxy: + reload_command: "haproxy -p /tmp/haproxy.pid -f /etc/haproxy/haproxy.cfg -sf `cat /tmp/haproxy.pid`" + config_file_path: "/etc/haproxy/haproxy.cfg" + do_writes: true + do_reloads: true + global: + - "daemon" + - "user haproxy" + - "group haproxy" + - "maxconn 4096" + - "log 127.0.0.1 local2 notice" + - "stats socket /var/run/haproxy.pid" + defaults: + - "log global" + - "mode http" + - "balance roundrobin" + - "timeout connect 5000ms" + - "timeout client 50000ms" + - "timeout server 50000ms" + shared_frontend: + - "bind 127.0.0.1:80" +services: + zookeeper_recursive: + discovery: + method: "zookeeper_recursive" + path: "/path/to/synapse" + hosts: + - "localhost:2181" + empty_backend_pool: "true" + haproxy: + server_options: "check inter 2s rise 3 fall 2" + shared_frontend: + - "acl is#[service] path_beg #[servicePath]" + - "use_backend #[service] if is#[service]" + diff --git a/lib/synapse.rb b/lib/synapse.rb index 0b18fd00..24a80dc6 100644 --- a/lib/synapse.rb +++ b/lib/synapse.rb @@ -71,6 +71,21 @@ def reconfigure! @config_updated = true end + def append_service_watcher(service_name, service_config) + watcher = ServiceWatcher.create(service_name, service_config, self) + @service_watchers << watcher + watcher.start + end + + def remove_watcher_by_name(service_name) + @service_watchers.each do |watcher| + if watcher.name == service_name + watcher.stop + @service_watchers.delete(watcher) + end + end + end + private def create_service_watchers(services={}) service_watchers =[] diff --git a/lib/synapse/service_watcher.rb b/lib/synapse/service_watcher.rb index ee05e6c2..4b23eb1c 100644 --- a/lib/synapse/service_watcher.rb +++ b/lib/synapse/service_watcher.rb @@ -4,6 +4,7 @@ require "synapse/service_watcher/dns" require "synapse/service_watcher/docker" require "synapse/service_watcher/zookeeper_dns" +require "synapse/service_watcher/zookeeper_recursive" module Synapse class ServiceWatcher @@ -15,6 +16,7 @@ class ServiceWatcher 'dns' => DnsWatcher, 'docker' => DockerWatcher, 'zookeeper_dns' => ZookeeperDnsWatcher, + 'zookeeper_recursive' => ZookeeperRecursiveWatcher } # the method which actually dispatches watcher creation requests diff --git a/lib/synapse/service_watcher/zookeeper.rb b/lib/synapse/service_watcher/zookeeper.rb index 4a88e077..0803e358 100644 --- a/lib/synapse/service_watcher/zookeeper.rb +++ b/lib/synapse/service_watcher/zookeeper.rb @@ -65,13 +65,19 @@ def discover numeric_id = NUMBERS_RE =~ numeric_id ? numeric_id.to_i : nil log.debug "synapse: discovered backend #{name} at #{host}:#{server_port} for service #{@name}" - new_backends << { 'name' => name, 'host' => host, 'port' => server_port, 'id' => numeric_id} - end + new_backends << {'name' => name, 'host' => host, 'port' => server_port, 'id' => numeric_id} + end unless node[1].ephemeralOwner == 0 end if new_backends.empty? if @default_servers.empty? - log.warn "synapse: no backends and no default servers for service #{@name}; using previous backends: #{@backends.inspect}" + log.info @discovery['empty_backend_pool'] + if @discovery['empty_backend_pool'].nil? or @discovery['empty_backend_pool'] == "false" + log.warn "synapse: no backends and no default servers for service #{@name}; using previous backends: #{@backends.inspect}" + else + log.warn "synapse: no backends and no default servers for service #{@name}; purging backends" + @backends=[] + end else log.warn "synapse: no backends for service #{@name}; using default servers: #{@default_servers.inspect}" @backends = @default_servers diff --git a/lib/synapse/service_watcher/zookeeper_recursive.rb b/lib/synapse/service_watcher/zookeeper_recursive.rb new file mode 100644 index 00000000..c5cfd43d --- /dev/null +++ b/lib/synapse/service_watcher/zookeeper_recursive.rb @@ -0,0 +1,138 @@ +require "synapse/service_watcher/base" +require "zk" +require "thread" + +module Synapse + class ZookeeperRecursiveWatcher < BaseWatcher + + # Overriden methods start, stop, validate_discovery_opts, ping + def start + boot unless @already_started + end + + def boot + @already_started = true + log.info "#{@name}: Starting @ hosts: #{@discovery["hosts"]}, path: #{@discovery["path"]}, id #{self.object_id}" + setup_zk_connection + setup_haproxy_configuration + start_watching_services + end + + def stop + log.info "#{@name}: Stopping using default stop handler" + @subwatcher.each { |watcher| cleanup_service_watcher(watcher) } + @should_exit = true + end + + def validate_discovery_opts + raise ArgumentError, "invalid discovery method #{@discovery["method"]}" \ + unless @discovery["method"] == "zookeeper_recursive" + raise ArgumentError, "missing or invalid zookeeper host for service #{@name}" \ + unless @discovery["hosts"] + raise ArgumentError, "invalid zookeeper path for service #{@name}" \ + unless @discovery["path"] + end + + def ping? + @zk && @zk.connected? + end + + # Methods for Initializing + def setup_zk_connection + @zk_hosts = @discovery["hosts"].shuffle.join(",") + @zk = ZK.new(@zk_hosts) + end + + def setup_haproxy_configuration + @haproxy_template = @haproxy.dup + #Purge the own haproxy-conf to a minimum, in order to be no haproxy-instance + @haproxy = {"listen" => @haproxy["listen"]} + end + + def start_watching_services + @subwatcher = [] + create_if_not_exists(@discovery["path"]) + watch_services(@discovery["path"]) + end + + def create_if_not_exists(path) + log.debug "#{@name}: Creating ZK path: #{path}" + current = "" + path.split("/").drop(1).each { |node| + current += "/#{node}" + @zk.create(current) unless @zk.exists?(current) + } + end + + # Methods for running + def watch_services(path) + log.info("Watching path #{path}") + # Register each time a event is fired, since we"re getting only one event per register + @zk.register(path, [:deleted, :child]) do |event| + if event.node_deleted? + cleanup_service_watcher(path) + else + watch_services(path) + end + end + + children = @zk.children(path, :watch => true).map { |child| "#{path}/#{child}" } + + persistent_children = children.select { |child| @zk.get("#{child}")[1].ephemeralOwner == 0 } + persistent_children.each { |child| watch_services(child) } + + + unless (path == @discovery["path"]) + if (!@subwatcher.include?(path) && persistent_children.empty?) then + create_service_watcher(path) + end + if (@subwatcher.include?(path) && !persistent_children.empty?) then + cleanup_service_watcher(path) + end + end + end + + def create_service_watcher(service_path) + service_name = service_path.gsub(/[\/\.]/, "_") + service_config = { + "discovery" => { + "method" => "zookeeper", + "path" => "#{service_path}", + "hosts" => @discovery["hosts"], + "empty_backend_pool" => @discovery["empty_backend_pool"] + }, + "haproxy" => build_haproxy_section(service_name, service_path, @haproxy_template) + } + log.info "#{@name}: Creating new Service-Watcher for #{service_name}@ hosts: #{@zk_hosts}" + log.debug service_config + @subwatcher << service_path + @synapse.append_service_watcher(service_name, service_config) + end + + def build_haproxy_section(service_name, service_path, template) + new_haproxy = {} + template.each { |key, section| new_haproxy[key] = parse_section(section, service_name, service_path) } + return new_haproxy + end + + def parse_section(section, service_name, service_path) + service_url = service_path.sub(@discovery["path"], "") + service_url = "/" if service_url.empty? + if section.is_a?(String) + new_section = section.gsub(/#\[servicePath\]/, "#{service_url}").gsub(/#\[service\]/, "#{service_name}") + else + unless section.nil? || section == 0 + new_section = section.map { |subsection| parse_section(subsection, service_name, service_path) } + end + end + new_section + end + + def cleanup_service_watcher(service_path) + service_name = service_path.gsub(/\//, "_") + log.info("#{@name}: Removing Watcher: #{service_name}") + @synapse.remove_watcher_by_name(service_name) + @subwatcher.delete(service_path) + end + end +end diff --git a/spec/lib/synapse/service_watcher_zookeeper_recursive_spec.rb b/spec/lib/synapse/service_watcher_zookeeper_recursive_spec.rb new file mode 100644 index 00000000..7863f315 --- /dev/null +++ b/spec/lib/synapse/service_watcher_zookeeper_recursive_spec.rb @@ -0,0 +1,193 @@ +require "spec_helper" + +class Synapse::ZookeeperRecursiveWatcher + attr_reader :should_exit, :default_servers + + def get_zk + @zk + end + + def get_synapse + @synapse + end + + def set_subwatcher(subwatcher) + @subwatcher = subwatcher + end +end + +describe Synapse::ZookeeperRecursiveWatcher do + let(:mocksynapse) { double } + subject { Synapse::ZookeeperRecursiveWatcher.new(args, mocksynapse) } + let(:testargs) { + {"name" => "foo", + "discovery" => { + "method" => "zookeeper_recursive", + "hosts" => ["localhost:2181"], + "path" => "/foo/synapse" + }, + "haproxy" => { + "option_with_param" => "has #[service] param" + } + } + } + + context "can construct normally" do + let(:args) { testargs } + it("can at least construct") { expect { subject }.not_to raise_error } + end + + def remove_discovery_arg(name) + args = testargs.clone + discovery = testargs["discovery"].clone + discovery.delete name + args["discovery"] = discovery + args + end + + context "without path argument" do + let(:args) { remove_discovery_arg "path" } + it("gots bang") { expect { subject }.to raise_error(ArgumentError, "invalid zookeeper path for service #{args["name"]}") } + end + + {"path" => "invalid zookeeper path for service foo", + "hosts" => "missing or invalid zookeeper host for service foo", + "method" => "invalid discovery method "}.each do |to_remove, message| + context "without path argument" do + let(:args) { remove_discovery_arg to_remove } + it("gots bang") { expect { subject }.to raise_error(ArgumentError, message) } + end + end + + context "when watcher gets started" do + let(:args) { testargs } + before(:each) do + ZK = ZKMock + end + it("sets up the zk-client and registers for the path") { + subject.start + expect(subject.get_zk.start_successful).to be true + } + context("when a registered event is fired") do + before(:each) do + ZK = ZKMock + subject.start + end + it("adds a new zookeeper service_watcher on child-events and discovers new services in the new directory") { + expect(subject.get_synapse).to receive(:append_service_watcher). + with("_foo_synapse_service1", + {"discovery" => {"method" => "zookeeper", "path" => "/foo/synapse/service1", "hosts" => ["localhost:2181"], "empty_backend_pool" => nil}, + "haproxy" => {"option_with_param" => "has _foo_synapse_service1 param", "server_options" => "", "server_port_override" => nil, "backend" => [], "frontend" => [], "listen" => []}}) + subject.get_zk.set_children("/foo/synapse", ["service1"]) + subject.get_zk.fire_event("/foo/synapse", false) + + expect(subject.get_synapse).to receive(:append_service_watcher). + with("_foo_synapse_service1_subservice", + {"discovery" => {"method" => "zookeeper", "path" => "/foo/synapse/service1/subservice", "hosts" => ["localhost:2181"], "empty_backend_pool" => nil}, + "haproxy" => {"option_with_param" => "has _foo_synapse_service1_subservice param", "server_options" => "", "server_port_override" => nil, "backend" => [], "frontend" => [], "listen" => []}}) + expect(subject.get_synapse).to receive(:remove_watcher_by_name).with("_foo_synapse_service1") + subject.get_zk.set_children("/foo/synapse/service1", ["subservice"]) + subject.get_zk.fire_event("/foo/synapse/service1", false) + } + it("removes a service_watcher on delete-events") { + expect(subject.get_synapse).to receive(:append_service_watcher) + expect(subject.get_synapse).to receive(:remove_watcher_by_name).with("_foo_synapse_service1") + subject.get_zk.set_children("/foo/synapse", ["service1"]) + subject.get_zk.fire_event("/foo/synapse", false) + subject.get_zk.fire_event("/foo/synapse/service1", true) + } + end + end + + context("when watcher gets stopped") do + let(:args) { testargs } + before(:each) do + subject.set_subwatcher(["service1"]) + end + it("cleans up every subwatcher") { + expect(subject.get_synapse).to receive(:remove_watcher_by_name).with("service1") + subject.stop + } + end + + class ZKMock + ZKMock::ZKStat = Struct.new(:ephemeralOwner, :name) + class ZKMock::ZKEvent + def initialize(is_delete_event) + @is_delete_event = is_delete_event + end + def node_deleted? + return @is_delete_event + end + end + + def initialize(zk_connect_string) + @initialized = true + @root = Tree.new(to_zk_node("root"), []) + @registered_paths = {} + end + + def exists?(path) + @root.find(to_zk_path(path)).nil? + end + + def create(path, *args) + @root.add_path(to_zk_path(path)) + end + + def register(path, opts={}, &block) + blocks = @registered_paths[path] || [] + blocks.push(block) + @registered_paths[path] = blocks + end + + def children(path, opts={}) + node = @root.find(to_zk_path(path)) + if node.nil? then + return Array.new + else + return node.get_children.map { |child| extract_name(child.get_value) } + end + end + + def get(path) + node = @root.find(to_zk_path(path)) + unless node.nil? + node.get_value + else + raise Exception "Node does not exist!" + end + end + + # Helper + def set_children(path, children) + @root.add_path(to_zk_path(path)) + node = @root.find(to_zk_path(path)) + children.each { |child| node.add_child(Tree.new(to_zk_node(child), [])) } + end + + def fire_event(path, is_delete_event) + blocks = @registered_paths.delete(path) + blocks.each { |block| block.call(ZKEvent.new(is_delete_event)) } + end + + def start_successful + return @initialized && @registered_paths.size > 0 + end + + def to_zk_path(path) + path.split("/").drop(1).map { |node| to_zk_node(node) } + end + + def to_zk_node(name) + data = "" + node_stat = ZKStat.new(0, name) + return [data, node_stat] + end + + def extract_name(zk_node) + return zk_node[1]["name"] + end + end + +end diff --git a/spec/lib/synapse_spec.rb b/spec/lib/synapse_spec.rb new file mode 100644 index 00000000..3b7871ea --- /dev/null +++ b/spec/lib/synapse_spec.rb @@ -0,0 +1,69 @@ +require 'spec_helper' + +class Synapse::Synapse + def initialize() end + + def set_service_watchers(watchers) + @service_watchers = watchers + end + + def get_service_watchers + @service_watchers + end + + class ServiceWatcher + def self.create(service_name, service_config, synapse) + return WatcherMock.new(service_name, false) + end + end +end + +class WatcherMock + def initialize(name, started) + @name=name + @started=started + end + def start + @started = true + end + def stop + @started = false + end + def started? + @started + end + def name + @name + end +end + +describe Synapse::Synapse do + subject { Synapse::Synapse.new() } + + context("when a watcher gets appended at runtime") do + before(:each) { + subject.set_service_watchers([]) + } + it("creates the watcher, appends it to the list and starts it") { + service_name = "serviceName" + service_config = { "foo" => "bar" } + subject.append_service_watcher(service_name, service_config) + expect(subject.get_service_watchers.length).to be(1) + expect(subject.get_service_watchers.first.started?).to be true + } + end + + context("when a watcher gets removed at runtime") do + before(:each) { + watcher1 = WatcherMock.new("watcher1", true) + watcher2 = WatcherMock.new("watcher2", true) + subject.set_service_watchers([watcher1, watcher2]) + } + it("stops the watcher and removes it from the list") { + service_name = "watcher1" + subject.remove_watcher_by_name(service_name) + expect(subject.get_service_watchers.length).to be(1) + expect(subject.get_service_watchers.first.name).to eq("watcher2") + } + end +end \ No newline at end of file diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 3d187cad..1d92d656 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -7,6 +7,7 @@ require "#{File.dirname(__FILE__)}/../lib/synapse" require 'pry' require 'support/configuration' +require 'support/tree' RSpec.configure do |config| config.run_all_when_everything_filtered = true diff --git a/spec/support/tree.rb b/spec/support/tree.rb new file mode 100644 index 00000000..27bb68d6 --- /dev/null +++ b/spec/support/tree.rb @@ -0,0 +1,53 @@ +class Tree + include Comparable + def initialize(value, children) + @value = value + @children = children + end + def get_value + @value + end + def <=>(anOther) + @value <=> anOther.get_value + end + def get_children() + @children + end + def getChild(value) + selected = @children.select{ |child| + child.get_value == value + } + return selected[0] unless selected.nil? + end + def add_child(child) + existing = getChild(child.get_value) + if existing.nil? && child.is_a?(Tree) + @children = @children.push(child) + existing = child + end + existing + end + def add_path(path) + if path.is_a?(Array) && !path.empty? + child = add_child(Tree.new(path[0], [])) + child.add_path(path.drop(1)) + end + end + def find(path) + if path.is_a?(Array) + if path.empty? + return self + else + child = getChild(path[0]) + return child.find(path.drop(1)) unless child.nil? + end + end + return nil + end + def to_s + "<#{@value}>: #{@children}" + end + def inspect + to_s + end +end \ No newline at end of file