forked from ipfs-cluster/ipfs-cluster
-
Notifications
You must be signed in to change notification settings - Fork 0
/
allocate.go
205 lines (177 loc) · 6.69 KB
/
allocate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
package ipfscluster
import (
"context"
"errors"
"fmt"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-core/peer"
"go.opencensus.io/trace"
"github.com/ipfs/ipfs-cluster/api"
)
// This file gathers allocation logic used when pinning or re-pinning
// to find which peers should be allocated to a Cid. Allocation is constrained
// by ReplicationFactorMin and ReplicationFactorMax parameters obtained
// from the Pin object.
// The allocation process has several steps:
//
// * Find which peers are pinning a CID
// * Obtain the last values for the configured informer metrics from the
// monitor component
// * Divide the metrics between "current" (peers already pinning the CID)
// and "candidates" (peers that could pin the CID), as long as their metrics
// are valid.
// * Given the candidates:
// * Check if we are overpinning an item
// * Check if there are not enough candidates for the "needed" replication
// factor.
// * If there are enough candidates:
// * Call the configured allocator, which sorts the candidates (and
// may veto some depending on the allocation strategy.
// * The allocator returns a list of final candidate peers sorted by
// order of preference.
// * Take as many final candidates from the list as we can, until
// ReplicationFactorMax is reached. Error if there are less than
// ReplicationFactorMin.
// allocate finds peers to allocate a hash using the informer and the monitor
// it should only be used with valid replicationFactors (if rplMin and rplMax
// are > 0, then rplMin <= rplMax).
// It always returns allocations, but if no new allocations are needed,
// it will return the current ones. Note that allocate() does not take
// into account if the given CID was previously in a "pin everywhere" mode,
// and will consider such Pins as currently unallocated ones, providing
// new allocations as available.
func (c *Cluster) allocate(ctx context.Context, hash cid.Cid, currentPin *api.Pin, rplMin, rplMax int, blacklist []peer.ID, prioritylist []peer.ID) ([]peer.ID, error) {
ctx, span := trace.StartSpan(ctx, "cluster/allocate")
defer span.End()
if (rplMin + rplMax) == 0 {
return nil, fmt.Errorf("bad replication factors: %d/%d", rplMin, rplMax)
}
if rplMin < 0 && rplMax < 0 { // allocate everywhere
return []peer.ID{}, nil
}
// Figure out who is holding the CID
var currentAllocs []peer.ID
if currentPin != nil {
currentAllocs = currentPin.Allocations
}
metrics := c.monitor.LatestMetrics(ctx, c.informers[0].Name())
currentMetrics := make(map[peer.ID]*api.Metric)
candidatesMetrics := make(map[peer.ID]*api.Metric)
priorityMetrics := make(map[peer.ID]*api.Metric)
// Divide metrics between current and candidates.
// All metrics in metrics are valid (at least the
// moment they were compiled by the monitor)
for _, m := range metrics {
switch {
case containsPeer(blacklist, m.Peer):
// discard blacklisted peers
continue
case containsPeer(currentAllocs, m.Peer):
currentMetrics[m.Peer] = m
case containsPeer(prioritylist, m.Peer):
priorityMetrics[m.Peer] = m
default:
candidatesMetrics[m.Peer] = m
}
}
newAllocs, err := c.obtainAllocations(
ctx,
hash,
rplMin,
rplMax,
currentMetrics,
candidatesMetrics,
priorityMetrics,
)
if err != nil {
return newAllocs, err
}
if newAllocs == nil {
newAllocs = currentAllocs
}
return newAllocs, nil
}
// allocationError logs an allocation error
func allocationError(hash cid.Cid, needed, wanted int, candidatesValid []peer.ID) error {
logger.Errorf("Not enough candidates to allocate %s:", hash)
logger.Errorf(" Needed: %d", needed)
logger.Errorf(" Wanted: %d", wanted)
logger.Errorf(" Valid candidates: %d:", len(candidatesValid))
for _, c := range candidatesValid {
logger.Errorf(" - %s", c.Pretty())
}
errorMsg := "not enough peers to allocate CID. "
errorMsg += fmt.Sprintf("Needed at least: %d. ", needed)
errorMsg += fmt.Sprintf("Wanted at most: %d. ", wanted)
errorMsg += fmt.Sprintf("Valid candidates: %d. ", len(candidatesValid))
errorMsg += "See logs for more info."
return errors.New(errorMsg)
}
func (c *Cluster) obtainAllocations(
ctx context.Context,
hash cid.Cid,
rplMin, rplMax int,
currentValidMetrics map[peer.ID]*api.Metric,
candidatesMetrics map[peer.ID]*api.Metric,
priorityMetrics map[peer.ID]*api.Metric,
) ([]peer.ID, error) {
ctx, span := trace.StartSpan(ctx, "cluster/obtainAllocations")
defer span.End()
// The list of peers in current
validAllocations := make([]peer.ID, 0, len(currentValidMetrics))
for k := range currentValidMetrics {
validAllocations = append(validAllocations, k)
}
nCurrentValid := len(validAllocations)
nCandidatesValid := len(candidatesMetrics) + len(priorityMetrics)
needed := rplMin - nCurrentValid // The minimum we need
wanted := rplMax - nCurrentValid // The maximum we want
logger.Debugf("obtainAllocations: current valid: %d", nCurrentValid)
logger.Debugf("obtainAllocations: candidates valid: %d", nCandidatesValid)
logger.Debugf("obtainAllocations: Needed: %d", needed)
logger.Debugf("obtainAllocations: Wanted: %d", wanted)
// Reminder: rplMin <= rplMax AND >0
if wanted < 0 { // allocations above maximum threshold: drop some
// This could be done more intelligently by dropping them
// according to the allocator order (i.e. free-ing peers
// with most used space first).
return validAllocations[0 : len(validAllocations)+wanted], nil
}
if needed <= 0 { // allocations are above minimal threshold
// We don't provide any new allocations
return nil, nil
}
if nCandidatesValid < needed { // not enough candidates
candidatesValid := []peer.ID{}
for k := range priorityMetrics {
candidatesValid = append(candidatesValid, k)
}
for k := range candidatesMetrics {
candidatesValid = append(candidatesValid, k)
}
return nil, allocationError(hash, needed, wanted, candidatesValid)
}
// We can allocate from this point. Use the allocator to decide
// on the priority of candidates grab as many as "wanted"
// the allocator returns a list of peers ordered by priority
finalAllocs, err := c.allocator.Allocate(
ctx,
hash,
currentValidMetrics,
candidatesMetrics,
priorityMetrics,
)
if err != nil {
return nil, logError(err.Error())
}
logger.Debugf("obtainAllocations: allocate(): %s", finalAllocs)
// check that we have enough as the allocator may have returned
// less candidates than provided.
if got := len(finalAllocs); got < needed {
return nil, allocationError(hash, needed, wanted, finalAllocs)
}
allocationsToUse := minInt(wanted, len(finalAllocs))
// the final result is the currently valid allocations
// along with the ones provided by the allocator
return append(validAllocations, finalAllocs[0:allocationsToUse]...), nil
}