forked from milvus-io/milvus
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathglobal_allocator.go
168 lines (147 loc) · 5.61 KB
/
global_allocator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Copyright 2016 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package tso
import (
"sync/atomic"
"time"
"github.com/cockroachdb/errors"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
// Allocator is a Timestamp Oracle allocator.
//
//go:generate mockery --name=Allocator --outpkg=mocktso
type Allocator interface {
// Initialize is used to initialize a TSO allocator.
// It will synchronize TSO with etcd and initialize the
// memory for later allocation work.
Initialize() error
// UpdateTSO is used to update the TSO in memory and the time window in etcd.
UpdateTSO() error
// SetTSO sets the physical part with given tso. It's mainly used for BR restore
// and can not forcibly set the TSO smaller than now.
SetTSO(tso uint64) error
// GenerateTSO is used to generate a given number of TSOs.
// Make sure you have initialized the TSO allocator before calling.
GenerateTSO(count uint32) (uint64, error)
// Reset is used to reset the TSO allocator.
Reset()
GetLastSavedTime() time.Time
}
// GlobalTSOAllocator is the global single point TSO allocator.
type GlobalTSOAllocator struct {
tso *timestampOracle
LimitMaxLogic bool
}
// NewGlobalTSOAllocator creates a new global TSO allocator.
func NewGlobalTSOAllocator(key string, txnKV kv.TxnKV) *GlobalTSOAllocator {
return &GlobalTSOAllocator{
tso: ×tampOracle{
txnKV: txnKV,
saveInterval: 3 * time.Second,
maxResetTSGap: func() time.Duration { return 3 * time.Second },
key: key,
},
LimitMaxLogic: true,
}
}
// Initialize will initialize the created global TSO allocator.
func (gta *GlobalTSOAllocator) Initialize() error {
return gta.tso.InitTimestamp()
}
// SetLimitMaxLogic is to enable or disable the maximum limit on the logical part of the hybrid timestamp.
// When enabled, if the logical part of the hybrid timestamp exceeds the maximum limit,
// GlobalTSOAllocator will sleep for a period and try to allocate the timestamp again.
func (gta *GlobalTSOAllocator) SetLimitMaxLogic(flag bool) {
gta.LimitMaxLogic = flag
}
// UpdateTSO is used to update the TSO in memory and the time window in etcd.
func (gta *GlobalTSOAllocator) UpdateTSO() error {
return gta.tso.UpdateTimestamp()
}
// SetTSO sets the physical part with given tso.
func (gta *GlobalTSOAllocator) SetTSO(tso uint64) error {
return gta.tso.ResetUserTimestamp(tso)
}
// GenerateTSO is used to generate a given number of TSOs.
// Make sure you have initialized the TSO allocator before calling.
func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (uint64, error) {
var physical, logical int64
if count == 0 {
return 0, errors.New("tso count should be positive")
}
maxRetryCount := 10
for i := 0; i < maxRetryCount; i++ {
current := (*atomicObject)(atomic.LoadPointer(>a.tso.TSO))
if current == nil || current.physical.Equal(typeutil.ZeroTime) {
// If it's leader, maybe SyncTimestamp hasn't completed yet
log.Info("sync hasn't completed yet, wait for a while")
time.Sleep(200 * time.Millisecond)
continue
}
physical = current.physical.UnixMilli()
logical = atomic.AddInt64(¤t.logical, int64(count))
if logical >= maxLogical && gta.LimitMaxLogic {
log.Info("logical part outside of max logical interval, please check ntp time",
zap.Int("retry-count", i))
time.Sleep(UpdateTimestampStep)
continue
}
return tsoutil.ComposeTS(physical, logical), nil
}
return 0, errors.New("can not get timestamp")
}
// Alloc allocates a batch of timestamps. What is returned is the starting timestamp.
func (gta *GlobalTSOAllocator) Alloc(count uint32) (typeutil.Timestamp, error) {
// return gta.tso.SyncTimestamp()
start, err := gta.GenerateTSO(count)
if err != nil {
return typeutil.ZeroTimestamp, err
}
//ret := make([]typeutil.Timestamp, count)
//for i:=uint32(0); i < count; i++{
// ret[i] = start + uint64(i)
//}
return start, err
}
// AllocOne only allocates one timestamp.
func (gta *GlobalTSOAllocator) AllocOne() (typeutil.Timestamp, error) {
return gta.GenerateTSO(1)
}
// Reset is used to reset the TSO allocator.
func (gta *GlobalTSOAllocator) Reset() {
gta.tso.ResetTimestamp()
}
// GetLastSavedTime get the last saved time for tso.
func (gta *GlobalTSOAllocator) GetLastSavedTime() time.Time {
ts := gta.tso.lastSavedTime.Load()
return ts.(time.Time)
}