diff --git a/bus.go b/bus.go
index ff3d9c4..b30aa3e 100644
--- a/bus.go
+++ b/bus.go
@@ -21,134 +21,64 @@ package bus
import (
"context"
"fmt"
- "os"
"reflect"
"sort"
"sync"
- "time"
)
type bus struct {
sync.RWMutex
- sub map[string]*subscribers
+ topics map[string]*Topic
}
// NewBus ...
func NewBus() Bus {
return &bus{
- sub: make(map[string]*subscribers),
+ topics: make(map[string]*Topic),
}
}
// Publish ...
func (b *bus) Publish(topic string, args ...interface{}) {
- go b.publish(topic, args...)
-}
-
-// publish ...
-func (b *bus) publish(topic string, args ...interface{}) {
- rArgs := buildHandlerArgs(append([]interface{}{topic}, args...))
-
b.RLock()
defer b.RUnlock()
- for t, sub := range b.sub {
+ for t, sub := range b.topics {
if !TopicMatch([]byte(topic), []byte(t)) {
continue
}
- sub.lastMsg = rArgs
- for _, h := range sub.handlers {
- h.queue <- rArgs
- }
+ go sub.Publish(args...)
}
}
// Subscribe ...
func (b *bus) Subscribe(topic string, fn interface{}, options ...interface{}) (err error) {
- if err = b.subscribe(topic, fn, options...); err != nil {
- fmt.Fprintln(os.Stderr, err.Error())
- }
- return err
-}
-
-// subscribe ...
-func (b *bus) subscribe(topic string, fn interface{}, options ...interface{}) error {
- if reflect.TypeOf(fn).Kind() != reflect.Func {
- return fmt.Errorf("%s is not a reflect.Func", reflect.TypeOf(fn))
- }
-
- const queueSize = 1024
-
- h := &handler{
- callback: reflect.ValueOf(fn),
- queue: make(chan []reflect.Value, queueSize),
- }
-
b.Lock()
defer b.Unlock()
- go func() {
- for args := range h.queue {
- go func(args []reflect.Value) {
- startTime := time.Now()
- h.callback.Call(args)
- t := time.Since(startTime).Milliseconds()
- if t > 5 {
- fmt.Printf("long call! topic %s, fn: %s, Milliseconds: %d\n\r", topic, reflect.ValueOf(fn).String(), t)
- }
- }(args)
- }
- }()
-
- if _, ok := b.sub[topic]; ok {
- b.sub[topic].handlers = append(b.sub[topic].handlers, h)
+ if sub, ok := b.topics[topic]; ok {
+ return sub.Subscribe(fn, options...)
} else {
- b.sub[topic] = &subscribers{
- handlers: []*handler{h},
- }
- }
-
- if len(options) > 0 {
- if retain, ok := options[0].(bool); ok && !retain {
- return nil
- }
- }
-
- // sand last message value
- if b.sub[topic].lastMsg != nil {
- go h.callback.Call(b.sub[topic].lastMsg)
+ subs := NewTopic(topic)
+ b.topics[topic] = subs
+ return subs.Subscribe(fn, options...)
}
-
- return nil
}
// Unsubscribe ...
func (b *bus) Unsubscribe(topic string, fn interface{}) (err error) {
- if err = b.unsubscribe(topic, fn); err != nil {
- fmt.Fprintln(os.Stderr, err.Error())
- }
- return
-}
-
-// unsubscribe ...
-func (b *bus) unsubscribe(topic string, fn interface{}) error {
b.Lock()
defer b.Unlock()
- rv := reflect.ValueOf(fn)
-
- if _, ok := b.sub[topic]; ok {
- for i, h := range b.sub[topic].handlers {
- if h.callback == rv || h.callback.Pointer() == rv.Pointer() {
- close(h.queue)
- b.sub[topic].handlers = append(b.sub[topic].handlers[:i], b.sub[topic].handlers[i+1:]...)
- if len(b.sub[topic].handlers) == 0 {
- delete(b.sub, topic)
- }
- return nil
- }
+ var empty bool
+ if sub, ok := b.topics[topic]; ok {
+ empty, err = sub.Unsubscribe(fn)
+ if err != nil {
+ return err
+ }
+ if empty {
+ delete(b.topics, topic)
}
-
return nil
}
@@ -160,12 +90,9 @@ func (b *bus) CloseTopic(topic string) {
b.Lock()
defer b.Unlock()
- if _, ok := b.sub[topic]; ok {
- for _, h := range b.sub[topic].handlers {
- close(h.queue)
- }
- delete(b.sub, topic)
- return
+ if sub, ok := b.topics[topic]; ok {
+ sub.Close()
+ delete(b.topics, topic)
}
}
@@ -174,29 +101,24 @@ func (b *bus) Purge() {
b.Lock()
defer b.Unlock()
- for topic, s := range b.sub {
- for _, h := range s.handlers {
- close(h.queue)
- }
- delete(b.sub, topic)
+ for topic, sub := range b.topics {
+ sub.Close()
+ delete(b.topics, topic)
}
}
func (b *bus) Stat(ctx context.Context, limit, offset int64, _, _ string) (result Stats, total int64, err error) {
- b.RLock()
- defer b.RUnlock()
- var stats Stats
- for topic, subs := range b.sub {
- stats = append(stats, Stat{
- Topic: topic,
- Subscribers: len(subs.handlers),
- })
+ b.RLock()
+ var stats = make(Stats, 0, len(b.topics))
+ for _, subs := range b.topics {
+ stats = append(stats, subs.Stat())
}
+ b.RUnlock()
sort.Sort(stats)
- total = int64(len(stats))
+ total = int64(len(b.topics))
if offset > total {
offset = total
diff --git a/bus_test.go b/bus_test.go
index 43cee74..2b5c143 100644
--- a/bus_test.go
+++ b/bus_test.go
@@ -122,7 +122,7 @@ func TestBus(t *testing.T) {
time.Sleep(time.Second)
- require.Equal(t, counter, 1)
+ require.Equal(t, 1, counter)
// Test Stat
stats, total, err = b.Stat(context.Background(), 999, 0, "", "")
@@ -423,5 +423,5 @@ func BenchmarkBus(b *testing.B) {
time.Sleep(time.Second)
- require.Equal(b, int32(b.N), counter)
+ require.Equal(b, int32(b.N), counter.Load())
}
diff --git a/statistic.go b/statistic.go
new file mode 100644
index 0000000..52a91b6
--- /dev/null
+++ b/statistic.go
@@ -0,0 +1,115 @@
+// This file is part of the Smart Home
+// Program complex distribution https://github.com/e154/smart-home
+// Copyright (C) 2024, Filippov Alex
+//
+// This library is free software: you can redistribute it and/or
+// modify it under the terms of the GNU Lesser General Public
+// License as published by the Free Software Foundation; either
+// version 3 of the License, or (at your option) any later version.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// Library General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public
+// License along with this library. If not, see
+// .
+
+package bus
+
+import (
+ "strings"
+ "time"
+
+ "go.uber.org/atomic"
+)
+
+type Statistic struct {
+ min *atomic.Duration
+ max *atomic.Duration
+ avg *atomic.Duration
+ rps *RPSCounter
+}
+
+func NewStatistic() *Statistic {
+ return &Statistic{
+ min: atomic.NewDuration(0),
+ max: atomic.NewDuration(0),
+ avg: atomic.NewDuration(0),
+ rps: startRPSCounter(),
+ }
+}
+
+func (s *Statistic) setTime(t time.Duration) {
+ if s.min.Load() == 0 {
+ s.min.Store(t)
+ }
+ if s.min.Load() > t {
+ s.min.Store(t)
+ }
+ if s.max.Load() == 0 {
+ s.max.Store(t)
+ }
+ if t > s.max.Load() {
+ s.max.Store(t)
+ }
+ s.avg.Store((s.max.Load() + s.min.Load()) / 2)
+}
+
+// StatItem ...
+type StatItem struct {
+ Topic string
+ Subscribers int
+ Min time.Duration
+ Max time.Duration
+ Avg time.Duration
+ Rps float64
+}
+
+// Stats ...
+type Stats []*StatItem
+
+func (s Stats) Len() int { return len(s) }
+func (s Stats) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
+func (s Stats) Less(i, j int) bool { return strings.Compare(s[i].Topic, s[j].Topic) == -1 }
+
+type RPSCounter struct {
+ count *atomic.Int64
+ value *atomic.Float64
+ isRunning *atomic.Bool
+}
+
+func startRPSCounter() *RPSCounter {
+ counter := &RPSCounter{
+ count: atomic.NewInt64(0),
+ value: atomic.NewFloat64(0),
+ isRunning: atomic.NewBool(true),
+ }
+
+ go func() {
+ ticker := time.NewTicker(time.Second * 5)
+ defer ticker.Stop()
+ for counter.isRunning.Load() {
+ select {
+ case <-ticker.C:
+ counter.value.Store(float64(counter.count.Load()) / 5)
+ counter.count.Store(0)
+ }
+ }
+ }()
+
+ return counter
+}
+
+func (c *RPSCounter) Inc() {
+ c.count.Inc()
+}
+
+func (c *RPSCounter) Value() float64 {
+ return c.value.Load()
+}
+
+func (c *RPSCounter) Stop() {
+ c.isRunning.Store(false)
+}
diff --git a/topic.go b/topic.go
new file mode 100644
index 0000000..84f6c22
--- /dev/null
+++ b/topic.go
@@ -0,0 +1,151 @@
+// This file is part of the Smart Home
+// Program complex distribution https://github.com/e154/smart-home
+// Copyright (C) 2024, Filippov Alex
+//
+// This library is free software: you can redistribute it and/or
+// modify it under the terms of the GNU Lesser General Public
+// License as published by the Free Software Foundation; either
+// version 3 of the License, or (at your option) any later version.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// Library General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public
+// License along with this library. If not, see
+// .
+
+package bus
+
+import (
+ "fmt"
+ "reflect"
+ "sync"
+ "time"
+)
+
+const queueSize = 1024
+
+type Topic struct {
+ name string
+ *Statistic
+ sync.RWMutex
+ handlers []*handler
+ lastMsg []reflect.Value
+}
+
+func NewTopic(name string) *Topic {
+ return &Topic{
+ name: name,
+ handlers: make([]*handler, 0),
+ Statistic: NewStatistic(),
+ }
+}
+
+func (t *Topic) Publish(args ...interface{}) {
+ t.RLock()
+ defer t.RUnlock()
+
+ if len(t.handlers) == 0 {
+ return
+ }
+
+ rArgs := buildHandlerArgs(append([]interface{}{t.name}, args...))
+
+ t.lastMsg = rArgs
+ for _, h := range t.handlers {
+ t.rps.Inc()
+ h.queue <- rArgs
+ }
+}
+
+func (t *Topic) Subscribe(fn interface{}, options ...interface{}) error {
+ if reflect.TypeOf(fn).Kind() != reflect.Func {
+ return fmt.Errorf("%s is not a reflect.Func", reflect.TypeOf(fn))
+ }
+
+ h := &handler{
+ callback: reflect.ValueOf(fn),
+ queue: make(chan []reflect.Value, queueSize),
+ }
+
+ t.Lock()
+ t.handlers = append(t.handlers, h)
+ t.Unlock()
+
+ go func(h *handler) {
+ var startTime time.Time
+ for args := range h.queue {
+ go func(args []reflect.Value) {
+ startTime = time.Now()
+ h.callback.Call(args)
+ t.setTime(time.Since(startTime))
+ }(args)
+ }
+ }(h)
+
+ if len(options) > 0 {
+ if retain, ok := options[0].(bool); ok && !retain {
+ return nil
+ }
+ }
+
+ t.RLock()
+ // sand last message value
+ if t.lastMsg != nil {
+ go h.callback.Call(t.lastMsg)
+ }
+ t.RUnlock()
+
+ return nil
+}
+
+func (t *Topic) Unsubscribe(fn interface{}) (empty bool, err error) {
+ t.Lock()
+ defer t.Unlock()
+
+ rv := reflect.ValueOf(fn)
+
+ var indexesToDelete []int
+
+ for i, h := range t.handlers {
+ if h.callback == rv || h.callback.Pointer() == rv.Pointer() {
+ indexesToDelete = append(indexesToDelete, i)
+ }
+ }
+
+ for i := len(indexesToDelete) - 1; i >= 0; i-- {
+ index := indexesToDelete[i]
+ close(t.handlers[index].queue)
+ t.handlers = append(t.handlers[:index], t.handlers[index+1:]...)
+ }
+
+ empty = len(t.handlers) == 0
+
+ return
+}
+
+func (t *Topic) Close() {
+ t.Lock()
+ defer t.Unlock()
+
+ for _, h := range t.handlers {
+ close(h.queue)
+ }
+ t.handlers = make([]*handler, 0)
+ t.rps.Stop()
+}
+
+func (t *Topic) Stat() *StatItem {
+ t.RLock()
+ defer t.RUnlock()
+ return &StatItem{
+ Topic: t.name,
+ Subscribers: len(t.handlers),
+ Min: t.min.Load(),
+ Max: t.max.Load(),
+ Avg: t.avg.Load(),
+ Rps: t.rps.Value(),
+ }
+}
diff --git a/topic_test.go b/topic_test.go
new file mode 100644
index 0000000..2e97273
--- /dev/null
+++ b/topic_test.go
@@ -0,0 +1,271 @@
+// This file is part of the Smart Home
+// Program complex distribution https://github.com/e154/smart-home
+// Copyright (C) 2023, Filippov Alex
+//
+// This library is free software: you can redistribute it and/or
+// modify it under the terms of the GNU Lesser General Public
+// License as published by the Free Software Foundation; either
+// version 3 of the License, or (at your option) any later version.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// Library General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public
+// License along with this library. If not, see
+// .
+
+package bus
+
+import (
+ "fmt"
+ "reflect"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+ "go.uber.org/atomic"
+)
+
+func TestTopic(t *testing.T) {
+
+ const topic = "test/topic"
+
+ b := NewTopic(topic)
+
+ var counter = 0
+ var wg = sync.WaitGroup{}
+
+ // Test Subscribe
+ fn := func(topic string, arg1 string, arg2 string) {
+ counter++
+ wg.Done()
+ }
+ wg.Add(1)
+ err := b.Subscribe(fn)
+ if err != nil {
+ t.Errorf("Subscribe returned an error: %v", err)
+ }
+
+ // Test Publish
+ b.Publish("hello", "world")
+
+ wg.Wait()
+
+ require.Equal(t, counter, 1)
+
+ // ------------------------------------------------------------
+ // Test Stat
+ stat := b.Stat()
+ require.Equal(t, stat.Topic, topic)
+ require.Equal(t, stat.Subscribers, 1)
+
+ // ------------------------------------------------------------
+
+ // Test Unsubscribe
+ empty, err := b.Unsubscribe(fn)
+ if err != nil {
+ t.Errorf("Unsubscribe returned an error: %v", err)
+ }
+ require.Equal(t, true, empty)
+
+ // Test Publish
+ b.Publish("hello", "world")
+
+ time.Sleep(time.Second)
+
+ require.Equal(t, 1, counter)
+
+ // ------------------------------------------------------------
+
+ stat = b.Stat()
+ require.Equal(t, stat.Topic, topic)
+ require.Equal(t, stat.Subscribers, 0)
+ // ------------------------------------------------------------
+
+ // Test Subscribe
+ fn = func(topic string, arg1 string, arg2 string) {
+ counter++
+ }
+ err = b.Subscribe(fn, false)
+ if err != nil {
+ t.Errorf("Subscribe returned an error: %v", err)
+ }
+
+ stat = b.Stat()
+ require.Equal(t, stat.Subscribers, 1)
+
+ // Test Close
+ b.Close()
+
+ stat = b.Stat()
+ require.Equal(t, stat.Subscribers, 0)
+
+ // Test Publish
+ b.Publish("foo", "bar")
+
+ time.Sleep(time.Second)
+
+ require.Equal(t, 1, counter)
+
+ // ------------------------------------------------------------
+ // Test Stat
+ stat = b.Stat()
+ require.Equal(t, stat.Subscribers, 0)
+ // ------------------------------------------------------------
+
+ fn = func(topic string, arg1 string, arg2 string) {
+ counter++
+ }
+ err = b.Subscribe(fn, false)
+ if err != nil {
+ t.Errorf("Subscribe returned an error: %v", err)
+ }
+
+ // Test Close
+ b.Close()
+
+ // Test Publish
+ b.Publish("hello", "world")
+
+ time.Sleep(time.Second)
+
+ require.Equal(t, 1, counter)
+
+ /// Test Stat
+ stat = b.Stat()
+ require.Equal(t, stat.Subscribers, 0)
+
+ // ------------------------------------------------------------
+
+ // Test buildHandlerArgs
+ args := buildHandlerArgs([]interface{}{topic, "hello", "world"})
+ if len(args) != 3 {
+ t.Errorf("buildHandlerArgs returned the wrong number of arguments: %v", args)
+ }
+ if args[0].String() != topic {
+ t.Errorf("buildHandlerArgs returned the wrong topic: %v", args[0])
+ }
+ if args[1].String() != "hello" {
+ t.Errorf("buildHandlerArgs returned the wrong arg1: %v", args[1])
+ }
+ if args[2].String() != "world" {
+ t.Errorf("buildHandlerArgs returned the wrong arg2: %v", args[2])
+ }
+
+ // Test reflection of buildHandlerArgs
+ if reflect.TypeOf(buildHandlerArgs).Kind() != reflect.Func {
+ t.Errorf("buildHandlerArgs is not a function")
+ }
+}
+
+func TestTopic2(t *testing.T) {
+
+ const topic = "test/topic"
+
+ b := NewTopic(topic)
+
+ var counter atomic.Int32
+ var wg = sync.WaitGroup{}
+
+ // Test Subscribe
+ fn := func(topic string, arg1 string, arg2 string) {
+ fmt.Println("fn1")
+ counter.Inc()
+ wg.Done()
+ }
+
+ fn2 := func(topic string, arg1 string, arg2 string) {
+ fmt.Println("fn2")
+ counter.Inc()
+ wg.Done()
+ }
+
+ fn3 := func(topic string, arg1 string, arg2 string) {
+ fmt.Println("fn3")
+ counter.Inc()
+ wg.Done()
+ }
+
+ wg.Add(3)
+
+ err := b.Subscribe(fn)
+ if err != nil {
+ t.Errorf("Subscribe returned an error: %v", err)
+ }
+ err = b.Subscribe(fn2)
+ if err != nil {
+ t.Errorf("Subscribe returned an error: %v", err)
+ }
+ err = b.Subscribe(fn3)
+ if err != nil {
+ t.Errorf("Subscribe returned an error: %v", err)
+ }
+
+ // Test Stat
+ stat := b.Stat()
+ require.Equal(t, 3, stat.Subscribers)
+
+ // Test Publish
+ b.Publish("hello", "world")
+
+ wg.Wait()
+
+ require.Equal(t, int32(3), counter.Load())
+}
+
+func TestTopic3(t *testing.T) {
+
+ const topic = "test/topic"
+
+ b := NewTopic(topic)
+
+ // Test Subscribe
+ fn := func(topic string, arg1 string, arg2 string) {}
+
+ empty, err := b.Unsubscribe(fn)
+ require.Equal(t, nil, err)
+ require.Equal(t, true, empty)
+
+ err = b.Subscribe(fn)
+ require.Equal(t, nil, err)
+
+ // Test Stat
+ stat := b.Stat()
+ require.Equal(t, 1, stat.Subscribers)
+
+ empty, err = b.Unsubscribe(fn)
+ require.Equal(t, nil, err)
+ require.Equal(t, true, empty)
+
+ empty, err = b.Unsubscribe(fn)
+ require.Equal(t, nil, err)
+ require.Equal(t, true, empty)
+}
+
+func BenchmarkTopic(b *testing.B) {
+
+ const topic = "test/topic"
+
+ bus := NewTopic(topic)
+
+ var counter atomic.Int32
+
+ // Test Subscribe
+ fn := func(topic string, arg1 string, arg2 string) {
+ counter.Inc()
+ }
+ err := bus.Subscribe(fn)
+ require.NoError(b, err)
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ bus.Publish("hello", "world")
+ }
+
+ time.Sleep(time.Second)
+
+ require.Equal(b, int32(b.N), counter.Load())
+}
diff --git a/types.go b/types.go
index dbb1567..c5fe246 100644
--- a/types.go
+++ b/types.go
@@ -21,7 +21,6 @@ package bus
import (
"context"
"reflect"
- "strings"
)
// Bus implements publish/subscribe messaging paradigm
@@ -45,21 +44,3 @@ type handler struct {
callback reflect.Value
queue chan []reflect.Value
}
-
-type subscribers struct {
- handlers []*handler
- lastMsg []reflect.Value
-}
-
-// Stat ...
-type Stat struct {
- Topic string
- Subscribers int
-}
-
-// Stats ...
-type Stats []Stat
-
-func (s Stats) Len() int { return len(s) }
-func (s Stats) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
-func (s Stats) Less(i, j int) bool { return strings.Compare(s[i].Topic, s[j].Topic) == -1 }