Skip to content

Commit

Permalink
key transform wrapper for ds.Txn
Browse files Browse the repository at this point in the history
  • Loading branch information
i-norden committed Aug 7, 2023
1 parent 9417800 commit da281c3
Showing 1 changed file with 165 additions and 21 deletions.
186 changes: 165 additions & 21 deletions keytransform/txndatastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,26 +225,6 @@ func (d *TxnDatastore) Batch(ctx context.Context) (ds.Batch, error) {
}, nil
}

type transformBatch struct {
dst ds.Batch

f KeyMapping
}

var _ ds.Batch = (*transformBatch)(nil)

func (t *transformBatch) Put(ctx context.Context, key ds.Key, val []byte) error {
return t.dst.Put(ctx, t.f(key), val)
}

func (t *transformBatch) Delete(ctx context.Context, key ds.Key) error {
return t.dst.Delete(ctx, t.f(key))
}

func (t *transformBatch) Commit(ctx context.Context) error {
return t.dst.Commit(ctx)
}

func (d *TxnDatastore) Check(ctx context.Context) error {
if c, ok := d.child.(ds.CheckedDatastore); ok {
return c.Check(ctx)
Expand All @@ -267,5 +247,169 @@ func (d *TxnDatastore) CollectGarbage(ctx context.Context) error {
}

func (d *TxnDatastore) NewTransaction(ctx context.Context, readOnly bool) (ds.Txn, error) {
panic("implement me")
childTxn, err := d.child.NewTransaction(ctx, readOnly)
if err != nil {
return nil, err
}
return &txnWrapper{child: childTxn, KeyTransform: d.KeyTransform}, nil
}

type txnWrapper struct {
child ds.Txn

KeyTransform
}

var _ ds.Txn = (*txnWrapper)(nil)

func (t *txnWrapper) Get(ctx context.Context, key ds.Key) (value []byte, err error) {
return t.child.Get(ctx, t.ConvertKey(key))
}

func (t *txnWrapper) Has(ctx context.Context, key ds.Key) (exists bool, err error) {
return t.child.Has(ctx, t.ConvertKey(key))
}

func (t *txnWrapper) GetSize(ctx context.Context, key ds.Key) (size int, err error) {
return t.child.GetSize(ctx, t.ConvertKey(key))
}

func (t *txnWrapper) Query(ctx context.Context, q dsq.Query) (dsq.Results, error) {
nq, cq := t.prepareQuery(q)

cqr, err := t.child.Query(ctx, cq)
if err != nil {
return nil, err
}

qr := dsq.ResultsFromIterator(q, dsq.Iterator{
Next: func() (dsq.Result, bool) {
r, ok := cqr.NextSync()
if !ok {
return r, false
}
if r.Error == nil {
r.Entry.Key = t.InvertKey(ds.RawKey(r.Entry.Key)).String()
}
return r, true
},
Close: func() error {
return cqr.Close()
},
})
return dsq.NaiveQueryApply(nq, qr), nil
}

// Split the query into a child query and a naive query. That way, we can make
// the child datastore do as much work as possible.
func (t *txnWrapper) prepareQuery(q dsq.Query) (naive, child dsq.Query) {

// First, put everything in the child query. Then, start taking things
// out.
child = q

// Always let the child handle the key prefix.
child.Prefix = t.ConvertKey(ds.NewKey(child.Prefix)).String()

// Check if the key transform is order-preserving so we can use the
// child datastore's built-in ordering.
orderPreserving := false
switch t.KeyTransform.(type) {
case PrefixTransform, *PrefixTransform:
orderPreserving = true
}

// Try to let the child handle ordering.
orders:
for i, o := range child.Orders {
switch o.(type) {
case dsq.OrderByValue, *dsq.OrderByValue,
dsq.OrderByValueDescending, *dsq.OrderByValueDescending:
// Key doesn't matter.
continue
case dsq.OrderByKey, *dsq.OrderByKey,
dsq.OrderByKeyDescending, *dsq.OrderByKeyDescending:
// if the key transform preserves order, we can delegate
// to the child datastore.
if orderPreserving {
// When sorting, we compare with the first
// Order, then, if equal, we compare with the
// second Order, etc. However, keys are _unique_
// so we'll never apply any additional orders
// after ordering by key.
child.Orders = child.Orders[:i+1]
break orders
}
}

// Can't handle this order under transform, punt it to a naive
// ordering.
naive.Orders = q.Orders
child.Orders = nil
naive.Offset = q.Offset
child.Offset = 0
naive.Limit = q.Limit
child.Limit = 0
break
}

// Try to let the child handle the filters.

// don't modify the original filters.
child.Filters = append([]dsq.Filter(nil), child.Filters...)

for i, f := range child.Filters {
switch f := f.(type) {
case dsq.FilterValueCompare, *dsq.FilterValueCompare:
continue
case dsq.FilterKeyCompare:
child.Filters[i] = dsq.FilterKeyCompare{
Op: f.Op,
Key: t.ConvertKey(ds.NewKey(f.Key)).String(),
}
continue
case *dsq.FilterKeyCompare:
child.Filters[i] = &dsq.FilterKeyCompare{
Op: f.Op,
Key: t.ConvertKey(ds.NewKey(f.Key)).String(),
}
continue
case dsq.FilterKeyPrefix:
child.Filters[i] = dsq.FilterKeyPrefix{
Prefix: t.ConvertKey(ds.NewKey(f.Prefix)).String(),
}
continue
case *dsq.FilterKeyPrefix:
child.Filters[i] = &dsq.FilterKeyPrefix{
Prefix: t.ConvertKey(ds.NewKey(f.Prefix)).String(),
}
continue
}

// Not a known filter, defer to the naive implementation.
naive.Filters = q.Filters
child.Filters = nil
naive.Offset = q.Offset
child.Offset = 0
naive.Limit = q.Limit
child.Limit = 0
break
}
return
}

func (t txnWrapper) Put(ctx context.Context, key ds.Key, value []byte) error {
return t.child.Put(ctx, t.ConvertKey(key), value)
}

func (t txnWrapper) Delete(ctx context.Context, key ds.Key) error {
return t.child.Delete(ctx, t.ConvertKey(key))
}

func (t txnWrapper) Commit(ctx context.Context) error {
return t.child.Commit(ctx)
}

func (t txnWrapper) Discard(ctx context.Context) {
t.child.Discard(ctx)
}

0 comments on commit da281c3

Please sign in to comment.