Skip to content

Commit

Permalink
execution: partial responses in distributed engine
Browse files Browse the repository at this point in the history
For very distributed setups we need to be able to deal with partial
failures. This commit adds an option to continue evaulation if we
encounter an error in a remote engine but dont want to fail the whole
query.

Signed-off-by: Michael Hoffmann <[email protected]>
  • Loading branch information
MichaHoffmann committed Aug 12, 2024
1 parent acfee21 commit 2f5c05e
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 16 deletions.
42 changes: 42 additions & 0 deletions engine/distributed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ func TestDistributedAggregations(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
for _, lookbackDelta := range lookbackDeltas {
localOpts := engine.Opts{
EnablePartialResponses: true,
EngineOpts: promql.EngineOpts{
Timeout: 1 * time.Hour,
MaxSamples: 1e10,
Expand Down Expand Up @@ -373,3 +374,44 @@ func TestDistributedEngineWarnings(t *testing.T) {
res := q.Exec(context.Background())
testutil.Equals(t, 1, len(res.Warnings))
}

func TestDistributedEnginePartialResponses(t *testing.T) {
t.Parallel()
querierErr := &storage.MockQueryable{
MockQuerier: &storage.MockQuerier{
SelectMockFunction: func(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
return newErrorSeriesSet(errors.New("test error"))
},
},
}
querierOk := storageWithMockSeries(newMockSeries([]string{labels.MetricName, "foo", "zone", "west"}, []int64{0, 30, 60, 90}, []float64{0, 3, 4, 5}))

opts := engine.Opts{
EnablePartialResponses: true,
EngineOpts: promql.EngineOpts{
MaxSamples: math.MaxInt64,
Timeout: 1 * time.Minute,
},
}

remoteErr := engine.NewRemoteEngine(opts, querierErr, math.MinInt64, math.MaxInt64, []labels.Labels{labels.FromStrings("zone", "east")})
remoteOk := engine.NewRemoteEngine(opts, querierOk, math.MinInt64, math.MaxInt64, []labels.Labels{labels.FromStrings("zone", "west")})
ng := engine.NewDistributedEngine(opts, api.NewStaticEndpoints([]api.RemoteEngine{remoteErr, remoteOk}))
var (
start = time.UnixMilli(0)
end = time.UnixMilli(600 * 1000)
step = 30 * time.Second
)
q, err := ng.NewRangeQuery(context.Background(), nil, nil, "sum by (zone) (foo)", start, end, step)
testutil.Ok(t, err)

res := q.Exec(context.Background())
testutil.Ok(t, res.Err)
testutil.Equals(t, 1, len(res.Warnings))
testutil.Equals(t, `error querying [{zone="east"}]: test error`, res.Warnings.AsErrors()[0].Error())

m, err := res.Matrix()
testutil.Ok(t, err)
testutil.Equals(t, 1, m.Len())
testutil.Equals(t, labels.FromStrings("zone", "west"), m[0].Metric)
}
23 changes: 15 additions & 8 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ type Opts struct {
// EnableAnalysis enables query analysis.
EnableAnalysis bool

// EnablePartialResponses enables partial responses in distributed mode.
EnablePartialResponses bool

// SelectorBatchSize specifies the maximum number of samples to be returned by selectors in a single batch.
SelectorBatchSize int64

Expand Down Expand Up @@ -184,14 +187,15 @@ func NewWithScanners(opts Opts, scanners engstorage.Scanners) *Engine {
disableDuplicateLabelChecks: opts.DisableDuplicateLabelChecks,
disableFallback: opts.DisableFallback,

logger: opts.Logger,
lookbackDelta: opts.LookbackDelta,
enablePerStepStats: opts.EnablePerStepStats,
logicalOptimizers: opts.getLogicalOptimizers(),
timeout: opts.Timeout,
metrics: metrics,
extLookbackDelta: opts.ExtLookbackDelta,
enableAnalysis: opts.EnableAnalysis,
logger: opts.Logger,
lookbackDelta: opts.LookbackDelta,
enablePerStepStats: opts.EnablePerStepStats,
logicalOptimizers: opts.getLogicalOptimizers(),
timeout: opts.Timeout,
metrics: metrics,
extLookbackDelta: opts.ExtLookbackDelta,
enableAnalysis: opts.EnableAnalysis,
enablePartialResponses: opts.EnablePartialResponses,
noStepSubqueryIntervalFn: func(d time.Duration) time.Duration {
return time.Duration(opts.NoStepSubqueryIntervalFn(d.Milliseconds()) * 1000000)
},
Expand Down Expand Up @@ -225,6 +229,7 @@ type Engine struct {
extLookbackDelta time.Duration
decodingConcurrency int
enableAnalysis bool
enablePartialResponses bool
noStepSubqueryIntervalFn func(time.Duration) time.Duration
}

Expand Down Expand Up @@ -261,6 +266,7 @@ func (e *Engine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts
EnablePerStepStats: e.enablePerStepStats && opts.EnablePerStepStats(),
ExtLookbackDelta: e.extLookbackDelta,
EnableAnalysis: e.enableAnalysis,
EnablePartialResponses: e.enablePartialResponses,
NoStepSubqueryIntervalFn: e.noStepSubqueryIntervalFn,
DecodingConcurrency: e.decodingConcurrency,
}
Expand Down Expand Up @@ -446,6 +452,7 @@ func (e *Engine) makeQueryOpts(start time.Time, end time.Time, step time.Duratio
EnablePerStepStats: e.enablePerStepStats && opts.EnablePerStepStats(),
ExtLookbackDelta: e.extLookbackDelta,
EnableAnalysis: e.enableAnalysis,
EnablePartialResponses: e.enablePartialResponses,
NoStepSubqueryIntervalFn: e.noStepSubqueryIntervalFn,
DecodingConcurrency: e.decodingConcurrency,
}
Expand Down
10 changes: 9 additions & 1 deletion engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4909,6 +4909,7 @@ type testSeriesSet struct {
i int
series []storage.Series
warns annotations.Annotations
err error
}

func newTestSeriesSet(series ...storage.Series) storage.SeriesSet {
Expand All @@ -4925,9 +4926,16 @@ func newWarningsSeriesSet(warns annotations.Annotations) storage.SeriesSet {
}
}

func newErrorSeriesSet(err error) storage.SeriesSet {
return &testSeriesSet{
i: -1,
err: err,
}
}

func (s *testSeriesSet) Next() bool { s.i++; return s.i < len(s.series) }
func (s *testSeriesSet) At() storage.Series { return s.series[s.i] }
func (s *testSeriesSet) Err() error { return nil }
func (s *testSeriesSet) Err() error { return s.err }
func (s *testSeriesSet) Warnings() annotations.Annotations { return s.warns }

type slowSeries struct{}
Expand Down
2 changes: 1 addition & 1 deletion execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ func newRemoteExecution(ctx context.Context, e logicalplan.RemoteExecution, opts
// We need to set the lookback for the selector to 0 since the remote query already applies one lookback.
selectorOpts := *opts
selectorOpts.LookbackDelta = 0
remoteExec := remote.NewExecution(qry, model.NewVectorPool(opts.StepsBatch), e.QueryRangeStart, &selectorOpts, hints)
remoteExec := remote.NewExecution(qry, model.NewVectorPool(opts.StepsBatch), e.QueryRangeStart, e.Engine.LabelSets(), &selectorOpts, hints)
return exchange.NewConcurrent(remoteExec, 2, opts), nil
}

Expand Down
14 changes: 10 additions & 4 deletions execution/remote/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ type Execution struct {
model.OperatorTelemetry
}

func NewExecution(query promql.Query, pool *model.VectorPool, queryRangeStart time.Time, opts *query.Options, _ storage.SelectHints) *Execution {
storage := newStorageFromQuery(query, opts)
func NewExecution(query promql.Query, pool *model.VectorPool, queryRangeStart time.Time, engineLabels []labels.Labels, opts *query.Options, _ storage.SelectHints) *Execution {
storage := newStorageFromQuery(query, opts, engineLabels)
oper := &Execution{
storage: storage,
query: query,
Expand Down Expand Up @@ -90,16 +90,18 @@ func (e *Execution) Samples() *stats.QuerySamples {
type storageAdapter struct {
query promql.Query
opts *query.Options
lbls []labels.Labels

once sync.Once
err error
series []promstorage.SignedSeries
}

func newStorageFromQuery(query promql.Query, opts *query.Options) *storageAdapter {
func newStorageFromQuery(query promql.Query, opts *query.Options, lbls []labels.Labels) *storageAdapter {
return &storageAdapter{
query: query,
opts: opts,
lbls: lbls,
}
}

Expand All @@ -120,7 +122,11 @@ func (s *storageAdapter) executeQuery(ctx context.Context) {
warnings.AddToContext(w, ctx)
}
if result.Err != nil {
s.err = result.Err
if s.opts.EnablePartialResponses {
warnings.AddToContext(fmt.Errorf("error querying %s: %s", s.lbls, result.Err), ctx)

Check failure on line 126 in execution/remote/operator.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

declaration "Errorf" from package "fmt" shouldn't be used, suggested: "github.com/efficientgo/core/errors.{Wrap,Wrapf}"
} else {
s.err = result.Err
}
return
}
switch val := result.Value.(type) {
Expand Down
1 change: 0 additions & 1 deletion logicalplan/distribute.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,6 @@ func (m DistributedExecutionOptimizer) Optimize(plan Node, opts *query.Options)
*current = m.distributeQuery(current, engines, m.subqueryOpts(parents, current, opts), minEngineOverlap)
return true
})

return plan, *warns
}

Expand Down
1 change: 1 addition & 0 deletions query/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Options struct {
ExtLookbackDelta time.Duration
NoStepSubqueryIntervalFn func(time.Duration) time.Duration
EnableAnalysis bool
EnablePartialResponses bool
DecodingConcurrency int
}

Expand Down
1 change: 0 additions & 1 deletion storage/prometheus/vector_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,5 @@ func selectPoint(it *storage.MemoizedSeriesIterator, ts, lookbackDelta, offset i
if value.IsStaleNaN(v) || (fh != nil && value.IsStaleNaN(fh.Sum)) {
return 0, 0, nil, false, nil
}

return t, v, fh, true, nil
}

0 comments on commit 2f5c05e

Please sign in to comment.