Skip to content

Commit

Permalink
break: refreshable/v2 with Generic type handling (#252)
Browse files Browse the repository at this point in the history
* break: refreshable/v2 with Generic type handling

* wip

* more

* updates

* MapContext

* Mapped refreshables do not export Update

* call subscribe function on registration

* pass ctx to NewFromTickerFunc

* comments
  • Loading branch information
bmoylan authored Aug 10, 2023
1 parent d5ee3e8 commit 5a10d90
Show file tree
Hide file tree
Showing 9 changed files with 379 additions and 623 deletions.
103 changes: 103 additions & 0 deletions refreshable/async.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright (c) 2022 Palantir Technologies. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package refreshable

import (
"context"
"time"
)

// NewFromChannel populates an Updatable with the values channel.
// If an element is already available, the returned Value is guaranteed to be populated.
// The channel should be closed when no longer used to avoid leaking resources.
func NewFromChannel[T any](values <-chan T) Ready[T] {
out := newReady[T]()
select {
case initial, ok := <-values:
if !ok {
return out // channel already closed
}
out.Update(initial)
default:
}
go func() {
for value := range values {
out.Update(value)
}
}()
return out
}

// NewFromTickerFunc returns a Ready Refreshable populated by the result of the provider called each interval.
// If the providers bool return is false, the value is ignored.
// The result's ReadyC channel is closed when a new value is populated.
// The refreshable will stop updating when the provided context is cancelled or the returned UnsubscribeFunc func is called.
func NewFromTickerFunc[T any](ctx context.Context, interval time.Duration, provider func(ctx context.Context) (T, bool)) (Ready[T], UnsubscribeFunc) {
out := newReady[T]()
ctx, cancel := context.WithCancel(ctx)
values := make(chan T)
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
defer close(values)
for {
if value, ok := provider(ctx); ok {
out.Update(value)
}
select {
case <-ticker.C:
continue
case <-ctx.Done():
return
}
}
}()
return out, UnsubscribeFunc(cancel)
}

// Wait waits until the Ready has a current value or the context expires.
func Wait[T any](ctx context.Context, ready Ready[T]) (T, bool) {
select {
case <-ready.ReadyC():
return ready.Current(), true
case <-ctx.Done():
var zero T
return zero, false
}
}

// ready is an Updatable which exposes a channel that is closed when a value is first available.
// Current returns the zero value before Update is called, marking the value ready.
type ready[T any] struct {
in Updatable[T]
readyC <-chan struct{}
cancel context.CancelFunc
}

func newReady[T any]() *ready[T] {
ctx, cancel := context.WithCancel(context.Background())
return &ready[T]{
in: newZero[T](),
readyC: ctx.Done(),
cancel: cancel,
}
}

func (r *ready[T]) Current() T {
return r.in.Current()
}

func (r *ready[T]) Subscribe(consumer func(T)) UnsubscribeFunc {
return r.in.Subscribe(consumer)
}

func (r *ready[T]) ReadyC() <-chan struct{} {
return r.readyC
}

func (r *ready[T]) Update(val T) {
r.in.Update(val)
r.cancel()
}
2 changes: 1 addition & 1 deletion refreshable/go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module github.com/palantir/pkg/refreshable
module github.com/palantir/pkg/refreshable/v2

go 1.20

Expand Down
5 changes: 0 additions & 5 deletions refreshable/godel/config/check-plugin.yml
Original file line number Diff line number Diff line change
@@ -1,5 +0,0 @@
checks:
golint:
filters:
- value: "should have comment or be unexported"
- value: "or a comment on this block"
92 changes: 85 additions & 7 deletions refreshable/refreshable.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,92 @@

package refreshable

