Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

api, chunk: progress bar support #1649

Merged
merged 12 commits into from
Aug 26, 2019
6 changes: 3 additions & 3 deletions api/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (c *Client) UploadRaw(r io.Reader, size int64, toEncrypt bool, toPin bool)
return "", err
}
req.ContentLength = size
req.Header.Set(swarmhttp.SwarmTagHeaderName, fmt.Sprintf("raw_upload_%d", time.Now().Unix()))
req.Header.Set(swarmhttp.TagHeaderName, fmt.Sprintf("raw_upload_%d", time.Now().Unix()))

// Set the pinning header if the file needs to be pinned
if toPin {
Expand Down Expand Up @@ -544,7 +544,7 @@ func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, t
}
log.Trace("setting upload tag", "tag", tag)

req.Header.Set(swarmhttp.SwarmTagHeaderName, tag)
req.Header.Set(swarmhttp.TagHeaderName, tag)

// Set the pinning header if the file is to be pinned
if toPin {
Expand Down Expand Up @@ -616,7 +616,7 @@ func (c *Client) MultipartUpload(hash string, uploader Uploader, toPin bool) (st

mw := multipart.NewWriter(reqW)
req.Header.Set("Content-Type", fmt.Sprintf("multipart/form-data; boundary=%q", mw.Boundary()))
req.Header.Set(swarmhttp.SwarmTagHeaderName, fmt.Sprintf("multipart_upload_%d", time.Now().Unix()))
req.Header.Set(swarmhttp.TagHeaderName, fmt.Sprintf("multipart_upload_%d", time.Now().Unix()))
if toPin {
req.Header.Set(swarmhttp.PinHeaderName, "true")
}
Expand Down
4 changes: 2 additions & 2 deletions api/http/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func InitLoggingResponseWriter(h http.Handler) http.Handler {
}

// InitUploadTag creates a new tag for an upload to the local HTTP proxy
// if a tag is not named using the SwarmTagHeaderName, a fallback name will be used
// if a tag is not named using the TagHeaderName, a fallback name will be used
// when the Content-Length header is set, an ETA on chunking will be available since the
// number of chunks to be split is known in advance (not including enclosing manifest chunks)
// the tag can later be accessed using the appropriate identifier in the request context
Expand All @@ -99,7 +99,7 @@ func InitUploadTag(h http.Handler, tags *chunk.Tags) http.Handler {
err error
estimatedTotal int64 = 0
contentType = r.Header.Get("Content-Type")
headerTag = r.Header.Get(SwarmTagHeaderName)
headerTag = r.Header.Get(TagHeaderName)
)
if headerTag != "" {
tagName = headerTag
Expand Down
76 changes: 73 additions & 3 deletions api/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,14 @@ var (
getFileFail = metrics.NewRegisteredCounter("api.http.get.file.fail", nil)
getListCount = metrics.NewRegisteredCounter("api.http.get.list.count", nil)
getListFail = metrics.NewRegisteredCounter("api.http.get.list.fail", nil)
getTagCount = metrics.NewRegisteredCounter("api.http.get.tag.count", nil)
getTagNotFound = metrics.NewRegisteredCounter("api.http.get.tag.notfound", nil)
getTagFail = metrics.NewRegisteredCounter("api.http.get.tag.fail", nil)
)

const (
SwarmTagHeaderName = "x-swarm-tag" // Presence of this in header indicates the tag
PinHeaderName = "x-swarm-pin" // Presence of this in header indicates pinning required
TagHeaderName = "x-swarm-tag"
zelig marked this conversation as resolved.
Show resolved Hide resolved
PinHeaderName = "x-swarm-pin" // Presence of this in header indicates pinning required
)

type methodHandler map[string]http.Handler
Expand Down Expand Up @@ -162,7 +165,12 @@ func NewServer(api *api.API, pinAPI *pin.API, corsString string) *Server {
defaultMiddlewares...,
),
})

mux.Handle("/bzz-tag:/", methodHandler{
"GET": Adapt(
http.HandlerFunc(server.HandleGetTag),
defaultMiddlewares...,
),
})
mux.Handle("/", methodHandler{
"GET": Adapt(
http.HandlerFunc(server.HandleRootPaths),
Expand Down Expand Up @@ -304,6 +312,7 @@ func (s *Server) HandlePostRaw(w http.ResponseWriter, r *http.Request) {
}

w.Header().Set("Content-Type", "text/plain")
w.Header().Set(TagHeaderName, fmt.Sprint(tagUid))
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, addr)
}
Expand Down Expand Up @@ -396,6 +405,7 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) {
log.Debug("stored content", "ruid", ruid, "key", newAddr)

w.Header().Set("Content-Type", "text/plain")
w.Header().Set(TagHeaderName, fmt.Sprint(tagUid))
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, newAddr)
}
Expand Down Expand Up @@ -897,6 +907,66 @@ func (s *Server) HandleGetFile(w http.ResponseWriter, r *http.Request) {
http.ServeContent(w, r, fileName, time.Now(), newBufferedReadSeeker(reader, getFileBufferSize))
}

