Skip to content
This repository was archived by the owner on Oct 18, 2021. It is now read-only.

Add appender support #18

Merged
merged 3 commits into from
Apr 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 143 additions & 7 deletions generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions service.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ name = "azblob"
required = ["credential", "endpoint"]
optional = ["default_service_pairs", "http_client_options"]

[namespace.storage]
implement = ["appender"]

[namespace.storage.new]
required = ["name"]
optional = ["default_storage_pairs", "pair_policy", "work_dir"]
Expand All @@ -19,6 +22,12 @@ optional = ["offset", "io_callback", "size", "encryption_key", "encryption_scope
[namespace.storage.op.write]
optional = ["content_md5", "content_type", "io_callback", "access_tier", "encryption_key", "encryption_scope"]

[namespace.storage.op.create_append]
optional = ["encryption_key", "encryption_scope"]

[namespace.storage.op.write_append]
optional = ["encryption_key", "encryption_scope"]

[namespace.storage.op.stat]
optional = ["encryption_key", "encryption_scope"]

Expand Down
67 changes: 67 additions & 0 deletions storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,30 @@ func (s *Storage) create(path string, opt pairStorageCreate) (o *Object) {
return o
}

func (s *Storage) createAppend(ctx context.Context, path string, opt pairStorageCreateAppend) (o *Object, err error) {
rp := s.getAbsPath(path)

var cpk azblob.ClientProvidedKeyOptions
if opt.HasEncryptionKey {
cpk, err = calculateEncryptionHeaders(opt.EncryptionKey, opt.EncryptionScope)
if err != nil {
return
}
}
_, err = s.bucket.NewAppendBlobURL(rp).Create(ctx, azblob.BlobHTTPHeaders{}, nil,
azblob.BlobAccessConditions{}, nil, cpk)
if err != nil {
return
}

o = s.newObject(true)
o.Mode = ModeRead | ModeAppend
o.ID = rp
o.Path = path
o.SetAppendOffset(0)
return o, nil
}

func (s *Storage) delete(ctx context.Context, path string, opt pairStorageDelete) (err error) {
rp := s.getAbsPath(path)

Expand Down Expand Up @@ -257,3 +281,46 @@ func (s *Storage) write(ctx context.Context, path string, r io.Reader, size int6
}
return size, nil
}

func (s *Storage) writeAppend(ctx context.Context, o *Object, r io.Reader, size int64, opt pairStorageWriteAppend) (n int64, err error) {
rp := o.GetID()

offset, ok := o.GetAppendOffset()
if !ok {
err = fmt.Errorf("append offset is not set")
return
}

var cpk azblob.ClientProvidedKeyOptions
if opt.HasEncryptionKey {
cpk, err = calculateEncryptionHeaders(opt.EncryptionKey, opt.EncryptionScope)
if err != nil {
return
}
}

var accessConditions azblob.AppendBlobAccessConditions
if 0 == offset {
accessConditions.AppendPositionAccessConditions.IfAppendPositionEqual = -1
} else {
accessConditions.AppendPositionAccessConditions.IfAppendPositionEqual = offset
}

appendResp, err := s.bucket.NewAppendBlobURL(rp).AppendBlock(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we need to set x-ms-blob-condition-appendpos to prevent the object has been appended via other process.

Optional conditional header, used only for the Append Block operation. A number indicating the byte offset to compare. Append Block will succeed only if the append position is equal to this number. If it is not, the request will fail with the AppendPositionConditionNotMet error (HTTP status code 412 – Precondition Failed).

https://docs.microsoft.com/en-us/rest/api/storageservices/append-block

ctx, iowrap.SizedReadSeekCloser(r, size),
accessConditions, nil, cpk)
if err != nil {
return
}

// BlobAppendOffset() returns the offset at which the block was committed, in bytes, but seems not the next append position.
// ref: https://github.com/Azure/azure-storage-blob-go/blob/master/azblob/zt_url_append_blob_test.go
offset, err = strconv.ParseInt(appendResp.BlobAppendOffset(), 10, 64)
if err != nil {
return
}
offset += size
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://docs.microsoft.com/en-us/rest/api/storageservices/append-block

Maybe we don't need to update offset again here?

Copy link
Contributor Author

@JinnyYi JinnyYi Apr 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

func (s *aztestsSuite) TestAppendBlock(c *chk.C) {
	bsu := getBSU()
	container, _ := createNewContainer(c, bsu)
	defer delContainer(c, container)

	blob := container.NewAppendBlobURL(generateBlobName())

	resp, err := blob.Create(context.Background(), BlobHTTPHeaders{}, nil, BlobAccessConditions{}, nil, ClientProvidedKeyOptions{})
	c.Assert(err, chk.IsNil)
	c.Assert(resp.StatusCode(), chk.Equals, 201)

	appendResp, err := blob.AppendBlock(context.Background(), getReaderToRandomBytes(1024), AppendBlobAccessConditions{}, nil, ClientProvidedKeyOptions{})
	c.Assert(err, chk.IsNil)
	c.Assert(appendResp.Response().StatusCode, chk.Equals, 201)
	c.Assert(appendResp.BlobAppendOffset(), chk.Equals, "0")
	c.Assert(appendResp.BlobCommittedBlockCount(), chk.Equals, int32(1))
	c.Assert(appendResp.ETag(), chk.Not(chk.Equals), ETagNone)
	c.Assert(appendResp.LastModified().IsZero(), chk.Equals, false)
	c.Assert(appendResp.ContentMD5(), chk.Not(chk.Equals), "")
	c.Assert(appendResp.RequestID(), chk.Not(chk.Equals), "")
	c.Assert(appendResp.Version(), chk.Not(chk.Equals), "")
	c.Assert(appendResp.Date().IsZero(), chk.Equals, false)

	appendResp, err = blob.AppendBlock(context.Background(), getReaderToRandomBytes(1024), AppendBlobAccessConditions{}, nil, ClientProvidedKeyOptions{})
	c.Assert(err, chk.IsNil)
	c.Assert(appendResp.BlobAppendOffset(), chk.Equals, "1024")
	c.Assert(appendResp.BlobCommittedBlockCount(), chk.Equals, int32(2))
}

From the test file in https://github.com/Azure/azure-storage-blob-go/blob/master/azblob/zt_url_append_blob_test.go, we can see that the appendResp.BlobAppendOffset() is not the next append position.

o.SetAppendOffset(offset)

return offset, nil
}
7 changes: 7 additions & 0 deletions tests/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,10 @@ func TestStorage(t *testing.T) {
}
tests.TestStorager(t, setupTest(t))
}

func TestAppend(t *testing.T) {
if os.Getenv("STORAGE_AZBLOB_INTEGRATION_TEST") != "on" {
t.Skipf("STORAGE_AZBLOB_INTEGRATION_TEST is not 'on', skipped")
}
tests.TestAppender(t, setupTest(t))
}
1 change: 1 addition & 0 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type Storage struct {
pairPolicy typ.PairPolicy

typ.UnimplementedStorager
typ.UnimplementedAppender
}

// String implements Storager.String
Expand Down