Skip to content

Commit

Permalink
fix: [hotfix] Load original key if ts is MaxTimestamp (#36935)
Browse files Browse the repository at this point in the history
Cherry pick from master
pr: #36934

Related to #36933

---------

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Oct 16, 2024
1 parent 1d9c746 commit dde7979
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 10 deletions.
3 changes: 3 additions & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ etcd:

metastore:
type: etcd # Default value: etcd, Valid values: [etcd, tikv]
snapshot:
ttl: 86400 # snapshot ttl in seconds
reserveTime: 3600 # snapshot reserve time in seconds

# Related configuration of tikv, used to store Milvus metadata.
# Notice that when TiKV is enabled for metastore, you still need to have etcd for service discovery.
Expand Down
20 changes: 11 additions & 9 deletions internal/metastore/kv/rootcoord/suffix_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,16 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

var (
// SuffixSnapshotTombstone special value for tombstone mark
SuffixSnapshotTombstone = []byte{0xE2, 0x9B, 0xBC}
PaginationSize = 5000
DefaultSnapshotReserveTime = 1 * time.Hour
DefaultSnapshotTTL = 24 * time.Hour
SuffixSnapshotTombstone = []byte{0xE2, 0x9B, 0xBC}
PaginationSize = 5000
)

// IsTombstone used in migration tool also.
Expand Down Expand Up @@ -308,9 +307,9 @@ func (ss *SuffixSnapshot) Save(key string, value string, ts typeutil.Timestamp)
}

func (ss *SuffixSnapshot) Load(key string, ts typeutil.Timestamp) (string, error) {
// if ts == 0, load latest by definition
// if ts == 0 or typeutil.MaxTimestamp, load latest by definition
// and with acceleration logic, just do load key will do
if ts == 0 {
if ts == 0 || ts == typeutil.MaxTimestamp {
value, err := ss.MetaKv.Load(key)
if ss.isTombstone(value) {
return "", errors.New("no value found")
Expand Down Expand Up @@ -434,7 +433,7 @@ func (ss *SuffixSnapshot) generateSaveExecute(kvs map[string]string, ts typeutil
// LoadWithPrefix load keys with provided prefix and returns value in the ts
func (ss *SuffixSnapshot) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error) {
// ts 0 case shall be treated as fetch latest/current value
if ts == 0 {
if ts == 0 || ts == typeutil.MaxTimestamp {
keys, values, err := ss.MetaKv.LoadWithPrefix(key)
fks := keys[:0] // make([]string, 0, len(keys))
fvs := values[:0] // make([]string, 0, len(values))
Expand Down Expand Up @@ -605,6 +604,9 @@ func (ss *SuffixSnapshot) batchRemoveExpiredKvs(keyGroup []string, originalKey s
// It walks through all keys with the snapshot prefix, groups them by original key,
// and removes expired versions or all versions if the original key has been deleted
func (ss *SuffixSnapshot) removeExpiredKvs(now time.Time) error {
ttlTime := paramtable.Get().ServiceParam.MetaStoreCfg.SnapshotTTLSeconds.GetAsDuration(time.Second)
reserveTime := paramtable.Get().ServiceParam.MetaStoreCfg.SnapshotReserveTimeSeconds.GetAsDuration(time.Second)

candidateExpiredKeys := make([]string, 0)
latestOriginalKey := ""
latestOriginValue := ""
Expand All @@ -625,7 +627,7 @@ func (ss *SuffixSnapshot) removeExpiredKvs(now time.Time) error {
for _, key := range candidateExpiredKeys {
ts, _ := ss.isTSKey(key)
expireTime, _ := tsoutil.ParseTS(ts)
if expireTime.Add(DefaultSnapshotTTL).Before(now) {
if expireTime.Add(ttlTime).Before(now) {
expiredKeys = append(expiredKeys, key)
}
}
Expand Down Expand Up @@ -666,7 +668,7 @@ func (ss *SuffixSnapshot) removeExpiredKvs(now time.Time) error {

// Record versions that are already expired but not removed
time, _ := tsoutil.ParseTS(ts)
if time.Add(DefaultSnapshotReserveTime).Before(now) {
if time.Add(reserveTime).Before(now) {
candidateExpiredKeys = append(candidateExpiredKeys, key)
}

Expand Down
22 changes: 21 additions & 1 deletion pkg/util/paramtable/service_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,9 @@ It is recommended to change this parameter before starting Milvus for the first
}

type MetaStoreConfig struct {
MetaStoreType ParamItem `refreshable:"false"`
MetaStoreType ParamItem `refreshable:"false"`
SnapshotTTLSeconds ParamItem `refreshable:"true"`
SnapshotReserveTimeSeconds ParamItem `refreshable:"true"`
}

func (p *MetaStoreConfig) Init(base *BaseTable) {
Expand All @@ -468,6 +470,24 @@ func (p *MetaStoreConfig) Init(base *BaseTable) {
}
p.MetaStoreType.Init(base.mgr)

p.SnapshotTTLSeconds = ParamItem{
Key: "metastore.snapshot.ttl",
Version: "2.4.14",
DefaultValue: "86400",
Doc: `snapshot ttl in seconds`,
Export: true,
}
p.SnapshotTTLSeconds.Init(base.mgr)

p.SnapshotReserveTimeSeconds = ParamItem{
Key: "metastore.snapshot.reserveTime",
Version: "2.4.14",
DefaultValue: "3600",
Doc: `snapshot reserve time in seconds`,
Export: true,
}
p.SnapshotReserveTimeSeconds.Init(base.mgr)

// TODO: The initialization operation of metadata storage is called in the initialization phase of every node.
// There should be a single initialization operation for meta store, then move the metrics registration to there.
metrics.RegisterMetaType(p.MetaStoreType.GetValue())
Expand Down

0 comments on commit dde7979

Please sign in to comment.