Skip to content

Commit

Permalink
capture the full raw response for Reply/Notificaiton
Browse files Browse the repository at this point in the history
  • Loading branch information
nemith committed Aug 31, 2023
1 parent becda36 commit 7d95be1
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 51 deletions.
78 changes: 70 additions & 8 deletions msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,18 @@ import (
"golang.org/x/exp/slices"
)

var (
RPCReplyName = xml.Name{
Space: "urn:ietf:params:xml:ns:netconf:base:1.0",
Local: "rpc-reply",
}

NofificationName = xml.Name{
Space: "urn:ietf:params:xml:ns:netconf:notification:1.0",
Local: "notification",
}
)

// RawXML captures the raw xml for the given element. Used to process certain
// elements later.
type RawXML []byte
Expand Down Expand Up @@ -69,13 +81,38 @@ type Reply struct {
XMLName xml.Name `xml:"urn:ietf:params:xml:ns:netconf:base:1.0 rpc-reply"`
MessageID uint64 `xml:"message-id,attr"`
Errors RPCErrors `xml:"rpc-error,omitempty"`
Body []byte `xml:",innerxml"`

raw []byte `xml:"-"`
}

func ParseReply(data []byte) (*Reply, error) {
reply := Reply{
raw: data,
}
if err := xml.Unmarshal(data, &reply); err != nil {
return nil, fmt.Errorf("couldn't parse reply: %v", err)
}

return &reply, nil
}

// Decode will decode the body of a reply into a value pointed to by v. This is
// a simple wrapper around xml.Unmarshal.
// Decode will decode the entire `rpc-reply` into a value pointed to by v. This
// is a simple wrapper around xml.Unmarshal.
func (r Reply) Decode(v interface{}) error {
return xml.Unmarshal(r.Body, v)
if r.raw == nil {
return fmt.Errorf("empty reply")
}
return xml.Unmarshal(r.raw, v)
}

// Raw returns the native message as it came from the server
func (r Reply) Raw() []byte {
return r.raw
}

// String returns the message as string.
func (r Reply) String() string {
return string(r.raw)
}

