Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use json.Number #474

Merged
merged 9 commits into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ clean: ## Clean the project, set it up for a new build
@mkdir -p packaging/output/upstart
@mkdir -p packaging/output/systemd
@mkdir -p tmp/linux
@go clean -testcache

run: clean generate ## Run Faktory daemon locally
FAKTORY_PASSWORD=${PASSWORD} go run cmd/faktory/daemon.go -l debug -e development
Expand Down
4 changes: 3 additions & 1 deletion client/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package client
import (
"encoding/json"
"fmt"

"github.com/contribsys/faktory/util"
)

type BatchStatus struct {
Expand Down Expand Up @@ -176,7 +178,7 @@ func (c *Client) BatchStatus(bid string) (*BatchStatus, error) {
}

var stat BatchStatus
err = json.Unmarshal(data, &stat)
err = util.JsonUnmarshal(data, &stat)
if err != nil {
return nil, err
}
Expand Down
85 changes: 44 additions & 41 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/contribsys/faktory/internal/pool"
"github.com/contribsys/faktory/util"
)

const (
Expand Down Expand Up @@ -152,7 +153,7 @@ func DefaultServer() *Server {
//
// Use the URL to configure any necessary password:
//
// tcp://:mypassword@localhost:7419
// tcp://:mypassword@localhost:7419
//
// By default Open assumes localhost with no password
// which is appropriate for local development.
Expand Down Expand Up @@ -182,8 +183,7 @@ func OpenWithDialer(dialer Dialer) (*Client, error) {
// a *tls.Dialer if "tcp+tls" and a *net.Dialer if
// not.
//
// client.Dial(client.Localhost, "topsecret")
//
// client.Dial(client.Localhost, "topsecret")
func Dial(srv *Server, password string) (*Client, error) {
d := &net.Dialer{Timeout: srv.Timeout}
dialer := Dialer(d)
Expand All @@ -198,6 +198,12 @@ func DialWithDialer(srv *Server, password string, dialer Dialer) (*Client, error
return dial(srv, password, dialer)
}

type HIv2 struct {
V int `json:"v"` // version, should be 2
I int `json:"i,omitempty"` // iterations
S string `json:"s,omitempty"` // salt
}

// dial connects to the remote faktory server.
func dial(srv *Server, password string, dialer Dialer) (*Client, error) {
client := emptyClientData()
Expand Down Expand Up @@ -227,27 +233,19 @@ func dial(srv *Server, password string, dialer Dialer) (*Client, error) {
if strings.HasPrefix(line, "HI ") {
str := strings.TrimSpace(line)[3:]

var hi map[string]interface{}
err = json.Unmarshal([]byte(str), &hi)
var hi HIv2
err = util.JsonUnmarshal([]byte(str), &hi)
if err != nil {
conn.Close()
return nil, err
}
v, ok := hi["v"].(float64)
if ok {
if ExpectedProtocolVersion != int(v) {
fmt.Println("Warning: server and client protocol versions out of sync:", v, ExpectedProtocolVersion)
}
if ExpectedProtocolVersion != hi.V {
util.Infof("Warning: server and client protocol versions out of sync: want %d, got %d", ExpectedProtocolVersion, hi.V)
}

salt, ok := hi["s"].(string)
if ok {
iter := 1
iterVal, ok := hi["i"]
if ok {
iter = int(iterVal.(float64))
}

salt := hi.S
if salt != "" {
iter := hi.I
client.PasswordHash = hash(password, salt, iter)
}
} else {
Expand Down Expand Up @@ -303,7 +301,7 @@ func (c *Client) PushBulk(jobs []*Job) (map[string]string, error) {
return nil, err
}
results := map[string]string{}
err = json.Unmarshal(data, &results)
err = util.JsonUnmarshal(data, &results)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -342,7 +340,7 @@ func (c *Client) Fetch(q ...string) (*Job, error) {
}

var job Job
err = json.Unmarshal(data, &job)
err = util.JsonUnmarshal(data, &job)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -421,7 +419,11 @@ func (c *Client) ResumeQueues(names ...string) error {
return c.ok(c.rdr)
}

// deprecated, this returns an untyped map.
// use CurrentState() instead which provides strong typing
func (c *Client) Info() (map[string]interface{}, error) {
util.Info("client.Info() is deprecated, use client.CurrentState() instead")

err := c.writeLine(c.wtr, "INFO", nil)
if err != nil {
return nil, err
Expand All @@ -435,42 +437,43 @@ func (c *Client) Info() (map[string]interface{}, error) {
return nil, nil
}

var hash map[string]interface{}
err = json.Unmarshal(data, &hash)
var cur map[string]interface{}
err = util.JsonUnmarshal(data, &cur)
if err != nil {
return nil, err
}

return hash, nil
return cur, nil
}

func (c *Client) QueueSizes() (map[string]uint64, error) {
hash, err := c.Info()
func (c *Client) CurrentState() (*FaktoryState, error) {
err := c.writeLine(c.wtr, "INFO", nil)
if err != nil {
return nil, err
}

faktory, ok := hash["faktory"].(map[string]interface{})
if !ok {
return nil, fmt.Errorf("invalid info hash: %s", hash)
data, err := c.readResponse(c.rdr)
if err != nil {
return nil, err
}

queues, ok := faktory["queues"].(map[string]interface{})
if !ok {
return nil, fmt.Errorf("invalid info hash: %s", hash)
if len(data) == 0 {
return nil, nil
}

sizes := make(map[string]uint64)
for name, size := range queues {
size, ok := size.(float64)
if !ok {
return nil, fmt.Errorf("invalid queue size: %v", size)
}

sizes[name] = uint64(size)
var cur FaktoryState
err = util.JsonUnmarshal(data, &cur)
if err != nil {
return nil, err
}
return &cur, nil
}

return sizes, nil
func (c *Client) QueueSizes() (map[string]uint64, error) {
state, err := c.CurrentState()
if err != nil {
return nil, err
}
return state.Data.Queues, nil
}

func (c *Client) Generic(cmdline string) (string, error) {
Expand Down
8 changes: 3 additions & 5 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package client

import (
"bufio"
"fmt"
"log"
"net"
"os"
Expand Down Expand Up @@ -122,13 +121,13 @@ func TestClientOperations(t *testing.T) {
resp <- "$36\r\n{\"faktory\":{\"queues\":{\"default\":2}}}\r\n"
sizes, err := cl.QueueSizes()
assert.NoError(t, err)
assert.Equal(t, sizes["default"], uint64(2))
assert.EqualValues(t, 2, sizes["default"])
assert.Contains(t, <-req, "INFO")

resp <- "$39\r\n{\"faktory\":{\"queues\":{\"invalid\":null}}}\r\n"
sizes, err = cl.QueueSizes()
assert.Error(t, err)
assert.Nil(t, sizes)
assert.NoError(t, err)
assert.EqualValues(t, 0, sizes["invalid"])
assert.Contains(t, <-req, "INFO")

err = cl.Close()
Expand Down Expand Up @@ -157,7 +156,6 @@ func withFakeServer(t *testing.T, fn func(chan string, chan string, string)) {
buf := bufio.NewReader(conn)
line, err := buf.ReadString('\n')
if err != nil {
fmt.Println(err)
conn.Close()
break
}
Expand Down
27 changes: 27 additions & 0 deletions client/faktory.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,30 @@ var (
Name = "Faktory"
Version = "1.9.0"
)

// Structs for parsing the INFO response
type FaktoryState struct {
Now string `json:"now"`
ServerUtcTime string `json:"server_utc_time"`
Data DataSnapshot `json:"faktory"`
Server ServerSnapshot `json:"server"`
}

type DataSnapshot struct {
TotalFailures uint64 `json:"total_failures"`
TotalProcessed uint64 `json:"total_processed"`
TotalEnqueued uint64 `json:"total_enqueued"`
TotalQueues uint64 `json:"total_queues"`
Queues map[string]uint64 `json:"queues"`
Sets map[string]uint64 `json:"sets"`
Tasks map[string]map[string]interface{} `json:"tasks"` // deprecated
}

type ServerSnapshot struct {
Description string `json:"description"`
Version string `json:"faktory_version"`
Uptime uint64 `json:"uptime"`
Connections uint64 `json:"connections"`
CommandCount uint64 `json:"command_count"`
UsedMemoryMB uint64 `json:"used_memory_mb"`
}
2 changes: 1 addition & 1 deletion client/tracking.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (c *Client) TrackGet(jid string) (*JobTrack, error) {
}

var trck JobTrack
err = json.Unmarshal(data, &trck)
err = util.JsonUnmarshal(data, &trck)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
module github.com/contribsys/faktory

go 1.21
go 1.22

require (
github.com/BurntSushi/toml v0.4.1
github.com/justinas/nosurf v1.1.1
github.com/stretchr/testify v1.8.1
github.com/stretchr/testify v1.9.0
)

require github.com/redis/go-redis/v9 v9.2.0
Expand Down
11 changes: 2 additions & 9 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/contribsys/faktory_worker_go v1.6.0 h1:ov69BLHL62i/wRLJwvuj5UphwgjMOINRCGW3KzrKOjk=
github.com/contribsys/faktory_worker_go v1.6.0/go.mod h1:XMNGn3sBJdqFGfTH4SkmYkMovhdkq5cDJj36wowfbNY=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
Expand All @@ -19,15 +18,9 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.2.0 h1:zwMdX0A4eVzse46YN18QhuDiM4uf3JmkOB4VZrdt5uI=
github.com/redis/go-redis/v9 v9.2.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
3 changes: 1 addition & 2 deletions manager/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package manager

import (
"context"
"encoding/json"
"fmt"
"time"

Expand Down Expand Up @@ -158,7 +157,7 @@ func (el *simpleLease) Job() (*client.Job, error) {
}
if el.job == nil {
var job client.Job
err := json.Unmarshal(el.payload, &job)
err := util.JsonUnmarshal(el.payload, &job)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal job payload: %w", err)
}
Expand Down
3 changes: 1 addition & 2 deletions manager/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package manager

import (
"context"
"encoding/json"
"fmt"
"time"

Expand Down Expand Up @@ -38,7 +37,7 @@ func (m *manager) schedule(ctx context.Context, when time.Time, set storage.Sort
for {
count, err := set.RemoveBefore(ctx, util.Thens(when), 100, func(data []byte) error {
var job client.Job
if err := json.Unmarshal(data, &job); err != nil {
if err := util.JsonUnmarshal(data, &job); err != nil {
return fmt.Errorf("cannot unmarshal job payload: %w", err)
}

Expand Down
4 changes: 2 additions & 2 deletions manager/working.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (m *manager) loadWorkingSet(ctx context.Context) error {
addedCount := 0
err := m.store.Working().Each(ctx, func(idx int, entry storage.SortedEntry) error {
var res Reservation
err := json.Unmarshal(entry.Value(), &res)
err := util.JsonUnmarshal(entry.Value(), &res)
if err != nil {
// We can't return an error here, this method is best effort
// as we are booting the server. We can't allow corrupted data
Expand Down Expand Up @@ -193,7 +193,7 @@ func (m *manager) ReapExpiredJobs(ctx context.Context, when time.Time) (int64, e
tm := util.Thens(when)
count, err := m.store.Working().RemoveBefore(ctx, tm, 10, func(data []byte) error {
var res Reservation
err := json.Unmarshal(data, &res)
err := util.JsonUnmarshal(data, &res)
if err != nil {
return fmt.Errorf("cannot unmarshal reservation payload: %w", err)
}
Expand Down
Loading