Skip to content

Commit

Permalink
Speedup the RAWX (#2017)
Browse files Browse the repository at this point in the history
* Better syslog usage
* beanstalkd: Remove unused code, Avoid string to bytes conversions
* beanstalkd: Parallelize on several beanstalkd connections
* Help the GC, prevent escape to the heap
* Lighter log management
* Rename a config directive (tcp_keepalive -> http_keepalive, tcp_keepalive is now deprecated)
* oio-rawxr-harass: allow to reuse the connections
* Allow to turn events down (config)
* Fix the open() flags used to just fsync
* rawx: Centralize in the repo the logic around sync* flags
  • Loading branch information
jfsmig authored Apr 23, 2020
1 parent be4bf24 commit c447d7b
Show file tree
Hide file tree
Showing 19 changed files with 415 additions and 405 deletions.
140 changes: 21 additions & 119 deletions rawx/beanstalk.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// OpenIO SDS Go rawx
// Copyright (C) 2018-2019 OpenIO SAS
// Copyright (C) 2018-2020 OpenIO SAS
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Affero General Public
Expand All @@ -18,12 +18,11 @@ package main

import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
"net"
"strconv"
"strings"
"time"
)

Expand Down Expand Up @@ -68,13 +67,9 @@ type Beanstalkd struct {
bufReader *bufio.Reader
}

type Job struct {
ID uint64
Data []byte
}

func itoa(i int) string { return strconv.Itoa(i) }
func utoa(i uint64) string { return strconv.FormatUint(i, 10) }
func itoa(i int) string { return strconv.Itoa(i) }
func utoa(i uint64) string { return strconv.FormatUint(i, 10) }
func itoa64(i int64) string { return strconv.FormatInt(i, 10) }

func DialBeanstalkd(addr string) (*Beanstalkd, error) {
conn, err := net.DialTimeout("tcp", addr, 2*time.Second)
Expand All @@ -99,37 +94,18 @@ func (beanstalkd *Beanstalkd) Close() {
}
}

func (beanstalkd *Beanstalkd) Watch(tubename string) error {
cmd := strings.Builder{}
cmd.Grow(len(tubename) + 16)
cmd.WriteString("watch ")
cmd.WriteString(tubename)
cmd.WriteString("\r\n")
resp, err := beanstalkd.sendCommand(cmd.String())
if err != nil {
return err
}

var tubeCount int
_, err = fmt.Sscanf(resp, "WATCHING %d\r\n", &tubeCount)
if err != nil {
return parseBeanstalkError(resp)
}
return nil
}

func (beanstalkd *Beanstalkd) Use(tubename string) error {
cmd := strings.Builder{}
cmd.Grow(len(tubename) + 16)
cmd := bytes.Buffer{}
cmd.Grow(256)
cmd.WriteString("use ")
cmd.WriteString(tubename)
cmd.WriteString("\r\n")
expected := fmt.Sprintf("USING %s\r\n", tubename)
return beanstalkd.sendCommandAndCheck(cmd.String(), expected)
return beanstalkd.sendCommandAndCheck(cmd.Bytes(), expected)
}

