Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add client function to list topics in a namespace. #101

Merged
merged 5 commits into from
Nov 18, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Ensure producer is removed from connection listener on close.
  • Loading branch information
cckellogg committed Nov 16, 2019
commit fe0fccb185aa3f4ac4ed691853343142587e267a
31 changes: 21 additions & 10 deletions pulsar/client_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ package pulsar

import (
"fmt"
"github.com/stretchr/testify/assert"
"io/ioutil"
"testing"

"github.com/stretchr/testify/assert"
)

func TestClient(t *testing.T) {
Expand Down Expand Up @@ -201,7 +200,7 @@ func TestTopicPartitions(t *testing.T) {
defer client.Close()

// Create topic with 5 partitions
httpPut("http://localhost:8080/admin/v2/persistent/public/default/TestGetTopicPartitions/partitions",
httpPut("admin/v2/persistent/public/default/TestGetTopicPartitions/partitions",
5)

partitionedTopic := "persistent://public/default/TestGetTopicPartitions"
Expand Down Expand Up @@ -243,27 +242,26 @@ func TestNamespaceTopicsNamespaceDoesNotExit(t *testing.T) {
func TestNamespaceTopics(t *testing.T) {
name := generateRandomName()
namespace := fmt.Sprintf("public/%s", name)
namespaceUrl := fmt.Sprintf("http://localhost:8080/admin/v2/namespaces/%s", namespace)
err := httpPut(namespaceUrl, nil)
namespaceUrl := fmt.Sprintf("admin/v2/namespaces/%s", namespace)
err := httpPut(namespaceUrl, anonymousNamespacePolicy())
if err != nil {
t.Fatal()
}
defer func() {
_ = httpDelete(fmt.Sprintf("http://localhost:8080/admin/v2/namespaces/%s", namespace))
_ = httpDelete(fmt.Sprintf("admin/v2/namespaces/%s", namespace))
}()

// create topics
topic1 := fmt.Sprintf("%s/topic-1", namespace)
if err := httpPut("http://localhost:8080/admin/v2/persistent/"+topic1, nil); err != nil {
if err := httpPut("admin/v2/persistent/"+topic1, nil); err != nil {
t.Fatal(err)
}
topic2 := fmt.Sprintf("%s/topic-2", namespace)
if err := httpPut("http://localhost:8080/admin/v2/persistent/"+topic2, namespace); err != nil {
if err := httpPut("admin/v2/persistent/"+topic2, namespace); err != nil {
t.Fatal(err)
}
defer func() {
_ = httpDelete("http://localhost:8080/admin/v2/persistent/"+topic1,
"http://localhost:8080/admin/v2/persistent/"+topic2)
_ = httpDelete("admin/v2/persistent/"+topic1, "admin/v2/persistent/"+topic2)
}()

c, err := NewClient(ClientOptions{
Expand Down Expand Up @@ -302,4 +300,17 @@ func TestNamespaceTopics(t *testing.T) {
t.Fatal(err)
}
assert.Equal(t, 2, len(topics))

//time.Sleep(60 * time.Second)
}

func anonymousNamespacePolicy() map[string]interface{} {
return map[string]interface{}{
"auth_policies": map[string]interface{}{
"namespace_auth": map[string]interface{}{
"anonymous": []string{"produce", "consume"},
},
},
"replication_clusters": []string{"standalone"},
}
}
9 changes: 4 additions & 5 deletions pulsar/impl_partition_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,8 +399,8 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
}

func (p *partitionProducer) internalClose(req *closeProducer) {
defer req.waitGroup.Done()
if p.state != producerReady {
req.waitGroup.Done()
return
}

Expand All @@ -417,12 +417,11 @@ func (p *partitionProducer) internalClose(req *closeProducer) {
p.log.WithError(err).Warn("Failed to close producer")
} else {
p.log.Info("Closed producer")
p.state = producerClosed
p.cnx.UnregisterListener(p.producerID)
p.batchFlushTicker.Stop()
}

req.waitGroup.Done()
p.state = producerClosed
p.cnx.UnregisterListener(p.producerID)
p.batchFlushTicker.Stop()
}

func (p *partitionProducer) LastSequenceID() int64 {
Expand Down
6 changes: 2 additions & 4 deletions pulsar/impl_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) {

type ProducerError struct {
partition int
prod *partitionProducer
prod Producer
err error
}

Expand All @@ -92,9 +92,7 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) {
pe, ok := <-c
if ok {
err = pe.err
if pe.err != nil {
p.producers[pe.partition] = pe.prod
}
p.producers[pe.partition] = pe.prod
}
}

Expand Down
2 changes: 1 addition & 1 deletion pulsar/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ func TestFlushInPartitionedProducer(t *testing.T) {

func TestMessageRouter(t *testing.T) {
// Create topic with 5 partitions
err := httpPut("http://localhost:8080/admin/v2/persistent/public/default/my-partitioned-topic/partitions", 5)
err := httpPut("admin/v2/persistent/public/default/my-partitioned-topic/partitions", 5)
if err != nil {
t.Fatal(err)
}
Expand Down
25 changes: 17 additions & 8 deletions pulsar/test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"encoding/json"
"fmt"
"net/http"
"path"
"strings"
"testing"
"time"
Expand All @@ -33,6 +34,8 @@ const (
serviceURL = "pulsar://localhost:6650"
serviceURLTLS = "pulsar+ssl://localhost:6651"

webServiceURL = "http://localhost:8080"

caCertsPath = "../integration-tests/certs/cacert.pem"
tlsClientCertPath = "../integration-tests/certs/client-cert.pem"
tlsClientKeyPath = "../integration-tests/certs/client-key.pem"
Expand All @@ -47,11 +50,16 @@ func newAuthTopicName() string {
return fmt.Sprintf("private/auth/my-topic-%v", time.Now().Nanosecond())
}

func httpDelete(urls ...string) error {
func testEndpoint(parts ...string) string {
return webServiceURL + "/" + path.Join(parts...)
}

func httpDelete(requestPaths ...string) error {
client := http.DefaultClient
var errs error
doFn := func(url string) error {
req, err := http.NewRequest(http.MethodDelete, url, nil)
doFn := func(requestPath string) error {
endpoint := testEndpoint(requestPath)
req, err := http.NewRequest(http.MethodDelete, endpoint, nil)
if err != nil {
return err
}
Expand All @@ -72,19 +80,20 @@ func httpDelete(urls ...string) error {
}
return nil
}
for _, url := range urls {
if err := doFn(url); err != nil {
err = pkgerrors.Wrap(err, "unable to delete url: "+url)
for _, requestPath := range requestPaths {
if err := doFn(requestPath); err != nil {
err = pkgerrors.Wrapf(err, "unable to delete url: %s"+requestPath)
}
}
return errs
}

func httpPut(url string, body interface{}) error {
func httpPut(requestPath string, body interface{}) error {
client := http.DefaultClient

data, _ := json.Marshal(body)
req, err := http.NewRequest(http.MethodPut, url, bytes.NewReader(data))
endpoint := testEndpoint(requestPath)
req, err := http.NewRequest(http.MethodPut, endpoint, bytes.NewReader(data))
if err != nil {
return err
}
Expand Down