diff --git a/README.md b/README.md index 5a017b9e..1b80035c 100644 --- a/README.md +++ b/README.md @@ -238,6 +238,38 @@ It takes the following options: * `check_interval`: How often to request the list of tasks from Marathon (default: 10 seconds) * `port_index`: Index of the backend port in the task's "ports" array. (default: 0) +##### YARN Service Registry With Apache Slider ##### + +This watcher polls the [YARN API](https://hadoop.apache.org/docs/r2.7.2/hadoop-yarn/hadoop-yarn-site/ResourceManagerRest.html#Cluster_Applications_API) and retrieves a list of yarn [tracking urls](lib/synapse/service_watcher/README.md) +for a given application by tag, only in RUNNING state. +Then it use [slier api](http://slider.incubator.apache.org/docs/slider_specs/specifying_exports.html#exporting-formatted-data-at-component-instance-level) for getting published config per container. +Slider exports must be configured per component and match pattern `host:port`. +Example of slider exports response +```json +{ +"description":"ComponentInstanceData", +"updated":0, +"entries":{ + "container_e02_1480417404363_0011_02_000002.server_port":"dn0.dev:50009", + "container_e02_1480417404363_0011_02_000003.server_port":"dn0.dev:50006" +}, +"empty":false +} +``` +where `server_port` is `parameter_sufix` +YARN container_id will be used as instance identifier. + +Tested with YARN 2.7.2 and slider 0.91 + +It takes the following options: + +* `yarn_api_url`: Address of the YARN Resource Manager API (e.g. `http://dn0.dev:8088`) +* `application_name`: [Name of the application](https://slider.incubator.apache.org/docs/slider_specs/application_definition.html) in slider, must present in yarn application description as tag. like`name: {application_name}` +* `yarn_apps_path`: optional YARN Resource Manager context path to apps +* `slider_componentinstance_path`: optional slider context path that will be concat with yarn application master tracking url +* `check_interval`: optional How often to request the list of tasks from Marathon (default: 10 seconds) +* `parameter_sufix`: optional String that will be looking in json published by slider. (default: 0) + #### Listing Default Servers #### You may list a number of default servers providing a service. diff --git a/config/yarn_slider.example.conf b/config/yarn_slider.example.conf new file mode 100644 index 00000000..89d3df67 --- /dev/null +++ b/config/yarn_slider.example.conf @@ -0,0 +1,58 @@ +{ + "services": { + "kafka_feeder": { + "default_servers": [], + "discovery": { + "method": "yarn_slider", + "yarn_api_url": "http://dn0.dev:8088/", + "application_name": "fee" + }, + "haproxy": { + "port": 3213, + "server_options": "check inter 2s rise 3 fall 2", + "listen": [ + "mode http", + "option httpchk /health", + "http-check expect string OK" + ] + } + + } + }, + "haproxy": { + "reload_command": "sudo service haproxy reload", + "config_file_path": "/etc/haproxy/haproxy.cfg", + "socket_file_path": "/var/haproxy/stats.sock", + "do_writes": false, + "do_reloads": false, + "do_socket": false, + "global": [ + "daemon", + "user haproxy", + "group haproxy", + "maxconn 4096", + "log 127.0.0.1 local0", + "log 127.0.0.1 local1 notice", + "stats socket /var/haproxy/stats.sock mode 666 level admin" + ], + "defaults": [ + "log global", + "option dontlognull", + "maxconn 2000", + "retries 3", + "timeout connect 5s", + "timeout client 1m", + "timeout server 1m", + "option redispatch", + "balance roundrobin" + ], + "extra_sections": { + "listen stats :3212": [ + "mode http", + "stats enable", + "stats uri /", + "stats refresh 5s" + ] + } + } +} diff --git a/lib/synapse/service_watcher/yarn_slider.rb b/lib/synapse/service_watcher/yarn_slider.rb new file mode 100644 index 00000000..d39f22f2 --- /dev/null +++ b/lib/synapse/service_watcher/yarn_slider.rb @@ -0,0 +1,127 @@ +require 'synapse/service_watcher/base' +require 'json' +require 'net/http' +require 'resolv' + +class Synapse::ServiceWatcher + class YarnSliderWatcher < BaseWatcher + def start + @check_interval = @discovery['check_interval'] || 10.0 + @connection = nil + @watcher = Thread.new { sleep splay; watch } + end + + def stop + @connection.finish + rescue + # pass + end + + private + + def validate_discovery_opts + required_opts = %w[yarn_api_url application_name] + + required_opts.each do |opt| + if @discovery.fetch(opt, '').empty? + raise ArgumentError, + "a value for services.#{@name}.discovery.#{opt} must be specified" + end + end + end + + def attempt_connection(url) + uri = URI(url) + log.debug "synapse: try connect to #{uri}" + begin + connection = Net::HTTP.new(uri.host, uri.port) + connection.open_timeout = 5 + connection.start + return connection + rescue => ex + log.error "synapse: could not connect to YARN at #{url}: #{ex}" + raise ex + end + end + + def try_find_yarn_app_master_traking_url(name) + begin + yarn_rm_connection = attempt_connection(@discovery['yarn_api_url']) + yarn_apps_path = @discovery.fetch('yarn_apps_path', '/ws/v1/cluster/apps?limit=2&state=RUNNING&applicationTypes=org-apache-slider&applicationTags=name:%20') + yarn_path_resolved = yarn_apps_path + name + log.debug "synapse resolved yarn path #{yarn_path_resolved}" + req = Net::HTTP::Get.new(yarn_path_resolved) + req['Content-Type'] = 'application/json' + req['Accept'] = 'application/json' + response = yarn_rm_connection.request(req) + log.debug "synapse yarn apps response\n#{response.body}" + apps = JSON.parse(response.body).fetch('apps', []) + if apps.nil? + raise 'No yarn application with name ' + name + end + if apps['app'].size > 1 + raise 'More then 1 yarn application with name ' + name + end + return apps['app'].at(0)['trackingUrl'] + rescue => ex + log.warn "synapse: error while watcher try find yarn application: #{ex.inspect}" + log.warn ex.backtrace.join("\n") + raise ex + end + end + + def watch + until @should_exit + retry_count = 0 + start = Time.now + + begin + if @connection.nil? + app_am_url = try_find_yarn_app_master_traking_url(@discovery['application_name']) + log.debug "synapse: try connect to app traking url #{app_am_url}" + @slider_component_instance_url = URI(app_am_url + @discovery.fetch('slider_componentinstance_path', '/ws/v1/slider/publisher/slider/componentinstancedata')) + @connection = attempt_connection(@slider_component_instance_url) + end + + req = Net::HTTP::Get.new(@slider_component_instance_url.request_uri) + req['Content-Type'] = 'application/json' + req['Accept'] = 'application/json' + response = @connection.request(req) + + lookup_sufix = @discovery.fetch('parameter_sufix', '.server_port') + entries = JSON.parse(response.body).fetch('entries', []) + backends = entries.keep_if{ |entry| entry.include? lookup_sufix }.map do |key, value| + { 'name' => key[/(.*)#{lookup_sufix}/,1], + 'host' => value[/(.*):.*/,1], + 'port' => value[/.*:(.*)/,1], + } + end.sort_by { |entry| entry['name'] } + + set_backends(backends) + rescue EOFError + # If the persistent HTTP connection is severed, we can automatically + # retry + log.info "synapse: yarn_slider HTTP API at {@slider_component_instance_url} disappeared, reconnecting..." + retry if (retry_count += 1) == 1 + rescue => e + log.warn "synapse: error in watcher thread: #{e.inspect}" + log.warn e.backtrace.join("\n") + @connection = nil + ensure + elapsed_time = Time.now - start + sleep (@check_interval - elapsed_time) if elapsed_time < @check_interval + end + + @should_exit = true if only_run_once? # for testability + end + end + + def splay + Random.rand(@check_interval) + end + + def only_run_once? + false + end + end +end \ No newline at end of file diff --git a/spec/lib/synapse/service_watcher_yarn_slider_spec.rb b/spec/lib/synapse/service_watcher_yarn_slider_spec.rb new file mode 100644 index 00000000..2ca5fd77 --- /dev/null +++ b/spec/lib/synapse/service_watcher_yarn_slider_spec.rb @@ -0,0 +1,226 @@ +require 'spec_helper' +require 'synapse/service_watcher/yarn_slider' + +describe Synapse::ServiceWatcher::YarnSliderWatcher do + let(:mocksynapse) { double() } + let(:yarn_host) { '127.0.0.1' } + let(:yarn_port) { '8088' } + let(:app_name) { 'feeder' } + let(:check_interval) { 11 } + let(:yarn_request_uri) { "#{yarn_host}:#{yarn_port}/ws/v1/cluster/apps?limit=2&state=RUNNING&applicationTypes=org-apache-slider&applicationTags=name:%20feeder" } + let(:slider_request_uri) { "#{yarn_host}:#{yarn_port}/proxy/application_1480417404363_0011/ws/v1/slider/publisher/slider/componentinstancedata" } + let(:config) do + { + 'name' => 'foo', + 'discovery' => { + 'method' => 'yarn', + 'yarn_api_url' => "http://#{yarn_host}:#{yarn_port}", + 'application_name' => app_name, + 'check_interval' => check_interval, + }, + 'haproxy' => {}, + } + end + let(:slider_response) { {'description' => 'ComponentInstanceData', 'updated' => 0, 'entries' => nil, 'empty' => false} } + let(:yarn_response) { { 'apps' => nil } } + + subject { described_class.new(config, mocksynapse) } + + before do + allow(subject.log).to receive(:warn) + allow(subject.log).to receive(:info) + allow(subject.log).to receive(:debug) + + allow(Thread).to receive(:new).and_yield + allow(subject).to receive(:sleep) + allow(subject).to receive(:only_run_once?).and_return(true) + allow(subject).to receive(:splay).and_return(0) + + stub_request(:get, yarn_request_uri). + with(:headers => { + 'Accept'=>'application/json', + 'Content-Type'=>'application/json', + 'User-Agent'=>'Ruby' + }).to_return(:body => JSON.generate(yarn_response)) + + stub_request(:get, slider_request_uri). + with(:headers => { + 'Accept'=>'application/json', + 'Content-Type'=>'application/json', + 'User-Agent'=>'Ruby' + }).to_return(:body => JSON.generate(slider_response)) + end + + context 'with a valid argument hash' do + it 'instantiates' do + expect(subject).to be_a(Synapse::ServiceWatcher::YarnSliderWatcher) + end + end + + describe '#watch' do + context 'when synapse cannot connect to yarn' do + before do + allow(Net::HTTP).to receive(:new). + with(yarn_host, yarn_port.to_i). + and_raise(Errno::ECONNREFUSED) + end + + it 'does not crash' do + expect { subject.start }.not_to raise_error + end + end + + it 'requests the proper API endpoint one time' do + subject.start + expect(a_request(:get, yarn_request_uri)).to have_been_made.times(1) + end + + context 'when yarn return empty apps' do + it 'does not crash' do + expect { subject.start }.not_to raise_error + end + it 'requests the proper API endpoint one time' do + subject.start + expect(a_request(:get, yarn_request_uri)).to have_been_made.times(1) + end + end + + context 'when slider return empty result' do + let(:yarn_response) do + {"apps"=> + {"app" => + [ + { "name" => "kafka_feeder", + "trackingUrl" => "http://#{yarn_host}:#{yarn_port}/proxy/application_1480417404363_0011", + "applicationTags" => "name: feeder,description: bit kafka feeder,version: 0.0.1" + } + ] + } + } + end + it 'does not crash' do + expect { subject.start }.not_to raise_error + end + it 'requests the proper API endpoint one time' do + subject.start + expect(a_request(:get, yarn_request_uri)).to have_been_made.times(1) + expect(a_request(:get, slider_request_uri)).to have_been_made.times(1) + end + end + + context 'when the API path (yarn_api_path) is customized' do + let(:config) do + super().tap do |c| + c['discovery']['yarn_apps_path'] = '/v3/tasks/' + end + end + + let(:yarn_request_uri) { "#{yarn_host}:#{yarn_port}/v3/tasks/#{app_name}" } + + it 'calls the customized path' do + subject.start + expect(a_request(:get, yarn_request_uri)).to have_been_made.times(1) + end + end + + context 'with entries returned from slider' do + let(:yarn_response) do + {"apps"=> + {"app" => + [ + { "name" => "kafka_feeder", + "trackingUrl" => "http://#{yarn_host}:#{yarn_port}/proxy/application_1480417404363_0011", + "applicationTags" => "name: feeder,description: bit kafka feeder,version: 0.0.1" + } + ] + } + } + end + let(:slider_response) do + { + "description" => "ComponentInstanceData", + "updated" => 0, + "entries" => { + "container_e02_1480417404363_0011_02_000002.server_port" => "dn0.dev:50009", + "container_e02_1480417404363_0011_02_000003.server_port" => "dn0.dev:50006" + }, + "empty" => false + } + end + let(:expected_backend_hash1) do + { + 'name' => 'container_e02_1480417404363_0011_02_000002', 'host' => 'dn0.dev', 'port' => "50009" + } + end + let(:expected_backend_hash2) do + { + 'name' => 'container_e02_1480417404363_0011_02_000003', 'host' => 'dn0.dev', 'port' => "50006" + } + end + + it 'adds the task as a backend' do + expect(subject).to receive(:set_backends).with([expected_backend_hash1,expected_backend_hash2]) + subject.start + expect(a_request(:get, yarn_request_uri)).to have_been_made.times(1) + expect(a_request(:get, slider_request_uri)).to have_been_made.times(1) + end + + context 'with a entries with out right sufix' do + let(:yarn_response) do + {"apps"=> + {"app" => + [ + { "name" => "kafka_feeder", + "trackingUrl" => "http://#{yarn_host}:#{yarn_port}/proxy/application_1480417404363_0011", + "applicationTags" => "name: feeder,description: bit kafka feeder,version: 0.0.1" + } + ] + } + } + end + let(:slider_response) do + { + "description" => "ComponentInstanceData", + "updated" => 0, + "entries" => { + "container_e02_1480417404363_0011_02_000002.host_port" => "dn0.dev:50009", + "container_e02_1480417404363_0011_02_000003.host_port" => "dn0.dev:50006" + }, + "empty" => false + } + end + it 'filters tasks that have no startedAt value' do + expect(subject).to receive(:set_backends).with([]) + subject.start + end + end + + context 'when yarn returns invalid response' do + let(:yarn_response) { [] } + it 'does not blow up' do + expect { subject.start }.to_not raise_error + end + end + + context 'when the job takes a long time for some reason' do + let(:job_duration) { 10 } # seconds + + before do + actual_time = Time.now + time_offset = -1 * job_duration + allow(Time).to receive(:now) do + # on first run, return the right time + # subsequently, add in our job_duration offset + actual_time + (time_offset += job_duration) + end + allow(subject).to receive(:set_backends) + end + + it 'only sleeps for the difference' do + expect(subject).to receive(:sleep).with(check_interval - job_duration) + subject.start + end + end + end + end +end \ No newline at end of file