func (beanstalkd *Beanstalkd) Put(data []byte) (uint64, error) {
cmd := strings.Builder{}
cmd := bytes.Buffer{}
cmd.Grow(len(data) + 64)
cmd.WriteString("put ")
cmd.WriteString(utoa(defaultPriority))
Expand All @@ -140,93 +116,29 @@ func (beanstalkd *Beanstalkd) Put(data []byte) (uint64, error) {
cmd.WriteString("\r\n")
cmd.Write(data)
cmd.WriteString("\r\n")
resp, err := beanstalkd.sendCommand(cmd.String())
resp, err := beanstalkd.sendCommand(cmd.Bytes())
if err != nil {
return 0, err
}

switch {
case strings.HasPrefix(resp, "IN"):
var id uint64
if len(resp) <= 0 {
return 0, parseBeanstalkError(resp)
}

var id uint64
switch resp[0] {
case 'I':
_, err := fmt.Sscanf(resp, "INSERTED %d\r\n", &id)
return id, err
case strings.HasPrefix(resp, "BU"):
var id uint64
case 'B':
_, _ = fmt.Sscanf(resp, "BURIED %d\r\n", &id)
return id, errBuried
default:
return 0, parseBeanstalkError(resp)
}
}

func (beanstalkd *Beanstalkd) Delete(id uint64) error {
cmd := strings.Builder{}
cmd.Grow(128)
cmd.WriteString("delete ")
cmd.WriteString(utoa(id))
cmd.WriteString("\r\n")
expected := "DELETED\r\n"
return beanstalkd.sendCommandAndCheck(cmd.String(), expected)
}

func (beanstalkd *Beanstalkd) Reserve() (*Job, error) {
command := "reserve\r\n"
resp, err := beanstalkd.sendCommand(command)
if err != nil {
return nil, err
}

switch {
case strings.HasPrefix(resp, "RESERVED"):
job := new(Job)
var dataLen int
_, err = fmt.Sscanf(resp, "RESERVED %d %d\r\n", &(job.ID), &dataLen)
if err != nil {
return nil, err
}
job.Data, err = beanstalkd.readData(dataLen)
return job, err
default:
return nil, parseBeanstalkError(resp)
}
}

func (beanstalkd *Beanstalkd) Bury(id uint64) error {
command := fmt.Sprintf("bury %d %d\r\n", id, defaultPriority)
expected := "BURIED\r\n"
return beanstalkd.sendCommandAndCheck(command, expected)
}

func (beanstalkd *Beanstalkd) Release(id uint64) error {
command := fmt.Sprintf("release %d %d %d\r\n", id, defaultPriority, 0)
expected := "RELEASED\r\n"
return beanstalkd.sendCommandAndCheck(command, expected)
}

func (beanstalkd *Beanstalkd) KickJob(id uint64) error {
command := fmt.Sprintf("kick-job %d\r\n", id)
expected := "KICKED\r\n"
return beanstalkd.sendCommandAndCheck(command, expected)
}

func (beanstalkd *Beanstalkd) Kick(bound uint64) (uint64, error) {
command := fmt.Sprintf("kick %d\r\n", bound)
resp, err := beanstalkd.sendCommand(command)
if err != nil {
return 0, err
}

switch {
case strings.HasPrefix(resp, "KICKED"):
var kicked uint64
_, err := fmt.Sscanf(resp, "KICKED %d\r\n", &kicked)
return kicked, err
default:
return 0, parseBeanstalkError(resp)
}
}

func (beanstalkd *Beanstalkd) sendCommandAndCheck(command, expected string) error {
func (beanstalkd *Beanstalkd) sendCommandAndCheck(command []byte, expected string) error {
resp, err := beanstalkd.sendCommand(command)
if err != nil {
return err
Expand All @@ -238,8 +150,8 @@ func (beanstalkd *Beanstalkd) sendCommandAndCheck(command, expected string) erro
return nil
}

func (beanstalkd *Beanstalkd) sendCommand(command string) (string, error) {
_, err := beanstalkd.sendAll([]byte(command))
func (beanstalkd *Beanstalkd) sendCommand(command []byte) (string, error) {
_, err := beanstalkd.sendAll(command)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -274,16 +186,6 @@ func (beanstalkd *Beanstalkd) sendAll(data []byte) (int, error) {
return totalWritten, nil
}

func (beanstalkd *Beanstalkd) readData(dataLen int) ([]byte, error) {
data := make([]byte, dataLen+2) //+2 is for trailing \r\n
n, err := io.ReadFull(beanstalkd.bufReader, data)
if err != nil {
return nil, err
}

return data[:n-2], nil //strip \r\n trail
}

func parseBeanstalkError(str string) error {
if err, ok := errorTable[str]; ok {
return err
Expand Down
2 changes: 1 addition & 1 deletion rawx/buffer_pool.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// OpenIO SDS Go rawx
// Copyright (C) 2015-2019 OpenIO SAS
// Copyright (C) 2020 OpenIO SAS
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Affero General Public
Expand Down
Loading

0 comments on commit c447d7b

Please sign in to comment.