Skip to content

The library implements worker pool pattern by channels.

License

Notifications You must be signed in to change notification settings

dipdup-io/workerpool

Repository files navigation

Worker Pool

The library implements worker pool pattern by channels. Now realized 2 kinds of worker pool: Pool and TimedPool. Pool is the generic realization of worker pool pattern when tasks are received from outside. TimedPool - is the worker pool implementation where tasks are received by dispatcher on ticker event.

Install

go get github.com/dipdup.net/workerpool

Examples

Usage of Pool

package main

import (
	"context"
	"log"
	"time"
)

func main() {
	pool := NewPool(worker, 2)

	ctx, cancel := context.WithCancel(context.Background())
	pool.Start(ctx)

	dispatcher(ctx, pool)

	time.Sleep(time.Minute)

	cancel()

	if err := pool.Close(); err != nil {
		log.Panic(err)
	}
}

func worker(ctx context.Context, name string) {
	for {
		select {
		case <-ctx.Done():
			return
		default:
			time.Sleep(time.Second)
			log.Printf("hello, %s", name)
			return
		}
	}
}

func dispatcher(ctx context.Context, pool *Pool[string]) {
	for _, name := range []string{"John", "Mark", "Peter", "Mike"} {
		select {
		case <-ctx.Done():
			return
		default:
			pool.AddTask(name)
		}
	}
}

Usage of TimedPool

package main

import (
	"context"
	"log"
	"time"
)

func main() {
	pool := NewTimedPool(dispatcher, worker, nil, 2, 60*1000) // tasks will be received over 60 seconds

	ctx, cancel := context.WithCancel(context.Background())
	pool.Start(ctx)

	time.Sleep(time.Minute)

	cancel()

	if err := pool.Close(); err != nil {
		log.Panic(err)
	}
}

func worker(ctx context.Context, name string) {
	for {
		select {
		case <-ctx.Done():
			return
		default:
			time.Sleep(time.Second)
			log.Printf("hello, %s", name)
			return
		}
	}
}

func dispatcher(ctx context.Context) ([]string, error) {
	return []string{"John", "Mark", "Peter", "Mike"}, nil
}

NewTimedPool receives as arguments 3 handlers: Dispatcher, Worker and ErrorHandler.

// Worker - worker handler. Calls on task arriving.
type Worker[Task any] func(ctx context.Context, task Task)

// Dispatcher - the function is called by timer for receiving tasks
type Dispatcher[Task any] func(ctx context.Context) ([]Task, error)

// ErrorHandler - the function is called when error occured in dispatcher
type ErrorHandler[Task any] func(ctx context.Context, err error)

Also it receives 2 integers: workers count and time between dispatcher calls in milliseconds.

Group

Group can be used for running processes with wait group. For example:

// this program runs 2 ticker functions and waits its executed.
func main() {
	group := NewGroup()
	group.Go(ticker3)
	group.Go(ticker4)
	group.Wait()
}

func ticker3() {
	ticker := time.NewTicker(time.Second)
	defer ticker.Stop()

	var count int

	for range ticker.C {
		count++
		log.Print("second")

		if count == 5 {
			return
		}
	}
}

func ticker4() {
	ticker := time.NewTicker(time.Second * 2)
	defer ticker.Stop()

	var count int

	for range ticker.C {
		count++
		log.Print("2 second")

		if count == 5 {
			return
		}
	}
}

With using context:

// the program runs two tickers and after 10 seconds cancel it using context cancellation
func main() {
	ctx, cancel := context.WithCancel(context.Background())

	group := NewGroup()
	group.GoCtx(ctx, ticker1)
	group.GoCtx(ctx, ticker2)

	time.Sleep(10 * time.Second)
	cancel()

	group.Wait()
}

func ticker1(ctx context.Context) {
	ticker := time.NewTicker(time.Second)
	defer ticker.Stop()

	for {
		select {
		case <-ctx.Done():
			return
		case <-ticker.C:
			log.Print("second")
		}
	}
}

func ticker2(ctx context.Context) {
	ticker := time.NewTicker(time.Second * 2)
	defer ticker.Stop()

	for {
		select {
		case <-ctx.Done():
			return
		case <-ticker.C:
			log.Print("2 second")
		}
	}
}