Skip to content

Commit

Permalink
feat: return events with log offset
Browse files Browse the repository at this point in the history
  • Loading branch information
ZYunfeii committed Jul 25, 2022
1 parent aefea4d commit 0058b88
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 9 deletions.
14 changes: 8 additions & 6 deletions client/internal/vanus/eventlog/log_segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,23 @@
package eventlog

import (
// standard libraries
// standard libraries.
"context"
"encoding/binary"
"math"
"sync"

// third-party libraries
// third-party libraries.
ce "github.com/cloudevents/sdk-go/v2"
"go.uber.org/atomic"

// this project
// first-party libraries.
segpb "github.com/linkall-labs/vanus/proto/pkg/segment"

// this project.
vdr "github.com/linkall-labs/vanus/client/internal/vanus/discovery/record"
"github.com/linkall-labs/vanus/client/pkg/errors"
"github.com/linkall-labs/vanus/client/pkg/eventlog"
"github.com/linkall-labs/vanus/internal/store/segment"
)

func newLogSegment(r *vdr.LogSegment, towrite bool) (*logSegment, error) {
Expand Down Expand Up @@ -173,7 +175,7 @@ func (s *logSegment) Read(ctx context.Context, from int64, size int16) ([]*ce.Ev
}

for _, e := range events {
v, ok := e.Extensions()[segment.XVanusBlockOffset]
v, ok := e.Extensions()[segpb.XVanusBlockOffset]
if !ok {
continue
}
Expand All @@ -185,7 +187,7 @@ func (s *logSegment) Read(ctx context.Context, from int64, size int16) ([]*ce.Ev
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, uint64(offset))
e.SetExtension(eventlog.XVanusLogOffset, buf)
e.SetExtension(segment.XVanusBlockOffset, nil)
e.SetExtension(segpb.XVanusBlockOffset, nil)
}

return events, err
Expand Down
3 changes: 1 addition & 2 deletions internal/store/segment/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ const (
debugModeENV = "SEGMENT_SERVER_DEBUG_MODE"
defaultLeaderInfoBufferSize = 256
defaultForceStopTimeout = 30 * time.Second
XVanusBlockOffset = "xvanusblockoffset"
)

type Server interface {
Expand Down Expand Up @@ -733,7 +732,7 @@ func (s *server) ReadFromBlock(ctx context.Context, id vanus.ID, off int, num in
if event.Attributes == nil {
event.Attributes = make(map[string]*cepb.CloudEventAttributeValue, 1)
}
event.Attributes[XVanusBlockOffset] = &cepb.CloudEventAttributeValue{
event.Attributes[segpb.XVanusBlockOffset] = &cepb.CloudEventAttributeValue{
Attr: &cepb.CloudEventAttributeValue_CeInteger{
CeInteger: int32(entry.Index),
},
Expand Down
3 changes: 2 additions & 1 deletion internal/store/segment/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/linkall-labs/vanus/internal/store/block/replica"
"github.com/linkall-labs/vanus/internal/util"
"github.com/linkall-labs/vanus/proto/pkg/errors"
segpb "github.com/linkall-labs/vanus/proto/pkg/segment"

"github.com/linkall-labs/vanus/internal/store"
. "github.com/smartystreets/goconvey/convey"
Expand Down Expand Up @@ -178,7 +179,7 @@ func TestServer_ReadFromBlock(t *testing.T) {
pbEvents, err := srv.ReadFromBlock(ctx, blockID, 0, 3)
So(err, ShouldBeNil)
for i, pbEvent := range pbEvents {
So(pbEvent.Attributes["xvanusblockoffset"].Attr.(*cepb.CloudEventAttributeValue_CeInteger).CeInteger, ShouldEqual, i)
So(pbEvent.Attributes[segpb.XVanusBlockOffset].Attr.(*cepb.CloudEventAttributeValue_CeInteger).CeInteger, ShouldEqual, i)
}
})
}
17 changes: 17 additions & 0 deletions proto/pkg/segment/attr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright 2022 Linkall Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package segment

const XVanusBlockOffset = "xvanusblockoffset"

0 comments on commit 0058b88

Please sign in to comment.