diff --git a/go.mod b/go.mod index 5a4c84a..a349216 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/jacobbrewer1/patcher v0.1.10 github.com/jacobbrewer1/uhttp v0.0.4 github.com/jacobbrewer1/vaulty v0.1.4 - github.com/jacobbrewer1/workerpool v0.0.2 + github.com/jacobbrewer1/workerpool v0.0.3 github.com/jmoiron/sqlx v1.4.0 github.com/oapi-codegen/runtime v1.1.1 github.com/prometheus/client_golang v1.20.5 diff --git a/go.sum b/go.sum index c59cc6e..2ab615a 100644 --- a/go.sum +++ b/go.sum @@ -141,8 +141,8 @@ github.com/jacobbrewer1/uhttp v0.0.4 h1:3RrRmz0fjsnLKuAs5fkflOcsjuBFoL0XvC6Xct/T github.com/jacobbrewer1/uhttp v0.0.4/go.mod h1:qwLAdGw4SLYJY2MS0sE1m/w1ILTr61bjwRtlbGwqo5c= github.com/jacobbrewer1/vaulty v0.1.4 h1:JLcSDX5f1UMBxNNzCxQ0neqV4Mvw6030OgxE44B/pqI= github.com/jacobbrewer1/vaulty v0.1.4/go.mod h1:BDrZxaM1mMWSqJ3UXHGOQyy6+2l5NxjAjva2V7ijwNI= -github.com/jacobbrewer1/workerpool v0.0.2 h1:H5+PRi1JLtOvPXHiER6+MCEfqIMn8d0MyZ0Mi4emYqM= -github.com/jacobbrewer1/workerpool v0.0.2/go.mod h1:PKV+PF+dbELi8p+abwzVDRy2FtgpdLJCFKlsz5thaYY= +github.com/jacobbrewer1/workerpool v0.0.3 h1:MnRiYACaCO9+M2ogM6gxQcySdbER/T6mehTvBQLRg6I= +github.com/jacobbrewer1/workerpool v0.0.3/go.mod h1:vZnQ5Ynp2rnihrFubKfyf7S+hnxC2TgQt755PFQ7cBQ= github.com/jawher/mow.cli v1.1.0/go.mod h1:aNaQlc7ozF3vw6IJ2dHjp2ZFiA4ozMIYY6PyuRJwlUg= github.com/jinzhu/copier v0.3.5 h1:GlvfUwHk62RokgqVNvYsku0TATCF7bAHVwEXoBh3iJg= github.com/jinzhu/copier v0.3.5/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg= diff --git a/pkg/services/importer/import.go b/pkg/services/importer/import.go index 550e2b4..3b782b9 100644 --- a/pkg/services/importer/import.go +++ b/pkg/services/importer/import.go @@ -12,7 +12,7 @@ import ( ) func (s *service) Import(from, to int) error { - workers := workerpool.NewWorkerPool( + workers := workerpool.New( workerpool.WithDelayedStart(), ) diff --git a/vendor/github.com/jacobbrewer1/workerpool/mock_Pool.go b/vendor/github.com/jacobbrewer1/workerpool/mock_Pool.go new file mode 100644 index 0000000..ccb12bf --- /dev/null +++ b/vendor/github.com/jacobbrewer1/workerpool/mock_Pool.go @@ -0,0 +1,75 @@ +// Code generated by mockery. DO NOT EDIT. + +package workerpool + +import mock "github.com/stretchr/testify/mock" + +// MockPool is an autogenerated mock type for the Pool type +type MockPool struct { + mock.Mock +} + +// BlockingSchedule provides a mock function with given fields: task +func (_m *MockPool) BlockingSchedule(task Runnable) error { + ret := _m.Called(task) + + if len(ret) == 0 { + panic("no return value specified for BlockingSchedule") + } + + var r0 error + if rf, ok := ret.Get(0).(func(Runnable) error); ok { + r0 = rf(task) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MustSchedule provides a mock function with given fields: task +func (_m *MockPool) MustSchedule(task Runnable) { + _m.Called(task) +} + +// Schedule provides a mock function with given fields: task +func (_m *MockPool) Schedule(task Runnable) error { + ret := _m.Called(task) + + if len(ret) == 0 { + panic("no return value specified for Schedule") + } + + var r0 error + if rf, ok := ret.Get(0).(func(Runnable) error); ok { + r0 = rf(task) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Stop provides a mock function with given fields: +func (_m *MockPool) Stop() { + _m.Called() +} + +// StopAsync provides a mock function with given fields: +func (_m *MockPool) StopAsync() { + _m.Called() +} + +// NewMockPool creates a new instance of MockPool. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockPool(t interface { + mock.TestingT + Cleanup(func()) +}) *MockPool { + mock := &MockPool{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/vendor/github.com/jacobbrewer1/workerpool/mock_WorkerOption.go b/vendor/github.com/jacobbrewer1/workerpool/mock_WorkerOption.go index 32c143e..add7c5d 100644 --- a/vendor/github.com/jacobbrewer1/workerpool/mock_WorkerOption.go +++ b/vendor/github.com/jacobbrewer1/workerpool/mock_WorkerOption.go @@ -10,7 +10,7 @@ type MockWorkerOption struct { } // Execute provides a mock function with given fields: pool -func (_m *MockWorkerOption) Execute(pool *WorkerPool) { +func (_m *MockWorkerOption) Execute(pool *pool) { _m.Called(pool) } diff --git a/vendor/github.com/jacobbrewer1/workerpool/pool.go b/vendor/github.com/jacobbrewer1/workerpool/pool.go new file mode 100644 index 0000000..cd0d497 --- /dev/null +++ b/vendor/github.com/jacobbrewer1/workerpool/pool.go @@ -0,0 +1,95 @@ +package workerpool + +import ( + "runtime" + "sync" +) + +const ( + minWorkers = 1 + minQueueLength = 0 +) + +type Pool interface { + // MustSchedule schedules a task to be executed + // by the worker pool. It panics if the worker pool is stopped. + MustSchedule(task Runnable) + + // Schedule schedules a task to be executed by the worker pool. + // It returns an error if the worker pool is stopped. + Schedule(task Runnable) error + + // BlockingSchedule schedules a task to be executed by the + // worker pool. It blocks until the task is scheduled. + BlockingSchedule(task Runnable) error + + // Stop stops the worker pool. + Stop() + + // StopAsync stops the worker pool asynchronously. + StopAsync() +} + +type pool struct { + // wg is the wait group for the worker pool. + wg *sync.WaitGroup + + // done is the channel to signal the worker pool to stop. + done chan struct{} + + // tasks is the channel to send tasks to the worker pool. + tasks chan Runnable + + // delayedStart is the flag to indicate if the worker pool should start immediately. + // + // If delayedStart is set to true, the worker pool will start only when the first task is scheduled. + // This is useful when you want to start the worker pool only when you have tasks to schedule. + delayedStart bool + + // started is the flag to indicate if the worker pool has started. + started bool + + // totalWorkers is the total number of workers to deploy to the pool. + totalWorkers int + + // maxQueueLength is the maximum number of tasks that can be scheduled. + maxQueueLength int +} + +// New creates a new worker pool with the given options. +// +// By default, the worker pool will have the same number of workers as the number of CPUs. +// The maximum queue length is set to the total number of workers multiplied by 1000. +// +// You can customize the worker pool by providing the options. +func New(opts ...WorkerOption) Pool { + p := &pool{ + totalWorkers: runtime.NumCPU(), + maxQueueLength: runtime.NumCPU() * 1000, + wg: new(sync.WaitGroup), + done: make(chan struct{}), + delayedStart: false, + started: false, + } + + for _, opt := range opts { + opt(p) + } + + if p.totalWorkers < minWorkers { + p.totalWorkers = runtime.NumCPU() + } + + taskChan := make(chan Runnable, p.maxQueueLength) + if p.maxQueueLength <= minQueueLength { + // The user has set the max queue length to the minQueueLength (0), which means we should use a blocking channel (non-buffered). + taskChan = make(chan Runnable) + } + p.tasks = taskChan + + if !p.delayedStart { + p.start() + } + + return p +} diff --git a/vendor/github.com/jacobbrewer1/workerpool/worker.go b/vendor/github.com/jacobbrewer1/workerpool/worker.go index 12bf8d2..b9f4a12 100644 --- a/vendor/github.com/jacobbrewer1/workerpool/worker.go +++ b/vendor/github.com/jacobbrewer1/workerpool/worker.go @@ -2,8 +2,6 @@ package workerpool import ( "errors" - "runtime" - "sync" "github.com/prometheus/client_golang/prometheus" ) @@ -16,87 +14,8 @@ var ( ErrWorkerPoolStopped = errors.New("worker pool stopped") ) -const ( - minWorkers = 1 - minQueueLength = 0 -) - -type WorkerPool struct { - // wg is the wait group for the worker pool. - wg *sync.WaitGroup - - // done is the channel to signal the worker pool to stop. - done chan struct{} - - // tasks is the channel to send tasks to the worker pool. - tasks chan Runnable - - // delayedStart is the flag to indicate if the worker pool should start immediately. - // - // If delayedStart is set to true, the worker pool will start only when the first task is scheduled. - // This is useful when you want to start the worker pool only when you have tasks to schedule. - delayedStart bool - - // started is the flag to indicate if the worker pool has started. - started bool - - // totalWorkers is the total number of workers to deploy to the pool. - totalWorkers int - - // maxQueueLength is the maximum number of tasks that can be scheduled. - maxQueueLength int -} - -// NewWorkerPool creates a new worker pool with the given options. -// -// By default, the worker pool will have the same number of workers as the number of CPUs. -// The maximum queue length is set to the total number of workers multiplied by 1000. -// -// You can customize the worker pool by providing the options. -func NewWorkerPool(opts ...WorkerOption) *WorkerPool { - pool := &WorkerPool{ - totalWorkers: runtime.NumCPU(), - maxQueueLength: runtime.NumCPU() * 1000, - wg: new(sync.WaitGroup), - done: make(chan struct{}), - delayedStart: false, - started: false, - } - - for _, opt := range opts { - opt(pool) - } - - if pool.totalWorkers < minWorkers { - pool.totalWorkers = runtime.NumCPU() - } - - taskChan := make(chan Runnable, pool.maxQueueLength) - if pool.maxQueueLength <= minQueueLength { - // The user has set the max queue length to the minQueueLength (0), which means we should use a blocking channel (non-buffered). - taskChan = make(chan Runnable) - } - pool.tasks = taskChan - - if !pool.delayedStart { - pool.start() - } - - return pool -} - -// TotalWorkers gets the total number of workers in the worker pool. -func (p *WorkerPool) TotalWorkers() int { - return p.totalWorkers -} - -// PendingTasks gets the number of pending tasks in the worker pool. -func (p *WorkerPool) PendingTasks() int { - return len(p.tasks) -} - // start starts the worker pool by deploying the workers to the pool. -func (p *WorkerPool) start() { +func (p *pool) start() { if p.started { return } @@ -111,7 +30,7 @@ func (p *WorkerPool) start() { // deployWorker deploys a worker to the worker pool. It listens for tasks to execute on the receiving tasks channel. // It decrements the wait group when the worker is done executing the tasks. -func (p *WorkerPool) deployWorker() { +func (p *pool) deployWorker() { defer p.wg.Done() for { select { @@ -129,7 +48,7 @@ func (p *WorkerPool) deployWorker() { // runTask runs the task. It increments the active workers and decrements the idle workers when the task is running. // It also updates the pending tasks metric. -func (p *WorkerPool) runTask(task Runnable) { +func (p *pool) runTask(task Runnable) { idleWorkers.Dec() activeWorkers.Inc() defer func() { @@ -149,7 +68,7 @@ func (p *WorkerPool) runTask(task Runnable) { // Stop stops the worker pool. // // Note: This is a blocking operation. It will wait for all the workers to finish executing the tasks. -func (p *WorkerPool) Stop() { +func (p *pool) Stop() { defer func() { idleWorkers.Set(0) activeWorkers.Set(0) @@ -172,12 +91,12 @@ func (p *WorkerPool) Stop() { // StopAsync stops the worker pool asynchronously. It will not wait for all the workers to finish executing the tasks. // // Note: This is a non-blocking operation. It will return immediately and stop the worker pool in the background. -func (p *WorkerPool) StopAsync() { +func (p *pool) StopAsync() { go p.Stop() } // isDone returns true if the worker pool is stopped. -func (p *WorkerPool) isDone() bool { +func (p *pool) isDone() bool { select { case <-p.done: return true @@ -189,7 +108,7 @@ func (p *WorkerPool) isDone() bool { // MustSchedule schedules a task to be executed by the worker pool. // // Note: This is a non-blocking operation. If the worker pool is stopped, it will panic. -func (p *WorkerPool) MustSchedule(task Runnable) { +func (p *pool) MustSchedule(task Runnable) { if err := p.Schedule(task); err != nil { panic(err) } @@ -198,7 +117,7 @@ func (p *WorkerPool) MustSchedule(task Runnable) { // Schedule schedules a task to be executed by the worker pool. // // Note: This is a non-blocking operation. If the worker pool is stopped, it will return an error. -func (p *WorkerPool) Schedule(task Runnable) error { +func (p *pool) Schedule(task Runnable) error { if p.isDone() { return ErrWorkerPoolStopped } @@ -220,7 +139,7 @@ func (p *WorkerPool) Schedule(task Runnable) error { // BlockingSchedule schedules a task to be executed by the worker pool. // // Note: This is a blocking operation. If the worker pool is stopped, it will return an error. -func (p *WorkerPool) BlockingSchedule(task Runnable) error { +func (p *pool) BlockingSchedule(task Runnable) error { if p.isDone() { return ErrWorkerPoolStopped } diff --git a/vendor/github.com/jacobbrewer1/workerpool/worker_opts.go b/vendor/github.com/jacobbrewer1/workerpool/worker_opts.go index 7587173..ac0bc62 100644 --- a/vendor/github.com/jacobbrewer1/workerpool/worker_opts.go +++ b/vendor/github.com/jacobbrewer1/workerpool/worker_opts.go @@ -1,10 +1,10 @@ package workerpool -type WorkerOption func(pool *WorkerPool) +type WorkerOption func(pool *pool) // WithTotalWorkers sets the total number of workers in the pool. func WithTotalWorkers(workers int) WorkerOption { - return func(pool *WorkerPool) { + return func(pool *pool) { // Cannot have less than 1 worker. Having 1 working is the same as running the task in the main goroutine but // on a separate goroutine. if workers < minWorkers { @@ -21,7 +21,7 @@ func WithTotalWorkers(workers int) WorkerOption { // // Setting the maximum queue length will set the worker pool to use a non-buffered channel (blocking channel). func WithMaxQueueLength(length int) WorkerOption { - return func(pool *WorkerPool) { + return func(pool *pool) { if length < minQueueLength { return } @@ -41,7 +41,7 @@ func WithBlockingChannel() WorkerOption { // // This is useful when you want to start the worker pool only when you have tasks to schedule. func WithDelayedStart() WorkerOption { - return func(pool *WorkerPool) { + return func(pool *pool) { pool.delayedStart = true } } diff --git a/vendor/modules.txt b/vendor/modules.txt index 7929b8e..cf23fa9 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -161,7 +161,7 @@ github.com/jacobbrewer1/uhttp/common ## explicit; go 1.23 github.com/jacobbrewer1/vaulty github.com/jacobbrewer1/vaulty/repositories -# github.com/jacobbrewer1/workerpool v0.0.2 +# github.com/jacobbrewer1/workerpool v0.0.3 ## explicit; go 1.23 github.com/jacobbrewer1/workerpool # github.com/jinzhu/copier v0.3.5