diff --git a/configs/milvus.yaml b/configs/milvus.yaml index c7cd7cfb34e71..1b4f5e86539e3 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -64,6 +64,9 @@ etcd: metastore: type: etcd # Default value: etcd, Valid values: [etcd, tikv] + snapshot: + ttl: 86400 # snapshot ttl in seconds + reserveTime: 3600 # snapshot reserve time in seconds # Related configuration of tikv, used to store Milvus metadata. # Notice that when TiKV is enabled for metastore, you still need to have etcd for service discovery. diff --git a/internal/metastore/kv/rootcoord/suffix_snapshot.go b/internal/metastore/kv/rootcoord/suffix_snapshot.go index 37fad9d598f2c..9b211e0867cf2 100644 --- a/internal/metastore/kv/rootcoord/suffix_snapshot.go +++ b/internal/metastore/kv/rootcoord/suffix_snapshot.go @@ -34,6 +34,7 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util" "github.com/milvus-io/milvus/pkg/util/etcd" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/retry" "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -41,10 +42,8 @@ import ( var ( // SuffixSnapshotTombstone special value for tombstone mark - SuffixSnapshotTombstone = []byte{0xE2, 0x9B, 0xBC} - PaginationSize = 5000 - DefaultSnapshotReserveTime = 1 * time.Hour - DefaultSnapshotTTL = 24 * time.Hour + SuffixSnapshotTombstone = []byte{0xE2, 0x9B, 0xBC} + PaginationSize = 5000 ) // IsTombstone used in migration tool also. @@ -308,9 +307,9 @@ func (ss *SuffixSnapshot) Save(key string, value string, ts typeutil.Timestamp) } func (ss *SuffixSnapshot) Load(key string, ts typeutil.Timestamp) (string, error) { - // if ts == 0, load latest by definition + // if ts == 0 or typeutil.MaxTimestamp, load latest by definition // and with acceleration logic, just do load key will do - if ts == 0 { + if ts == 0 || ts == typeutil.MaxTimestamp { value, err := ss.MetaKv.Load(key) if ss.isTombstone(value) { return "", errors.New("no value found") @@ -434,7 +433,7 @@ func (ss *SuffixSnapshot) generateSaveExecute(kvs map[string]string, ts typeutil // LoadWithPrefix load keys with provided prefix and returns value in the ts func (ss *SuffixSnapshot) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error) { // ts 0 case shall be treated as fetch latest/current value - if ts == 0 { + if ts == 0 || ts == typeutil.MaxTimestamp { keys, values, err := ss.MetaKv.LoadWithPrefix(key) fks := keys[:0] // make([]string, 0, len(keys)) fvs := values[:0] // make([]string, 0, len(values)) @@ -605,6 +604,9 @@ func (ss *SuffixSnapshot) batchRemoveExpiredKvs(keyGroup []string, originalKey s // It walks through all keys with the snapshot prefix, groups them by original key, // and removes expired versions or all versions if the original key has been deleted func (ss *SuffixSnapshot) removeExpiredKvs(now time.Time) error { + ttlTime := paramtable.Get().ServiceParam.MetaStoreCfg.SnapshotTTLSeconds.GetAsDuration(time.Second) + reserveTime := paramtable.Get().ServiceParam.MetaStoreCfg.SnapshotReserveTimeSeconds.GetAsDuration(time.Second) + candidateExpiredKeys := make([]string, 0) latestOriginalKey := "" latestOriginValue := "" @@ -625,7 +627,7 @@ func (ss *SuffixSnapshot) removeExpiredKvs(now time.Time) error { for _, key := range candidateExpiredKeys { ts, _ := ss.isTSKey(key) expireTime, _ := tsoutil.ParseTS(ts) - if expireTime.Add(DefaultSnapshotTTL).Before(now) { + if expireTime.Add(ttlTime).Before(now) { expiredKeys = append(expiredKeys, key) } } @@ -666,7 +668,7 @@ func (ss *SuffixSnapshot) removeExpiredKvs(now time.Time) error { // Record versions that are already expired but not removed time, _ := tsoutil.ParseTS(ts) - if time.Add(DefaultSnapshotReserveTime).Before(now) { + if time.Add(reserveTime).Before(now) { candidateExpiredKeys = append(candidateExpiredKeys, key) } diff --git a/pkg/util/paramtable/service_param.go b/pkg/util/paramtable/service_param.go index e9c419da2b054..893e27be85fac 100644 --- a/pkg/util/paramtable/service_param.go +++ b/pkg/util/paramtable/service_param.go @@ -455,7 +455,9 @@ It is recommended to change this parameter before starting Milvus for the first } type MetaStoreConfig struct { - MetaStoreType ParamItem `refreshable:"false"` + MetaStoreType ParamItem `refreshable:"false"` + SnapshotTTLSeconds ParamItem `refreshable:"true"` + SnapshotReserveTimeSeconds ParamItem `refreshable:"true"` } func (p *MetaStoreConfig) Init(base *BaseTable) { @@ -468,6 +470,24 @@ func (p *MetaStoreConfig) Init(base *BaseTable) { } p.MetaStoreType.Init(base.mgr) + p.SnapshotTTLSeconds = ParamItem{ + Key: "metastore.snapshot.ttl", + Version: "2.4.14", + DefaultValue: "86400", + Doc: `snapshot ttl in seconds`, + Export: true, + } + p.SnapshotTTLSeconds.Init(base.mgr) + + p.SnapshotReserveTimeSeconds = ParamItem{ + Key: "metastore.snapshot.reserveTime", + Version: "2.4.14", + DefaultValue: "3600", + Doc: `snapshot reserve time in seconds`, + Export: true, + } + p.SnapshotReserveTimeSeconds.Init(base.mgr) + // TODO: The initialization operation of metadata storage is called in the initialization phase of every node. // There should be a single initialization operation for meta store, then move the metrics registration to there. metrics.RegisterMetaType(p.MetaStoreType.GetValue())