Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

api, chunk: progress bar support #1649

Merged
merged 12 commits into from
Aug 26, 2019
Prev Previous commit
Exported Tags fields to get rid of custom marshelling
  • Loading branch information
jmozah committed Aug 22, 2019
commit 8d28849c2d9497af50f0dbbfd6eb3e1ecff7a6ec
5 changes: 2 additions & 3 deletions api/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package http
import (
"bufio"
"bytes"
"encoding/hex"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -412,7 +411,7 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) {
log.Error("got an error retrieving tag for DoneSplit", "tagUid", tagUid, "err", err)
}

log.Debug("done splitting, setting tag total", "SPLIT", tag.Get(chunk.StateSplit), "TOTAL", tag.Total())
log.Debug("done splitting, setting tag total", "SPLIT", tag.Get(chunk.StateSplit), "TOTAL", tag.TotalCounter())
tag.DoneSplit(newAddr)

// Pin the file
Expand Down Expand Up @@ -969,7 +968,7 @@ func (s *Server) HandleGetTag(w http.ResponseWriter, r *http.Request) {
}
} else {

tagByFile, err := s.api.Tags.GetByAddress(hex.EncodeToString(fileAddr))
tagByFile, err := s.api.Tags.GetByAddress(fileAddr)
if err != nil {
getTagNotFound.Inc(1)
respondError(w, r, "Tag not found", http.StatusNotFound)
acud marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
24 changes: 16 additions & 8 deletions api/http/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestGetTagUsingHash(t *testing.T) {

// upload a file
data := testutil.RandomBytes(1, 10000)
resp, err := http.Post(fmt.Sprintf("%s/bzz-raw:/", srv.URL), "text/plain", bytes.NewReader([]byte(data)))
resp, err := http.Post(fmt.Sprintf("%s/bzz-raw:/", srv.URL), "text/plain", bytes.NewReader(data))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -110,12 +110,16 @@ func TestGetTagUsingHash(t *testing.T) {
}

// check if the tag has valid values
if tag.Address != string(rootHash) {
rcvdAddress, err := hex.DecodeString(string(rootHash))
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(tag.Address, rcvdAddress) {
t.Fatalf("retrieved address mismatch, expected %x, got %x", string(rootHash), tag.Address)
}

if tag.Total() != 4 {
t.Fatalf("retrieved total tag count mismatch, expected %x, got %x", 4, tag.Total())
if tag.TotalCounter() != 4 {
t.Fatalf("retrieved total tag count mismatch, expected %x, got %x", 4, tag.TotalCounter())
}

if !strings.HasPrefix(tag.Name, "unnamed_tag_") {
Expand All @@ -131,7 +135,7 @@ func TestGetTagUsingTagId(t *testing.T) {

// upload a file
data := testutil.RandomBytes(1, 10000)
resp, err := http.Post(fmt.Sprintf("%s/bzz-raw:/", srv.URL), "text/plain", bytes.NewReader([]byte(data)))
resp, err := http.Post(fmt.Sprintf("%s/bzz-raw:/", srv.URL), "text/plain", bytes.NewReader(data))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -166,12 +170,16 @@ func TestGetTagUsingTagId(t *testing.T) {
}

// check if the received tags has valid values
if tag.Address != string(rootHash) {
rcvdAddress, err := hex.DecodeString(string(rootHash))
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(tag.Address, rcvdAddress) {
t.Fatalf("retrieved address mismatch, expected %x, got %x", string(rootHash), tag.Address)
}

if tag.Total() != 4 {
t.Fatalf("retrieved total tag count mismatch, expected %x, got %x", 4, tag.Total())
if tag.TotalCounter() != 4 {
t.Fatalf("retrieved total tag count mismatch, expected %x, got %x", 4, tag.TotalCounter())
}

if !strings.HasPrefix(tag.Name, "unnamed_tag_") {
Expand Down
142 changes: 45 additions & 97 deletions chunk/tag.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ package chunk

import (
"encoding/binary"
"encoding/hex"
"encoding/json"
"errors"
"sync/atomic"
"time"
Expand Down Expand Up @@ -47,14 +45,14 @@ const (
type Tag struct {
Uid uint32 // a unique identifier for this tag
Name string // a name tag for this tag
Address string // the associated swarm hash for this tag
total int64 // total chunks belonging to a tag
split int64 // number of chunks already processed by splitter for hashing
seen int64 // number of chunks already seen
stored int64 // number of chunks already stored locally
sent int64 // number of chunks sent for push syncing
synced int64 // number of chunks synced with proof
startedAt time.Time // tag started to calculate ETA
Address Address // the associated swarm hash for this tag
Total int64 // total chunks belonging to a tag
Split int64 // number of chunks already processed by splitter for hashing
Seen int64 // number of chunks already seen
Stored int64 // number of chunks already stored locally
Sent int64 // number of chunks sent for push syncing
Synced int64 // number of chunks synced with proof
StartedAt time.Time // tag started to calculate ETA
}

// New creates a new tag, stores it by the name and returns it
Expand All @@ -63,8 +61,8 @@ func NewTag(uid uint32, s string, total int64) *Tag {
t := &Tag{
Uid: uid,
Name: s,
startedAt: time.Now(),
total: total,
StartedAt: time.Now(),
Total: total,
}
return t
}
Expand All @@ -74,15 +72,15 @@ func (t *Tag) Inc(state State) {
var v *int64
switch state {
case StateSplit:
v = &t.split
v = &t.Split
case StateStored:
v = &t.stored
v = &t.Stored
case StateSeen:
v = &t.seen
v = &t.Seen
case StateSent:
v = &t.sent
v = &t.Sent
case StateSynced:
v = &t.synced
v = &t.Synced
}
atomic.AddInt64(v, 1)
}
Expand All @@ -92,44 +90,44 @@ func (t *Tag) Get(state State) int64 {
var v *int64
switch state {
case StateSplit:
v = &t.split
v = &t.Split
case StateStored:
v = &t.stored
v = &t.Stored
case StateSeen:
v = &t.seen
v = &t.Seen
case StateSent:
v = &t.sent
v = &t.Sent
case StateSynced:
v = &t.synced
v = &t.Synced
}
return atomic.LoadInt64(v)
}

// GetTotal returns the total count
func (t *Tag) Total() int64 {
return atomic.LoadInt64(&t.total)
func (t *Tag) TotalCounter() int64 {
return atomic.LoadInt64(&t.Total)
}

// DoneSplit sets total count to SPLIT count and sets the associated swarm hash for this tag
// is meant to be called when splitter finishes for input streams of unknown size
func (t *Tag) DoneSplit(address Address) int64 {
total := atomic.LoadInt64(&t.split)
atomic.StoreInt64(&t.total, total)
t.Address = hex.EncodeToString(address)
total := atomic.LoadInt64(&t.Split)
atomic.StoreInt64(&t.Total, total)
t.Address = address
return total
}

// Status returns the value of state and the total count
func (t *Tag) Status(state State) (int64, int64, error) {
count, seen, total := t.Get(state), atomic.LoadInt64(&t.seen), atomic.LoadInt64(&t.total)
count, seen, total := t.Get(state), atomic.LoadInt64(&t.Seen), atomic.LoadInt64(&t.Total)
if total == 0 {
return count, total, errNA
}
switch state {
case StateSplit, StateStored, StateSeen:
return count, total, nil
case StateSent, StateSynced:
stored := atomic.LoadInt64(&t.stored)
stored := atomic.LoadInt64(&t.Stored)
if stored < total {
return count, total - seen, errNA
}
Expand All @@ -147,31 +145,31 @@ func (t *Tag) ETA(state State) (time.Time, error) {
if cnt == 0 || total == 0 {
return time.Time{}, errNoETA
}
diff := time.Since(t.startedAt)
diff := time.Since(t.StartedAt)
dur := time.Duration(total) * diff / time.Duration(cnt)
return t.startedAt.Add(dur), nil
return t.StartedAt.Add(dur), nil
}

// MarshalBinary marshals the tag into a byte slice
func (tag *Tag) MarshalBinary() (data []byte, err error) {
buffer := make([]byte, 4)
binary.BigEndian.PutUint32(buffer, tag.Uid)
encodeInt64Append(&buffer, tag.total)
encodeInt64Append(&buffer, tag.split)
encodeInt64Append(&buffer, tag.seen)
encodeInt64Append(&buffer, tag.stored)
encodeInt64Append(&buffer, tag.sent)
encodeInt64Append(&buffer, tag.synced)
encodeInt64Append(&buffer, tag.Total)
encodeInt64Append(&buffer, tag.Split)
encodeInt64Append(&buffer, tag.Seen)
encodeInt64Append(&buffer, tag.Stored)
encodeInt64Append(&buffer, tag.Sent)
encodeInt64Append(&buffer, tag.Synced)

intBuffer := make([]byte, 8)

n := binary.PutVarint(intBuffer, tag.startedAt.Unix())
n := binary.PutVarint(intBuffer, tag.StartedAt.Unix())
buffer = append(buffer, intBuffer[:n]...)

n = binary.PutVarint(intBuffer, int64(len(tag.Address)))
buffer = append(buffer, intBuffer[:n]...)

buffer = append(buffer, []byte(tag.Address)...)
buffer = append(buffer, tag.Address...)

buffer = append(buffer, []byte(tag.Name)...)

Expand All @@ -186,21 +184,21 @@ func (tag *Tag) UnmarshalBinary(buffer []byte) error {
tag.Uid = binary.BigEndian.Uint32(buffer)
buffer = buffer[4:]

tag.total = decodeInt64Splice(&buffer)
tag.split = decodeInt64Splice(&buffer)
tag.seen = decodeInt64Splice(&buffer)
tag.stored = decodeInt64Splice(&buffer)
tag.sent = decodeInt64Splice(&buffer)
tag.synced = decodeInt64Splice(&buffer)
tag.Total = decodeInt64Splice(&buffer)
tag.Split = decodeInt64Splice(&buffer)
tag.Seen = decodeInt64Splice(&buffer)
tag.Stored = decodeInt64Splice(&buffer)
tag.Sent = decodeInt64Splice(&buffer)
tag.Synced = decodeInt64Splice(&buffer)

t, n := binary.Varint(buffer)
tag.startedAt = time.Unix(t, 0)
tag.StartedAt = time.Unix(t, 0)
buffer = buffer[n:]

t, n = binary.Varint(buffer)
buffer = buffer[n:]
if t > 0 {
tag.Address = string(buffer[:t])
tag.Address = buffer[:t]
}
tag.Name = string(buffer[t:])

Expand All @@ -218,53 +216,3 @@ func decodeInt64Splice(buffer *[]byte) int64 {
*buffer = (*buffer)[n:]
return val
}

// marshall friendly tag structure
type jsonTag struct {
Uid uint32
Name string
Address string
Total int64
Split int64
Seen int64
Stored int64
Sent int64
Synced int64
StartedAt time.Time
}

// MarshalJSON marshals the tag structure in to JSON encoded byte slice
func (t *Tag) MarshalJSON() ([]byte, error) {
j := jsonTag{
Uid: t.Uid,
Name: t.Name,
Address: t.Address,
Total: t.total,
Split: t.split,
Seen: t.seen,
Stored: t.stored,
Sent: t.sent,
Synced: t.synced,
StartedAt: t.startedAt,
}
return json.Marshal(j)
}

// UnmarshalJSON unwraps the JSON encoded byte slice to tag structure
func (t *Tag) UnmarshalJSON(b []byte) error {
mTag := &jsonTag{}
if err := json.Unmarshal(b, &mTag); err != nil {
return err
}
t.Uid = mTag.Uid
t.Name = mTag.Name
t.Address = mTag.Address
t.total = mTag.Total
t.split = mTag.Split
t.seen = mTag.Seen
t.stored = mTag.Stored
t.sent = mTag.Sent
t.synced = mTag.Synced
t.startedAt = mTag.StartedAt
return nil
}
Loading