From f62e961af52d774118920d9f1e99892fe6727cb1 Mon Sep 17 00:00:00 2001 From: James Yin Date: Tue, 24 May 2022 15:10:09 +0800 Subject: [PATCH] refactor: segment server --- .gitignore | 19 +- go.mod | 2 +- go.sum | 10 - internal/raft/log/compaction.go | 12 +- internal/raft/log/recovery.go | 4 +- internal/raft/log/wal.go | 19 +- internal/raft/transport/host.go | 13 +- internal/raft/transport/host_test.go | 15 + internal/raft/transport/loopback_test.go | 15 + internal/raft/transport/multiplexing.go | 6 +- internal/raft/transport/multiplexing_test.go | 15 + internal/raft/transport/peer_test.go | 15 + internal/raft/transport/resolver.go | 7 + internal/raft/transport/server.go | 13 +- internal/raft/transport/server_test.go | 15 + internal/raft/transport/transport_test.go | 15 + internal/store/block/block.go | 66 ++++ .../store/{segment => }/block/block_test.go | 0 internal/store/{segment => }/block/entry.go | 3 +- .../store/{segment => }/block/entry_test.go | 2 +- .../block/file.go => block/file/block.go} | 329 +++++++++--------- .../file_test.go => block/file/block_test.go} | 2 +- .../{segment/block => block/file}/index.go | 2 +- internal/store/block/file/index_test.go | 15 + internal/store/block/file/open.go | 90 +++++ internal/store/block/file/open_test.go | 15 + .../{segment/block => block/file}/recovery.go | 23 +- .../block => block/file}/recovery_test.go | 2 +- .../block => block/replica}/replica.go | 243 ++++++------- .../block => block/replica}/replica_test.go | 2 +- internal/store/meta/store_test.go | 15 + internal/store/segment/block/block.go | 129 ------- internal/store/segment/errors/errors.go | 15 +- internal/store/segment/recovery.go | 52 +-- .../store/segment/{segment.go => server.go} | 263 +++++++------- .../{segment_test.go => server_test.go} | 0 internal/store/wal/compaction_test.go | 15 + internal/store/wal/record/record.go | 2 +- internal/store/wal/record/record_test.go | 87 +++++ internal/store/wal/stream.go | 1 + internal/store/wal/wal.go | 7 +- 41 files changed, 942 insertions(+), 633 deletions(-) create mode 100644 internal/raft/transport/host_test.go create mode 100644 internal/raft/transport/loopback_test.go create mode 100644 internal/raft/transport/multiplexing_test.go create mode 100644 internal/raft/transport/peer_test.go create mode 100644 internal/raft/transport/server_test.go create mode 100644 internal/raft/transport/transport_test.go create mode 100644 internal/store/block/block.go rename internal/store/{segment => }/block/block_test.go (100%) rename internal/store/{segment => }/block/entry.go (97%) rename internal/store/{segment => }/block/entry_test.go (98%) rename internal/store/{segment/block/file.go => block/file/block.go} (58%) rename internal/store/{segment/block/file_test.go => block/file/block_test.go} (97%) rename internal/store/{segment/block => block/file}/index.go (98%) create mode 100644 internal/store/block/file/index_test.go create mode 100644 internal/store/block/file/open.go create mode 100644 internal/store/block/file/open_test.go rename internal/store/{segment/block => block/file}/recovery.go (82%) rename internal/store/{segment/block => block/file}/recovery_test.go (97%) rename internal/store/{segment/block => block/replica}/replica.go (67%) rename internal/store/{segment/block => block/replica}/replica_test.go (98%) create mode 100644 internal/store/meta/store_test.go delete mode 100644 internal/store/segment/block/block.go rename internal/store/segment/{segment.go => server.go} (72%) rename internal/store/segment/{segment_test.go => server_test.go} (100%) create mode 100644 internal/store/wal/compaction_test.go diff --git a/.gitignore b/.gitignore index f0a71a946..8e1c8ef9d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,10 +1,3 @@ - -# configuration file -./config/*.json -config/*.yaml -config/cluster/*.yaml -./config/*.toml - # macOS .DS_Store @@ -15,16 +8,24 @@ config/cluster/*.yaml #!.vscode/launch.json #!.vscode/extensions.json #!.vscode/*.code-snippets +__debug_bin # JetBrains .idea/ +# configuration file +/config/*.json +/config/*.toml +/config/*.yaml +/config/cluster/*.yaml +/config/**/*.local.* + # data dir of etcd etcd bin -ø + *.xml *coverage* -.run \ No newline at end of file +.run diff --git a/go.mod b/go.mod index c7aa60ed9..00ce3f0ce 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/golang/mock v1.6.0 github.com/golang/protobuf v1.5.2 github.com/google/cel-go v0.11.2 + github.com/google/uuid v1.3.0 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/huandu/skiplist v1.2.0 github.com/labstack/echo/v4 v4.7.2 @@ -56,7 +57,6 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang-jwt/jwt v3.2.2+incompatible // indirect github.com/google/btree v1.0.1 // indirect - github.com/google/uuid v1.3.0 // indirect github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 // indirect github.com/gorilla/websocket v1.4.2 // indirect github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect diff --git a/go.sum b/go.sum index ee0180ae1..30877f820 100644 --- a/go.sum +++ b/go.sum @@ -77,7 +77,6 @@ github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA github.com/certifi/gocertifi v0.0.0-20191021191039-0944d244cd40/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA= github.com/certifi/gocertifi v0.0.0-20200922220541-2c3bb06c6054 h1:uH66TXeswKn5PW5zdZ39xEwfS9an067BirqA+P4QaLI= github.com/certifi/gocertifi v0.0.0-20200922220541-2c3bb06c6054/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA= -github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= @@ -106,7 +105,6 @@ github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkE github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= -github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e h1:Wf6HqHfScWJN9/ZjdUKyjop4mf3Qdd+1TvvltAvM3m8= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI= github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= @@ -178,7 +176,6 @@ github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69 github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY= github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= -github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4= github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -230,7 +227,6 @@ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o= -github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.1.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= @@ -634,7 +630,6 @@ golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= -golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211029224645-99673261e6eb/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211209124913-491a49abca63/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd h1:O7DYs+zxREGLKzKoMQrtrEacpb0ZVXA5rIwylE2Xchk= @@ -721,18 +716,15 @@ golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210603125802-9665404d3644/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210831042530-f4d43177bf5e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211103235746-7861aae1554b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220209214540-3681064d5158 h1:rm+CHSpPEEW2IsXUib1ThaHIjuBVZjxNgSKmBLFfD4c= golang.org/x/sys v0.0.0-20220209214540-3681064d5158/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -746,7 +738,6 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac h1:7zkz7BUtwNFFqcowJ+RIgu2MaV/MapERkDIy+mwPyjs= golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -885,7 +876,6 @@ google.golang.org/genproto v0.0.0-20210310155132-4ce2db91004e/go.mod h1:FWY/as6D google.golang.org/genproto v0.0.0-20210319143718-93e7006c17a6/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20210402141018-6c239bbf2bb1/go.mod h1:9lPAdzaEmUacj36I+k7YKbEc5CXzPIeORRgDAUOu28A= google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0= -google.golang.org/genproto v0.0.0-20220218161850-94dd64e39d7c/go.mod h1:kGP+zUP2Ddo0ayMi4YuN7C3WZyJvGLZRh8Z5wnAqvEI= google.golang.org/genproto v0.0.0-20220310185008-1973136f34c6 h1:FglFEfyj61zP3c6LgjmVHxYxZWXYul9oiS1EZqD5gLc= google.golang.org/genproto v0.0.0-20220310185008-1973136f34c6/go.mod h1:kGP+zUP2Ddo0ayMi4YuN7C3WZyJvGLZRh8Z5wnAqvEI= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= diff --git a/internal/raft/log/compaction.go b/internal/raft/log/compaction.go index 25b9844c5..bc72457d9 100644 --- a/internal/raft/log/compaction.go +++ b/internal/raft/log/compaction.go @@ -83,9 +83,9 @@ func (l *Log) Compact(i uint64) error { return nil } -func (w *WAL) suppressCompact(cb exeCallback) error { +func (w *WAL) suppressCompact(cb executeCallback) error { result := make(chan error, 1) - w.exec <- exeTask{ + w.executec <- executeTask{ cb: cb, result: result, } @@ -93,7 +93,7 @@ func (w *WAL) suppressCompact(cb exeCallback) error { } func (w *WAL) tryCompact(offset, last int64, nodeID vanus.ID, index, term uint64) { - w.exec <- exeTask{ + w.executec <- executeTask{ cb: func() (compactTask, error) { return compactTask{ offset: offset, @@ -147,7 +147,7 @@ func (m *compactMeta) Range(cb meta.RangeCallback) error { var emptyMark = struct{}{} func (w WAL) run() { - for task := range w.exec { + for task := range w.executec { ct, err := task.cb() if task.result != nil { if err != nil { @@ -163,8 +163,8 @@ func (w WAL) run() { } func (w *WAL) runCompact() { - peroid := 30 * time.Second - ticker := time.NewTicker(peroid) + period := 30 * time.Second + ticker := time.NewTicker(period) defer ticker.Stop() var compacted int64 diff --git a/internal/raft/log/recovery.go b/internal/raft/log/recovery.go index b2ec7e2c9..fad03f7b2 100644 --- a/internal/raft/log/recovery.go +++ b/internal/raft/log/recovery.go @@ -31,7 +31,9 @@ import ( "github.com/linkall-labs/vanus/observability/log" ) -func RecoverLogsAndWAL(walDir string, metaStore *meta.SyncStore, offsetStore *meta.AsyncStore) (map[vanus.ID]*Log, *WAL, error) { +func RecoverLogsAndWAL( + walDir string, metaStore *meta.SyncStore, offsetStore *meta.AsyncStore, +) (map[vanus.ID]*Log, *WAL, error) { var compacted int64 if v, exist := metaStore.Load(walCompactKey); exist { var ok bool diff --git a/internal/raft/log/wal.go b/internal/raft/log/wal.go index 24fe6fcde..5b78bb4f1 100644 --- a/internal/raft/log/wal.go +++ b/internal/raft/log/wal.go @@ -24,10 +24,13 @@ import ( walog "github.com/linkall-labs/vanus/internal/store/wal" ) -var ( - walCompactKey = []byte("wal/compact") +const ( + defaultExecuteTaskBufferSize = 256 + defaultCompactTaskBufferSize = 256 ) +var walCompactKey = []byte("wal/compact") + type compactInfo struct { index, term uint64 } @@ -38,10 +41,10 @@ type compactTask struct { info compactInfo } -type exeCallback func() (compactTask, error) +type executeCallback func() (compactTask, error) -type exeTask struct { - cb exeCallback +type executeTask struct { + cb executeCallback result chan error } @@ -51,7 +54,7 @@ type WAL struct { metaStore *meta.SyncStore barrier *skiplist.SkipList - exec chan exeTask + executec chan executeTask compactc chan compactTask } @@ -60,8 +63,8 @@ func newWAL(wal *walog.WAL, metaStore *meta.SyncStore) *WAL { WAL: wal, metaStore: metaStore, barrier: skiplist.New(skiplist.Int64), - exec: make(chan exeTask, 256), - compactc: make(chan compactTask, 256), + executec: make(chan executeTask, defaultExecuteTaskBufferSize), + compactc: make(chan compactTask, defaultCompactTaskBufferSize), } go w.run() diff --git a/internal/raft/transport/host.go b/internal/raft/transport/host.go index f48231298..66ffa7f3e 100644 --- a/internal/raft/transport/host.go +++ b/internal/raft/transport/host.go @@ -56,6 +56,7 @@ func NewHost(resolver Resolver, callback string) Host { func (h *host) Send(ctx context.Context, msg *raftpb.Message, to uint64, endpoint string) { mux := h.resolveMultiplexer(ctx, to, endpoint) if mux == nil { + // TODO(james.yin): report MsgUnreachable. return } mux.Send(msg) @@ -64,6 +65,7 @@ func (h *host) Send(ctx context.Context, msg *raftpb.Message, to uint64, endpoin func (h *host) Sendv(ctx context.Context, msgs []*raftpb.Message, to uint64, endpoint string) { mux := h.resolveMultiplexer(ctx, to, endpoint) if mux == nil { + // TODO(james.yin): report MsgUnreachable. return } mux.Sendv(msgs) @@ -81,21 +83,24 @@ func (h *host) resolveMultiplexer(ctx context.Context, to uint64, endpoint strin } if mux, ok := h.peers.Load(endpoint); ok { - return mux.(*peer) + p, _ := mux.(*peer) + return p } // TODO(james.yin): clean unused peer p := newPeer(context.TODO(), endpoint, h.callback) if mux, loaded := h.peers.LoadOrStore(endpoint, p); loaded { defer p.Close() - return mux.(*peer) + p2, _ := mux.(*peer) + return p2 } return p } -// Receive implements Demultiplexer +// Receive implements Demultiplexer. func (h *host) Receive(ctx context.Context, msg *raftpb.Message, endpoint string) error { if receiver, ok := h.receivers.Load(msg.To); ok { - receiver.(Receiver).Receive(ctx, msg, msg.From, endpoint) + r, _ := receiver.(Receiver) + r.Receive(ctx, msg, msg.From, endpoint) } return nil } diff --git a/internal/raft/transport/host_test.go b/internal/raft/transport/host_test.go new file mode 100644 index 000000000..aab76cb1f --- /dev/null +++ b/internal/raft/transport/host_test.go @@ -0,0 +1,15 @@ +// Copyright 2022 Linkall Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transport diff --git a/internal/raft/transport/loopback_test.go b/internal/raft/transport/loopback_test.go new file mode 100644 index 000000000..aab76cb1f --- /dev/null +++ b/internal/raft/transport/loopback_test.go @@ -0,0 +1,15 @@ +// Copyright 2022 Linkall Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transport diff --git a/internal/raft/transport/multiplexing.go b/internal/raft/transport/multiplexing.go index 072785e30..da61b024c 100644 --- a/internal/raft/transport/multiplexing.go +++ b/internal/raft/transport/multiplexing.go @@ -15,16 +15,16 @@ package transport import ( - // standard libraries + // standard libraries. "context" - // third-party libraries + // first-party libraries. "github.com/linkall-labs/raft/raftpb" ) type Multiplexer interface { Send(msg *raftpb.Message) - Sendv(msg []*raftpb.Message) + Sendv(msgs []*raftpb.Message) } type Demultiplexer interface { diff --git a/internal/raft/transport/multiplexing_test.go b/internal/raft/transport/multiplexing_test.go new file mode 100644 index 000000000..aab76cb1f --- /dev/null +++ b/internal/raft/transport/multiplexing_test.go @@ -0,0 +1,15 @@ +// Copyright 2022 Linkall Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transport diff --git a/internal/raft/transport/peer_test.go b/internal/raft/transport/peer_test.go new file mode 100644 index 000000000..aab76cb1f --- /dev/null +++ b/internal/raft/transport/peer_test.go @@ -0,0 +1,15 @@ +// Copyright 2022 Linkall Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transport diff --git a/internal/raft/transport/resolver.go b/internal/raft/transport/resolver.go index ab9099df1..67edfd22b 100644 --- a/internal/raft/transport/resolver.go +++ b/internal/raft/transport/resolver.go @@ -17,6 +17,8 @@ package transport import ( // standard libraries "sync" + + "github.com/linkall-labs/vanus/observability/log" ) type Resolver interface { @@ -41,6 +43,11 @@ func (r *SimpleResolver) Resolve(node uint64) string { } func (r *SimpleResolver) Register(node uint64, endpoint string) { + log.Info(nil, "Register raft node route.", map[string]interface{}{ + "nodeID": node, + "endpoint": endpoint, + }) + r.Lock() defer r.Unlock() r.nodes[node] = endpoint diff --git a/internal/raft/transport/server.go b/internal/raft/transport/server.go index 71a3daa61..fbb640cb5 100644 --- a/internal/raft/transport/server.go +++ b/internal/raft/transport/server.go @@ -15,14 +15,15 @@ package transport import ( - // standard libraries + // standard libraries. "context" + "errors" "io" - // third-party libraries + // third-party libraries. emptypb "google.golang.org/protobuf/types/known/emptypb" - // first-party libraries + // first-party libraries. raftpb "github.com/linkall-labs/vsproto/pkg/raft" ) @@ -41,7 +42,7 @@ func NewRaftServer(ctx context.Context, dmx Demultiplexer) raftpb.RaftServerServ } } -// SendMessage implements raftpb.RaftServerServer +// SendMessage implements raftpb.RaftServerServer. func (s *server) SendMsssage(stream raftpb.RaftServer_SendMsssageServer) error { preface, err := stream.Recv() if err != nil { @@ -54,7 +55,7 @@ func (s *server) SendMsssage(stream raftpb.RaftServer_SendMsssageServer) error { msg, err := stream.Recv() if err != nil { // close by client - if err == io.EOF { + if errors.Is(err, io.EOF) { return s.closeStream(stream) } return err @@ -63,7 +64,7 @@ func (s *server) SendMsssage(stream raftpb.RaftServer_SendMsssageServer) error { err = s.dmx.Receive(s.ctx, msg, callback) if err != nil { // server is closed - if err == context.Canceled { + if errors.Is(err, context.Canceled) { return s.closeStream(stream) } diff --git a/internal/raft/transport/server_test.go b/internal/raft/transport/server_test.go new file mode 100644 index 000000000..aab76cb1f --- /dev/null +++ b/internal/raft/transport/server_test.go @@ -0,0 +1,15 @@ +// Copyright 2022 Linkall Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transport diff --git a/internal/raft/transport/transport_test.go b/internal/raft/transport/transport_test.go new file mode 100644 index 000000000..aab76cb1f --- /dev/null +++ b/internal/raft/transport/transport_test.go @@ -0,0 +1,15 @@ +// Copyright 2022 Linkall Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transport diff --git a/internal/store/block/block.go b/internal/store/block/block.go new file mode 100644 index 000000000..d3aaa959f --- /dev/null +++ b/internal/store/block/block.go @@ -0,0 +1,66 @@ +// Copyright 2022 Linkall Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package block + +import ( + // standard libraries. + "context" + "errors" + + // first-party libraries. + metapb "github.com/linkall-labs/vsproto/pkg/meta" + + // this project. + "github.com/linkall-labs/vanus/internal/primitive/vanus" +) + +var ( + ErrNotEnoughSpace = errors.New("not enough space") + ErrFull = errors.New("full") + ErrNotLeader = errors.New("not leader") + ErrOffsetExceeded = errors.New("the offset exceeded") + ErrOffsetOnEnd = errors.New("the offset on end") +) + +type Appender interface { + Append(ctx context.Context, entries ...Entry) error +} + +type AppendContext interface { + Full() bool + MarkFull() + FullEntry() Entry +} + +type TwoPCAppender interface { + NewAppendContext(last *Entry) AppendContext + PrepareAppend(ctx context.Context, appendCtx AppendContext, entries ...Entry) error + CommitAppend(ctx context.Context, entries ...Entry) error + MarkFull(ctx context.Context) error +} + +type Reader interface { + Read(context.Context, int, int) ([]Entry, error) +} + +type Block interface { + ID() vanus.ID + Close(context.Context) error + HealthInfo() *metapb.SegmentHealthInfo +} + +type ClusterInfoSource interface { + FillClusterInfo(info *metapb.SegmentHealthInfo) +} diff --git a/internal/store/segment/block/block_test.go b/internal/store/block/block_test.go similarity index 100% rename from internal/store/segment/block/block_test.go rename to internal/store/block/block_test.go diff --git a/internal/store/segment/block/entry.go b/internal/store/block/entry.go similarity index 97% rename from internal/store/segment/block/entry.go rename to internal/store/block/entry.go index 9d38ceaa0..95027d5b5 100644 --- a/internal/store/segment/block/entry.go +++ b/internal/store/block/entry.go @@ -23,6 +23,7 @@ type Entry struct { Offset uint32 Index uint32 Payload []byte + // TODO: CRC } // Size returns the size of space used by the entry in storage. @@ -51,7 +52,7 @@ func (e Entry) doMarshalTo(data []byte, values ...uint32) (int, error) { binary.BigEndian.PutUint32(data[so:], value) so += 4 } - if len(e.Payload) > 0 { + if len(e.Payload) != 0 { so += copy(data[so:], e.Payload) } return so, nil diff --git a/internal/store/segment/block/entry_test.go b/internal/store/block/entry_test.go similarity index 98% rename from internal/store/segment/block/entry_test.go rename to internal/store/block/entry_test.go index d49997946..b924bd59c 100644 --- a/internal/store/segment/block/entry_test.go +++ b/internal/store/block/entry_test.go @@ -20,7 +20,7 @@ import ( . "github.com/smartystreets/goconvey/convey" ) -func TestMarshal(t *testing.T) { +func TestEntry_MarshalTo(t *testing.T) { Convey("v0.0.1 serialization testing", t, func() { e := &Entry{ Payload: []byte{ diff --git a/internal/store/segment/block/file.go b/internal/store/block/file/block.go similarity index 58% rename from internal/store/segment/block/file.go rename to internal/store/block/file/block.go index 872e32af0..10744072a 100644 --- a/internal/store/segment/block/file.go +++ b/internal/store/block/file/block.go @@ -12,15 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -package block +package file import ( // standard libraries. "context" "encoding/binary" - "fmt" "os" - "path/filepath" "sync" "time" @@ -28,135 +26,121 @@ import ( "go.uber.org/atomic" // first-party libraries. - "github.com/linkall-labs/vsproto/pkg/meta" + metapb "github.com/linkall-labs/vsproto/pkg/meta" // this project. "github.com/linkall-labs/vanus/internal/primitive/vanus" + "github.com/linkall-labs/vanus/internal/store/block" "github.com/linkall-labs/vanus/internal/store/segment/errors" "github.com/linkall-labs/vanus/observability" "github.com/linkall-labs/vanus/observability/log" ) const ( - blockExt = ".block" - fileBlockHeaderSize = 4 * 1024 - - // version + capacity + size + number + full - v1FileBlockHeaderLength = 4 + 8 + 4 + 8 + 1 - entryLengthSize = 4 + headerSize = 4 * 1024 + // headerLength = version + capacity + size + number + full. + v1HeaderLength = 4 + 8 + 4 + 8 + 1 + entryLengthSize = 4 ) -func resolvePath(blockDir string, id vanus.ID) string { - return filepath.Join(blockDir, fmt.Sprintf("%020d%s", id.Uint64(), blockExt)) -} - -type FileBlock struct { +// Block +// +// The layout of Block is: +// ┌────────────────┬─────────────────────┬───────────────┐ +// │ Header Block │ Data Blocks ... │ Index Block │ +// └────────────────┴─────────────────────┴───────────────┘ +// An index Block contains one entry per data Block. +type Block struct { version int32 id vanus.ID path string cap int64 - size atomic.Int64 + wo int64 + fo atomic.Int64 num atomic.Int32 - wo atomic.Int64 + full atomic.Bool mux sync.Mutex f *os.File - cis ClusterInfoSource + cis block.ClusterInfoSource indexes []index - readable atomic.Bool - appendable atomic.Bool - full atomic.Bool - uncompletedReadRequestCount atomic.Int32 uncompletedAppendRequestCount atomic.Int32 } -func (b *FileBlock) Initialize(ctx context.Context) error { - if err := b.loadHeader(ctx); err != nil { - return err - } - b.wo.Store(fileBlockHeaderSize + b.size.Load()) - - if b.full.Load() { - b.appendable.Store(false) - } else { - if _, err := b.f.Seek(b.wo.Load(), 0); err != nil { - return err - } - } +// Make sure block implements Block, TwoPCAppender and Reader. +var ( + _ block.Block = (*Block)(nil) + _ block.TwoPCAppender = (*Block)(nil) + _ block.Reader = (*Block)(nil) +) - if err := b.loadIndex(ctx); err != nil { - return err - } +type appendContext struct { + offset int + num int + full bool +} - if err := b.validate(ctx); err != nil { - return err - } +// Make sure appendContext implements AppendContext. +var _ block.AppendContext = (*appendContext)(nil) - return nil +func (w *appendContext) Full() bool { + return w.full } -func (b *FileBlock) Append(ctx context.Context, entities ...Entry) error { - observability.EntryMark(ctx) - // TODO: optimize lock. - b.mux.Lock() - b.uncompletedAppendRequestCount.Add(1) - defer func() { - observability.LeaveMark(ctx) - b.mux.Unlock() - b.uncompletedAppendRequestCount.Sub(1) - }() +func (w *appendContext) MarkFull() { + w.full = true +} - if len(entities) == 0 { - return nil +func (w *appendContext) FullEntry() block.Entry { + return block.Entry{ + Offset: uint32(w.offset), + Index: uint32(w.num), } +} - length := 0 - for _, entry := range entities { - length += entry.Size() +func (b *Block) NewAppendContext(last *block.Entry) block.AppendContext { + if last != nil { + return &appendContext{ + offset: int(last.Offset) + len(last.Payload), + num: int(last.Index + 1), + full: len(last.Payload) == 0, + } } - if length+v1IndexLength*len(entities) > b.remaining() { - b.full.Store(true) - return ErrNoEnoughCapacity + return &appendContext{ + offset: int(b.wo), + num: int(b.num.Load()), + full: b.full.Load(), } +} - var so, wo int64 = 0, b.wo.Load() - buf := make([]byte, length) - indexs := make([]index, 0, len(entities)) - for _, entry := range entities { - n, _ := entry.MarshalTo(buf[so:]) - bi := index{ - offset: wo + so, - length: int32(n), - } - indexs = append(indexs, bi) - so += int64(n) - } +func (b *Block) PrepareAppend(ctx context.Context, appendCtx block.AppendContext, entries ...block.Entry) error { + actx, _ := appendCtx.(*appendContext) - n, err := b.f.Write(buf) - if err != nil { - return err + var size int + for i := range entries { + entry := &entries[i] + entry.Offset = uint32(actx.offset + size) + entry.Index = uint32(actx.num + i) + size += entry.Size() } - b.indexes = append(b.indexes, indexs...) - - b.num.Add(int32(len(indexs))) - b.wo.Add(int64(n)) - b.size.Add(int64(n)) + if int64(actx.offset+size+v1IndexLength*(actx.num+len(entries))) > b.cap { + return block.ErrNotEnoughSpace + } - //if err = b.physicalFile.Sync(); err != nil { - // return err - //} + actx.offset += size + actx.num += len(entries) return nil } -func (b *FileBlock) appendWithOffset(ctx context.Context, entries ...Entry) error { +func (b *Block) CommitAppend(ctx context.Context, entries ...block.Entry) error { if len(entries) == 0 { return nil } @@ -166,14 +150,16 @@ func (b *FileBlock) appendWithOffset(ctx context.Context, entries ...Entry) erro switch entry := &entries[i]; { case entry.Index < num: log.Warning(ctx, "block: entry index less than block num, skip this entry.", map[string]interface{}{ - "index": entry.Index, - "num": num, + "blockID": b.id, + "index": entry.Index, + "num": num, }) continue case entry.Index > num: log.Error(ctx, "block: entry index greater than block num.", map[string]interface{}{ - "index": entry.Index, - "num": num, + "blockID": b.id, + "index": entry.Index, + "num": num, }) return errors.ErrInternal } @@ -186,12 +172,12 @@ func (b *FileBlock) appendWithOffset(ctx context.Context, entries ...Entry) erro return nil } - wo := uint32(b.wo.Load()) offset := entries[0].Offset - if offset != wo { + if int64(offset) != b.wo { log.Error(ctx, "block: entry offset is not equal than block wo.", map[string]interface{}{ - "offset": offset, - "wo": wo, + "blockID": b.id, + "offset": offset, + "wo": b.wo, }) return errors.ErrInternal } @@ -201,15 +187,17 @@ func (b *FileBlock) appendWithOffset(ctx context.Context, entries ...Entry) erro prev := &entries[i-1] if prev.Index+1 != entry.Index { log.Error(ctx, "block: entry index is discontinuous.", map[string]interface{}{ - "index": entry.Index, - "prev": prev.Index, + "blockID": b.id, + "index": entry.Index, + "prev": prev.Index, }) return errors.ErrInternal } if prev.Offset+uint32(prev.Size()) != entry.Offset { log.Error(ctx, "block: entry offset is discontinuous.", map[string]interface{}{ - "offset": entry.Offset, - "prev": prev.Offset, + "blockID": b.id, + "offset": entry.Offset, + "prev": prev.Offset, }) return errors.ErrInternal } @@ -219,9 +207,14 @@ func (b *FileBlock) appendWithOffset(ctx context.Context, entries ...Entry) erro length := int(last.Offset-offset) + last.Size() // Check free space. - if length+v1IndexLength*len(entries) > b.remaining() { - b.full.Store(true) - return ErrNoEnoughCapacity + if require := length + v1IndexLength*len(entries); require > b.remaining() { + log.Error(ctx, "block: not enough space.", map[string]interface{}{ + "blockID": b.id, + "length": length, + "require": require, + "remaining": b.remaining(), + }) + return block.ErrNotEnoughSpace } buf := make([]byte, length) @@ -242,8 +235,8 @@ func (b *FileBlock) appendWithOffset(ctx context.Context, entries ...Entry) erro b.indexes = append(b.indexes, indexs...) b.num.Add(int32(len(entries))) - b.wo.Add(int64(n)) - b.size.Add(int64(n)) + b.wo += int64(n) + b.fo.Store(b.wo) //if err = b.physicalFile.Sync(); err != nil { // return err @@ -252,8 +245,17 @@ func (b *FileBlock) appendWithOffset(ctx context.Context, entries ...Entry) erro return nil } +func (b *Block) MarkFull(ctx context.Context) error { + b.full.Store(true) + if err := b.persistHeader(ctx); err != nil { + return err + } + go b.persistIndex(ctx) + return nil +} + // Read date from file. -func (b *FileBlock) Read(ctx context.Context, entityStartOffset, number int) ([]Entry, error) { +func (b *Block) Read(ctx context.Context, start, number int) ([]block.Entry, error) { observability.EntryMark(ctx) b.uncompletedReadRequestCount.Add(1) defer func() { @@ -261,7 +263,7 @@ func (b *FileBlock) Read(ctx context.Context, entityStartOffset, number int) ([] b.uncompletedReadRequestCount.Sub(1) }() - from, to, num, err := b.calculateRange(entityStartOffset, number) + from, to, num, err := b.entryRange(start, number) if err != nil { return nil, err } @@ -272,11 +274,11 @@ func (b *FileBlock) Read(ctx context.Context, entityStartOffset, number int) ([] return nil, err2 } - entries := make([]Entry, num) + entries := make([]block.Entry, num) so := uint32(0) from2 := uint32(from) for i := 0; i < num; i++ { - length := binary.BigEndian.Uint32(data[so : so+4]) + length := binary.BigEndian.Uint32(data[so : so+entryLengthSize]) eo := so + entryLengthSize + length if eo > size { // TODO @@ -289,11 +291,29 @@ func (b *FileBlock) Read(ctx context.Context, entityStartOffset, number int) ([] return entries, nil } -func (b *FileBlock) CloseWrite(ctx context.Context) error { +func (b *Block) entryRange(start, num int) (int64, int64, int, error) { + indexes := b.indexes + if start >= len(indexes) { + if !b.IsFull() && start == len(indexes) { + return -1, -1, 0, block.ErrOffsetOnEnd + } + return -1, -1, 0, block.ErrOffsetExceeded + } + + so := indexes[start].offset + end := start + num - 1 + if end >= len(indexes) { + end = len(indexes) - 1 + } + eo := indexes[end].offset + int64(indexes[end].length) + entryLengthSize + return so, eo, end - start + 1, nil +} + +func (b *Block) CloseWrite(ctx context.Context) error { observability.EntryMark(ctx) defer observability.LeaveMark(ctx) - b.appendable.Store(false) + // b.appendable.Store(false) for b.uncompletedAppendRequestCount.Load() != 0 { time.Sleep(time.Millisecond) } @@ -309,74 +329,76 @@ func (b *FileBlock) CloseWrite(ctx context.Context) error { return nil } -func (b *FileBlock) CloseRead(ctx context.Context) error { +func (b *Block) CloseRead(ctx context.Context) error { if err := b.f.Close(); err != nil { return err } observability.EntryMark(ctx) defer observability.LeaveMark(ctx) - b.readable.Store(false) + // b.readable.Store(false) for b.uncompletedReadRequestCount.Load() != 0 { time.Sleep(time.Millisecond) } return nil } -func (b *FileBlock) Close(ctx context.Context) error { +func (b *Block) Close(ctx context.Context) error { observability.EntryMark(ctx) defer observability.LeaveMark(ctx) return b.f.Close() } -func (b *FileBlock) IsAppendable() bool { - return b.appendable.Load() && !b.IsFull() +func (b *Block) IsAppendable() bool { + return !b.IsFull() } -func (b *FileBlock) IsReadable() bool { - return b.appendable.Load() && !b.IsEmpty() +func (b *Block) IsReadable() bool { + return true } -func (b *FileBlock) IsEmpty() bool { - return b.size.Load() == fileBlockHeaderSize +func (b *Block) IsEmpty() bool { + return b.wo == headerSize } -func (b *FileBlock) IsFull() bool { +func (b *Block) IsFull() bool { return b.full.Load() } -func (b *FileBlock) Path() string { +func (b *Block) Path() string { return b.path } -func (b *FileBlock) SegmentBlockID() vanus.ID { +func (b *Block) ID() vanus.ID { return b.id } -func (b *FileBlock) HealthInfo() *meta.SegmentHealthInfo { - info := &meta.SegmentHealthInfo{ +func (b *Block) HealthInfo() *metapb.SegmentHealthInfo { + info := &metapb.SegmentHealthInfo{ Id: b.id.Uint64(), - Size: b.size.Load(), + Size: b.size(), EventNumber: b.num.Load(), SerializationVersion: b.version, IsFull: b.IsFull(), } - // Fill cluster information. if cis := b.cis; cis != nil { cis.FillClusterInfo(info) } - return info } -func (b *FileBlock) remaining() int { - // capacity - headerCapacity - dataLength - indexDataLength - currentRequestDataLength - return int(b.cap - fileBlockHeaderSize - b.size.Load() - +func (b *Block) size() int64 { + return b.fo.Load() - headerSize +} + +func (b *Block) remaining() int { + // remaining = capacity - headerSize - dataLength - indexLength. + return int(b.cap - headerSize - b.size() - int64(b.num.Load()*v1IndexLength)) } -func (b *FileBlock) persistHeader(ctx context.Context) error { +func (b *Block) persistHeader(ctx context.Context) error { observability.EntryMark(ctx) b.mux.Lock() defer func() { @@ -384,10 +406,10 @@ func (b *FileBlock) persistHeader(ctx context.Context) error { observability.LeaveMark(ctx) }() - buf := make([]byte, v1FileBlockHeaderLength) + buf := make([]byte, v1HeaderLength) binary.BigEndian.PutUint32(buf[0:4], uint32(b.version)) binary.BigEndian.PutUint64(buf[4:12], uint64(b.cap)) - binary.BigEndian.PutUint64(buf[12:20], uint64(b.size.Load())) + binary.BigEndian.PutUint64(buf[12:20], uint64(b.size())) binary.BigEndian.PutUint32(buf[20:24], uint32(b.num.Load())) if b.full.Load() { buf[24] = 1 @@ -401,25 +423,27 @@ func (b *FileBlock) persistHeader(ctx context.Context) error { return nil } -func (b *FileBlock) loadHeader(ctx context.Context) error { +func (b *Block) loadHeader(ctx context.Context) error { observability.EntryMark(ctx) defer observability.LeaveMark(ctx) - buf := make([]byte, v1FileBlockHeaderLength) + buf := make([]byte, v1HeaderLength) if _, err := b.f.ReadAt(buf, 0); err != nil { return err } b.version = int32(binary.BigEndian.Uint32(buf[0:4])) b.cap = int64(binary.BigEndian.Uint64(buf[4:12])) - b.size.Store(int64(binary.BigEndian.Uint64(buf[12:20]))) + size := int64(binary.BigEndian.Uint64(buf[12:20])) + b.wo = size + headerSize + b.fo.Store(b.wo) b.num.Store(int32(binary.BigEndian.Uint32(buf[20:24]))) b.full.Store(buf[24] != 0) return nil } -func (b *FileBlock) persistIndex(ctx context.Context) error { +func (b *Block) persistIndex(ctx context.Context) error { observability.EntryMark(ctx) defer observability.LeaveMark(ctx) @@ -429,7 +453,8 @@ func (b *FileBlock) persistIndex(ctx context.Context) error { length := v1IndexLength * len(b.indexes) buf := make([]byte, length) - for i, index := range b.indexes { + for i := range b.indexes { + index := &b.indexes[i] off := length - (i+1)*v1IndexLength _, _ = index.MarshalTo(buf[off : off+v1IndexLength]) } @@ -440,7 +465,7 @@ func (b *FileBlock) persistIndex(ctx context.Context) error { return nil } -func (b *FileBlock) loadIndex(ctx context.Context) error { +func (b *Block) loadIndex(ctx context.Context) error { observability.EntryMark(ctx) defer observability.LeaveMark(ctx) @@ -453,7 +478,7 @@ func (b *FileBlock) loadIndex(ctx context.Context) error { return b.loadIndexFromFile() } -func (b *FileBlock) loadIndexFromFile() error { +func (b *Block) loadIndexFromFile() error { num := int(b.num.Load()) length := num * v1IndexLength @@ -473,13 +498,13 @@ func (b *FileBlock) loadIndexFromFile() error { return nil } -func (b *FileBlock) rebuildIndex() error { +func (b *Block) rebuildIndex() error { num := b.num.Load() b.indexes = make([]index, 0, num) num = 0 buf := make([]byte, entryLengthSize) - off := int64(fileBlockHeaderSize) + off := int64(headerSize) for { if _, err := b.f.ReadAt(buf, off); err != nil { return err @@ -497,38 +522,20 @@ func (b *FileBlock) rebuildIndex() error { } // Reset meta data. - b.size.Store(off - fileBlockHeaderSize) + b.wo = off + b.fo.Store(b.wo) b.num.Store(num) - b.wo.Store(off) return nil } -func (b *FileBlock) validate(ctx context.Context) error { +func (b *Block) validate(ctx context.Context) error { observability.EntryMark(ctx) defer observability.LeaveMark(ctx) return nil } -func (b *FileBlock) calculateRange(start, num int) (int64, int64, int, error) { - indexes := b.indexes - if start >= len(indexes) { - if !b.IsFull() && start == len(indexes) { - return -1, -1, 0, ErrOffsetOnEnd - } - return -1, -1, 0, ErrOffsetExceeded - } - - so := indexes[start].offset - end := start + num - 1 - if end >= len(indexes) { - end = len(indexes) - 1 - } - eo := indexes[end].offset + int64(indexes[end].length) + entryLengthSize - return so, eo, end - start + 1, nil -} - -func (b *FileBlock) SetClusterInfoSource(cis ClusterInfoSource) { +func (b *Block) SetClusterInfoSource(cis block.ClusterInfoSource) { b.cis = cis } diff --git a/internal/store/segment/block/file_test.go b/internal/store/block/file/block_test.go similarity index 97% rename from internal/store/segment/block/file_test.go rename to internal/store/block/file/block_test.go index 0e0c4291d..082122806 100644 --- a/internal/store/segment/block/file_test.go +++ b/internal/store/block/file/block_test.go @@ -12,4 +12,4 @@ // See the License for the specific language governing permissions and // limitations under the License. -package block +package file diff --git a/internal/store/segment/block/index.go b/internal/store/block/file/index.go similarity index 98% rename from internal/store/segment/block/index.go rename to internal/store/block/file/index.go index 2cf4dc383..9122c3dde 100644 --- a/internal/store/segment/block/index.go +++ b/internal/store/block/file/index.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package block +package file import ( "bytes" diff --git a/internal/store/block/file/index_test.go b/internal/store/block/file/index_test.go new file mode 100644 index 000000000..082122806 --- /dev/null +++ b/internal/store/block/file/index_test.go @@ -0,0 +1,15 @@ +// Copyright 2022 Linkall Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package file diff --git a/internal/store/block/file/open.go b/internal/store/block/file/open.go new file mode 100644 index 000000000..223001e45 --- /dev/null +++ b/internal/store/block/file/open.go @@ -0,0 +1,90 @@ +// Copyright 2022 Linkall Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package file + +import ( + // standard libraries. + "context" + "fmt" + "os" + "path/filepath" + + // this project. + "github.com/linkall-labs/vanus/internal/primitive/vanus" + "github.com/linkall-labs/vanus/observability" +) + +const ( + blockExt = ".block" + defaultFilePerm = 0o644 +) + +func resolvePath(blockDir string, id vanus.ID) string { + return filepath.Join(blockDir, fmt.Sprintf("%020d%s", id.Uint64(), blockExt)) +} + +func Create(ctx context.Context, blockDir string, id vanus.ID, capacity int64) (*Block, error) { + observability.EntryMark(ctx) + defer observability.LeaveMark(ctx) + + path := resolvePath(blockDir, id) + b := &Block{ + id: id, + path: path, + cap: capacity, + wo: headerSize, + } + + f, err := os.OpenFile(path, os.O_CREATE|os.O_EXCL|os.O_RDWR|os.O_SYNC, defaultFilePerm) + if err != nil { + return nil, err + } + if err = f.Truncate(capacity); err != nil { + return nil, err + } + b.f = f + + b.fo.Store(b.wo) + if err = b.persistHeader(ctx); err != nil { + return nil, err + } + + return b, nil +} + +func Open(ctx context.Context, path string) (*Block, error) { + observability.EntryMark(ctx) + defer observability.LeaveMark(ctx) + + filename := filepath.Base(path) + id, err := vanus.NewIDFromString(filename[:len(filename)-len(blockExt)]) + if err != nil { + return nil, err + } + + b := &Block{ + id: id, + path: path, + } + + // TODO: use direct IO + f, err := os.OpenFile(path, os.O_RDWR|os.O_SYNC, defaultFilePerm) + if err != nil { + return nil, err + } + b.f = f + + return b, nil +} diff --git a/internal/store/block/file/open_test.go b/internal/store/block/file/open_test.go new file mode 100644 index 000000000..082122806 --- /dev/null +++ b/internal/store/block/file/open_test.go @@ -0,0 +1,15 @@ +// Copyright 2022 Linkall Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package file diff --git a/internal/store/segment/block/recovery.go b/internal/store/block/file/recovery.go similarity index 82% rename from internal/store/segment/block/recovery.go rename to internal/store/block/file/recovery.go index 118da8369..d241feec6 100644 --- a/internal/store/segment/block/recovery.go +++ b/internal/store/block/file/recovery.go @@ -12,10 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -package block +package file import ( // standard libraries. + "context" "os" "path/filepath" @@ -24,10 +25,10 @@ import ( ) const ( - defaultDirPerm = 0755 + defaultDirPerm = 0o755 ) -func RecoverBlocks(blockDir string) (map[vanus.ID]string, error) { +func Recover(blockDir string) (map[vanus.ID]string, error) { // Make sure the block directory exists. if err := os.MkdirAll(blockDir, defaultDirPerm); err != nil { return nil, err @@ -72,3 +73,19 @@ func filterRegularBlock(entries []os.DirEntry) []os.DirEntry { entries = entries[:n] return entries } + +func (b *Block) Recover(ctx context.Context) error { + if err := b.loadHeader(ctx); err != nil { + return err + } + + if err := b.loadIndex(ctx); err != nil { + return err + } + + if err := b.validate(ctx); err != nil { + return err + } + + return nil +} diff --git a/internal/store/segment/block/recovery_test.go b/internal/store/block/file/recovery_test.go similarity index 97% rename from internal/store/segment/block/recovery_test.go rename to internal/store/block/file/recovery_test.go index 0e0c4291d..082122806 100644 --- a/internal/store/segment/block/recovery_test.go +++ b/internal/store/block/file/recovery_test.go @@ -12,4 +12,4 @@ // See the License for the specific language governing permissions and // limitations under the License. -package block +package file diff --git a/internal/store/segment/block/replica.go b/internal/store/block/replica/replica.go similarity index 67% rename from internal/store/segment/block/replica.go rename to internal/store/block/replica/replica.go index 9c64f3634..6210305ce 100644 --- a/internal/store/segment/block/replica.go +++ b/internal/store/block/replica/replica.go @@ -12,11 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -package block +package replica import ( // standard libraries. "context" + stderr "errors" "sort" "sync" "time" @@ -30,15 +31,25 @@ import ( "github.com/linkall-labs/vanus/internal/primitive/vanus" raftlog "github.com/linkall-labs/vanus/internal/raft/log" "github.com/linkall-labs/vanus/internal/raft/transport" + "github.com/linkall-labs/vanus/internal/store/block" "github.com/linkall-labs/vanus/internal/store/segment/errors" ) -type IDAndEndpoint struct { +const ( + defaultHintCapactiy = 2 + defaultTickPeriodMs = 100 + defaultElectionTick = 10 + defaultHeartbeatTick = 3 + defaultMaxSizePerMsg = 4096 + defaultMaxInflightMsgs = 256 +) + +type Peer struct { ID vanus.ID Endpoint string } -type idAndEndpoint struct { +type peer struct { id uint64 endpoint string } @@ -46,12 +57,12 @@ type idAndEndpoint struct { type LeaderChangedListener func(block, leader vanus.ID, term uint64) type Replica struct { - num int - wo int - full bool - mu sync.RWMutex + blockID vanus.ID + + mu sync.RWMutex - block *FileBlock + appender block.TwoPCAppender + actx block.AppendContext leaderID vanus.ID listener LeaderChangedListener @@ -60,53 +71,53 @@ type Replica struct { log *raftlog.Log sender transport.Sender - endpoints []idAndEndpoint - epMu sync.RWMutex + hint []peer + hintMu sync.RWMutex ctx context.Context cancel context.CancelFunc donec chan struct{} } -// Make sure replica implements SegmentBlockWriter, ClusterInfoSource and transport.Receiver. +// Make sure replica implements block.Appender, block.ClusterInfoSource and transport.Receiver. var ( - _ SegmentBlockWriter = (*Replica)(nil) - _ ClusterInfoSource = (*Replica)(nil) - _ transport.Receiver = (*Replica)(nil) + _ block.Appender = (*Replica)(nil) + _ block.ClusterInfoSource = (*Replica)(nil) + _ transport.Receiver = (*Replica)(nil) ) -func NewReplica(ctx context.Context, block SegmentBlock, raftLog *raftlog.Log, +func New(ctx context.Context, blockID vanus.ID, appender block.TwoPCAppender, raftLog *raftlog.Log, sender transport.Sender, listener LeaderChangedListener, ) *Replica { - blockID := block.SegmentBlockID() - ctx, cancel := context.WithCancel(ctx) r := &Replica{ - block: block.(*FileBlock), - listener: listener, - log: raftLog, - sender: sender, - endpoints: make([]idAndEndpoint, 0, 2), - ctx: ctx, - cancel: cancel, - donec: make(chan struct{}), + blockID: blockID, + appender: appender, + listener: listener, + log: raftLog, + sender: sender, + hint: make([]peer, 0, defaultHintCapactiy), + ctx: ctx, + cancel: cancel, + donec: make(chan struct{}), } - r.resetByBlock() + r.actx = r.appender.NewAppendContext(nil) c := &raft.Config{ ID: blockID.Uint64(), - ElectionTick: 10, - HeartbeatTick: 3, + ElectionTick: defaultElectionTick, + HeartbeatTick: defaultHeartbeatTick, Storage: raftLog, Applied: raftLog.Applied(), Compacted: raftLog.Compacted(), - MaxSizePerMsg: 4096, - MaxInflightMsgs: 256, + MaxSizePerMsg: defaultMaxSizePerMsg, + MaxInflightMsgs: defaultMaxInflightMsgs, PreVote: true, DisableProposalForwarding: true, } r.node = raft.RestartNode(c) + go r.run() return r @@ -118,7 +129,7 @@ func (r *Replica) Stop() { <-r.donec } -func (r *Replica) Bootstrap(blocks []IDAndEndpoint) error { +func (r *Replica) Bootstrap(blocks []Peer) error { peers := make([]raft.Peer, 0, len(blocks)) for _, ep := range blocks { peers = append(peers, raft.Peer{ @@ -135,7 +146,8 @@ func (r *Replica) Bootstrap(blocks []IDAndEndpoint) error { func (r *Replica) run() { // TODO(james.yin): reduce Ticker - t := time.NewTicker(100 * time.Millisecond) + period := defaultTickPeriodMs * time.Millisecond + t := time.NewTicker(period) defer t.Stop() for { @@ -175,14 +187,14 @@ func (r *Replica) run() { if num := len(rd.CommittedEntries); num != 0 { var cs *raftpb.ConfState - entries := make([]Entry, 0, num) + entries := make([]block.Entry, 0, num) for i := range rd.CommittedEntries { entrypb := &rd.CommittedEntries[i] if entrypb.Type == raftpb.EntryNormal { // Skip empty entry(raft heartbeat). - if len(entrypb.Data) > 0 { - entry := UnmarshalWithOffsetAndIndex(entrypb.Data) + if len(entrypb.Data) != 0 { + entry := block.UnmarshalWithOffsetAndIndex(entrypb.Data) entries = append(entries, entry) } continue @@ -192,7 +204,7 @@ func (r *Replica) run() { cs = r.applyConfChange(entrypb) } - if len(entries) > 0 { + if len(entries) != 0 { r.doAppend(entries...) } @@ -207,6 +219,7 @@ func (r *Replica) run() { } if applied != 0 { + // FIXME(james.yin): persist applied after flush block. r.log.SetApplied(applied) } @@ -236,7 +249,7 @@ func (r *Replica) leaderChanged() { } leader, term := r.leaderInfo() - r.listener(r.block.SegmentBlockID(), leader, term) + r.listener(r.blockID, leader, term) } func (r *Replica) applyConfChange(entrypb *raftpb.Entry) *raftpb.ConfState { @@ -252,7 +265,7 @@ func (r *Replica) applyConfChange(entrypb *raftpb.Entry) *raftpb.ConfState { panic(err) } // TODO(james.yin): non-add - r.hintEndpoint(cc.NodeID, string(cc.Context)) + r.hintPeer(cc.NodeID, string(cc.Context)) cci = cc } else { var cc raftpb.ConfChangeV2 @@ -261,7 +274,7 @@ func (r *Replica) applyConfChange(entrypb *raftpb.Entry) *raftpb.ConfState { } // TODO(james.yin): non-add for _, ccs := range cc.Changes { - r.hintEndpoint(ccs.NodeID, string(cc.Context)) + r.hintPeer(ccs.NodeID, string(cc.Context)) } cci = cc } @@ -274,19 +287,22 @@ func (r *Replica) reset() { off = r.log.HardState().Commit } + r.mu.Lock() + defer r.mu.Unlock() + for off > 0 { entrypbs, err2 := r.log.Entries(off, off+1, 0) // Entry has been compacted. if err2 != nil { - r.resetByBlock() + r.actx = r.appender.NewAppendContext(nil) break } entrypb := entrypbs[0] if entrypb.Type == raftpb.EntryNormal && len(entrypb.Data) > 0 { - entry := UnmarshalWithOffsetAndIndex(entrypb.Data) - r.resetByEntry(entry) + entry := block.UnmarshalWithOffsetAndIndex(entrypb.Data) + r.actx = r.appender.NewAppendContext(&entry) break } @@ -295,43 +311,38 @@ func (r *Replica) reset() { // no normal entry if off == 0 { - r.resetByBlock() + r.actx = r.appender.NewAppendContext(nil) } } -func (r *Replica) resetByEntry(entry Entry) { - r.mu.Lock() - defer r.mu.Unlock() - - r.num = int(entry.Index + 1) - r.wo = int(entry.Offset) + len(entry.Payload) - r.full = len(entry.Payload) == 0 -} +// Append implements block.Appender. +func (r *Replica) Append(ctx context.Context, entries ...block.Entry) error { + // TODO(james.yin): support batch + if len(entries) != 1 { + return errors.ErrInvalidRequest + } -func (r *Replica) resetByBlock() { r.mu.Lock() defer r.mu.Unlock() - r.num = int(r.block.num.Load()) - r.wo = int(r.block.wo.Load()) - r.full = r.block.full.Load() -} - -// Append implements SegmentBlockWriter. -func (r *Replica) Append(ctx context.Context, entries ...Entry) error { if !r.isLeader() { - return ErrNotLeader + return block.ErrNotLeader } - // TODO(james.yin): support batch - if len(entries) != 1 { - return errors.ErrInvalidRequest + if r.actx.Full() { + return block.ErrFull } - r.mu.Lock() - defer r.mu.Unlock() - - if err := r.preAppend(ctx, entries); err != nil { + if err := r.appender.PrepareAppend(ctx, r.actx, entries...); err != nil { + // Full + if stderr.Is(err, block.ErrNotEnoughSpace) { + entry := r.actx.FullEntry() + data := entry.MarshalWithOffsetAndIndex() + if err2 := r.node.Propose(ctx, data); err2 != nil { + return err2 + } + r.actx.MarkFull() + } return err } @@ -346,40 +357,7 @@ func (r *Replica) Append(ctx context.Context, entries ...Entry) error { return nil } -func (r *Replica) preAppend(ctx context.Context, entries []Entry) error { - if r.full { - return ErrFull - } - - var size int - for i := range entries { - entry := &entries[i] - entry.Offset = uint32(r.wo + size) - entry.Index = uint32(r.num + i) - size += entry.Size() - } - - // TODO(james.yin): full - if int64(r.wo+size+v1IndexLength*(r.num+len(entries))) > r.block.cap { - fullEntry := Entry{ - Offset: uint32(r.wo), - Index: uint32(r.num), - } - data := fullEntry.MarshalWithOffsetAndIndex() - if err := r.node.Propose(ctx, data); err != nil { - return err - } - r.full = true - return ErrNoEnoughCapacity - } - - r.wo += size - r.num += len(entries) - - return nil -} - -func (r *Replica) doAppend(entries ...Entry) { +func (r *Replica) doAppend(entries ...block.Entry) { num := len(entries) if num == 0 { return @@ -392,13 +370,14 @@ func (r *Replica) doAppend(entries ...Entry) { last = nil } - // FIXME(james.yin): context - r.block.appendWithOffset(context.TODO(), entries...) + if len(entries) != 0 { + // FIXME(james.yin): context + _ = r.appender.CommitAppend(context.TODO(), entries...) + } // Mark full. if last != nil { - r.full = true - r.block.full.Store(true) + _ = r.appender.MarkFull(context.TODO()) } } @@ -409,7 +388,7 @@ func (r *Replica) send(msgs []raftpb.Message) { if len(msgs) == 1 { msg := &msgs[0] - endpoint := r.endpointHint(msg.To) + endpoint := r.peerHint(msg.To) r.sender.Send(r.ctx, msg, msg.To, endpoint) return } @@ -428,7 +407,7 @@ func (r *Replica) send(msgs []raftpb.Message) { for i := 0; i < len(msgs); i++ { ma[i] = &msgs[i] } - endpoint := r.endpointHint(to) + endpoint := r.peerHint(to) r.sender.Sendv(r.ctx, ma, to, endpoint) return } @@ -439,7 +418,7 @@ func (r *Replica) send(msgs []raftpb.Message) { mm[msg.To] = append(mm[msg.To], msg) } for to, msgs := range mm { - endpoint := r.endpointHint(to) + endpoint := r.peerHint(to) if len(msgs) == 1 { r.sender.Send(r.ctx, msgs[0], to, endpoint) } else { @@ -448,13 +427,13 @@ func (r *Replica) send(msgs []raftpb.Message) { } } -func (r *Replica) endpointHint(to uint64) string { - r.epMu.RLock() - defer r.epMu.RUnlock() - for i := range r.endpoints { - ep := &r.endpoints[i] - if ep.id == to { - return ep.endpoint +func (r *Replica) peerHint(to uint64) string { + r.hintMu.RLock() + defer r.hintMu.RUnlock() + for i := range r.hint { + p := &r.hint[i] + if p.id == to { + return p.endpoint } } return "" @@ -463,37 +442,37 @@ func (r *Replica) endpointHint(to uint64) string { // Receive implements transport.Receiver. func (r *Replica) Receive(ctx context.Context, msg *raftpb.Message, from uint64, endpoint string) { if endpoint != "" { - r.hintEndpoint(from, endpoint) + r.hintPeer(from, endpoint) } // TODO(james.yin): check ctx.Done(). _ = r.node.Step(r.ctx, *msg) } -func (r *Replica) hintEndpoint(from uint64, endpoint string) { +func (r *Replica) hintPeer(from uint64, endpoint string) { if endpoint == "" { return } // TODO(james.yin): optimize lock - r.epMu.Lock() - defer r.epMu.Unlock() - ep := func() *idAndEndpoint { - for i := range r.endpoints { - ep := &r.endpoints[i] + r.hintMu.Lock() + defer r.hintMu.Unlock() + p := func() *peer { + for i := range r.hint { + ep := &r.hint[i] if ep.id == from { return ep } } return nil }() - if ep == nil { - r.endpoints = append(r.endpoints, idAndEndpoint{ + if p == nil { + r.hint = append(r.hint, peer{ id: from, endpoint: endpoint, }) - } else if ep.endpoint != endpoint { - ep.endpoint = endpoint + } else if p.endpoint != endpoint { + p.endpoint = endpoint } } @@ -509,16 +488,10 @@ func (r *Replica) leaderInfo() (vanus.ID, uint64) { // CloseWrite implements SegmentBlockWriter. func (r *Replica) CloseWrite(ctx context.Context) error { - return r.block.CloseWrite(ctx) -} - -// IsAppendable implements SegmentBlockWriter. -func (r *Replica) IsAppendable() bool { - r.mu.RLock() - defer r.mu.RUnlock() - return !r.full + // return r.appender.CloseWrite(ctx) + return nil } func (r *Replica) isLeader() bool { - return r.leaderID == r.block.SegmentBlockID() + return r.leaderID == r.blockID } diff --git a/internal/store/segment/block/replica_test.go b/internal/store/block/replica/replica_test.go similarity index 98% rename from internal/store/segment/block/replica_test.go rename to internal/store/block/replica/replica_test.go index 6920ebcc1..e3dd1f76d 100644 --- a/internal/store/segment/block/replica_test.go +++ b/internal/store/block/replica/replica_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package block +package replica import ( // standard libraries. diff --git a/internal/store/meta/store_test.go b/internal/store/meta/store_test.go new file mode 100644 index 000000000..29d131b92 --- /dev/null +++ b/internal/store/meta/store_test.go @@ -0,0 +1,15 @@ +// Copyright 2022 Linkall Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package meta diff --git a/internal/store/segment/block/block.go b/internal/store/segment/block/block.go deleted file mode 100644 index d6b14ac82..000000000 --- a/internal/store/segment/block/block.go +++ /dev/null @@ -1,129 +0,0 @@ -// Copyright 2022 Linkall Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package block - -import ( - // standard libraries. - "context" - "errors" - "io" - "os" - "path/filepath" - - // third-party libraries. - "go.uber.org/atomic" - - // first-party libraries. - metapb "github.com/linkall-labs/vsproto/pkg/meta" - - // this project. - "github.com/linkall-labs/vanus/internal/primitive/vanus" - "github.com/linkall-labs/vanus/observability" -) - -var ( - ErrNoEnoughCapacity = errors.New("no enough capacity") - ErrFull = errors.New("full") - ErrNotLeader = errors.New("not leader") - ErrOffsetExceeded = errors.New("the offset exceeded") - ErrOffsetOnEnd = errors.New("the offset on end") -) - -type SegmentBlockWriter interface { - Append(context.Context, ...Entry) error - CloseWrite(context.Context) error - IsAppendable() bool -} - -type SegmentBlockReader interface { - Read(context.Context, int, int) ([]Entry, error) - CloseRead(context.Context) error - IsReadable() bool -} - -type SegmentBlock interface { - SegmentBlockWriter - SegmentBlockReader - - Path() string - IsFull() bool - IsEmpty() bool - SegmentBlockID() vanus.ID - Close(context.Context) error - Initialize(context.Context) error - HealthInfo() *metapb.SegmentHealthInfo -} - -type ClusterInfoSource interface { - FillClusterInfo(info *metapb.SegmentHealthInfo) -} - -func CreateFileSegmentBlock(ctx context.Context, blockDir string, id vanus.ID, capacity int64) (*FileBlock, error) { - observability.EntryMark(ctx) - defer observability.LeaveMark(ctx) - - path := resolvePath(blockDir, id) - b := &FileBlock{ - id: id, - path: path, - cap: capacity, - wo: *atomic.NewInt64(fileBlockHeaderSize), - } - - f, err := os.Create(path) - if err != nil { - return nil, err - } - if err = f.Truncate(capacity); err != nil { - return nil, err - } - if _, err = f.Seek(fileBlockHeaderSize, io.SeekStart); err != nil { - return nil, err - } - b.appendable.Store(true) - b.readable.Store(true) - b.full.Store(false) - b.f = f - if err = b.persistHeader(ctx); err != nil { - return nil, err - } - - return b, nil -} - -func OpenFileSegmentBlock(ctx context.Context, path string) (*FileBlock, error) { - observability.EntryMark(ctx) - defer observability.LeaveMark(ctx) - - filename := filepath.Base(path) - id, err := vanus.NewIDFromString(filename[:len(filename)-len(blockExt)]) - if err != nil { - return nil, err - } - b := &FileBlock{ - id: id, - path: path, - } - b.appendable.Store(true) - b.readable.Store(true) - b.full.Store(false) - // TODO: use direct IO - f, err := os.OpenFile(path, os.O_RDWR|os.O_SYNC, 0o666) - if err != nil { - return nil, err - } - b.f = f - return b, nil -} diff --git a/internal/store/segment/errors/errors.go b/internal/store/segment/errors/errors.go index 881cc47d1..ee9b20bc6 100644 --- a/internal/store/segment/errors/errors.go +++ b/internal/store/segment/errors/errors.go @@ -17,11 +17,12 @@ package errors import rpcerr "github.com/linkall-labs/vsproto/pkg/errors" var ( - ErrInternal = rpcerr.New("internal error").WithGRPCCode(rpcerr.ErrorCode_INTERNAL) - ErrInvalidRequest = rpcerr.New("invalid request").WithGRPCCode(rpcerr.ErrorCode_INVALID_REQUEST) - ErrServiceState = rpcerr.New("service state error").WithGRPCCode(rpcerr.ErrorCode_SERVICE_NOT_RUNNING) - ErrSegmentNoEnoughCapacity = rpcerr.New("no enough capacity").WithGRPCCode(rpcerr.ErrorCode_SEGMENT_FULL) - ErrResourceNotFound = rpcerr.New("resource not found").WithGRPCCode(rpcerr.ErrorCode_RESOURCE_NOT_FOUND) - ErrResourceAlreadyExist = rpcerr.New("resource already exist").WithGRPCCode(rpcerr.ErrorCode_RESOURCE_EXIST) - ErrNoControllerLeader = rpcerr.New("no leader controller found").WithGRPCCode(rpcerr.ErrorCode_NOT_LEADER) + ErrInternal = rpcerr.New("internal error").WithGRPCCode(rpcerr.ErrorCode_INTERNAL) + ErrInvalidRequest = rpcerr.New("invalid request").WithGRPCCode(rpcerr.ErrorCode_INVALID_REQUEST) + ErrServiceState = rpcerr.New("service state error").WithGRPCCode(rpcerr.ErrorCode_SERVICE_NOT_RUNNING) + ErrSegmentNotEnoughSpace = rpcerr.New("not enough space").WithGRPCCode(rpcerr.ErrorCode_SEGMENT_FULL) + ErrSegmentFull = rpcerr.New("full").WithGRPCCode(rpcerr.ErrorCode_SEGMENT_FULL) + ErrResourceNotFound = rpcerr.New("resource not found").WithGRPCCode(rpcerr.ErrorCode_RESOURCE_NOT_FOUND) + ErrResourceAlreadyExist = rpcerr.New("resource already exist").WithGRPCCode(rpcerr.ErrorCode_RESOURCE_EXIST) + ErrNoControllerLeader = rpcerr.New("no leader controller found").WithGRPCCode(rpcerr.ErrorCode_NOT_LEADER) ) diff --git a/internal/store/segment/recovery.go b/internal/store/segment/recovery.go index 247d72289..036cbb77d 100644 --- a/internal/store/segment/recovery.go +++ b/internal/store/segment/recovery.go @@ -22,12 +22,12 @@ import ( // this project. "github.com/linkall-labs/vanus/internal/primitive/vanus" raftlog "github.com/linkall-labs/vanus/internal/raft/log" + "github.com/linkall-labs/vanus/internal/store/block/file" "github.com/linkall-labs/vanus/internal/store/meta" - "github.com/linkall-labs/vanus/internal/store/segment/block" "github.com/linkall-labs/vanus/observability/log" ) -func (s *segmentServer) recover(ctx context.Context) error { +func (s *server) recover(ctx context.Context) error { metaStore, err := meta.RecoverSyncStore(filepath.Join(s.volumeDir, "meta")) if err != nil { return err @@ -54,25 +54,24 @@ func (s *segmentServer) recover(ctx context.Context) error { return nil } -func (s *segmentServer) recoverBlocks(ctx context.Context, raftLogs map[vanus.ID]*raftlog.Log) error { - blocks, err := block.RecoverBlocks(filepath.Join(s.volumeDir, "block")) +func (s *server) recoverBlocks(ctx context.Context, raftLogs map[vanus.ID]*raftlog.Log) error { + blocks, err := file.Recover(filepath.Join(s.volumeDir, "block")) if err != nil { return err } // TODO: optimize this, because the implementation assumes under storage is linux file system for blockID, path := range blocks { - b, err := block.OpenFileSegmentBlock(ctx, path) - if err != nil { - return err + b, err2 := file.Open(ctx, path) + if err2 != nil { + return err2 } log.Info(ctx, "The block was loaded.", map[string]interface{}{ "blockID": blockID, }) - // TODO(james.yin): initialize block - if err = b.Initialize(ctx); err != nil { - return err + if err2 = b.Recover(ctx); err2 != nil { + return err2 } s.blocks.Store(blockID, b) @@ -80,34 +79,41 @@ func (s *segmentServer) recoverBlocks(ctx context.Context, raftLogs map[vanus.ID // recover replica if b.IsAppendable() { raftLog := raftLogs[blockID] - // raft log has been compacted + // Raft log has been compacted. if raftLog == nil { raftLog = raftlog.RecoverLog(blockID, s.wal, s.metaStore, s.offsetStore) } - replica := s.makeReplicaWithRaftLog(context.TODO(), b, raftLog) - b.SetClusterInfoSource(replica) - s.blockWriters.Store(b.SegmentBlockID(), replica) + r := s.makeReplicaWithRaftLog(context.TODO(), b.ID(), b, raftLog) + b.SetClusterInfoSource(r) + s.writers.Store(b.ID(), r) } - s.blockReaders.Store(b.SegmentBlockID(), b) + s.readers.Store(b.ID(), b) } for nodeID, raftLog := range raftLogs { - b, ok := s.blocks.Load(nodeID) - if !ok { - // TODO(james.yin): no block for raft log, compact + var b *file.Block + if v, ok := s.blocks.Load(nodeID); ok { + b, _ = v.(*file.Block) + } + + switch { + case b == nil: log.Debug(ctx, "Not found block, so discard the raft log.", map[string]interface{}{ "nodeID": nodeID, }) - continue - } - if _, ok = b.(*block.Replica); !ok { - // TODO(james.yin): block is not appendable, compact + case !b.IsAppendable(): log.Debug(ctx, "Block is not appendable, so discard the raft log.", map[string]interface{}{ "nodeID": nodeID, }) + default: continue } - _ = raftLog + + lastIndex, err2 := raftLog.LastIndex() + if err2 != nil { + return err2 + } + _ = raftLog.Compact(lastIndex) } return nil diff --git a/internal/store/segment/segment.go b/internal/store/segment/server.go similarity index 72% rename from internal/store/segment/segment.go rename to internal/store/segment/server.go index cc6953c1c..0b9213aa3 100644 --- a/internal/store/segment/segment.go +++ b/internal/store/segment/server.go @@ -34,6 +34,7 @@ import ( // first-party libraries. ctrlpb "github.com/linkall-labs/vsproto/pkg/controller" + rpcerr "github.com/linkall-labs/vsproto/pkg/errors" metapb "github.com/linkall-labs/vsproto/pkg/meta" raftpb "github.com/linkall-labs/vsproto/pkg/raft" segpb "github.com/linkall-labs/vsproto/pkg/segment" @@ -44,17 +45,19 @@ import ( raftlog "github.com/linkall-labs/vanus/internal/raft/log" "github.com/linkall-labs/vanus/internal/raft/transport" "github.com/linkall-labs/vanus/internal/store" + "github.com/linkall-labs/vanus/internal/store/block" + "github.com/linkall-labs/vanus/internal/store/block/file" + "github.com/linkall-labs/vanus/internal/store/block/replica" "github.com/linkall-labs/vanus/internal/store/meta" - "github.com/linkall-labs/vanus/internal/store/segment/block" "github.com/linkall-labs/vanus/internal/store/segment/errors" "github.com/linkall-labs/vanus/internal/util" - errutil "github.com/linkall-labs/vanus/internal/util/errors" "github.com/linkall-labs/vanus/observability" "github.com/linkall-labs/vanus/observability/log" ) const ( segmentServerDebugModeFlagENV = "SEGMENT_SERVER_DEBUG_MODE" + defaultLeaderInfoBufferSize = 256 ) type leaderInfo struct { @@ -62,13 +65,14 @@ type leaderInfo struct { term uint64 } -type segmentServer struct { - blocks sync.Map - blockWriters sync.Map - blockReaders sync.Map - wal *raftlog.WAL - metaStore *meta.SyncStore - offsetStore *meta.AsyncStore +type server struct { + blocks sync.Map // *file.Block + writers sync.Map // *replica.Replica + readers sync.Map + + wal *raftlog.WAL + metaStore *meta.SyncStore + offsetStore *meta.AsyncStore resolver *transport.SimpleResolver host transport.Host @@ -88,22 +92,24 @@ type segmentServer struct { leaderc chan leaderInfo stopCallback func() - closeCh chan struct{} + closec chan struct{} } // Make sure segmentServer implements segpb.SegmentServerServer and primitive.Initializer. -var _ segpb.SegmentServerServer = (*segmentServer)(nil) -var _ primitive.Initializer = (*segmentServer)(nil) +var ( + _ segpb.SegmentServerServer = (*server)(nil) + _ primitive.Initializer = (*server)(nil) +) func NewSegmentServer(cfg store.Config, stop func()) (segpb.SegmentServerServer, raftpb.RaftServerServer) { localAddress := fmt.Sprintf("%s:%d", cfg.IP, cfg.Port) - // setup raft + // Setup raft. resolver := transport.NewSimpleResolver() host := transport.NewHost(resolver, localAddress) raftSrv := transport.NewRaftServer(context.TODO(), host) - return &segmentServer{ + srv := &server{ state: primitive.ServerStateCreated, cfg: cfg, localAddress: localAddress, @@ -114,13 +120,15 @@ func NewSegmentServer(cfg store.Config, stop func()) (segpb.SegmentServerServer, ctrlAddress: cfg.ControllerAddresses, credentials: insecure.NewCredentials(), cc: NewClient(cfg.ControllerAddresses), - leaderc: make(chan leaderInfo, 256), + leaderc: make(chan leaderInfo, defaultLeaderInfoBufferSize), stopCallback: stop, - closeCh: make(chan struct{}), - }, raftSrv + closec: make(chan struct{}), + } + + return srv, raftSrv } -func (s *segmentServer) Initialize(ctx context.Context) error { +func (s *server) Initialize(ctx context.Context) error { if err := s.recover(ctx); err != nil { return err } @@ -131,10 +139,17 @@ func (s *segmentServer) Initialize(ctx context.Context) error { s.state = primitive.ServerStateStarted + if s.isDebugMode { + if err := s.start(ctx); err != nil { + return err + } + s.state = primitive.ServerStateRunning + } + return nil } -func (s *segmentServer) Start( +func (s *server) Start( ctx context.Context, req *segpb.StartSegmentServerRequest, ) (*segpb.StartSegmentServerResponse, error) { observability.EntryMark(ctx) @@ -151,7 +166,7 @@ func (s *segmentServer) Start( return &segpb.StartSegmentServerResponse{}, nil } -func (s *segmentServer) Stop( +func (s *server) Stop( ctx context.Context, req *segpb.StopSegmentServerRequest, ) (*segpb.StopSegmentServerResponse, error) { observability.EntryMark(ctx) @@ -171,11 +186,11 @@ func (s *segmentServer) Stop( return &segpb.StopSegmentServerResponse{}, nil } -func (s *segmentServer) Status(ctx context.Context, req *emptypb.Empty) (*segpb.StatusResponse, error) { +func (s *server) Status(ctx context.Context, req *emptypb.Empty) (*segpb.StatusResponse, error) { return &segpb.StatusResponse{Status: string(s.state)}, nil } -func (s *segmentServer) CreateBlock(ctx context.Context, req *segpb.CreateBlockRequest) (*emptypb.Empty, error) { +func (s *server) CreateBlock(ctx context.Context, req *segpb.CreateBlockRequest) (*emptypb.Empty, error) { observability.EntryMark(ctx) defer observability.LeaveMark(ctx) @@ -195,29 +210,28 @@ func (s *segmentServer) CreateBlock(ctx context.Context, req *segpb.CreateBlockR blockID := vanus.NewIDFromUint64(req.Id) - _, exist := s.blocks.Load(blockID) - if exist { + if _, ok := s.blocks.Load(blockID); ok { return nil, errors.ErrResourceAlreadyExist.WithMessage("the segment has already exist.") } - // create block - b, err := block.CreateFileSegmentBlock(ctx, filepath.Join(s.volumeDir, "block"), blockID, req.Size) + // Create block. + b, err := file.Create(ctx, filepath.Join(s.volumeDir, "block"), blockID, req.Size) if err != nil { return nil, err } - // create replica - replica := s.makeReplica(context.TODO(), b) - b.SetClusterInfoSource(replica) + // Create replica. + r := s.makeReplica(context.TODO(), b.ID(), b) + b.SetClusterInfoSource(r) s.blocks.Store(blockID, b) - s.blockWriters.Store(blockID, replica) - s.blockReaders.Store(blockID, b) + s.writers.Store(blockID, r) + s.readers.Store(blockID, b) return &emptypb.Empty{}, nil } -func (s *segmentServer) RemoveBlock( +func (s *server) RemoveBlock( ctx context.Context, req *segpb.RemoveBlockRequest, ) (*emptypb.Empty, error) { observability.EntryMark(ctx) @@ -233,7 +247,7 @@ func (s *segmentServer) RemoveBlock( } // ActivateSegment mark a block ready to using and preparing to initializing a replica group. -func (s *segmentServer) ActivateSegment( +func (s *server) ActivateSegment( ctx context.Context, req *segpb.ActivateSegmentRequest, ) (*segpb.ActivateSegmentResponse, error) { observability.EntryMark(ctx) @@ -258,10 +272,10 @@ func (s *segmentServer) ActivateSegment( }) var myID vanus.ID - peers := make([]block.IDAndEndpoint, 0, len(req.Replicas)-1) + peers := make([]replica.Peer, 0, len(req.Replicas)-1) for blockID, endpoint := range req.Replicas { peer := vanus.NewIDFromUint64(blockID) - peers = append(peers, block.IDAndEndpoint{ + peers = append(peers, replica.Peer{ ID: peer, Endpoint: endpoint, }) @@ -273,8 +287,8 @@ func (s *segmentServer) ActivateSegment( if myID == 0 { return nil, errors.ErrResourceNotFound.WithMessage("the segment doesn't exist") } - v, exist := s.blockWriters.Load(myID) - if !exist { + v, ok := s.writers.Load(myID) + if !ok { return nil, errors.ErrResourceNotFound.WithMessage("the segment doesn't exist") } @@ -290,7 +304,7 @@ func (s *segmentServer) ActivateSegment( }) // Bootstrap replica. - replica, _ := v.(*block.Replica) + replica, _ := v.(*replica.Replica) if err := replica.Bootstrap(peers); err != nil { return nil, err } @@ -299,7 +313,7 @@ func (s *segmentServer) ActivateSegment( } // InactivateSegment mark a block ready to be removed. -func (s *segmentServer) InactivateSegment( +func (s *server) InactivateSegment( ctx context.Context, req *segpb.InactivateSegmentRequest, ) (*segpb.InactivateSegmentResponse, error) { observability.EntryMark(ctx) @@ -312,7 +326,7 @@ func (s *segmentServer) InactivateSegment( return &segpb.InactivateSegmentResponse{}, nil } -func (s *segmentServer) GetBlockInfo( +func (s *server) GetBlockInfo( ctx context.Context, req *segpb.GetBlockInfoRequest, ) (*segpb.GetBlockInfoResponse, error) { observability.EntryMark(ctx) @@ -326,7 +340,7 @@ func (s *segmentServer) GetBlockInfo( } // AppendToBlock implements segpb.SegmentServerServer. -func (s *segmentServer) AppendToBlock(ctx context.Context, req *segpb.AppendToBlockRequest) (*emptypb.Empty, error) { +func (s *server) AppendToBlock(ctx context.Context, req *segpb.AppendToBlockRequest) (*emptypb.Empty, error) { observability.EntryMark(ctx) defer observability.LeaveMark(ctx) @@ -340,16 +354,13 @@ func (s *segmentServer) AppendToBlock(ctx context.Context, req *segpb.AppendToBl blockID := vanus.NewIDFromUint64(req.BlockId) - v, exist := s.blockWriters.Load(blockID) - if !exist { + var appender block.Appender + if v, ok := s.writers.Load(blockID); ok { + appender, _ = v.(block.Appender) + } else { return nil, errors.ErrResourceNotFound.WithMessage("the block doesn't exist") } - writer, _ := v.(block.SegmentBlockWriter) - if !writer.IsAppendable() { - return nil, errors.ErrSegmentNoEnoughCapacity - } - events := req.GetEvents().Events entries := make([]block.Entry, len(events)) for i, event := range events { @@ -362,21 +373,43 @@ func (s *segmentServer) AppendToBlock(ctx context.Context, req *segpb.AppendToBl } } - if err := writer.Append(ctx, entries...); err != nil { - if stderr.Is(err, block.ErrNoEnoughCapacity) { - // TODO optimize this to async from sync - if err = s.markSegmentIsFull(ctx, blockID); err != nil { - return nil, err - } - return nil, errors.ErrSegmentNoEnoughCapacity - } - return nil, errors.ErrInternal.WithMessage("write to storage failed").Wrap(err) + if err := appender.Append(ctx, entries...); err != nil { + return nil, s.processAppendError(ctx, blockID, err) } + return &emptypb.Empty{}, nil } +func (s *server) processAppendError(ctx context.Context, blockID vanus.ID, err error) error { + if stderr.As(err, &rpcerr.ErrorType{}) { + return err + } + + if stderr.Is(err, block.ErrNotEnoughSpace) { + log.Debug(ctx, "Append failed: not enough space. Mark segment is full.", map[string]interface{}{ + "blockID": blockID, + }) + // TODO: optimize this to async from sync + if err = s.markSegmentIsFull(ctx, blockID); err != nil { + return err + } + return errors.ErrSegmentNotEnoughSpace + } else if stderr.Is(err, block.ErrFull) { + log.Debug(ctx, "Append failed: block is full.", map[string]interface{}{ + "blockID": blockID, + }) + return errors.ErrSegmentFull + } + + log.Warning(ctx, "Append failed.", map[string]interface{}{ + "blockID": blockID, + log.KeyError: err, + }) + return errors.ErrInternal.WithMessage("write to storage failed").Wrap(err) +} + // ReadFromBlock implements segpb.SegmentServerServer. -func (s *segmentServer) ReadFromBlock( +func (s *server) ReadFromBlock( ctx context.Context, req *segpb.ReadFromBlockRequest, ) (*segpb.ReadFromBlockResponse, error) { observability.EntryMark(ctx) @@ -386,25 +419,15 @@ func (s *segmentServer) ReadFromBlock( } blockID := vanus.NewIDFromUint64(req.BlockId) - segV, exist := s.blocks.Load(blockID) - if !exist { + + var reader block.Reader + if v, ok := s.readers.Load(blockID); ok { + reader, _ = v.(block.Reader) + } else { return nil, errors.ErrResourceNotFound.WithMessage( "the segment doesn't exist on this server") } - segBlock, _ := segV.(block.SegmentBlock) - v, exist := s.blockReaders.Load(blockID) - var reader block.SegmentBlockReader - if !exist { - _reader, err := block.OpenFileSegmentBlock(ctx, segBlock.Path()) - if err != nil { - return nil, err - } - reader = _reader - s.blockReaders.Store(blockID, reader) - } else { - reader, _ = v.(block.SegmentBlockReader) - } entries, err := reader.Read(ctx, int(req.Offset), int(req.Number)) if err != nil { return nil, err @@ -425,7 +448,7 @@ func (s *segmentServer) ReadFromBlock( }, nil } -func (s *segmentServer) startHeartbeatTask(ctx context.Context) error { +func (s *server) startHeartbeatTask(ctx context.Context) error { if s.isDebugMode { return nil } @@ -434,18 +457,18 @@ func (s *segmentServer) startHeartbeatTask(ctx context.Context) error { return nil } -func (s *segmentServer) runHeartbeat(ctx context.Context) { +func (s *server) runHeartbeat(ctx context.Context) { ticker := time.NewTicker(time.Second) defer ticker.Stop() for { select { - case <-s.closeCh: + case <-s.closec: return case <-ticker.C: infos := make([]*metapb.SegmentHealthInfo, 0) s.blocks.Range(func(key, value interface{}) bool { - b, _ := value.(block.SegmentBlock) + b, _ := value.(block.Block) infos = append(infos, b.HealthInfo()) return true }) @@ -465,6 +488,7 @@ func (s *segmentServer) runHeartbeat(ctx context.Context) { }) } case info := <-s.leaderc: + // TODO(james.yin): move to other goroutine. req := &ctrlpb.ReportSegmentLeaderRequest{ LeaderId: info.leader.Uint64(), Term: info.term, @@ -480,7 +504,7 @@ func (s *segmentServer) runHeartbeat(ctx context.Context) { } } -func (s *segmentServer) leaderChanged(blockID, leaderID vanus.ID, term uint64) { +func (s *server) leaderChanged(blockID, leaderID vanus.ID, term uint64) { if blockID == leaderID { info := leaderInfo{ leader: leaderID, @@ -494,7 +518,7 @@ func (s *segmentServer) leaderChanged(blockID, leaderID vanus.ID, term uint64) { } } -func (s *segmentServer) start(ctx context.Context) error { +func (s *server) start(ctx context.Context) error { log.Info(ctx, "Start SegmentServer.", nil) if err := s.startHeartbeatTask(ctx); err != nil { return err @@ -502,7 +526,9 @@ func (s *segmentServer) start(ctx context.Context) error { return nil } -func (s *segmentServer) stop(ctx context.Context) error { +func (s *server) stop(ctx context.Context) error { + // TODO(james.yin): + s.cc.Close(ctx) wg := sync.WaitGroup{} var err error @@ -510,13 +536,13 @@ func (s *segmentServer) stop(ctx context.Context) error { wg.Add(1) go func() { s.waitAllAppendRequestCompleted(ctx) - s.blockWriters.Range(func(key, value interface{}) bool { - writer, _ := value.(block.SegmentBlockWriter) - if err2 := writer.CloseWrite(ctx); err2 != nil { - err = errutil.Chain(err, err2) - } - return true - }) + // s.blockWriters.Range(func(key, value interface{}) bool { + // writer, _ := value.(block.Writer) + // if err2 := writer.CloseWrite(ctx); err2 != nil { + // err = errutil.Chain(err, err2) + // } + // return true + // }) wg.Done() }() } @@ -525,13 +551,13 @@ func (s *segmentServer) stop(ctx context.Context) error { wg.Add(1) go func() { s.waitAllReadRequestCompleted(ctx) - s.blockReaders.Range(func(key, value interface{}) bool { - reader, _ := value.(block.SegmentBlockReader) - if err2 := reader.CloseRead(ctx); err2 != nil { - err = errutil.Chain(err, err2) - } - return true - }) + // s.blockReaders.Range(func(key, value interface{}) bool { + // reader, _ := value.(block.Reader) + // if err2 := reader.CloseRead(ctx); err2 != nil { + // err = errutil.Chain(err, err2) + // } + // return true + // }) wg.Done() }() } @@ -540,34 +566,37 @@ func (s *segmentServer) stop(ctx context.Context) error { return err } -func (s *segmentServer) markSegmentIsFull(ctx context.Context, segID vanus.ID) error { - bl, exist := s.blocks.Load(segID) - if !exist { +func (s *server) markSegmentIsFull(ctx context.Context, blockID vanus.ID) error { + var b block.Block + if v, ok := s.blocks.Load(blockID); ok { + b, _ = v.(block.Block) + } else { return fmt.Errorf("the SegmentBlock does not exist") } - writer, _ := bl.(block.SegmentBlockWriter) - if err := writer.CloseWrite(ctx); err != nil { - return err - } + // TODO(james.yin): close Appender + // writer, _ := bl.(block.Writer) + // if err := writer.CloseWrite(ctx); err != nil { + // return err + // } // report to controller immediately _, err := s.cc.reportSegmentBlockIsFull(ctx, &ctrlpb.SegmentHeartbeatRequest{ ServerId: s.id.Uint64(), VolumeId: s.volumeID.Uint64(), HealthInfo: []*metapb.SegmentHealthInfo{ - bl.(block.SegmentBlock).HealthInfo(), + b.HealthInfo(), }, ReportTime: util.FormatTime(time.Now()), }) return err } -func (s *segmentServer) waitAllAppendRequestCompleted(ctx context.Context) {} +func (s *server) waitAllAppendRequestCompleted(ctx context.Context) {} -func (s *segmentServer) waitAllReadRequestCompleted(ctx context.Context) {} +func (s *server) waitAllReadRequestCompleted(ctx context.Context) {} -func (s *segmentServer) registerSelf(ctx context.Context) error { +func (s *server) registerSelf(ctx context.Context) error { if strings.ToLower(os.Getenv(segmentServerDebugModeFlagENV)) == "true" { return s.registerSelfInDebug(ctx) } @@ -624,7 +653,7 @@ func (s *segmentServer) registerSelf(ctx context.Context) error { return nil } -func (s *segmentServer) registerReplicas(ctx context.Context, segmentpb *metapb.Segment) { +func (s *server) registerReplicas(ctx context.Context, segmentpb *metapb.Segment) { for blockID, blockpb := range segmentpb.Replicas { if blockpb.Endpoint == "" { if blockpb.VolumeID == s.volumeID.Uint64() { @@ -643,7 +672,7 @@ func (s *segmentServer) registerReplicas(ctx context.Context, segmentpb *metapb. } } -func (s *segmentServer) registerSelfInDebug(ctx context.Context) error { +func (s *server) registerSelfInDebug(ctx context.Context) error { log.Info(ctx, "the segment server debug mode enabled", nil) s.id = vanus.NewID() @@ -652,20 +681,20 @@ func (s *segmentServer) registerSelfInDebug(ctx context.Context) error { return nil } -func (s *segmentServer) makeReplica(ctx context.Context, b block.SegmentBlock) *block.Replica { - raftLog := raftlog.NewLog(b.SegmentBlockID(), s.wal, s.metaStore, s.offsetStore) - return s.makeReplicaWithRaftLog(ctx, b, raftLog) +func (s *server) makeReplica(ctx context.Context, blockID vanus.ID, appender block.TwoPCAppender) *replica.Replica { + raftLog := raftlog.NewLog(blockID, s.wal, s.metaStore, s.offsetStore) + return s.makeReplicaWithRaftLog(ctx, blockID, appender, raftLog) } -func (s *segmentServer) makeReplicaWithRaftLog( - ctx context.Context, b block.SegmentBlock, raftLog *raftlog.Log, -) *block.Replica { - replica := block.NewReplica(ctx, b, raftLog, s.host, s.leaderChanged) - s.host.Register(b.SegmentBlockID().Uint64(), replica) - return replica +func (s *server) makeReplicaWithRaftLog( + ctx context.Context, blockID vanus.ID, appender block.TwoPCAppender, raftLog *raftlog.Log, +) *replica.Replica { + r := replica.New(ctx, blockID, appender, raftLog, s.host, s.leaderChanged) + s.host.Register(blockID.Uint64(), r) + return r } -func (s *segmentServer) checkState() error { +func (s *server) checkState() error { if s.state != primitive.ServerStateRunning { return errors.ErrServiceState.WithMessage(fmt.Sprintf( "the server isn't ready to work, current state:%s", s.state)) diff --git a/internal/store/segment/segment_test.go b/internal/store/segment/server_test.go similarity index 100% rename from internal/store/segment/segment_test.go rename to internal/store/segment/server_test.go diff --git a/internal/store/wal/compaction_test.go b/internal/store/wal/compaction_test.go new file mode 100644 index 000000000..536c71fc6 --- /dev/null +++ b/internal/store/wal/compaction_test.go @@ -0,0 +1,15 @@ +// Copyright 2022 Linkall Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wal diff --git a/internal/store/wal/record/record.go b/internal/store/wal/record/record.go index 0aedca448..b86bfa5e3 100644 --- a/internal/store/wal/record/record.go +++ b/internal/store/wal/record/record.go @@ -71,7 +71,7 @@ func (r *Record) MarshalTo(data []byte) (int, error) { binary.BigEndian.PutUint16(data[4:6], r.Length) data[6] = byte(r.Type) ds := len(r.Data) - if ds > 0 { + if ds != 0 { copy(data[7:7+ds], r.Data) } // calculate CRC diff --git a/internal/store/wal/record/record_test.go b/internal/store/wal/record/record_test.go index a6ce98f05..625239962 100644 --- a/internal/store/wal/record/record_test.go +++ b/internal/store/wal/record/record_test.go @@ -13,3 +13,90 @@ // limitations under the License. package record + +import ( + // standard libraries. + "testing" + + // third-party libraries. + . "github.com/smartystreets/goconvey/convey" +) + +var ( + rawData = []byte{ + 0x01, 0x02, 0x03, + } + encodedData = []byte{ + 0x04, 0x76, 0xb0, 0x1b, 0x00, 0x03, 0x01, 0x01, 0x02, 0x03, + } + encodedData2 = []byte{ + 0x00, 0x00, 0x00, 0x01, 0x00, 0x03, 0x01, 0x01, 0x02, 0x03, + } +) + +func TestRecord_Size(t *testing.T) { + Convey("size", t, func() { + r := Record{ + Data: rawData, + } + So(r.Size(), ShouldEqual, 4+2+1+len(rawData)) + }) +} + +func TestRecord_MashalTo(t *testing.T) { + Convey("mashal record to buffer", t, func() { + r := Record{ + Length: uint16(len(rawData)), + Type: Full, + Data: rawData, + } + data := make([]byte, r.Size()) + n, err := r.MarshalTo(data) + So(err, ShouldBeNil) + So(n, ShouldEqual, r.Size()) + So(data, ShouldResemble, encodedData) + }) + Convey("mashal record to buffer that don't have enough space", t, func() { + r := Record{ + Length: uint16(len(rawData)), + Type: Full, + Data: rawData, + } + data := make([]byte, r.Size()-1) + _, err := r.MarshalTo(data) + So(err, ShouldNotBeNil) + }) +} + +func TestRecord_Mashal(t *testing.T) { + Convey("mashal record", t, func() { + r := Record{ + Length: uint16(len(rawData)), + Type: Full, + Data: rawData, + } + data := r.Marshal() + So(data, ShouldResemble, encodedData) + }) + Convey("mashal record with CRC", t, func() { + r := Record{ + CRC: 0x00000001, + Length: uint16(len(rawData)), + Type: Full, + Data: rawData, + } + data := r.Marshal() + So(data, ShouldResemble, encodedData2) + }) +} + +func TestRecord_Unmashal(t *testing.T) { + Convey("unmashal record", t, func() { + r, err := Unmashal(encodedData) + So(err, ShouldBeNil) + So(r.CRC, ShouldEqual, 0x0476b01b) + So(r.Length, ShouldEqual, 3) + So(r.Type, ShouldEqual, Full) + So(r.Data, ShouldResemble, rawData) + }) +} diff --git a/internal/store/wal/stream.go b/internal/store/wal/stream.go index af02d8b81..0e34e4b84 100644 --- a/internal/store/wal/stream.go +++ b/internal/store/wal/stream.go @@ -121,6 +121,7 @@ func (s *logStream) Visit(visitor WalkFunc, compacted int64) (int64, error) { if r.Type == record.Zero { // TODO(james.yin): has empty log file(s). if i != len(s.stream)-1 { + panic("has empty log file") } // TODO(james.yin): Has incomplete entry, truncate it. diff --git a/internal/store/wal/wal.go b/internal/store/wal/wal.go index 3f774c2c2..0cdd02a94 100644 --- a/internal/store/wal/wal.go +++ b/internal/store/wal/wal.go @@ -204,7 +204,8 @@ func (w *WAL) runAppend() { select { case er := <-w.appendc: full, goahead := w.doAppend(er.entries, er.callback) - if full || !er.batching { + switch { + case full || !er.batching: if !full { w.flushWritableBlock() } @@ -216,7 +217,7 @@ func (w *WAL) runAppend() { } waiting = false } - } else if goahead { + case goahead: // reset timer if waiting && !timer.Stop() { // drain channel @@ -224,7 +225,7 @@ func (w *WAL) runAppend() { } timer.Reset(period) waiting = true - } else if !waiting { + case !waiting: // start timer timer.Reset(period) waiting = true