Skip to content

Commit

Permalink
ability to stop the client with a timeout
Browse files Browse the repository at this point in the history
try to gracefully stop or kill the runner thread
  • Loading branch information
mikz committed Aug 11, 2017
1 parent 7bb0072 commit fcdd949
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 7 deletions.
41 changes: 34 additions & 7 deletions lib/message_bus_client/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,14 @@ def diagnostics; end

def start
return unless @state == INITIALISED || stopped?
@state = STARTED

@connection = Excon.new(server_endpoint, persistent: true)
@runner = Thread.new { runner }

@runner = Thread.new(&method(:runner))
@runner.name = "MessageBusClient (#{@client_id})"
@runner.abort_on_exception = true

@state = STARTED
end

def pause
Expand All @@ -53,12 +56,22 @@ def resume
handle_messages
end

def stop
return unless @state == STARTED || @state == PAUSED
def stop(timeout = nil)
raise ThreadError if Thread.current == @runner

return if should_stop? || stopped?

@state = STOPPING
@connection.reset
@runner.join

wakeup # break out of light sleep when polling

unless @runner.join(timeout)
@runner.kill
@runner.join # just killing the thread is not enough to finish the work
end

@runner.stop?
end

def stopped?
Expand All @@ -67,9 +80,13 @@ def stopped?

private

def should_stop?
@state == STOPPING
end

# The runner handling polling over the connection.
def runner
poll until @state == STOPPING
poll until should_stop?
rescue Excon::Errors::Error
@statistics[:failed_calls] += 1
retry
Expand Down Expand Up @@ -113,9 +130,19 @@ def server_endpoint
endpoint.freeze
end

def light_sleep(seconds)
return if should_stop?
@_sleep_check, @_sleep_interrupt = IO.pipe
IO.select([@_sleep_check], nil, nil, seconds)
end

def wakeup
@_sleep_interrupt.close if defined?(@_sleep_interrupt) && !@_sleep_interrupt.closed?
end

# Handles the response from the connection.
def handle_connection_response(response)
handle_response(response.body)
sleep(self.class.poll_interval)
light_sleep(self.class.poll_interval)
end
end
27 changes: 27 additions & 0 deletions spec/message_bus_client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,20 @@ def write_message(message, user = 'message_bus_client')
subject.stop
end

it 'can stop client quickly' do
allow(MessageBusClient).to receive(:poll_interval).and_return(2)
timeout = MessageBusClient.poll_interval.to_f / 2

subject.start
sleep(timeout / 2) # let the background thread start the runner

Timeout.timeout(timeout * 1.1) do
subject.stop(timeout)
end

expect(subject).to be_stopped
end

context 'when the connection times out' do
it 'continues to long poll' do
count = 0
Expand Down Expand Up @@ -72,6 +86,19 @@ def write_message(message, user = 'message_bus_client')
subject.stop
end

it 'can stop client quickly before the background thread starts' do
timeout = MessageBusClient.poll_interval.to_f / 2

subject.start
sleep(timeout / 2) # let the background thread start the runner

Timeout.timeout(timeout * 1.1) do
subject.stop(timeout)
end

expect(subject).to be_stopped
end

it 'receives messages' do
subject.start

Expand Down

0 comments on commit fcdd949

Please sign in to comment.