Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

first commit #1

Merged
merged 1 commit into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 29 additions & 107 deletions bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,134 +21,64 @@ package bus
import (
"context"
"fmt"
"os"
"reflect"
"sort"
"sync"
"time"
)

type bus struct {
sync.RWMutex
sub map[string]*subscribers
topics map[string]*Topic
}

// NewBus ...
func NewBus() Bus {
return &bus{
sub: make(map[string]*subscribers),
topics: make(map[string]*Topic),
}
}

// Publish ...
func (b *bus) Publish(topic string, args ...interface{}) {
go b.publish(topic, args...)
}

// publish ...
func (b *bus) publish(topic string, args ...interface{}) {
rArgs := buildHandlerArgs(append([]interface{}{topic}, args...))

b.RLock()
defer b.RUnlock()

for t, sub := range b.sub {
for t, sub := range b.topics {
if !TopicMatch([]byte(topic), []byte(t)) {
continue
}
sub.lastMsg = rArgs
for _, h := range sub.handlers {
h.queue <- rArgs
}
go sub.Publish(args...)
}
}

// Subscribe ...
func (b *bus) Subscribe(topic string, fn interface{}, options ...interface{}) (err error) {
if err = b.subscribe(topic, fn, options...); err != nil {
fmt.Fprintln(os.Stderr, err.Error())
}
return err
}

// subscribe ...
func (b *bus) subscribe(topic string, fn interface{}, options ...interface{}) error {
if reflect.TypeOf(fn).Kind() != reflect.Func {
return fmt.Errorf("%s is not a reflect.Func", reflect.TypeOf(fn))
}

const queueSize = 1024

h := &handler{
callback: reflect.ValueOf(fn),
queue: make(chan []reflect.Value, queueSize),
}

b.Lock()
defer b.Unlock()

go func() {
for args := range h.queue {
go func(args []reflect.Value) {
startTime := time.Now()
h.callback.Call(args)
t := time.Since(startTime).Milliseconds()
if t > 5 {
fmt.Printf("long call! topic %s, fn: %s, Milliseconds: %d\n\r", topic, reflect.ValueOf(fn).String(), t)
}
}(args)
}
}()

if _, ok := b.sub[topic]; ok {
b.sub[topic].handlers = append(b.sub[topic].handlers, h)
if sub, ok := b.topics[topic]; ok {
return sub.Subscribe(fn, options...)
} else {
b.sub[topic] = &subscribers{
handlers: []*handler{h},
}
}

if len(options) > 0 {
if retain, ok := options[0].(bool); ok && !retain {
return nil
}
}

// sand last message value
if b.sub[topic].lastMsg != nil {
go h.callback.Call(b.sub[topic].lastMsg)
subs := NewTopic(topic)
b.topics[topic] = subs
return subs.Subscribe(fn, options...)
}

return nil
}

// Unsubscribe ...
func (b *bus) Unsubscribe(topic string, fn interface{}) (err error) {
if err = b.unsubscribe(topic, fn); err != nil {
fmt.Fprintln(os.Stderr, err.Error())
}
return
}

// unsubscribe ...
func (b *bus) unsubscribe(topic string, fn interface{}) error {
b.Lock()
defer b.Unlock()

rv := reflect.ValueOf(fn)

if _, ok := b.sub[topic]; ok {
for i, h := range b.sub[topic].handlers {
if h.callback == rv || h.callback.Pointer() == rv.Pointer() {
close(h.queue)
b.sub[topic].handlers = append(b.sub[topic].handlers[:i], b.sub[topic].handlers[i+1:]...)
if len(b.sub[topic].handlers) == 0 {
delete(b.sub, topic)
}
return nil
}
var empty bool
if sub, ok := b.topics[topic]; ok {
empty, err = sub.Unsubscribe(fn)
if err != nil {
return err
}
if empty {
delete(b.topics, topic)
}

return nil
}

Expand All @@ -160,12 +90,9 @@ func (b *bus) CloseTopic(topic string) {
b.Lock()
defer b.Unlock()

if _, ok := b.sub[topic]; ok {
for _, h := range b.sub[topic].handlers {
close(h.queue)
}
delete(b.sub, topic)
return
if sub, ok := b.topics[topic]; ok {
sub.Close()
delete(b.topics, topic)
}
}

Expand All @@ -174,29 +101,24 @@ func (b *bus) Purge() {
b.Lock()
defer b.Unlock()

for topic, s := range b.sub {
for _, h := range s.handlers {
close(h.queue)
}
delete(b.sub, topic)
for topic, sub := range b.topics {
sub.Close()
delete(b.topics, topic)
}
}

func (b *bus) Stat(ctx context.Context, limit, offset int64, _, _ string) (result Stats, total int64, err error) {
b.RLock()
defer b.RUnlock()

var stats Stats
for topic, subs := range b.sub {
stats = append(stats, Stat{
Topic: topic,
Subscribers: len(subs.handlers),
})
b.RLock()
var stats = make(Stats, 0, len(b.topics))
for _, subs := range b.topics {
stats = append(stats, subs.Stat())
}
b.RUnlock()

sort.Sort(stats)

total = int64(len(stats))
total = int64(len(b.topics))

if offset > total {
offset = total
Expand Down
4 changes: 2 additions & 2 deletions bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestBus(t *testing.T) {

time.Sleep(time.Second)

require.Equal(t, counter, 1)
require.Equal(t, 1, counter)

// Test Stat
stats, total, err = b.Stat(context.Background(), 999, 0, "", "")
Expand Down Expand Up @@ -423,5 +423,5 @@ func BenchmarkBus(b *testing.B) {

time.Sleep(time.Second)

require.Equal(b, int32(b.N), counter)
require.Equal(b, int32(b.N), counter.Load())
}
115 changes: 115 additions & 0 deletions statistic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// This file is part of the Smart Home
// Program complex distribution https://github.com/e154/smart-home
// Copyright (C) 2024, Filippov Alex
//
// This library is free software: you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
// License as published by the Free Software Foundation; either
// version 3 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Library General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public
// License along with this library. If not, see
// <https://www.gnu.org/licenses/>.

package bus

import (
"strings"
"time"

"go.uber.org/atomic"
)

type Statistic struct {
min *atomic.Duration
max *atomic.Duration
avg *atomic.Duration
rps *RPSCounter
}

func NewStatistic() *Statistic {
return &Statistic{
min: atomic.NewDuration(0),
max: atomic.NewDuration(0),
avg: atomic.NewDuration(0),
rps: startRPSCounter(),
}
}

func (s *Statistic) setTime(t time.Duration) {
if s.min.Load() == 0 {
s.min.Store(t)
}
if s.min.Load() > t {
s.min.Store(t)
}
if s.max.Load() == 0 {
s.max.Store(t)
}
if t > s.max.Load() {
s.max.Store(t)
}
s.avg.Store((s.max.Load() + s.min.Load()) / 2)
}

// StatItem ...
type StatItem struct {
Topic string
Subscribers int
Min time.Duration
Max time.Duration
Avg time.Duration
Rps float64
}

// Stats ...
type Stats []*StatItem

func (s Stats) Len() int { return len(s) }
func (s Stats) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s Stats) Less(i, j int) bool { return strings.Compare(s[i].Topic, s[j].Topic) == -1 }

type RPSCounter struct {
count *atomic.Int64
value *atomic.Float64
isRunning *atomic.Bool
}

func startRPSCounter() *RPSCounter {
counter := &RPSCounter{
count: atomic.NewInt64(0),
value: atomic.NewFloat64(0),
isRunning: atomic.NewBool(true),
}

go func() {
ticker := time.NewTicker(time.Second * 5)
defer ticker.Stop()
for counter.isRunning.Load() {
select {
case <-ticker.C:
counter.value.Store(float64(counter.count.Load()) / 5)
counter.count.Store(0)
}
}
}()

return counter
}

func (c *RPSCounter) Inc() {
c.count.Inc()
}

func (c *RPSCounter) Value() float64 {
return c.value.Load()
}

func (c *RPSCounter) Stop() {
c.isRunning.Store(false)
}
Loading
Loading