-
Notifications
You must be signed in to change notification settings - Fork 206
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add support for header transferring (#136)
- Loading branch information
Showing
6 changed files
with
311 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
package rmq | ||
|
||
import ( | ||
"encoding/json" | ||
"errors" | ||
"fmt" | ||
"net/http" | ||
"strings" | ||
) | ||
|
||
// Redis protocol does not define a specific way to pass additional data like header. | ||
// However, there is often need to pass them (for example for traces propagation). | ||
// | ||
// This implementation injects optional header values marked with a signature into payload body | ||
// during publishing. When message is consumed, if signature is present, header and original payload | ||
// are extracted from augmented payload. | ||
// | ||
// Header is defined as http.Header for better interoperability with existing libraries, | ||
// for example with go.opentelemetry.io/otel/propagation.HeaderCarrier. | ||
|
||
// PayloadWithHeader creates a payload string with header. | ||
func PayloadWithHeader(payload string, header http.Header) string { | ||
if len(header) == 0 { | ||
return payload | ||
} | ||
|
||
hd, _ := json.Marshal(header) // String map never fails marshaling. | ||
|
||
return jsonHeaderSignature + string(hd) + "\n" + payload | ||
} | ||
|
||
// PayloadBytesWithHeader creates payload bytes slice with header. | ||
func PayloadBytesWithHeader(payload []byte, header http.Header) []byte { | ||
if len(header) == 0 { | ||
return payload | ||
} | ||
|
||
hd, _ := json.Marshal(header) // String map never fails marshaling. | ||
|
||
res := make([]byte, 0, len(jsonHeaderSignature)+len(hd)+1+len(payload)) | ||
res = append(res, []byte(jsonHeaderSignature)...) | ||
res = append(res, hd...) | ||
res = append(res, '\n') | ||
res = append(res, payload...) | ||
|
||
return res | ||
} | ||
|
||
// ExtractHeaderAndPayload splits augmented payload into header and original payload if specific signature is present. | ||
func ExtractHeaderAndPayload(payload string) (http.Header, string, error) { | ||
if !strings.HasPrefix(payload, jsonHeaderSignature) { | ||
return nil, payload, nil | ||
} | ||
|
||
lineEnd := strings.Index(payload, "\n") | ||
if lineEnd == -1 { | ||
return nil, "", errors.New("missing line separator") | ||
} | ||
|
||
first := payload[len(jsonHeaderSignature):lineEnd] | ||
rest := payload[lineEnd+1:] | ||
|
||
header := make(http.Header) | ||
|
||
if err := json.Unmarshal([]byte(first), &header); err != nil { | ||
return nil, "", fmt.Errorf("parsing header: %w", err) | ||
} | ||
|
||
return header, rest, nil | ||
} | ||
|
||
// WithHeader is a Delivery with Header. | ||
type WithHeader interface { | ||
Header() http.Header | ||
} | ||
|
||
// jsonHeaderSignature is a signature marker to indicate JSON header presence. | ||
// Do not change the value. | ||
const jsonHeaderSignature = "\xFF\x00\xBE\xBEJ" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
package rmq_test | ||
|
||
import ( | ||
"net/http" | ||
"strings" | ||
"testing" | ||
|
||
"github.com/adjust/rmq/v5" | ||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestPayloadWithHeader(t *testing.T) { | ||
p := `{"foo":"bar"}` | ||
|
||
h := make(http.Header) | ||
ph := rmq.PayloadWithHeader(p, h) | ||
assert.Equal(t, p, ph) // No change for empty header. | ||
h2, p2, err := rmq.ExtractHeaderAndPayload(ph) | ||
require.NoError(t, err) | ||
assert.Nil(t, h2) | ||
assert.Equal(t, p, p2) | ||
|
||
h.Set("X-Foo", "Bar") | ||
ph = rmq.PayloadWithHeader(p, h) | ||
assert.NotEqual(t, p, ph) | ||
|
||
h2, p2, err = rmq.ExtractHeaderAndPayload(ph) | ||
require.NoError(t, err) | ||
assert.Equal(t, h, h2) | ||
assert.Equal(t, p, p2) | ||
} | ||
|
||
func TestPayloadBytesWithHeader(t *testing.T) { | ||
p := `{"foo":"bar"}` | ||
|
||
h := make(http.Header) | ||
ph := rmq.PayloadBytesWithHeader([]byte(p), h) | ||
assert.Equal(t, p, string(ph)) // No change for empty header. | ||
h2, p2, err := rmq.ExtractHeaderAndPayload(string(ph)) | ||
require.NoError(t, err) | ||
assert.Nil(t, h2) | ||
assert.Equal(t, p, p2) | ||
|
||
h.Set("X-Foo", "Bar") | ||
ph = rmq.PayloadBytesWithHeader([]byte(p), h) | ||
assert.NotEqual(t, p, ph) | ||
|
||
h2, p2, err = rmq.ExtractHeaderAndPayload(string(ph)) | ||
require.NoError(t, err) | ||
assert.Equal(t, h, h2) | ||
assert.Equal(t, p, string(p2)) | ||
} | ||
|
||
func TestExtractHeaderAndPayload(t *testing.T) { | ||
t.Run("missing_line_separator", func(t *testing.T) { | ||
ph := rmq.PayloadWithHeader("foo", http.Header{"foo": []string{"bar"}}) | ||
ph = ph[0:7] // Truncating payload. | ||
h, p, err := rmq.ExtractHeaderAndPayload(ph) | ||
require.Error(t, err) | ||
assert.Nil(t, h) | ||
assert.Empty(t, p) | ||
}) | ||
|
||
t.Run("invalid_json", func(t *testing.T) { | ||
ph := rmq.PayloadWithHeader("foo", http.Header{"foo": []string{"bar"}}) | ||
ph = strings.Replace(ph, `"`, `'`, 1) // Corrupting JSON. | ||
h, p, err := rmq.ExtractHeaderAndPayload(ph) | ||
require.Error(t, err) | ||
assert.Nil(t, h) | ||
assert.Empty(t, p) | ||
}) | ||
|
||
t.Run("ok", func(t *testing.T) { | ||
ph := rmq.PayloadWithHeader("foo", http.Header{"foo": []string{"bar"}}) | ||
h, p, err := rmq.ExtractHeaderAndPayload(ph) | ||
require.NoError(t, err) | ||
assert.Equal(t, http.Header{"foo": []string{"bar"}}, h) | ||
assert.Equal(t, "foo", p) | ||
}) | ||
|
||
t.Run("ok_line_breaks", func(t *testing.T) { | ||
ph := rmq.PayloadWithHeader("foo", http.Header{"foo": []string{"bar1\nbar2\nbar3"}}) | ||
h, p, err := rmq.ExtractHeaderAndPayload(ph) | ||
require.NoError(t, err) | ||
assert.Equal(t, http.Header{"foo": []string{"bar1\nbar2\nbar3"}}, h) | ||
assert.Equal(t, "foo", p) | ||
}) | ||
} | ||
|
||
func ExamplePayloadWithHeader() { | ||
var ( | ||
pub, con rmq.Queue | ||
) | ||
|
||
// .... | ||
|
||
h := make(http.Header) | ||
h.Set("X-Baz", "quux") | ||
|
||
// You can add header to your payload during publish. | ||
_ = pub.Publish(rmq.PayloadWithHeader(`{"foo":"bar"}`, h)) | ||
|
||
// .... | ||
|
||
_, _ = con.AddConsumerFunc("tag", func(delivery rmq.Delivery) { | ||
// And receive header back in consumer. | ||
delivery.(rmq.WithHeader).Header().Get("X-Baz") // "quux" | ||
|
||
// .... | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters