Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handleBootstrap: avoid a use of syncutil.Without() #397

Merged
merged 2 commits into from
Dec 22, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 70 additions & 75 deletions rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,49 +612,48 @@ func (c *Conn) handleBootstrap(ctx context.Context, id answerID) error {
rl := &releaseList{}
defer rl.Release()

c.lk.Lock()
defer c.lk.Unlock()

if c.lk.answers[id] != nil {
return rpcerr.Failedf("incoming bootstrap: answer ID %d reused", id)
}

var (
err error
ans = answer{c: c, id: id}
)

syncutil.Without(&c.lk, func() {
ans.ret, ans.sendMsg, ans.msgReleaser, err = c.newReturn()
if err == nil {
ans.ret.SetAnswerId(uint32(id))
ans.ret.SetReleaseParamCaps(false)
ans.ret, ans.sendMsg, ans.msgReleaser, err = c.newReturn()
if err == nil {
ans.ret.SetAnswerId(uint32(id))
ans.ret.SetReleaseParamCaps(false)
}

syncutil.With(&c.lk, func() {
if c.lk.answers[id] != nil {
rl.Add(ans.msgReleaser.Decr)
err = rpcerr.Failedf("incoming bootstrap: answer ID %d reused", id)
return
}
})

if err != nil {
err = rpcerr.Annotate(err, "incoming bootstrap")
c.lk.answers[id] = errorAnswer(c, id, err)
c.er.ReportError(err)
return nil
}
if err != nil {
err = rpcerr.Annotate(err, "incoming bootstrap")
c.lk.answers[id] = errorAnswer(c, id, err)
c.er.ReportError(err)
return
}

c.lk.answers[id] = &ans
if !c.bootstrap.IsValid() {
ans.sendException(rl, exc.New(exc.Failed, "", "vat does not expose a public/bootstrap interface"))
return nil
}
if err := ans.setBootstrap(c.bootstrap.AddRef()); err != nil {
ans.sendException(rl, err)
return nil
}
err = ans.sendReturn(rl)
if err != nil {
// Answer cannot possibly encounter a Finish, since we still
// haven't returned to receive().
panic(err)
}
return nil
c.lk.answers[id] = &ans
if !c.bootstrap.IsValid() {
ans.sendException(rl, exc.New(exc.Failed, "", "vat does not expose a public/bootstrap interface"))
return
}
if err := ans.setBootstrap(c.bootstrap.AddRef()); err != nil {
ans.sendException(rl, err)
return
}
err = ans.sendReturn(rl)
if err != nil {
// Answer cannot possibly encounter a Finish, since we still
// haven't returned to receive().
panic(err)
}
})
return err
}

func (c *Conn) handleCall(ctx context.Context, call rpccp.Call, releaseCall capnp.ReleaseFunc) error {
Expand Down Expand Up @@ -1389,64 +1388,60 @@ func (c *Conn) handleDisembargo(ctx context.Context, d rpccp.Disembargo, release
return
}

client := iface.Client()

var ok bool
syncutil.Without(&c.lk, func() {
imp, ok = client.State().Brand.Value.(*importClient)
})

if !ok || imp.c != c {
client.Release()
err = rpcerr.Failedf("incoming disembargo: sender loopback requested on a capability that is not an import")
return
}

// TODO(maybe): check generation?
client = iface.Client()
})

if err != nil {
release()
return err
}

imp, ok := client.State().Brand.Value.(*importClient)
if !ok || imp.c != c {
client.Release()
return rpcerr.Failedf("incoming disembargo: sender loopback requested on a capability that is not an import")
}
// TODO(maybe): check generation?

// Since this Cap'n Proto RPC implementation does not send imports
// unless they are fully dequeued, we can just immediately loop back.
id := d.Context().SenderLoopback()
c.sendMessage(ctx, func(m rpccp.Message) error {
defer release()
defer client.Release()

d, err := m.NewDisembargo()
if err != nil {
return err
}
syncutil.With(&c.lk, func() {
c.sendMessage(ctx, func(m rpccp.Message) error {
d, err := m.NewDisembargo()
if err != nil {
return err
}

tgt, err := d.NewTarget()
if err != nil {
return err
}
tgt, err := d.NewTarget()
if err != nil {
return err
}

tgt.SetImportedCap(uint32(imp.id))
d.Context().SetReceiverLoopback(id)
return nil
tgt.SetImportedCap(uint32(imp.id))
d.Context().SetReceiverLoopback(id)
return nil

}, func(err error) {
c.er.ReportError(rpcerr.Annotatef(err, "incoming disembargo: send receiver loopback"))
}, func(err error) {
defer release()
defer client.Release()
c.er.ReportError(rpcerr.Annotatef(err, "incoming disembargo: send receiver loopback"))
})
})

default:
c.er.ReportError(fmt.Errorf("incoming disembargo: context %v not implemented", d.Context().Which()))
c.sendMessage(ctx, func(m rpccp.Message) (err error) {
defer release()

if m, err = m.NewUnimplemented(); err == nil {
err = m.SetDisembargo(d)
}
syncutil.With(&c.lk, func() {
c.sendMessage(ctx, func(m rpccp.Message) (err error) {
if m, err = m.NewUnimplemented(); err == nil {
err = m.SetDisembargo(d)
}

return
}, func(err error) {
c.er.ReportError(rpcerr.Annotate(err, "incoming disembargo: send unimplemented"))
return
}, func(err error) {
defer release()
c.er.ReportError(rpcerr.Annotate(err, "incoming disembargo: send unimplemented"))
})
})
}

Expand Down