Skip to content

Commit e29dac4

Browse files
committed
feat: add S3 storage backend
1 parent 91874fb commit e29dac4

File tree

2 files changed

+203
-143
lines changed

2 files changed

+203
-143
lines changed

storage/azblob.go

Lines changed: 142 additions & 143 deletions
Original file line numberDiff line numberDiff line change
@@ -1,143 +1,142 @@
1-
package storage
2-
3-
import (
4-
"context"
5-
"fmt"
6-
"io"
7-
"net/url"
8-
"strings"
9-
10-
"github.com/Azure/azure-storage-blob-go/azblob"
11-
)
12-
13-
type AzBlobStorage struct {
14-
containerURL azblob.ContainerURL
15-
accountName string // Store for potential use/logging
16-
}
17-
18-
// NewAzBlobStorage creates a new Azure Blob Storage client.
19-
func NewAzBlobStorage(accountName, accountKey, containerName string) (*AzBlobStorage, error) {
20-
if accountName == "" || accountKey == "" || containerName == "" {
21-
return nil, fmt.Errorf("azure storage account name, key, and container name cannot be empty")
22-
}
23-
credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
24-
if err != nil {
25-
return nil, fmt.Errorf("failed to create azure shared key credential: %w", err)
26-
}
27-
28-
// Use default pipeline options
29-
p := azblob.NewPipeline(credential, azblob.PipelineOptions{})
30-
31-
// Construct the container URL
32-
// Ensure the container name is URL-encoded if necessary, though usually not needed for valid names.
33-
u, err := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net/%s", accountName, containerName))
34-
if err != nil {
35-
return nil, fmt.Errorf("failed to parse azure container URL: %w", err)
36-
}
37-
containerURL := azblob.NewContainerURL(*u, p)
38-
39-
// Optional: Verify container exists? Could add a check here.
40-
41-
return &AzBlobStorage{
42-
containerURL: containerURL,
43-
accountName: accountName,
44-
}, nil
45-
}
46-
47-
// Upload compresses and uploads data to the specified blob name (filename + compression extension).
48-
func (a *AzBlobStorage) Upload(filename string, reader io.Reader, format string, level int) error {
49-
ctx := context.Background()
50-
compressedReader, ext := compressStream(reader, format, level)
51-
blobName := filename + ext
52-
blobURL := a.containerURL.NewBlockBlobURL(blobName)
53-
54-
// Use UploadStreamToBlockBlob for efficient streaming upload
55-
_, err := azblob.UploadStreamToBlockBlob(ctx, compressedReader, blobURL, azblob.UploadStreamToBlockBlobOptions{
56-
// Can configure parallelism, buffer size, metadata, tags etc. here if needed
57-
})
58-
if err != nil {
59-
return fmt.Errorf("failed to upload %s to azure container %s: %w", blobName, a.containerURL.String(), err)
60-
}
61-
return nil
62-
}
63-
64-
// Download retrieves a blob from Azure Blob Storage and returns a reader for its decompressed content.
65-
// It tries common compression extensions (.gz, .zstd) if the base filename doesn't exist.
66-
func (a *AzBlobStorage) Download(filename string) (io.ReadCloser, error) {
67-
ctx := context.Background()
68-
extensionsToTry := []string{".gz", ".zstd", ""} // Try compressed first, then raw
69-
70-
var lastErr error
71-
for _, ext := range extensionsToTry {
72-
blobName := filename + ext
73-
blobURL := a.containerURL.NewBlockBlobURL(blobName)
74-
75-
// Attempt to download the blob properties first to check existence with less overhead
76-
// _, err := blobURL.GetProperties(ctx, azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{})
77-
// if err == nil { // Blob exists
78-
79-
// Or directly attempt download
80-
response, err := blobURL.Download(ctx, 0, azblob.CountToEnd, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{})
81-
82-
if err == nil {
83-
// Success! Return the response body wrapped in our decompressor
84-
// The response body needs to be closed by the caller.
85-
bodyStream := response.Body(azblob.RetryReaderOptions{MaxRetryRequests: 3}) // Use retry reader
86-
decompressedStream := decompressStream(bodyStream, blobName) // Handles decompression based on blobName extension
87-
return decompressedStream, nil
88-
}
89-
90-
// Handle error
91-
lastErr = fmt.Errorf("failed attempt to download %s from azure container %s: %w", blobName, a.containerURL.String(), err)
92-
93-
// Check if the error is a 404 Not Found
94-
if stgErr, ok := err.(azblob.StorageError); ok {
95-
if stgErr.ServiceCode() == azblob.ServiceCodeBlobNotFound {
96-
// Blob not found, continue to try the next extension
97-
continue
98-
}
99-
}
100-
// If it's not a BlobNotFound error, return it immediately
101-
return nil, lastErr
102-
// }
103-
// else { // GetProperties failed
104-
// lastErr = fmt.Errorf("failed attempt to get properties for %s from azure container %s: %w", blobName, a.containerURL.String(), err)
105-
// if stgErr, ok := err.(azblob.StorageError); ok {
106-
// if stgErr.ServiceCode() == azblob.ServiceCodeBlobNotFound {
107-
// continue // Try next extension
108-
// }
109-
// }
110-
// return nil, lastErr // Return other errors
111-
// }
112-
}
113-
114-
// If we tried all extensions and none worked, return the last error encountered
115-
return nil, fmt.Errorf("file %s not found in azure container %s with extensions %v: %w", filename, a.containerURL.String(), extensionsToTry, lastErr)
116-
}
117-
118-
// List returns a list of blob names in the Azure container matching the prefix.
119-
func (a *AzBlobStorage) List(prefix string) ([]string, error) {
120-
ctx := context.Background()
121-
var blobNames []string
122-
123-
marker := azblob.Marker{} // Start with no marker
124-
for marker.NotDone() {
125-
// List blobs segment by segment
126-
listBlob, err := a.containerURL.ListBlobsFlatSegment(ctx, marker, azblob.ListBlobsSegmentOptions{
127-
Prefix: prefix,
128-
// Details: azblob.BlobListingDetails{ /* include metadata, tags etc. if needed */ },
129-
})
130-
if err != nil {
131-
return nil, fmt.Errorf("failed to list blobs in azure container %s with prefix %s: %w", a.containerURL.String(), prefix, err)
132-
}
133-
134-
for _, blobInfo := range listBlob.Segment.BlobItems {
135-
blobNames = append(blobNames, blobInfo.Name)
136-
}
137-
138-
// Advance the marker for the next segment
139-
marker = listBlob.NextMarker
140-
}
141-
142-
return blobNames, nil
143-
}
1+
package storage
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"io"
7+
"net/url"
8+
9+
"github.com/Azure/azure-storage-blob-go/azblob"
10+
)
11+
12+
type AzBlobStorage struct {
13+
containerURL azblob.ContainerURL
14+
accountName string // Store for potential use/logging
15+
}
16+
17+
// NewAzBlobStorage creates a new Azure Blob Storage client.
18+
func NewAzBlobStorage(accountName, accountKey, containerName string) (*AzBlobStorage, error) {
19+
if accountName == "" || accountKey == "" || containerName == "" {
20+
return nil, fmt.Errorf("azure storage account name, key, and container name cannot be empty")
21+
}
22+
credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
23+
if err != nil {
24+
return nil, fmt.Errorf("failed to create azure shared key credential: %w", err)
25+
}
26+
27+
// Use default pipeline options
28+
p := azblob.NewPipeline(credential, azblob.PipelineOptions{})
29+
30+
// Construct the container URL
31+
// Ensure the container name is URL-encoded if necessary, though usually not needed for valid names.
32+
u, err := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net/%s", accountName, containerName))
33+
if err != nil {
34+
return nil, fmt.Errorf("failed to parse azure container URL: %w", err)
35+
}
36+
containerURL := azblob.NewContainerURL(*u, p)
37+
38+
// Optional: Verify container exists? Could add a check here.
39+
40+
return &AzBlobStorage{
41+
containerURL: containerURL,
42+
accountName: accountName,
43+
}, nil
44+
}
45+
46+
// Upload compresses and uploads data to the specified blob name (filename + compression extension).
47+
func (a *AzBlobStorage) Upload(filename string, reader io.Reader, format string, level int) error {
48+
ctx := context.Background()
49+
compressedReader, ext := compressStream(reader, format, level)
50+
blobName := filename + ext
51+
blobURL := a.containerURL.NewBlockBlobURL(blobName)
52+
53+
// Use UploadStreamToBlockBlob for efficient streaming upload
54+
_, err := azblob.UploadStreamToBlockBlob(ctx, compressedReader, blobURL, azblob.UploadStreamToBlockBlobOptions{
55+
// Can configure parallelism, buffer size, metadata, tags etc. here if needed
56+
})
57+
if err != nil {
58+
return fmt.Errorf("failed to upload %s to azure container %s: %w", blobName, a.containerURL.String(), err)
59+
}
60+
return nil
61+
}
62+
63+
// Download retrieves a blob from Azure Blob Storage and returns a reader for its decompressed content.
64+
// It tries common compression extensions (.gz, .zstd) if the base filename doesn't exist.
65+
func (a *AzBlobStorage) Download(filename string) (io.ReadCloser, error) {
66+
ctx := context.Background()
67+
extensionsToTry := []string{".gz", ".zstd", ""} // Try compressed first, then raw
68+
69+
var lastErr error
70+
for _, ext := range extensionsToTry {
71+
blobName := filename + ext
72+
blobURL := a.containerURL.NewBlockBlobURL(blobName)
73+
74+
// Attempt to download the blob properties first to check existence with less overhead
75+
// _, err := blobURL.GetProperties(ctx, azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{})
76+
// if err == nil { // Blob exists
77+
78+
// Or directly attempt download
79+
response, err := blobURL.Download(ctx, 0, azblob.CountToEnd, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{})
80+
81+
if err == nil {
82+
// Success! Return the response body wrapped in our decompressor
83+
// The response body needs to be closed by the caller.
84+
bodyStream := response.Body(azblob.RetryReaderOptions{MaxRetryRequests: 3}) // Use retry reader
85+
decompressedStream := decompressStream(bodyStream, blobName) // Handles decompression based on blobName extension
86+
return decompressedStream, nil
87+
}
88+
89+
// Handle error
90+
lastErr = fmt.Errorf("failed attempt to download %s from azure container %s: %w", blobName, a.containerURL.String(), err)
91+
92+
// Check if the error is a 404 Not Found
93+
if stgErr, ok := err.(azblob.StorageError); ok {
94+
if stgErr.ServiceCode() == azblob.ServiceCodeBlobNotFound {
95+
// Blob not found, continue to try the next extension
96+
continue
97+
}
98+
}
99+
// If it's not a BlobNotFound error, return it immediately
100+
return nil, lastErr
101+
// }
102+
// else { // GetProperties failed
103+
// lastErr = fmt.Errorf("failed attempt to get properties for %s from azure container %s: %w", blobName, a.containerURL.String(), err)
104+
// if stgErr, ok := err.(azblob.StorageError); ok {
105+
// if stgErr.ServiceCode() == azblob.ServiceCodeBlobNotFound {
106+
// continue // Try next extension
107+
// }
108+
// }
109+
// return nil, lastErr // Return other errors
110+
// }
111+
}
112+
113+
// If we tried all extensions and none worked, return the last error encountered
114+
return nil, fmt.Errorf("file %s not found in azure container %s with extensions %v: %w", filename, a.containerURL.String(), extensionsToTry, lastErr)
115+
}
116+
117+
// List returns a list of blob names in the Azure container matching the prefix.
118+
func (a *AzBlobStorage) List(prefix string) ([]string, error) {
119+
ctx := context.Background()
120+
var blobNames []string
121+
122+
marker := azblob.Marker{} // Start with no marker
123+
for marker.NotDone() {
124+
// List blobs segment by segment
125+
listBlob, err := a.containerURL.ListBlobsFlatSegment(ctx, marker, azblob.ListBlobsSegmentOptions{
126+
Prefix: prefix,
127+
// Details: azblob.BlobListingDetails{ /* include metadata, tags etc. if needed */ },
128+
})
129+
if err != nil {
130+
return nil, fmt.Errorf("failed to list blobs in azure container %s with prefix %s: %w", a.containerURL.String(), prefix, err)
131+
}
132+
133+
for _, blobInfo := range listBlob.Segment.BlobItems {
134+
blobNames = append(blobNames, blobInfo.Name)
135+
}
136+
137+
// Advance the marker for the next segment
138+
marker = listBlob.NextMarker
139+
}
140+
141+
return blobNames, nil
142+
}

