Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adds SubscribeAsyncWithNewContext #37

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 24 additions & 5 deletions event_bus.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package EventBus

import (
"context"
"fmt"
"reflect"
"sync"
Expand All @@ -10,6 +11,7 @@ import (
type BusSubscriber interface {
Subscribe(topic string, fn interface{}) error
SubscribeAsync(topic string, fn interface{}, transactional bool) error
SubscribeAsyncWithNewContext(topic string, fn interface{}, transactional bool) error
SubscribeOnce(topic string, fn interface{}) error
SubscribeOnceAsync(topic string, fn interface{}) error
Unsubscribe(topic string, handler interface{}) error
Expand Down Expand Up @@ -46,6 +48,7 @@ type eventHandler struct {
async bool
transactional bool
sync.Mutex // lock for an event handler - useful for running async callbacks serially
withNewCtx bool
}

// New returns new EventBus with empty handlers.
Expand Down Expand Up @@ -73,7 +76,7 @@ func (bus *EventBus) doSubscribe(topic string, fn interface{}, handler *eventHan
// Returns error if `fn` is not a function.
func (bus *EventBus) Subscribe(topic string, fn interface{}) error {
return bus.doSubscribe(topic, fn, &eventHandler{
reflect.ValueOf(fn), false, false, false, sync.Mutex{},
reflect.ValueOf(fn), false, false, false, sync.Mutex{}, false,
})
}

Expand All @@ -83,15 +86,21 @@ func (bus *EventBus) Subscribe(topic string, fn interface{}) error {
// Returns error if `fn` is not a function.
func (bus *EventBus) SubscribeAsync(topic string, fn interface{}, transactional bool) error {
return bus.doSubscribe(topic, fn, &eventHandler{
reflect.ValueOf(fn), false, true, transactional, sync.Mutex{},
reflect.ValueOf(fn), false, true, transactional, sync.Mutex{}, false,
})
}

func (bus *EventBus) SubscribeAsyncWithNewContext(topic string, fn interface{}, transactional bool) error {
return bus.doSubscribe(topic, fn, &eventHandler{
reflect.ValueOf(fn), false, true, transactional, sync.Mutex{}, true,
})
}

// SubscribeOnce subscribes to a topic once. Handler will be removed after executing.
// Returns error if `fn` is not a function.
func (bus *EventBus) SubscribeOnce(topic string, fn interface{}) error {
return bus.doSubscribe(topic, fn, &eventHandler{
reflect.ValueOf(fn), true, false, false, sync.Mutex{},
reflect.ValueOf(fn), true, false, false, sync.Mutex{}, false,
})
}

Expand All @@ -100,7 +109,7 @@ func (bus *EventBus) SubscribeOnce(topic string, fn interface{}) error {
// Returns error if `fn` is not a function.
func (bus *EventBus) SubscribeOnceAsync(topic string, fn interface{}) error {
return bus.doSubscribe(topic, fn, &eventHandler{
reflect.ValueOf(fn), true, true, false, sync.Mutex{},
reflect.ValueOf(fn), true, true, false, sync.Mutex{}, false,
})
}

Expand Down Expand Up @@ -147,7 +156,17 @@ func (bus *EventBus) Publish(topic string, args ...interface{}) {
if handler.transactional {
handler.Lock()
}
go bus.doPublishAsync(handler, topic, args...)
var asyncArgs []interface{}
if handler.withNewCtx {
if len(args) == 0 {
panic("context expected, got no args")
}
asyncArgs = append(asyncArgs, context.Background())
asyncArgs = append(asyncArgs, args[1:]...)
} else {
asyncArgs = args
}
go bus.doPublishAsync(handler, topic, asyncArgs...)
}
}
}
Expand Down
83 changes: 83 additions & 0 deletions event_bus_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package EventBus

import (
"context"
"testing"
"time"
)
Expand Down Expand Up @@ -154,3 +155,85 @@ func TestSubscribeAsync(t *testing.T) {
t.Fail()
}
}

func TestSubscribeAsyncContextCancelled(t *testing.T) {
var isCancelledDone bool
isNotCancelledDone := true

bus := New()
bus.SubscribeAsync("topic", func(ctx context.Context, done *bool) {
select {
case <-time.NewTimer(10*time.Millisecond).C:
*done = false
case <-ctx.Done():
*done = true
}
}, false)

notCancelled, notCancel := context.WithCancel(context.Background())
cancelled, cancel := context.WithCancel(context.Background())
bus.Publish("topic", notCancelled, &isNotCancelledDone)
bus.Publish("topic", cancelled, &isCancelledDone)
defer notCancel()
cancel()

bus.WaitAsync()

if isNotCancelledDone {
t.Fail()
}
if !isCancelledDone {
t.Fail()
}
}

func TestSubscribeAsyncWithNewContextNoArgs(t *testing.T) {
defer func() {
if r := recover(); r != nil {
if r != "context expected, got no args" {
t.Fail()
}
}
}()
bus := New()
bus.SubscribeAsyncWithNewContext("topic", func(ctx context.Context) {}, false)
bus.Publish("topic")
}

func TestSubscribeAsyncWithNewContext(t *testing.T) {
isCancelledDone := true
isNotCancelledDone := true

bus := New()
bus.SubscribeAsyncWithNewContext("topic", func(ctx context.Context, done *bool) {
select {
case <-time.NewTimer(10*time.Millisecond).C:
*done = false
case <-ctx.Done():
*done = true
}
}, false)

notCancelled, notCancel := context.WithCancel(context.Background())
cancelled, cancel := context.WithCancel(context.Background())
bus.Publish("topic", notCancelled, &isNotCancelledDone)
bus.Publish("topic", cancelled, &isCancelledDone)
defer notCancel()
cancel()

bus.WaitAsync()

if isCancelledDone {
t.Fail()
}
if isNotCancelledDone {
t.Fail()
}
}

//func TestSubscribeAsyncNewContext(t *testing.T) {
// bus := New()
// bus.SubscribeAsyncWithNewContext("topic", func(context.Context, i int, k string) {
//
// }, false)
//}