type Refreshable interface {
import (
"context"
)

// A Refreshable is a generic container type for a volatile underlying value.
// It supports atomic access and user-provided callback "subscriptions" on updates.
type Refreshable[T any] interface {
// Current returns the most recent value of this Refreshable.
Current() interface{}
// If the value has not been initialized, returns T's zero value.
Current() T

// Subscribe calls the consumer function when Value updates until stop is closed.
// The consumer must be relatively fast: Updatable.Set blocks until all subscribers have returned.
// Expensive or error-prone responses to refreshed values should be asynchronous.
// Updates considered no-ops by reflect.DeepEqual may be skipped.
// When called, consumer is executed with the Current value.
Subscribe(consumer func(T)) UnsubscribeFunc
}

// A Updatable is a Refreshable which supports setting the value with a user-provided value.
// When a utility returns a (non-Updatable) Refreshable, it implies that value updates are handled internally.
type Updatable[T any] interface {
Refreshable[T]
// Update updates the Refreshable with a new T.
// It blocks until all subscribers have completed.
Update(T)
}

// A Validated is a Refreshable capable of rejecting updates according to validation logic.
// Its Current method returns the most recent value to pass validation.
type Validated[T any] interface {
Refreshable[T]
// Validation returns the result of the most recent validation.
// If the last value was valid, Validation returns the same value as Current and a nil error.
// If the last value was invalid, it and the error are returned. Current returns the most recent valid value.
Validation() (T, error)
}

// Ready extends Refreshable for asynchronous implementations which may not have a value when they are constructed.
// Callers should check that the Ready channel is closed before using the Current value.
type Ready[T any] interface {
Refreshable[T]
// ReadyC returns a channel which is closed after a value is successfully populated.
ReadyC() <-chan struct{}
}

// Subscribe subscribes to changes of this Refreshable. The provided function is called with the value of Current()
// whenever the value changes.
Subscribe(consumer func(interface{})) (unsubscribe func())
// UnsubscribeFunc removes a subscription from a refreshable's internal tracking and/or stops its update routine.
// It is safe to call multiple times.
type UnsubscribeFunc func()

// New returns a new Updatable that begins with the given value.
func New[T any](val T) Updatable[T] {
return newDefault(val)
}

// Map returns a new Refreshable based on the current one that handles updates based on the current Refreshable.
func Map[T any, M any](original Refreshable[T], mapFn func(T) M) (Refreshable[M], UnsubscribeFunc) {
out := newDefault(mapFn(original.Current()))
stop := original.Subscribe(func(v T) {
out.Update(mapFn(v))
})
return (*readOnlyRefreshable[M])(out), stop
}

// MapContext is like Map but unsubscribes when the context is cancelled.
func MapContext[T any, M any](ctx context.Context, original Refreshable[T], mapFn func(T) M) Refreshable[M] {
out, stop := Map(original, mapFn)
go func() {
<-ctx.Done()
stop()
}()
return out
}

// MapWithError is similar to Validate but allows for the function to return a mapping/mutation
// of the input object in addition to returning an error. The returned validRefreshable will contain the mapped value.
// An error is returned if the current original value fails to map.
func MapWithError[T any, M any](original Refreshable[T], mapFn func(T) (M, error)) (Validated[M], UnsubscribeFunc, error) {
v, stop := newValidRefreshable(original, mapFn)
_, err := v.Validation()
return v, stop, err
}

// Map returns a new Refreshable based on the current one that handles updates based on the current Refreshable.
Map(func(interface{}) interface{}) Refreshable
// Validate returns a new Refreshable that returns the latest original value accepted by the validatingFn.
// If the upstream value results in an error, it is reported by Validation().
// An error is returned if the current original value is invalid.
func Validate[T any](original Refreshable[T], validatingFn func(T) error) (Validated[T], UnsubscribeFunc, error) {
return MapWithError(original, identity(validatingFn))
}
104 changes: 52 additions & 52 deletions refreshable/refreshable_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,84 +5,84 @@
package refreshable

import (
"fmt"
"reflect"
"sync"
"sync/atomic"
)

type DefaultRefreshable struct {
typ reflect.Type
current *atomic.Value

sync.Mutex // protects subscribers
subscribers []*func(interface{})
type defaultRefreshable[T any] struct {
mux sync.Mutex
current atomic.Value
subscribers []*func(T)
}

func NewDefaultRefreshable(val interface{}) *DefaultRefreshable {
current := atomic.Value{}
current.Store(val)

return &DefaultRefreshable{
current: &current,
typ: reflect.TypeOf(val),
}
func newDefault[T any](val T) *defaultRefreshable[T] {
d := new(defaultRefreshable[T])
d.current.Store(&val)
return d
}

func (d *DefaultRefreshable) Update(val interface{}) error {
d.Lock()
defer d.Unlock()

if valType := reflect.TypeOf(val); valType != d.typ {
return fmt.Errorf("new refreshable value must be type %s: got %s", d.typ, valType)
}
func newZero[T any]() *defaultRefreshable[T] {
d := new(defaultRefreshable[T])
var zero T
d.current.Store(&zero)
return d
}

if reflect.DeepEqual(d.current.Load(), val) {
return nil
// Update changes the value of the Refreshable, then blocks while subscribers are executed.
func (d *defaultRefreshable[T]) Update(val T) {
d.mux.Lock()
defer d.mux.Unlock()
old := d.current.Swap(&val)
if reflect.DeepEqual(*(old.(*T)), val) {
return
}
d.current.Store(val)

for _, sub := range d.subscribers {
(*sub)(val)
}
return nil
}

func (d *DefaultRefreshable) Current() interface{} {
return d.current.Load()
func (d *defaultRefreshable[T]) Current() T {
return *(d.current.Load().(*T))
}

func (d *DefaultRefreshable) Subscribe(consumer func(interface{})) (unsubscribe func()) {
d.Lock()
defer d.Unlock()
func (d *defaultRefreshable[T]) Subscribe(consumer func(T)) UnsubscribeFunc {
d.mux.Lock()
defer d.mux.Unlock()

consumerFnPtr := &consumer
d.subscribers = append(d.subscribers, consumerFnPtr)
consumer(d.Current())
return d.unsubscribe(consumerFnPtr)
}

func (d *defaultRefreshable[T]) unsubscribe(consumerFnPtr *func(T)) UnsubscribeFunc {
return func() {
d.unsubscribe(consumerFnPtr)
d.mux.Lock()
defer d.mux.Unlock()

matchIdx := -1
for idx, currSub := range d.subscribers {
if currSub == consumerFnPtr {
matchIdx = idx
break
}
}
if matchIdx != -1 {
d.subscribers = append(d.subscribers[:matchIdx], d.subscribers[matchIdx+1:]...)
}
}

}

func (d *DefaultRefreshable) unsubscribe(consumerFnPtr *func(interface{})) {
d.Lock()
defer d.Unlock()
// readOnlyRefreshable aliases defaultRefreshable but hides the Update method so the type
// does not implement Updatable.
type readOnlyRefreshable[T any] defaultRefreshable[T]

matchIdx := -1
for idx, currSub := range d.subscribers {
if currSub == consumerFnPtr {
matchIdx = idx
break
}
}
if matchIdx != -1 {
d.subscribers = append(d.subscribers[:matchIdx], d.subscribers[matchIdx+1:]...)
}
func (d *readOnlyRefreshable[T]) Current() T {
return (*defaultRefreshable[T])(d).Current()
}

func (d *DefaultRefreshable) Map(mapFn func(interface{}) interface{}) Refreshable {
newRefreshable := NewDefaultRefreshable(mapFn(d.Current()))
d.Subscribe(func(updatedVal interface{}) {
_ = newRefreshable.Update(mapFn(updatedVal))
})
return newRefreshable
func (d *readOnlyRefreshable[T]) Subscribe(consumer func(T)) UnsubscribeFunc {
return (*defaultRefreshable[T])(d).Subscribe(consumer)
}
Loading

0 comments on commit 5a10d90

Please sign in to comment.