Skip to content

Commit

Permalink
Plug new envoy to record import endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
tjerman committed Oct 9, 2023
1 parent 38580e5 commit 399f9a7
Show file tree
Hide file tree
Showing 9 changed files with 339 additions and 226 deletions.
38 changes: 24 additions & 14 deletions server/compose/envoy/record_datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package envoy

import (
"context"
"strconv"
"strings"

"github.com/cortezaproject/corteza/server/pkg/dal"
Expand All @@ -13,8 +14,10 @@ type (
// RecordDatasource provides a mechanism for you to access large
// record datasets optimally
RecordDatasource struct {
mapping datasourceMapping
provider envoyx.Provider
Mapping envoyx.DatasourceMapping
Provider envoyx.Provider

currentIndex int

// Reusable buffer for reading records
rowCache map[string]string
Expand All @@ -36,11 +39,11 @@ type (
)

func (rd *RecordDatasource) SetProvider(s envoyx.Provider) bool {
if rd.mapping.SourceIdent != s.Ident() {
if rd.Mapping.SourceIdent != s.Ident() {
return false
}

rd.provider = s
rd.Provider = s
return true
}

Expand All @@ -49,27 +52,34 @@ func (rd *RecordDatasource) Next(ctx context.Context, out map[string]string) (id
rd.rowCache = make(map[string]string)
}

more, err = rd.provider.Next(ctx, rd.rowCache)
more, err = rd.Provider.Next(ctx, rd.rowCache)
if err != nil || !more {
return
}

rd.applyMapping(rd.rowCache, out)

for _, k := range rd.mapping.KeyField {
ident = append(ident, rd.rowCache[k])
if len(rd.Mapping.KeyField) == 0 {
ident = append(ident, strconv.FormatInt(int64(rd.currentIndex), 10))
} else {
for _, k := range rd.Mapping.KeyField {
ident = append(ident, rd.rowCache[k])
}
}

rd.currentIndex++

return
}

func (rd *RecordDatasource) Reset(ctx context.Context) (err error) {
return rd.provider.Reset(ctx)
rd.currentIndex = 0
return rd.Provider.Reset(ctx)
}

func (rd *RecordDatasource) applyMapping(in, out map[string]string) {
if len(rd.mapping.Mapping.m) == 0 {
if !rd.mapping.Defaultable {
if len(rd.Mapping.Mapping.Map) == 0 {
if !rd.Mapping.Defaultable {
return
}

Expand All @@ -79,16 +89,16 @@ func (rd *RecordDatasource) applyMapping(in, out map[string]string) {
return
}

if rd.mapping.Defaultable {
if rd.Mapping.Defaultable {
rd.applyMappingWithDefaults(in, out)
} else {
rd.applyMappingWoDefaults(in, out)
}
}

func (rd *RecordDatasource) applyMappingWithDefaults(in, out map[string]string) {
maps := make(map[string]mapEntry)
for k, v := range rd.mapping.Mapping.m {
maps := make(map[string]envoyx.MapEntry)
for k, v := range rd.Mapping.Mapping.Map {
maps[k] = v
}

Expand All @@ -105,7 +115,7 @@ func (rd *RecordDatasource) applyMappingWithDefaults(in, out map[string]string)
}

func (rd *RecordDatasource) applyMappingWoDefaults(in, out map[string]string) {
for _, m := range rd.mapping.Mapping.m {
for _, m := range rd.Mapping.Mapping.Map {
if m.Skip {
continue
}
Expand Down
4 changes: 2 additions & 2 deletions server/compose/envoy/store_decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,10 @@ func (d StoreDecoder) decodeRecordDatasource(ctx context.Context, s store.Storer
}

ou := &RecordDatasource{
provider: &iteratorProvider{iter: iter},
Provider: &iteratorProvider{iter: iter},
refToID: make(map[string]uint64),
// @todo consider providing defaults from the outside
mapping: datasourceMapping{
Mapping: envoyx.DatasourceMapping{
KeyField: []string{"id"},
Defaultable: true,
},
Expand Down
12 changes: 12 additions & 0 deletions server/compose/envoy/store_encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,18 @@ import (
)

func (e StoreEncoder) encode(ctx context.Context, p envoyx.EncodeParams, s store.Storer, rt string, nn envoyx.NodeSet, tree envoyx.Traverser) (err error) {
dl, err := e.grabDal(p)
if err != nil {
return
}

switch rt {
case ComposeRecordDatasourceAuxType:
err = e.encodeRecordDatasources(ctx, p, s, dl, nn, tree)
if err != nil {
return
}
}
return
}

Expand Down
89 changes: 2 additions & 87 deletions server/compose/envoy/yaml_decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,32 +12,6 @@ import (
"gopkg.in/yaml.v3"
)

type (
mapEntry struct {
Column string
Field string
Skip bool
}

fieldMapping struct {
m map[string]mapEntry
}

datasourceMapping struct {
SourceIdent string `yaml:"source"`
KeyField []string `yaml:"key"`
References map[string]string
Scope map[string]string

// Defaultable indicates wether the mapping should keep the values where
// the ident is not explicitly mapped.
//
// When true, the value is assigned to the given identifier.
Defaultable bool `yaml:"defaultable"`
Mapping fieldMapping
}
)

const (
ComposeRecordDatasourceAuxType = "corteza::compose:record-datasource"
)
Expand Down Expand Up @@ -432,7 +406,7 @@ func (d *auxYamlDoc) unmarshalPagesExtendedNode(dctx documentContext, n *yaml.No
}

func (d *auxYamlDoc) unmarshalSourceExtendedNode(dctx documentContext, n *yaml.Node, meta ...*yaml.Node) (out envoyx.NodeSet, err error) {
var r datasourceMapping
var r envoyx.DatasourceMapping

// @todo we're omitting errors because there will be a bunch due to invalid
// resource field types. This might be a bit unstable as other errors may
Expand Down Expand Up @@ -474,7 +448,7 @@ func (d *auxYamlDoc) unmarshalSourceExtendedNode(dctx documentContext, n *yaml.N
// @todo for now we only support record datasources; extend when needed
auxN := &envoyx.Node{
Datasource: &RecordDatasource{
mapping: r,
Mapping: r,
},

ResourceType: ComposeRecordDatasourceAuxType,
Expand All @@ -486,65 +460,6 @@ func (d *auxYamlDoc) unmarshalSourceExtendedNode(dctx documentContext, n *yaml.N
return
}

// UnmarshalYAML is used to get the yaml parsed into a series of nodes so
// we can easily pass it down
func (d *fieldMapping) UnmarshalYAML(n *yaml.Node) (err error) {
d.m = make(map[string]mapEntry)
if y7s.IsSeq(n) {
err = y7s.EachSeq(n, func(n *yaml.Node) error {
a, err := d.unmarshalMappingNode(n)
d.m[a.Column] = a
return err
})
} else {
err = y7s.EachMap(n, func(k, n *yaml.Node) error {
a, err := d.unmarshalMappingNode(n)
if a.Column == "" {
err = y7s.DecodeScalar(k, "fieldMapping column", &a.Column)
if err != nil {
return err
}
}

d.m[a.Column] = a
return err
})
}

return
}

func (d *fieldMapping) unmarshalMappingNode(n *yaml.Node) (out mapEntry, err error) {
if y7s.IsKind(n, yaml.ScalarNode) {
err = y7s.DecodeScalar(n, "Column", &out.Column)
if err != nil {
return
}
err = y7s.DecodeScalar(n, "Field", &out.Field)
return
}

// @todo we're omitting errors because there will be a bunch due to invalid
// resource field types. This might be a bit unstable as other errors may
// also get ignored.
//
// A potential fix would be to firstly unmarshal into an any, check errors
// and then unmarshal into the resource while omitting errors.
n.Decode(&out)

err = y7s.EachMap(n, func(k, v *yaml.Node) error {
switch strings.ToLower(k.Value) {
case "skip":
if v.Value == "/" {
out.Skip = true
}
}
return nil
})

return
}

func (d *auxYamlDoc) procMappingRefs(in map[string]string) (out map[string]envoyx.Ref, scope envoyx.Scope) {
out = make(map[string]envoyx.Ref)

Expand Down
Loading

0 comments on commit 399f9a7

Please sign in to comment.