Skip to content

Commit

Permalink
chore: update what needed
Browse files Browse the repository at this point in the history
  • Loading branch information
Tomas Prochazka committed Aug 15, 2024
1 parent 7f5bb4c commit ca5ea3c
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 101 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
matrix:
# version must be string, otherwise it will be converted to float
# and the trailing zero will be removed. 1.20 -> 1.2
go-version: [ "1.21", "1.22" ]
go-version: [ "1.22", "1.23" ]
postgres-version: [ 16, 15, 14, 13, 12, 11 ]
# Ensure that all combinations of Go and Postgres versions will run
continue-on-error: true
Expand Down
21 changes: 20 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -390,4 +390,23 @@ SET infinite_time_partitions = true,
retention_keep_table = false,
retention_keep_index = false
WHERE parent_table = 'my_queue_name';
```
```

## Contribution

We are open to any contribution to the pgq package, but since we use it in our production environment, we have to be very careful about the changes.
We don't need to add any new features, but we are open to any bug fixes, performance improvements, and documentation enhancements.

### Run integration tests

The unit tests will run without any additional setup, but the integration tests require the running postgres instance, otherwise are skipped.

In one shell start the postgres docker container:
```shell
docker run --rm -it -e POSTGRES_PASSWORD=postgres -p 5432:5432 postgres:15-alpine
```

In another shell run the tests:
```shell
TEST_POSTGRES_DSN=postgres://postgres:postgres@localhost:5432/postgres go test ./...
```
22 changes: 9 additions & 13 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,14 +381,12 @@ func (c *Consumer) generateQuery() (*query.Builder, error) {
qb.WriteString(` AND consumed_count < :max_consume_count`)
}

if c.cfg.MetadataFilters != nil && len(c.cfg.MetadataFilters) > 0 {
for i, filter := range c.cfg.MetadataFilters {
if len(filter.Operation) == 0 {
return nil, fatalError{Err: fmt.Errorf("metadata filter operation is empty")}
}

qb.WriteString(fmt.Sprintf(" AND metadata->>:metadata_key_%d %s :metadata_value_%d", i, filter.Operation, i))
for i, filter := range c.cfg.MetadataFilters {
if len(filter.Operation) == 0 {
return nil, fatalError{Err: fmt.Errorf("metadata filter operation is empty")}
}

qb.WriteString(fmt.Sprintf(" AND metadata->>:metadata_key_%d %s :metadata_value_%d", i, filter.Operation, i))
}

qb.WriteString(` AND processed_at IS NULL`)
Expand Down Expand Up @@ -467,7 +465,7 @@ func prepareCtxTimeout() (func(td time.Duration) context.Context, context.Cancel
parent, cancel := context.WithCancel(context.Background())
fn := func(td time.Duration) context.Context {
// ctx will be released by parent cancellation
ctx, _ := context.WithTimeout(parent, td)
ctx, _ := context.WithTimeout(parent, td) //nolint:govet
return ctx
}
return fn, cancel
Expand Down Expand Up @@ -544,11 +542,9 @@ func (c *Consumer) tryConsumeMessages(ctx context.Context, query *query.Builder,
namedParams["max_consume_count"] = c.cfg.MaxConsumeCount
}

if c.cfg.MetadataFilters != nil && len(c.cfg.MetadataFilters) > 0 {
for i, filter := range c.cfg.MetadataFilters {
namedParams[fmt.Sprintf("metadata_key_%d", i)] = filter.Key
namedParams[fmt.Sprintf("metadata_value_%d", i)] = filter.Value
}
for i, filter := range c.cfg.MetadataFilters {
namedParams[fmt.Sprintf("metadata_key_%d", i)] = filter.Key
namedParams[fmt.Sprintf("metadata_value_%d", i)] = filter.Value
}

queryString, err := query.Build(namedParams)
Expand Down
9 changes: 8 additions & 1 deletion example_publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ func ExamplePublisher() {
defer db.Close()
const queueName = "test_queue"
p := pgq.NewPublisher(db)
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
payload, _ := json.Marshal(PayloadStruct{Foo: "bar"})
messages := []*pgq.MessageOutgoing{
{
Expand All @@ -37,7 +36,15 @@ func ExamplePublisher() {
},
Payload: json.RawMessage(payload),
},
{
Metadata: pgq.Metadata{
"version": "1.0",
},
Payload: json.RawMessage(payload),
},
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
ids, err := p.Publish(ctx, queueName, messages...)
if err != nil {
log.Fatal("Error publishing message:", err)
Expand Down
37 changes: 20 additions & 17 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,35 +1,38 @@
module go.dataddo.com/pgq

go 1.21
go 1.22

require (
github.com/google/uuid v1.5.0
github.com/jackc/pgtype v1.14.1
github.com/jmoiron/sqlx v1.3.5
github.com/lib/pq v1.10.2
github.com/google/uuid v1.6.0
github.com/jackc/pgtype v1.14.3
github.com/jmoiron/sqlx v1.4.0
github.com/pkg/errors v0.9.1
go.opentelemetry.io/otel v1.21.0
go.opentelemetry.io/otel/metric v1.21.0
golang.org/x/sync v0.6.0
go.opentelemetry.io/otel v1.28.0
go.opentelemetry.io/otel/metric v1.28.0
golang.org/x/sync v0.8.0
)

require (
github.com/go-logr/logr v1.3.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/jackc/pgio v1.0.0 // indirect
go.opentelemetry.io/otel/trace v1.21.0 // indirect
go.opentelemetry.io/otel/trace v1.28.0 // indirect
)

// Test dependencies
require github.com/jackc/pgx/v4 v4.18.1
require github.com/jackc/pgx/v4 v4.18.2

// dependencies from github.com/jackc/pgx/v4 v4.18.1, that's used only in tests.
// Prevent forcing someones to use a vulnerable version of pgx/v4
// https://devhub.checkmarx.com/cve-details/CVE-2024-27289/
exclude github.com/jackc/pgx/v4 v4.18.3

// dependencies from github.com/jackc/pgx/v4 v4.18.2, that's used only in tests.
require (
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgconn v1.14.1 // indirect
github.com/jackc/pgconn v1.14.3 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.3.2 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
golang.org/x/crypto v0.12.0 // indirect
golang.org/x/text v0.12.0 // indirect
github.com/jackc/pgproto3/v2 v2.3.3 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
golang.org/x/crypto v0.26.0 // indirect
golang.org/x/text v0.17.0 // indirect
)
Loading

0 comments on commit ca5ea3c

Please sign in to comment.