Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

execution: partial responses in distributed engine #480

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions engine/distributed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,3 +373,44 @@ func TestDistributedEngineWarnings(t *testing.T) {
res := q.Exec(context.Background())
testutil.Equals(t, 1, len(res.Warnings))
}

func TestDistributedEnginePartialResponses(t *testing.T) {
t.Parallel()
querierErr := &storage.MockQueryable{
MockQuerier: &storage.MockQuerier{
SelectMockFunction: func(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
return newErrorSeriesSet(errors.New("test error"))
},
},
}
querierOk := storageWithMockSeries(newMockSeries([]string{labels.MetricName, "foo", "zone", "west"}, []int64{0, 30, 60, 90}, []float64{0, 3, 4, 5}))

opts := engine.Opts{
EnablePartialResponses: true,
EngineOpts: promql.EngineOpts{
MaxSamples: math.MaxInt64,
Timeout: 1 * time.Minute,
},
}

remoteErr := engine.NewRemoteEngine(opts, querierErr, math.MinInt64, math.MaxInt64, []labels.Labels{labels.FromStrings("zone", "east")})
remoteOk := engine.NewRemoteEngine(opts, querierOk, math.MinInt64, math.MaxInt64, []labels.Labels{labels.FromStrings("zone", "west")})
ng := engine.NewDistributedEngine(opts, api.NewStaticEndpoints([]api.RemoteEngine{remoteErr, remoteOk}))
var (
start = time.UnixMilli(0)
end = time.UnixMilli(600 * 1000)
step = 30 * time.Second
)
q, err := ng.NewRangeQuery(context.Background(), nil, nil, "sum by (zone) (foo)", start, end, step)
testutil.Ok(t, err)

res := q.Exec(context.Background())
testutil.Ok(t, res.Err)
testutil.Equals(t, 1, len(res.Warnings))
testutil.Equals(t, `remote exec error [[{zone="east"}]]: test error`, res.Warnings.AsErrors()[0].Error())

m, err := res.Matrix()
testutil.Ok(t, err)
testutil.Equals(t, 1, m.Len())
testutil.Equals(t, labels.FromStrings("zone", "west"), m[0].Metric)
}
23 changes: 15 additions & 8 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ type Opts struct {
// EnableAnalysis enables query analysis.
EnableAnalysis bool

// EnablePartialResponses enables partial responses in distributed mode.
EnablePartialResponses bool

// SelectorBatchSize specifies the maximum number of samples to be returned by selectors in a single batch.
SelectorBatchSize int64

Expand Down Expand Up @@ -184,14 +187,15 @@ func NewWithScanners(opts Opts, scanners engstorage.Scanners) *Engine {
disableDuplicateLabelChecks: opts.DisableDuplicateLabelChecks,
disableFallback: opts.DisableFallback,

logger: opts.Logger,
lookbackDelta: opts.LookbackDelta,
enablePerStepStats: opts.EnablePerStepStats,
logicalOptimizers: opts.getLogicalOptimizers(),
timeout: opts.Timeout,
metrics: metrics,
extLookbackDelta: opts.ExtLookbackDelta,
enableAnalysis: opts.EnableAnalysis,
logger: opts.Logger,
lookbackDelta: opts.LookbackDelta,
enablePerStepStats: opts.EnablePerStepStats,
logicalOptimizers: opts.getLogicalOptimizers(),
timeout: opts.Timeout,
metrics: metrics,
extLookbackDelta: opts.ExtLookbackDelta,
enableAnalysis: opts.EnableAnalysis,
enablePartialResponses: opts.EnablePartialResponses,
noStepSubqueryIntervalFn: func(d time.Duration) time.Duration {
return time.Duration(opts.NoStepSubqueryIntervalFn(d.Milliseconds()) * 1000000)
},
Expand Down Expand Up @@ -225,6 +229,7 @@ type Engine struct {
extLookbackDelta time.Duration
decodingConcurrency int
enableAnalysis bool
enablePartialResponses bool
noStepSubqueryIntervalFn func(time.Duration) time.Duration
}

Expand Down Expand Up @@ -261,6 +266,7 @@ func (e *Engine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts
EnablePerStepStats: e.enablePerStepStats && opts.EnablePerStepStats(),
ExtLookbackDelta: e.extLookbackDelta,
EnableAnalysis: e.enableAnalysis,
EnablePartialResponses: e.enablePartialResponses,
NoStepSubqueryIntervalFn: e.noStepSubqueryIntervalFn,
DecodingConcurrency: e.decodingConcurrency,
}
Expand Down Expand Up @@ -446,6 +452,7 @@ func (e *Engine) makeQueryOpts(start time.Time, end time.Time, step time.Duratio
EnablePerStepStats: e.enablePerStepStats && opts.EnablePerStepStats(),
ExtLookbackDelta: e.extLookbackDelta,
EnableAnalysis: e.enableAnalysis,
EnablePartialResponses: e.enablePartialResponses,
NoStepSubqueryIntervalFn: e.noStepSubqueryIntervalFn,
DecodingConcurrency: e.decodingConcurrency,
}
Expand Down
10 changes: 9 additions & 1 deletion engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4909,6 +4909,7 @@ type testSeriesSet struct {
i int
series []storage.Series
warns annotations.Annotations
err error
}

func newTestSeriesSet(series ...storage.Series) storage.SeriesSet {
Expand All @@ -4925,9 +4926,16 @@ func newWarningsSeriesSet(warns annotations.Annotations) storage.SeriesSet {
}
}

func newErrorSeriesSet(err error) storage.SeriesSet {
return &testSeriesSet{
i: -1,
err: err,
}
}

func (s *testSeriesSet) Next() bool { s.i++; return s.i < len(s.series) }
func (s *testSeriesSet) At() storage.Series { return s.series[s.i] }
func (s *testSeriesSet) Err() error { return nil }
func (s *testSeriesSet) Err() error { return s.err }
func (s *testSeriesSet) Warnings() annotations.Annotations { return s.warns }

type slowSeries struct{}
Expand Down
2 changes: 1 addition & 1 deletion execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ func newRemoteExecution(ctx context.Context, e logicalplan.RemoteExecution, opts
// We need to set the lookback for the selector to 0 since the remote query already applies one lookback.
selectorOpts := *opts
selectorOpts.LookbackDelta = 0
remoteExec := remote.NewExecution(qry, model.NewVectorPool(opts.StepsBatch), e.QueryRangeStart, &selectorOpts, hints)
remoteExec := remote.NewExecution(qry, model.NewVectorPool(opts.StepsBatch), e.QueryRangeStart, e.Engine.LabelSets(), &selectorOpts, hints)
return exchange.NewConcurrent(remoteExec, 2, opts), nil
}

Expand Down
16 changes: 12 additions & 4 deletions execution/remote/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync"
"time"

"github.com/efficientgo/core/errors"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
Expand All @@ -29,8 +30,8 @@ type Execution struct {
model.OperatorTelemetry
}

func NewExecution(query promql.Query, pool *model.VectorPool, queryRangeStart time.Time, opts *query.Options, _ storage.SelectHints) *Execution {
storage := newStorageFromQuery(query, opts)
func NewExecution(query promql.Query, pool *model.VectorPool, queryRangeStart time.Time, engineLabels []labels.Labels, opts *query.Options, _ storage.SelectHints) *Execution {
storage := newStorageFromQuery(query, opts, engineLabels)
oper := &Execution{
storage: storage,
query: query,
Expand Down Expand Up @@ -90,16 +91,18 @@ func (e *Execution) Samples() *stats.QuerySamples {
type storageAdapter struct {
query promql.Query
opts *query.Options
lbls []labels.Labels

once sync.Once
err error
series []promstorage.SignedSeries
}

func newStorageFromQuery(query promql.Query, opts *query.Options) *storageAdapter {
func newStorageFromQuery(query promql.Query, opts *query.Options, lbls []labels.Labels) *storageAdapter {
return &storageAdapter{
query: query,
opts: opts,
lbls: lbls,
}
}

Expand All @@ -120,7 +123,12 @@ func (s *storageAdapter) executeQuery(ctx context.Context) {
warnings.AddToContext(w, ctx)
}
if result.Err != nil {
s.err = result.Err
err := errors.Wrapf(result.Err, "remote exec error [%s]", s.lbls)
if s.opts.EnablePartialResponses {
warnings.AddToContext(err, ctx)
} else {
s.err = err
}
return
}
switch val := result.Value.(type) {
Expand Down
1 change: 0 additions & 1 deletion logicalplan/distribute.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,6 @@ func (m DistributedExecutionOptimizer) Optimize(plan Node, opts *query.Options)
*current = m.distributeQuery(current, engines, m.subqueryOpts(parents, current, opts), minEngineOverlap)
return true
})

return plan, *warns
}

Expand Down
1 change: 1 addition & 0 deletions query/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Options struct {
ExtLookbackDelta time.Duration
NoStepSubqueryIntervalFn func(time.Duration) time.Duration
EnableAnalysis bool
EnablePartialResponses bool
DecodingConcurrency int
}

Expand Down
1 change: 0 additions & 1 deletion storage/prometheus/vector_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,5 @@ func selectPoint(it *storage.MemoizedSeriesIterator, ts, lookbackDelta, offset i
if value.IsStaleNaN(v) || (fh != nil && value.IsStaleNaN(fh.Sum)) {
return 0, 0, nil, false, nil
}

return t, v, fh, true, nil
}
Loading