-
Notifications
You must be signed in to change notification settings - Fork 232
/
lookup_optim.go
313 lines (258 loc) · 10.4 KB
/
lookup_optim.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
package dht
import (
"context"
"fmt"
"math"
"sync"
"sync/atomic"
"time"
"github.com/libp2p/go-libp2p-kad-dht/metrics"
"github.com/libp2p/go-libp2p-kad-dht/netsize"
"github.com/libp2p/go-libp2p-kad-dht/qpeerset"
kb "github.com/libp2p/go-libp2p-kbucket"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multihash"
ks "github.com/whyrusleeping/go-keyspace"
"gonum.org/v1/gonum/mathext"
)
const (
// optProvIndividualThresholdCertainty describes how sure we want to be that an individual peer that
// we find during walking the DHT actually belongs to the k-closest peers based on the current network size
// estimation.
optProvIndividualThresholdCertainty = 0.9
// optProvSetThresholdStrictness describes the probability that the set of closest peers is actually further
// away then the calculated set threshold. Put differently, what is the probability that we are too strict and
// don't terminate the process early because we can't find any closer peers.
optProvSetThresholdStrictness = 0.1
// optProvReturnRatio corresponds to how many ADD_PROVIDER RPCs must have completed (regardless of success)
// before we return to the user. The ratio of 0.75 equals 15 RPC as it is based on the Kademlia bucket size.
optProvReturnRatio = 0.75
)
type addProviderRPCState int
const (
scheduled addProviderRPCState = iota + 1
success
failure
)
type optimisticState struct {
// context for all ADD_PROVIDER RPCs
putCtx context.Context
// reference to the DHT
dht *IpfsDHT
// the most recent network size estimation
networkSize int32
// a channel indicating when an ADD_PROVIDER RPC completed (successful or not)
doneChan chan struct{}
// tracks which peers we have stored the provider records with
peerStatesLk sync.RWMutex
peerStates map[peer.ID]addProviderRPCState
// the key to provide
key string
// the key to provide transformed into the Kademlia key space
ksKey ks.Key
// distance threshold for individual peers. If peers are closer than this number we store
// the provider records right away.
individualThreshold float64
// distance threshold for the set of bucketSize closest peers. If the average distance of the bucketSize
// closest peers is below this number we stop the DHT walk and store the remaining provider records.
// "remaining" because we have likely already stored some on peers that were below the individualThreshold.
setThreshold float64
// number of completed (regardless of success) ADD_PROVIDER RPCs before we return control back to the user.
returnThreshold int
// putProvDone counts the ADD_PROVIDER RPCs that have completed (successful and unsuccessful)
putProvDone atomic.Int32
}
func (dht *IpfsDHT) newOptimisticState(ctx context.Context, key string) (*optimisticState, error) {
// get network size and err out if there is no reasonable estimate
networkSize, err := dht.nsEstimator.NetworkSize()
if err != nil {
return nil, err
}
individualThreshold := mathext.GammaIncRegInv(float64(dht.bucketSize), 1-optProvIndividualThresholdCertainty) / float64(networkSize)
setThreshold := mathext.GammaIncRegInv(float64(dht.bucketSize)/2.0+1, 1-optProvSetThresholdStrictness) / float64(networkSize)
returnThreshold := int(math.Ceil(float64(dht.bucketSize) * optProvReturnRatio))
return &optimisticState{
putCtx: ctx,
dht: dht,
key: key,
doneChan: make(chan struct{}, returnThreshold), // buffered channel to not miss events
ksKey: ks.XORKeySpace.Key([]byte(key)),
networkSize: networkSize,
peerStates: map[peer.ID]addProviderRPCState{},
individualThreshold: individualThreshold,
setThreshold: setThreshold,
returnThreshold: returnThreshold,
putProvDone: atomic.Int32{},
}, nil
}
func (dht *IpfsDHT) optimisticProvide(outerCtx context.Context, keyMH multihash.Multihash) error {
key := string(keyMH)
if key == "" {
return fmt.Errorf("can't lookup empty key")
}
// initialize new context for all putProvider operations.
// We don't want to give the outer context to the put operations as we return early before all
// put operations have finished to avoid the long tail of the latency distribution. If we
// provided the outer context the put operations may be cancelled depending on what happens
// with the context on the user side.
putCtx, putCtxCancel := context.WithTimeout(context.Background(), time.Minute)
es, err := dht.newOptimisticState(putCtx, key)
if err != nil {
putCtxCancel()
return err
}
// initialize context that finishes when this function returns
innerCtx, innerCtxCancel := context.WithCancel(outerCtx)
defer innerCtxCancel()
go func() {
select {
case <-outerCtx.Done():
// If the outer context gets cancelled while we're still in this function. We stop all
// pending put operations.
putCtxCancel()
case <-innerCtx.Done():
// We have returned from this function. Ignore cancellations of the outer context and continue
// with the remaining put operations.
}
}()
lookupRes, err := dht.runLookupWithFollowup(outerCtx, key, dht.pmGetClosestPeers(key), es.stopFn)
if err != nil {
return err
}
// Store the provider records with all the closest peers we haven't already contacted/scheduled interaction with.
es.peerStatesLk.Lock()
for _, p := range lookupRes.peers {
if _, found := es.peerStates[p]; found {
continue
}
go es.putProviderRecord(p)
es.peerStates[p] = scheduled
}
es.peerStatesLk.Unlock()
// wait until a threshold number of RPCs have completed
es.waitForRPCs()
if err := outerCtx.Err(); err != nil || !lookupRes.completed { // likely the "completed" field is false but that's not a given
return err
}
// tracking lookup results for network size estimator as "completed" is true
if err = dht.nsEstimator.Track(key, lookupRes.closest); err != nil {
logger.Warnf("network size estimator track peers: %s", err)
}
if ns, err := dht.nsEstimator.NetworkSize(); err == nil {
metrics.NetworkSize.M(int64(ns))
}
// refresh the cpl for this key as the query was successful
dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertKey(key), time.Now())
return nil
}
func (os *optimisticState) stopFn(qps *qpeerset.QueryPeerset) bool {
os.peerStatesLk.Lock()
defer os.peerStatesLk.Unlock()
// get currently known closest peers and check if any of them is already very close.
// If so -> store provider records straight away.
closest := qps.GetClosestNInStates(os.dht.bucketSize, qpeerset.PeerHeard, qpeerset.PeerWaiting, qpeerset.PeerQueried)
distances := make([]float64, os.dht.bucketSize)
for i, p := range closest {
// calculate distance of peer p to the target key
distances[i] = netsize.NormedDistance(p, os.ksKey)
// Check if we have already scheduled interaction or have actually interacted with that peer
if _, found := os.peerStates[p]; found {
continue
}
// Check if peer is close enough to store the provider record with
if distances[i] > os.individualThreshold {
continue
}
// peer is indeed very close already -> store the provider record directly with it!
go os.putProviderRecord(p)
// keep track that we've scheduled storing a provider record with that peer
os.peerStates[p] = scheduled
}
// count number of peers we have scheduled to contact or have already successfully contacted via the above method
scheduledAndSuccessCount := 0
for _, s := range os.peerStates {
if s == scheduled || s == success {
scheduledAndSuccessCount += 1
}
}
// if we have already contacted/scheduled the RPC for more than bucketSize peers stop the procedure
if scheduledAndSuccessCount >= os.dht.bucketSize {
return true
}
// calculate average distance of the set of closest peers
sum := 0.0
for _, d := range distances {
sum += d
}
avg := sum / float64(len(distances))
// if the average is below the set threshold stop the procedure
return avg < os.setThreshold
}
func (os *optimisticState) putProviderRecord(pid peer.ID) {
err := os.dht.protoMessenger.PutProviderAddrs(os.putCtx, pid, []byte(os.key), peer.AddrInfo{
ID: os.dht.self,
Addrs: os.dht.filterAddrs(os.dht.host.Addrs()),
})
os.peerStatesLk.Lock()
if err != nil {
os.peerStates[pid] = failure
} else {
os.peerStates[pid] = success
}
os.peerStatesLk.Unlock()
// indicate that this ADD_PROVIDER RPC has completed
os.doneChan <- struct{}{}
}
// waitForRPCs waits for a subset of ADD_PROVIDER RPCs to complete and then acquire a lease on
// a bound channel to return early back to the user and prevent unbound asynchronicity. If
// there are already too many requests in-flight we are just waiting for our current set to
// finish.
func (os *optimisticState) waitForRPCs() {
os.peerStatesLk.RLock()
rpcCount := len(os.peerStates)
os.peerStatesLk.RUnlock()
// returnThreshold can't be larger than the total number issued RPCs
if os.returnThreshold > rpcCount {
os.returnThreshold = rpcCount
}
// Wait until returnThreshold ADD_PROVIDER RPCs have returned
for range os.doneChan {
if int(os.putProvDone.Add(1)) == os.returnThreshold {
break
}
}
// At this point only a subset of all ADD_PROVIDER RPCs have completed.
// We want to give control back to the user as soon as possible because
// it is highly likely that at least one of the remaining RPCs will time
// out and thus slow down the whole processes. The provider records will
// already be available with less than the total number of RPCs having
// finished. This has been investigated here:
// https://github.com/protocol/network-measurements/blob/master/results/rfm17-provider-record-liveness.md
// For the remaining ADD_PROVIDER RPCs try to acquire a lease on the optProvJobsPool channel.
// If that worked we need to consume the doneChan and release the acquired lease on the
// optProvJobsPool channel.
remaining := rpcCount - int(os.putProvDone.Load())
for i := 0; i < remaining; i++ {
select {
case os.dht.optProvJobsPool <- struct{}{}:
// We were able to acquire a lease on the optProvJobsPool channel.
// Consume doneChan to release the acquired lease again.
go os.consumeDoneChan(rpcCount)
case <-os.doneChan:
// We were not able to acquire a lease but an ADD_PROVIDER RPC resolved.
if int(os.putProvDone.Add(1)) == rpcCount {
close(os.doneChan)
}
}
}
}
func (os *optimisticState) consumeDoneChan(until int) {
// Wait for an RPC to finish
<-os.doneChan
// Release acquired lease for other's to get a spot
<-os.dht.optProvJobsPool
// If all RPCs have finished, close the channel.
if int(os.putProvDone.Add(1)) == until {
close(os.doneChan)
}
}