// HandleGetTag responds to the following request
// - bzz-tag:/<manifest> and
// - bzz-tag:/?tagId=<tagId>
// Clients should use root hash or the tagID to get the tag counters
func (s *Server) HandleGetTag(w http.ResponseWriter, r *http.Request) {
getTagCount.Inc(1)
uri := GetURI(r.Context())
if uri == nil {
getTagFail.Inc(1)
respondError(w, r, "Error decoding uri", http.StatusBadRequest)
return
}
fileAddr := uri.Address()
acud marked this conversation as resolved.
Show resolved Hide resolved

var tag *chunk.Tag
if fileAddr == nil {
tagString := r.URL.Query().Get("Id")
if tagString == "" {
getTagFail.Inc(1)
respondError(w, r, "Missing one of the mandatory argument", http.StatusBadRequest)
acud marked this conversation as resolved.
Show resolved Hide resolved
return
}

u64, err := strconv.ParseUint(tagString, 10, 32)
if err != nil {
getTagFail.Inc(1)
respondError(w, r, "Invalid Id argument", http.StatusBadRequest)
return
}
tagId := uint32(u64)

tag, err = s.api.Tags.Get(tagId)
if err != nil {
getTagNotFound.Inc(1)
respondError(w, r, "Tag not found", http.StatusNotFound)
acud marked this conversation as resolved.
Show resolved Hide resolved
return
}
} else {

tagByFile, err := s.api.Tags.GetByAddress(fileAddr)
if err != nil {
getTagNotFound.Inc(1)
respondError(w, r, "Tag not found", http.StatusNotFound)
acud marked this conversation as resolved.
Show resolved Hide resolved
return
}
tag = tagByFile
}

w.Header().Set("Content-Type", "application/json")
w.Header().Set("Cache-Control", "no-cache, private, max-age=0")
r.Header.Del("ETag")
w.WriteHeader(http.StatusOK)
err := json.NewEncoder(w).Encode(&tag)
if err != nil {
getTagFail.Inc(1)
respondError(w, r, "marshalling error", http.StatusInternalServerError)
return
}
}

// calculateNumberOfChunks calculates the number of chunks in an arbitrary content length
func calculateNumberOfChunks(contentLength int64, isEncrypted bool) int64 {
if contentLength < 4096 {
Expand Down
125 changes: 123 additions & 2 deletions api/http/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"archive/tar"
"bytes"
"context"
"encoding/hex"
"encoding/json"
"errors"
"flag"
Expand All @@ -42,6 +43,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethersphere/swarm/api"
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/storage"
"github.com/ethersphere/swarm/storage/feed"
"github.com/ethersphere/swarm/storage/feed/lookup"
Expand All @@ -66,6 +68,125 @@ func newTestSigner() (*feed.GenericSigner, error) {
return feed.NewGenericSigner(privKey), nil
}

// TestGetTag uploads a file, retrieves the tag using http GET and check if it matches
func TestGetTagUsingHash(t *testing.T) {
srv := NewTestSwarmServer(t, serverFunc, nil, nil)
defer srv.Close()

// upload a file
data := testutil.RandomBytes(1, 10000)
resp, err := http.Post(fmt.Sprintf("%s/bzz-raw:/", srv.URL), "text/plain", bytes.NewReader([]byte(data)))
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Fatalf("err %s", resp.Status)
}
rootHash, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatal(err)
}

// get the tag for the above upload using root hash of the file
getBzzURL := fmt.Sprintf("%s/bzz-tag:/%s", srv.URL, string(rootHash))
getResp, err := http.Get(getBzzURL)
if err != nil {
t.Fatal(err)
}
defer getResp.Body.Close()
if getResp.StatusCode != http.StatusOK {
t.Fatalf("err %s", getResp.Status)
}
retrievedData, err := ioutil.ReadAll(getResp.Body)
if err != nil {
t.Fatal(err)
}
tag := &chunk.Tag{}
err = json.Unmarshal(retrievedData, &tag)
if err != nil {
t.Fatal(err)
}

// check if the tag has valid values
rcvdAddress, err := hex.DecodeString(string(rootHash))
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(tag.Address, rcvdAddress) {
t.Fatalf("retrieved address mismatch, expected %x, got %x", rcvdAddress, tag.Address)
}

if tag.Total() != 4 {
t.Fatalf("retrieved total tag count mismatch, expected %x, got %x", 4, tag.Total())
}

if !strings.HasPrefix(tag.Name, "unnamed_tag_") {
t.Fatalf("retrieved name prefix mismatch, expected %x, got %x", "unnamed_tag_", tag.Name)
}

}