// Err will return go error(s) from a Reply that are of the given severities. If
Expand Down Expand Up @@ -121,13 +158,38 @@ func (r Reply) Err(severity ...ErrSeverity) error {
type Notification struct {
XMLName xml.Name `xml:"urn:ietf:params:xml:ns:netconf:notification:1.0 notification"`
EventTime time.Time `xml:"eventTime"`
Body []byte `xml:",innerxml"`

raw []byte `xml:"-"`
}

// Decode will decode the body of a noticiation into a value pointed to by v.
func ParseNotification(data []byte) (*Notification, error) {
notif := Notification{
raw: data,
}
if err := xml.Unmarshal(data, &notif); err != nil {
return nil, fmt.Errorf("couldn't parse reply: %v", err)
}

return &notif, nil
}

// Decode will decode the entire `noticiation` into a value pointed to by v.
// This is a simple wrapper around xml.Unmarshal.
func (r Notification) Decode(v interface{}) error {
return xml.Unmarshal(r.Body, v)
func (n Notification) Decode(v interface{}) error {
if n.raw == nil {
return fmt.Errorf("empty reply")
}
return xml.Unmarshal(n.raw, v)
}

// Raw returns the native message as it came from the server
func (n Notification) Raw() []byte {
return n.raw
}

// String returns the message as string.
func (n Notification) String() string {
return string(n.raw)
}

type ErrSeverity string
Expand Down
11 changes: 0 additions & 11 deletions msg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,17 +251,6 @@ func TestUnmarshalRPCReply(t *testing.T) {
`),
},
},
Body: []byte(`
<rpc-error>
<error-type>protocol</error-type>
<error-tag>operation-failed</error-tag>
<error-severity>error</error-severity>
<error-message>syntax error, expecting &lt;candidate/&gt; or &lt;running/&gt;</error-message>
<error-info>
<bad-element>non-exist</bad-element>
</error-info>
</rpc-error>
`),
},
},
}
Expand Down
27 changes: 13 additions & 14 deletions ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (b *ExtantBool) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error
return nil
}

type OKResp struct {
type OKReply struct {
OK ExtantBool `xml:"ok"`
}

Expand Down Expand Up @@ -95,8 +95,7 @@ type GetConfigReq struct {
}

type GetConfigReply struct {
XMLName xml.Name `xml:"data"`
Config []byte `xml:",innerxml"`
Data []byte `xml:"data"`
}

// GetConfig implements the <get-config> rpc operation defined in [RFC6241 7.1].
Expand All @@ -113,7 +112,7 @@ func (s *Session) GetConfig(ctx context.Context, source Datastore) ([]byte, erro
return nil, err
}

return resp.Config, nil
return resp.Data, nil
}

// MergeStrategy defines the strategies for merging configuration in a
Expand Down Expand Up @@ -272,7 +271,7 @@ func (s *Session) EditConfig(ctx context.Context, target Datastore, config any,
opt.apply(&req)
}

var resp OKResp
var resp OKReply
return s.Call(ctx, &req, &resp)
}

Expand All @@ -297,7 +296,7 @@ func (s *Session) CopyConfig(ctx context.Context, source, target any) error {
Target: target,
}

var resp OKResp
var resp OKReply
return s.Call(ctx, &req, &resp)
}

Expand All @@ -311,7 +310,7 @@ func (s *Session) DeleteConfig(ctx context.Context, target Datastore) error {
Target: target,
}

var resp OKResp
var resp OKReply
return s.Call(ctx, &req, &resp)
}

Expand All @@ -326,7 +325,7 @@ func (s *Session) Lock(ctx context.Context, target Datastore) error {
Target: target,
}

var resp OKResp
var resp OKReply
return s.Call(ctx, &req, &resp)
}

Expand All @@ -336,7 +335,7 @@ func (s *Session) Unlock(ctx context.Context, target Datastore) error {
Target: target,
}

var resp OKResp
var resp OKReply
return s.Call(ctx, &req, &resp)
}

Expand All @@ -356,7 +355,7 @@ func (s *Session) KillSession(ctx context.Context, sessionID uint32) error {
SessionID: sessionID,
}

var resp OKResp
var resp OKReply
return s.Call(ctx, &req, &resp)
}

Expand All @@ -370,7 +369,7 @@ func (s *Session) Validate(ctx context.Context, source any) error {
Source: source,
}

var resp OKResp
var resp OKReply
return s.Call(ctx, &req, &resp)
}

Expand Down Expand Up @@ -444,7 +443,7 @@ func (s *Session) Commit(ctx context.Context, opts ...CommitOption) error {
return fmt.Errorf("PersistID cannot be used with Confirmed/ConfirmedTimeout or Persist options")
}

var resp OKResp
var resp OKReply
return s.Call(ctx, &req, &resp)
}

Expand All @@ -466,7 +465,7 @@ func (s *Session) CancelCommit(ctx context.Context, opts ...CancelCommitOption)
opt.applyCancelCommit(&req)
}

var resp OKResp
var resp OKReply
return s.Call(ctx, &req, &resp)
}

Expand Down Expand Up @@ -509,6 +508,6 @@ func (s *Session) CreateSubscription(ctx context.Context, opts ...CreateSubscrip
}
// TODO: eventual custom notifications rpc logic, e.g. create subscription only if notification capability is present

var resp OKResp
var resp OKReply
return s.Call(ctx, &req, &resp)
}
48 changes: 30 additions & 18 deletions session.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package netconf

import (
"bytes"
"context"
"encoding/xml"
"errors"
Expand Down Expand Up @@ -189,30 +190,26 @@ func (s *Session) recvMsg() error {
return err
}
defer r.Close()
dec := xml.NewDecoder(r)

root, err := startElement(dec)
msg, err := io.ReadAll(r)
if err != nil {
return err
}

const (
ncNamespace = "urn:ietf:params:xml:ns:netconf:base:1.0"
notifNamespace = "urn:ietf:params:xml:ns:netconf:notification:1.0"
)
return s.parseMsg(msg)
}

func (s *Session) parseMsg(msg []byte) error {
dec := xml.NewDecoder(bytes.NewReader(msg))

root, err := startElement(dec)
if err != nil {
return err
}

switch root.Name {
case xml.Name{Space: notifNamespace, Local: "notification"}:
if s.notificationHandler == nil {
return nil
}
var notif Notification
if err := dec.DecodeElement(&notif, root); err != nil {
return fmt.Errorf("failed to decode notification message: %w", err)
}
s.notificationHandler(notif)
case xml.Name{Space: ncNamespace, Local: "rpc-reply"}:
var reply Reply
case RPCReplyName:
reply := Reply{raw: msg}
if err := dec.DecodeElement(&reply, root); err != nil {
// What should we do here? Kill the connection?
return fmt.Errorf("failed to decode rpc-reply message: %w", err)
Expand All @@ -228,6 +225,17 @@ func (s *Session) recvMsg() error {
case <-req.ctx.Done():
return fmt.Errorf("message %d context canceled: %s", reply.MessageID, req.ctx.Err().Error())
}

case NofificationName:
if s.notificationHandler == nil {
return nil
}
notif := Notification{raw: msg}
if err := dec.DecodeElement(&notif, root); err != nil {
return fmt.Errorf("failed to decode notification message: %w", err)
}
s.notificationHandler(notif)

default:
return fmt.Errorf("unknown message type: %q", root.Name.Local)
}
Expand Down Expand Up @@ -342,6 +350,8 @@ func (s *Session) Call(ctx context.Context, req any, resp any) error {
return err
}

// Return any <rpc-error>. This defaults to a severity of `error` (warning
// are omitted).
if err := reply.Err(); err != nil {
return err
}
Expand Down Expand Up @@ -377,7 +387,9 @@ func (s *Session) Close(ctx context.Context) error {
}
}

if callErr != io.EOF {
// it's ok if we are already closed
if !errors.Is(callErr, io.EOF) &&
!errors.Is(callErr, ErrClosed) {
return callErr
}

Expand Down

0 comments on commit 7d95be1

Please sign in to comment.