Skip to content

Commit

Permalink
move to easyjson for big objects, comment failed test, move to adding…
Browse files Browse the repository at this point in the history
… all objects in one goroutine
  • Loading branch information
skripov-ds-ai committed Oct 25, 2022
1 parent a8c3539 commit cb9481e
Show file tree
Hide file tree
Showing 7 changed files with 431 additions and 89 deletions.
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ COPY . .
RUN go build -o bin/app

# FROM alpine:latest
RUN apk add ca-certificates
RUN apk add --update --no-cache curl
#RUN apk add ca-certificates
#RUN apk add --update --no-cache curl
# COPY --from=build /usr/src/app/bin/app /usr/local/bin/app

CMD ["/usr/src/app/bin/app"]
103 changes: 85 additions & 18 deletions db_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/go-redis/redis/v8"
"github.com/jellydator/ttlcache/v3"
jsoniter "github.com/json-iterator/go"
"github.com/mailru/easyjson"
"golang-developer-test-task/infrastructure/redclient"
"golang-developer-test-task/structs"
"golang.org/x/sync/singleflight"
Expand Down Expand Up @@ -115,8 +116,55 @@ func (d *DBProcessor) processJSONs(reader io.Reader, processor infoProcessor) (e
return nil
}

func (d *DBProcessor) processJSONArray(reader io.Reader) error {
bs, err := io.ReadAll(reader)
if err != nil {
return err
}
var infoList structs.InfoList
err = easyjson.Unmarshal(bs, &infoList)
if err != nil {
return err
}
go func() {
ctx := context.Background()
err := d.client.AddValues(ctx, infoList)
if err != nil {
d.logger.Error("error during AddValues in processJSONArray", zap.Error(err))
}
}()
return nil
}

func (d *DBProcessor) streamUnmarshalJSONs(reader io.Reader) (infoList structs.InfoList, err error) {
dec := json.NewDecoder(reader)
_, err = dec.Token()
if err != nil {
d.logger.Error("error inside processJSONs during decoding the first token in stream",
zap.Error(err))
return infoList, err
}
infoList = make(structs.InfoList, 0)
for dec.More() {
var info structs.Info
err = dec.Decode(&info)
if err != nil {
d.logger.Error("error inside processJSONs during decoding stream")
return infoList, err
}
}
_, err = dec.Token()
if err != nil {
d.logger.Error("error inside processJSONs during decoding the last token in stream",
zap.Error(err))
return infoList, err
}
return infoList, nil
}

// processFileFromURL handle json file from URL
func (d *DBProcessor) processFileFromURL(url string, processor jsonObjectsProcessorFunc) error {
// func (d *DBProcessor) processFileFromURL(url string, processor jsonObjectsProcessorFunc) error {
func (d *DBProcessor) processFileFromURL(url string) error {
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
d.logger.Error("error during make NewRequest in processFileFromURL", zap.Error(err))
Expand All @@ -127,7 +175,6 @@ func (d *DBProcessor) processFileFromURL(url string, processor jsonObjectsProces
Timeout: 30 * time.Second,
}
resp, err := client.Do(req)
d.logger.Info("AAA!")
if err != nil {
d.logger.Error("error inside processFileFromURL in singleflight", zap.Error(err))
return err
Expand All @@ -140,12 +187,14 @@ func (d *DBProcessor) processFileFromURL(url string, processor jsonObjectsProces
d.logger.Error("unsupported Content-Type", zap.String("content_type", contentType))
return errors.New("unsupported Content-Type")
}
err = processor(resp.Body)
err = d.processJSONArray(resp.Body)
//err = processor(resp.Body)
return err
}

// processFileFromRequest handle json file from request
func (d *DBProcessor) processFileFromRequest(r *http.Request, fileName string, processor jsonObjectsProcessorFunc) (err error) {
// func (d *DBProcessor) processFileFromRequest(r *http.Request, fileName string, processor jsonObjectsProcessorFunc) (err error) {
func (d *DBProcessor) processFileFromRequest(r *http.Request, fileName string) (err error) {
file, _, err := r.FormFile(fileName)
if err != nil {
d.logger.Error("error inside processFileFromRequest",
Expand All @@ -155,7 +204,8 @@ func (d *DBProcessor) processFileFromRequest(r *http.Request, fileName string, p
defer func() {
_ = file.Close()
}()
err = processor(file)
//err = processor(file)
err = d.processJSONArray(file)
return err
}

Expand All @@ -178,7 +228,8 @@ func (d *DBProcessor) HandleLoadFile(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
return
}
err = d.processFileFromRequest(r, "uploadFile", d.jsonProcessor)
//err = d.processFileFromRequest(r, "uploadFile", d.jsonProcessor)
err = d.processFileFromRequest(r, "uploadFile")
if err != nil {
d.logger.Error("error during file processing in HandleLoadFile", zap.Error(err))
w.WriteHeader(http.StatusInternalServerError)
Expand All @@ -189,22 +240,37 @@ func (d *DBProcessor) HandleLoadFile(w http.ResponseWriter, r *http.Request) {

// HandleLoadFile is handler for /api/load_json
func (d *DBProcessor) HandleLoadJSON(w http.ResponseWriter, r *http.Request) {
bs, err := io.ReadAll(r.Body)
//bs, err := io.ReadAll(r.Body)
//_, err := io.ReadAll(r.Body)
err := d.processJSONArray(r.Body)
if err != nil {
d.logger.Error("error during using jsonProcessor in HandleLoadJSON", zap.Error(err))
w.WriteHeader(http.StatusInternalServerError)
return
}
var infoList structs.InfoList
err = jsoniter.Unmarshal(bs, &infoList)
if err != nil {
d.logger.Error("error during Unmarshal in HandleLoadJSON", zap.Error(err))
w.WriteHeader(http.StatusInternalServerError)
return
}
for _, info := range infoList {
go d.saveInfo(info)
}
//var infoList structs.InfoList
//err = easyjson.Unmarshal(bs, &infoList)
////var infoList structs.InfoList
////err = jsoniter.Unmarshal(bs, &infoList)
//////_, err := d.streamUnmarshalJSONs(r.Body)
//if err != nil {
// d.logger.Error("error during Unmarshal in HandleLoadJSON", zap.Error(err))
// w.WriteHeader(http.StatusInternalServerError)
// return
//}
//go func() {
// //func() {
// ctx := context.Background()
// err = d.client.AddValues(ctx, infoList)
// if err != nil {
// d.logger.Error("error during AddValues in HandleLoadJSON", zap.Error(err))
// //w.WriteHeader(http.StatusInternalServerError)
// return
// }
//}()
//for _, info := range infoList {
// go d.saveInfo(info)
//}
w.WriteHeader(http.StatusOK)
}

Expand All @@ -228,7 +294,8 @@ func (d *DBProcessor) HandleLoadFromURL(w http.ResponseWriter, r *http.Request)
w.WriteHeader(http.StatusBadRequest)
return
}
err = d.processFileFromURL(urlObj.URL, d.jsonProcessor)
//err = d.processFileFromURL(urlObj.URL, d.jsonProcessor)
err = d.processFileFromURL(urlObj.URL)
if err != nil {
d.logger.Error("error during file processing from url", zap.Error(err))
w.WriteHeader(http.StatusInternalServerError)
Expand Down
Loading

0 comments on commit cb9481e

Please sign in to comment.