// TestGetTag uploads a file, retrieves the tag using http GET and check if it matches
func TestGetTagUsingTagId(t *testing.T) {
srv := NewTestSwarmServer(t, serverFunc, nil, nil)
defer srv.Close()

// upload a file
data := testutil.RandomBytes(1, 10000)
resp, err := http.Post(fmt.Sprintf("%s/bzz-raw:/", srv.URL), "text/plain", bytes.NewReader([]byte(data)))
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Fatalf("err %s", resp.Status)
}
rootHash, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatal(err)
}
tidString := resp.Header.Get(TagHeaderName)

// get the tag of the above upload using the tagId
getBzzURL := fmt.Sprintf("%s/bzz-tag:/?Id=%s", srv.URL, tidString)
getResp, err := http.Get(getBzzURL)
if err != nil {
t.Fatal(err)
}
defer getResp.Body.Close()
if getResp.StatusCode != http.StatusOK {
t.Fatalf("err %s", getResp.Status)
}
retrievedData, err := ioutil.ReadAll(getResp.Body)
if err != nil {
t.Fatal(err)
}
tag := &chunk.Tag{}
err = json.Unmarshal(retrievedData, &tag)
if err != nil {
t.Fatal(err)
}

// check if the received tags has valid values
rcvdAddress, err := hex.DecodeString(string(rootHash))
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(tag.Address, rcvdAddress) {
t.Fatalf("retrieved address mismatch, expected %x, got %x", rcvdAddress, tag.Address)
}

if tag.Total() != 4 {
t.Fatalf("retrieved total tag count mismatch, expected %x, got %x", 4, tag.Total())
}

if !strings.HasPrefix(tag.Name, "unnamed_tag_") {
t.Fatalf("retrieved name prefix mismatch, expected %x, got %x", "unnamed_tag_", tag.Name)
}

}

// Test the transparent resolving of feed updates with bzz:// scheme
//
// First upload data to bzz:, and store the Swarm hash to the resulting manifest in a feed update.
Expand Down Expand Up @@ -753,7 +874,7 @@ func testBzzTar(encrypted bool, t *testing.T) {
t.Fatal(err)
}
req.Header.Add("Content-Type", "application/x-tar")
req.Header.Add(SwarmTagHeaderName, "test-upload")
req.Header.Add(TagHeaderName, "test-upload")
client := &http.Client{}
resp2, err := client.Do(req)
if err != nil {
Expand Down Expand Up @@ -872,7 +993,7 @@ func TestBzzCorrectTagEstimate(t *testing.T) {

req = req.WithContext(ctx)
req.ContentLength = 1000000
req.Header.Add(SwarmTagHeaderName, "1000000")
req.Header.Add(TagHeaderName, "1000000")

go func() {
for {
Expand Down
7 changes: 6 additions & 1 deletion api/uri.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func Parse(rawuri string) (*URI, error) {

// check the scheme is valid
switch uri.Scheme {
case "bzz", "bzz-raw", "bzz-immutable", "bzz-list", "bzz-hash", "bzz-feed":
case "bzz", "bzz-raw", "bzz-immutable", "bzz-list", "bzz-hash", "bzz-feed", "bzz-tag":
default:
return nil, fmt.Errorf("unknown scheme %q", u.Scheme)
}
Expand All @@ -108,6 +108,11 @@ func Parse(rawuri string) (*URI, error) {
}
return uri, nil
}

func (u *URI) Tag() bool {
return u.Scheme == "bzz-tag"
}

func (u *URI) Feed() bool {
return u.Scheme == "bzz-feed"
}
Expand Down
Loading