storage/s3.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package storage
2+
3+
import (
4+
"context"
5+
"io"
6+
"os"
7+
"strings"
8+
9+
"github.com/aws/aws-sdk-go-v2/aws"
10+
"github.com/aws/aws-sdk-go-v2/config"
11+
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
12+
"github.com/aws/aws-sdk-go-v2/service/s3"
13+
)
14+
15+
type S3Storage struct {
16+
bucket string
17+
client *s3.Client
18+
uploader *manager.Uploader
19+
downloader *manager.Downloader
20+
}
21+
22+
func NewS3Storage(bucket, region string) (*S3Storage, error) {
23+
cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion(region))
24+
if err != nil {
25+
return nil, err
26+
}
27+
28+
client := s3.NewFromConfig(cfg)
29+
30+
return &S3Storage{
31+
bucket: bucket,
32+
client: client,
33+
uploader: manager.NewUploader(client),
34+
downloader: manager.NewDownloader(client),
35+
}, nil
36+
}
37+
38+
func (s *S3Storage) Upload(filename string, reader io.Reader, format string, level int) error {
39+
compressedReader, ext := compressStream(reader, format, level)
40+
_, err := s.uploader.Upload(context.TODO(), &s3.PutObjectInput{
41+
Bucket: aws.String(s.bucket),
42+
Key: aws.String(filename + ext),
43+
Body: compressedReader,
44+
})
45+
return err
46+
}
47+
48+
func (s *S3Storage) Download(filename string) (io.ReadCloser, error) {
49+
writer, err := os.CreateTemp(filename, strings.ReplaceAll(filename, "/", "_"))
50+
if err != nil {
51+
return nil, err
52+
}
53+
_, err = s.downloader.Download(context.TODO(), writer, &s3.GetObjectInput{
54+
Bucket: aws.String(s.bucket),
55+
Key: aws.String(filename),
56+
})
57+
if err != nil {
58+
return nil, err
59+
}
60+
return io.NopCloser(decompressStream(writer, filename)), nil
61+
}

0 commit comments

Comments
 (0)