From fcdd9493d7fca7a7aa5b6ca842159de142abfc1d Mon Sep 17 00:00:00 2001 From: Michal Cichra Date: Fri, 11 Aug 2017 12:59:00 +0200 Subject: [PATCH] ability to stop the client with a timeout try to gracefully stop or kill the runner thread --- lib/message_bus_client/connection.rb | 41 +++++++++++++++++++++++----- spec/message_bus_client_spec.rb | 27 ++++++++++++++++++ 2 files changed, 61 insertions(+), 7 deletions(-) diff --git a/lib/message_bus_client/connection.rb b/lib/message_bus_client/connection.rb index ee0a968..f4f095e 100644 --- a/lib/message_bus_client/connection.rb +++ b/lib/message_bus_client/connection.rb @@ -29,11 +29,14 @@ def diagnostics; end def start return unless @state == INITIALISED || stopped? - @state = STARTED @connection = Excon.new(server_endpoint, persistent: true) - @runner = Thread.new { runner } + + @runner = Thread.new(&method(:runner)) + @runner.name = "MessageBusClient (#{@client_id})" @runner.abort_on_exception = true + + @state = STARTED end def pause @@ -53,12 +56,22 @@ def resume handle_messages end - def stop - return unless @state == STARTED || @state == PAUSED + def stop(timeout = nil) + raise ThreadError if Thread.current == @runner + + return if should_stop? || stopped? @state = STOPPING @connection.reset - @runner.join + + wakeup # break out of light sleep when polling + + unless @runner.join(timeout) + @runner.kill + @runner.join # just killing the thread is not enough to finish the work + end + + @runner.stop? end def stopped? @@ -67,9 +80,13 @@ def stopped? private + def should_stop? + @state == STOPPING + end + # The runner handling polling over the connection. def runner - poll until @state == STOPPING + poll until should_stop? rescue Excon::Errors::Error @statistics[:failed_calls] += 1 retry @@ -113,9 +130,19 @@ def server_endpoint endpoint.freeze end + def light_sleep(seconds) + return if should_stop? + @_sleep_check, @_sleep_interrupt = IO.pipe + IO.select([@_sleep_check], nil, nil, seconds) + end + + def wakeup + @_sleep_interrupt.close if defined?(@_sleep_interrupt) && !@_sleep_interrupt.closed? + end + # Handles the response from the connection. def handle_connection_response(response) handle_response(response.body) - sleep(self.class.poll_interval) + light_sleep(self.class.poll_interval) end end diff --git a/spec/message_bus_client_spec.rb b/spec/message_bus_client_spec.rb index bf879c2..e0f6f29 100644 --- a/spec/message_bus_client_spec.rb +++ b/spec/message_bus_client_spec.rb @@ -19,6 +19,20 @@ def write_message(message, user = 'message_bus_client') subject.stop end + it 'can stop client quickly' do + allow(MessageBusClient).to receive(:poll_interval).and_return(2) + timeout = MessageBusClient.poll_interval.to_f / 2 + + subject.start + sleep(timeout / 2) # let the background thread start the runner + + Timeout.timeout(timeout * 1.1) do + subject.stop(timeout) + end + + expect(subject).to be_stopped + end + context 'when the connection times out' do it 'continues to long poll' do count = 0 @@ -72,6 +86,19 @@ def write_message(message, user = 'message_bus_client') subject.stop end + it 'can stop client quickly before the background thread starts' do + timeout = MessageBusClient.poll_interval.to_f / 2 + + subject.start + sleep(timeout / 2) # let the background thread start the runner + + Timeout.timeout(timeout * 1.1) do + subject.stop(timeout) + end + + expect(subject).to be_stopped + end + it 'receives messages' do subject.start