Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add watcher for YARN application that was deploy with Apache Slider. #217

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,38 @@ It takes the following options:
* `check_interval`: How often to request the list of tasks from Marathon (default: 10 seconds)
* `port_index`: Index of the backend port in the task's "ports" array. (default: 0)

##### YARN Service Registry With Apache Slider #####

This watcher polls the [YARN API](https://hadoop.apache.org/docs/r2.7.2/hadoop-yarn/hadoop-yarn-site/ResourceManagerRest.html#Cluster_Applications_API) and retrieves a list of yarn [tracking urls](lib/synapse/service_watcher/README.md)
for a given application by tag, only in RUNNING state.
Then it use [slier api](http://slider.incubator.apache.org/docs/slider_specs/specifying_exports.html#exporting-formatted-data-at-component-instance-level) for getting published config per container.
Slider exports must be configured per component and match pattern `host:port`.
Example of slider exports response
```json
{
"description":"ComponentInstanceData",
"updated":0,
"entries":{
"container_e02_1480417404363_0011_02_000002.server_port":"dn0.dev:50009",
"container_e02_1480417404363_0011_02_000003.server_port":"dn0.dev:50006"
},
"empty":false
}
```
where `server_port` is `parameter_sufix`
YARN container_id will be used as instance identifier.

Tested with YARN 2.7.2 and slider 0.91

It takes the following options:

* `yarn_api_url`: Address of the YARN Resource Manager API (e.g. `http://dn0.dev:8088`)
* `application_name`: [Name of the application](https://slider.incubator.apache.org/docs/slider_specs/application_definition.html) in slider, must present in yarn application description as tag. like`name: {application_name}`
* `yarn_apps_path`: optional YARN Resource Manager context path to apps
* `slider_componentinstance_path`: optional slider context path that will be concat with yarn application master tracking url
* `check_interval`: optional How often to request the list of tasks from Marathon (default: 10 seconds)
* `parameter_sufix`: optional String that will be looking in json published by slider. (default: 0)

#### Listing Default Servers ####

You may list a number of default servers providing a service.
Expand Down
58 changes: 58 additions & 0 deletions config/yarn_slider.example.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
{
"services": {
"kafka_feeder": {
"default_servers": [],
"discovery": {
"method": "yarn_slider",
"yarn_api_url": "http://dn0.dev:8088/",
"application_name": "fee"
},
"haproxy": {
"port": 3213,
"server_options": "check inter 2s rise 3 fall 2",
"listen": [
"mode http",
"option httpchk /health",
"http-check expect string OK"
]
}

}
},
"haproxy": {
"reload_command": "sudo service haproxy reload",
"config_file_path": "/etc/haproxy/haproxy.cfg",
"socket_file_path": "/var/haproxy/stats.sock",
"do_writes": false,
"do_reloads": false,
"do_socket": false,
"global": [
"daemon",
"user haproxy",
"group haproxy",
"maxconn 4096",
"log 127.0.0.1 local0",
"log 127.0.0.1 local1 notice",
"stats socket /var/haproxy/stats.sock mode 666 level admin"
],
"defaults": [
"log global",
"option dontlognull",
"maxconn 2000",
"retries 3",
"timeout connect 5s",
"timeout client 1m",
"timeout server 1m",
"option redispatch",
"balance roundrobin"
],
"extra_sections": {
"listen stats :3212": [
"mode http",
"stats enable",
"stats uri /",
"stats refresh 5s"
]
}
}
}
127 changes: 127 additions & 0 deletions lib/synapse/service_watcher/yarn_slider.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
require 'synapse/service_watcher/base'
require 'json'
require 'net/http'
require 'resolv'

class Synapse::ServiceWatcher
class YarnSliderWatcher < BaseWatcher
def start
@check_interval = @discovery['check_interval'] || 10.0
@connection = nil
@watcher = Thread.new { sleep splay; watch }
end

def stop
@connection.finish
rescue
# pass
end

private

def validate_discovery_opts
required_opts = %w[yarn_api_url application_name]

required_opts.each do |opt|
if @discovery.fetch(opt, '').empty?
raise ArgumentError,
"a value for services.#{@name}.discovery.#{opt} must be specified"
end
end
end

def attempt_connection(url)
uri = URI(url)
log.debug "synapse: try connect to #{uri}"
begin
connection = Net::HTTP.new(uri.host, uri.port)
connection.open_timeout = 5
connection.start
return connection
rescue => ex
log.error "synapse: could not connect to YARN at #{url}: #{ex}"
raise ex
end
end

def try_find_yarn_app_master_traking_url(name)
begin
yarn_rm_connection = attempt_connection(@discovery['yarn_api_url'])
yarn_apps_path = @discovery.fetch('yarn_apps_path', '/ws/v1/cluster/apps?limit=2&state=RUNNING&applicationTypes=org-apache-slider&applicationTags=name:%20')
yarn_path_resolved = yarn_apps_path + name
log.debug "synapse resolved yarn path #{yarn_path_resolved}"
req = Net::HTTP::Get.new(yarn_path_resolved)
req['Content-Type'] = 'application/json'
req['Accept'] = 'application/json'
response = yarn_rm_connection.request(req)
log.debug "synapse yarn apps response\n#{response.body}"
apps = JSON.parse(response.body).fetch('apps', [])
if apps.nil?
raise 'No yarn application with name ' + name
end
if apps['app'].size > 1
raise 'More then 1 yarn application with name ' + name
end
return apps['app'].at(0)['trackingUrl']
rescue => ex
log.warn "synapse: error while watcher try find yarn application: #{ex.inspect}"
log.warn ex.backtrace.join("\n")
raise ex
end
end

def watch
until @should_exit
retry_count = 0
start = Time.now

begin
if @connection.nil?
app_am_url = try_find_yarn_app_master_traking_url(@discovery['application_name'])
log.debug "synapse: try connect to app traking url #{app_am_url}"
@slider_component_instance_url = URI(app_am_url + @discovery.fetch('slider_componentinstance_path', '/ws/v1/slider/publisher/slider/componentinstancedata'))
@connection = attempt_connection(@slider_component_instance_url)
end

req = Net::HTTP::Get.new(@slider_component_instance_url.request_uri)
req['Content-Type'] = 'application/json'
req['Accept'] = 'application/json'
response = @connection.request(req)

lookup_sufix = @discovery.fetch('parameter_sufix', '.server_port')
entries = JSON.parse(response.body).fetch('entries', [])
backends = entries.keep_if{ |entry| entry.include? lookup_sufix }.map do |key, value|
{ 'name' => key[/(.*)#{lookup_sufix}/,1],
'host' => value[/(.*):.*/,1],
'port' => value[/.*:(.*)/,1],
}
end.sort_by { |entry| entry['name'] }

set_backends(backends)
rescue EOFError
# If the persistent HTTP connection is severed, we can automatically
# retry
log.info "synapse: yarn_slider HTTP API at {@slider_component_instance_url} disappeared, reconnecting..."
retry if (retry_count += 1) == 1
rescue => e
log.warn "synapse: error in watcher thread: #{e.inspect}"
log.warn e.backtrace.join("\n")
@connection = nil
ensure
elapsed_time = Time.now - start
sleep (@check_interval - elapsed_time) if elapsed_time < @check_interval
end

@should_exit = true if only_run_once? # for testability
end
end

def splay
Random.rand(@check_interval)
end

def only_run_once?
false
end
end
end
Loading