Skip to content

Commit

Permalink
Merge pull request #1 from e154/develop
Browse files Browse the repository at this point in the history
technical debt
  • Loading branch information
e154 authored Jan 29, 2024
2 parents bde7c24 + 2752d3f commit 6638848
Show file tree
Hide file tree
Showing 6 changed files with 568 additions and 128 deletions.
136 changes: 29 additions & 107 deletions bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,134 +21,64 @@ package bus
import (
"context"
"fmt"
"os"
"reflect"
"sort"
"sync"
"time"
)

type bus struct {
sync.RWMutex
sub map[string]*subscribers
topics map[string]*Topic
}

// NewBus ...
func NewBus() Bus {
return &bus{
sub: make(map[string]*subscribers),
topics: make(map[string]*Topic),
}
}

// Publish ...
func (b *bus) Publish(topic string, args ...interface{}) {
go b.publish(topic, args...)
}

// publish ...
func (b *bus) publish(topic string, args ...interface{}) {
rArgs := buildHandlerArgs(append([]interface{}{topic}, args...))

b.RLock()
defer b.RUnlock()

for t, sub := range b.sub {
for t, sub := range b.topics {
if !TopicMatch([]byte(topic), []byte(t)) {
continue
}
sub.lastMsg = rArgs
for _, h := range sub.handlers {
h.queue <- rArgs
}
go sub.Publish(args...)
}
}

// Subscribe ...
func (b *bus) Subscribe(topic string, fn interface{}, options ...interface{}) (err error) {
if err = b.subscribe(topic, fn, options...); err != nil {
fmt.Fprintln(os.Stderr, err.Error())
}
return err
}

// subscribe ...
func (b *bus) subscribe(topic string, fn interface{}, options ...interface{}) error {
if reflect.TypeOf(fn).Kind() != reflect.Func {
return fmt.Errorf("%s is not a reflect.Func", reflect.TypeOf(fn))
}

const queueSize = 1024

h := &handler{
callback: reflect.ValueOf(fn),
queue: make(chan []reflect.Value, queueSize),
}

b.Lock()
defer b.Unlock()

go func() {
for args := range h.queue {
go func(args []reflect.Value) {
startTime := time.Now()
h.callback.Call(args)
t := time.Since(startTime).Milliseconds()
if t > 5 {
fmt.Printf("long call! topic %s, fn: %s, Milliseconds: %d\n\r", topic, reflect.ValueOf(fn).String(), t)
}
}(args)
}
}()

if _, ok := b.sub[topic]; ok {
b.sub[topic].handlers = append(b.sub[topic].handlers, h)
if sub, ok := b.topics[topic]; ok {
return sub.Subscribe(fn, options...)
} else {
b.sub[topic] = &subscribers{
handlers: []*handler{h},
}
}

if len(options) > 0 {
if retain, ok := options[0].(bool); ok && !retain {
return nil
}
}

// sand last message value
if b.sub[topic].lastMsg != nil {
go h.callback.Call(b.sub[topic].lastMsg)
subs := NewTopic(topic)
b.topics[topic] = subs
return subs.Subscribe(fn, options...)
}

return nil
}

// Unsubscribe ...
func (b *bus) Unsubscribe(topic string, fn interface{}) (err error) {
if err = b.unsubscribe(topic, fn); err != nil {
fmt.Fprintln(os.Stderr, err.Error())
}
return
}

// unsubscribe ...
func (b *bus) unsubscribe(topic string, fn interface{}) error {
b.Lock()
defer b.Unlock()

rv := reflect.ValueOf(fn)

if _, ok := b.sub[topic]; ok {
for i, h := range b.sub[topic].handlers {
if h.callback == rv || h.callback.Pointer() == rv.Pointer() {
close(h.queue)
b.sub[topic].handlers = append(b.sub[topic].handlers[:i], b.sub[topic].handlers[i+1:]...)
if len(b.sub[topic].handlers) == 0 {
delete(b.sub, topic)
}
return nil
}
var empty bool
if sub, ok := b.topics[topic]; ok {
empty, err = sub.Unsubscribe(fn)
if err != nil {
return err
}
if empty {
delete(b.topics, topic)
}

return nil
}

Expand All @@ -160,12 +90,9 @@ func (b *bus) CloseTopic(topic string) {
b.Lock()
defer b.Unlock()

if _, ok := b.sub[topic]; ok {
for _, h := range b.sub[topic].handlers {
close(h.queue)
}
delete(b.sub, topic)
return
if sub, ok := b.topics[topic]; ok {
sub.Close()
delete(b.topics, topic)
}
}

Expand All @@ -174,29 +101,24 @@ func (b *bus) Purge() {
b.Lock()
defer b.Unlock()

for topic, s := range b.sub {
for _, h := range s.handlers {
close(h.queue)
}
delete(b.sub, topic)
for topic, sub := range b.topics {
sub.Close()
delete(b.topics, topic)
}
}

func (b *bus) Stat(ctx context.Context, limit, offset int64, _, _ string) (result Stats, total int64, err error) {
b.RLock()
defer b.RUnlock()

var stats Stats
for topic, subs := range b.sub {
stats = append(stats, Stat{
Topic: topic,
Subscribers: len(subs.handlers),
})
b.RLock()
var stats = make(Stats, 0, len(b.topics))
for _, subs := range b.topics {
stats = append(stats, subs.Stat())
}
b.RUnlock()

sort.Sort(stats)

total = int64(len(stats))
total = int64(len(b.topics))

if offset > total {
offset = total
Expand Down
4 changes: 2 additions & 2 deletions bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestBus(t *testing.T) {

time.Sleep(time.Second)

require.Equal(t, counter, 1)
require.Equal(t, 1, counter)

// Test Stat
stats, total, err = b.Stat(context.Background(), 999, 0, "", "")
Expand Down Expand Up @@ -423,5 +423,5 @@ func BenchmarkBus(b *testing.B) {

time.Sleep(time.Second)

require.Equal(b, int32(b.N), counter)
require.Equal(b, int32(b.N), counter.Load())
}
115 changes: 115 additions & 0 deletions statistic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// This file is part of the Smart Home
// Program complex distribution https://github.com/e154/smart-home
// Copyright (C) 2024, Filippov Alex
//
// This library is free software: you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
// License as published by the Free Software Foundation; either
// version 3 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Library General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public
// License along with this library. If not, see
// <https://www.gnu.org/licenses/>.

package bus

import (
"strings"
"time"

"go.uber.org/atomic"
)

type Statistic struct {
min *atomic.Duration
max *atomic.Duration
avg *atomic.Duration
rps *RPSCounter
}

func NewStatistic() *Statistic {
return &Statistic{
min: atomic.NewDuration(0),
max: atomic.NewDuration(0),
avg: atomic.NewDuration(0),
rps: startRPSCounter(),
}
}

func (s *Statistic) setTime(t time.Duration) {
if s.min.Load() == 0 {
s.min.Store(t)
}
if s.min.Load() > t {
s.min.Store(t)
}
if s.max.Load() == 0 {
s.max.Store(t)
}
if t > s.max.Load() {
s.max.Store(t)
}
s.avg.Store((s.max.Load() + s.min.Load()) / 2)
}

// StatItem ...
type StatItem struct {
Topic string
Subscribers int
Min time.Duration
Max time.Duration
Avg time.Duration
Rps float64
}

// Stats ...
type Stats []*StatItem

func (s Stats) Len() int { return len(s) }
func (s Stats) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s Stats) Less(i, j int) bool { return strings.Compare(s[i].Topic, s[j].Topic) == -1 }

type RPSCounter struct {
count *atomic.Int64
value *atomic.Float64
isRunning *atomic.Bool
}

func startRPSCounter() *RPSCounter {
counter := &RPSCounter{
count: atomic.NewInt64(0),
value: atomic.NewFloat64(0),
isRunning: atomic.NewBool(true),
}

go func() {
ticker := time.NewTicker(time.Second * 5)
defer ticker.Stop()
for counter.isRunning.Load() {
select {
case <-ticker.C:
counter.value.Store(float64(counter.count.Load()) / 5)
counter.count.Store(0)
}
}
}()

return counter
}

func (c *RPSCounter) Inc() {
c.count.Inc()
}

func (c *RPSCounter) Value() float64 {
return c.value.Load()
}

func (c *RPSCounter) Stop() {
c.isRunning.Store(false)
}
Loading

0 comments on commit 6638848

Please sign in to comment.