Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 26 additions & 24 deletions pulsaradmin/pkg/admin/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,10 @@ type Namespaces interface {
// SetNamespaceMessageTTLWithContext sets the messages Time to Live for all the topics within a namespace
SetNamespaceMessageTTLWithContext(ctx context.Context, namespace string, ttlInSeconds int) error

// GetNamespaceMessageTTL returns the message TTL for a namespace
// GetNamespaceMessageTTL returns the message TTL for a namespace. Returns -1 if not set
GetNamespaceMessageTTL(namespace string) (int, error)

// GetNamespaceMessageTTLWithContext returns the message TTL for a namespace
// GetNamespaceMessageTTLWithContext returns the message TTL for a namespace. Returns -1 if not set
GetNamespaceMessageTTLWithContext(ctx context.Context, namespace string) (int, error)

// GetRetention returns the retention configuration for a namespace
Expand Down Expand Up @@ -226,10 +226,11 @@ type Namespaces interface {
// SetOffloadDeleteLagWithContext sets the offload deletion lag for a namespace
SetOffloadDeleteLagWithContext(ctx context.Context, namespace utils.NameSpaceName, timeMs int64) error

// GetOffloadDeleteLag returns the offload deletion lag for a namespace, in milliseconds
// GetOffloadDeleteLag returns the offload deletion lag for a namespace, in milliseconds. Returns -1 if not set
GetOffloadDeleteLag(namespace utils.NameSpaceName) (int64, error)

// GetOffloadDeleteLagWithContext returns the offload deletion lag for a namespace, in milliseconds
// GetOffloadDeleteLagWithContext returns the offload deletion lag for a namespace, in milliseconds.
// Returns -1 if not set
GetOffloadDeleteLagWithContext(ctx context.Context, namespace utils.NameSpaceName) (int64, error)

// SetOffloadThreshold sets the offloadThreshold for a namespace
Expand All @@ -238,10 +239,10 @@ type Namespaces interface {
// SetOffloadThresholdWithContext sets the offloadThreshold for a namespace
SetOffloadThresholdWithContext(ctx context.Context, namespace utils.NameSpaceName, threshold int64) error

// GetOffloadThreshold returns the offloadThreshold for a namespace
// GetOffloadThreshold returns the offloadThreshold for a namespace. Returns -1 if not set
GetOffloadThreshold(namespace utils.NameSpaceName) (int64, error)

// GetOffloadThresholdWithContext returns the offloadThreshold for a namespace
// GetOffloadThresholdWithContext returns the offloadThreshold for a namespace. Returns -1 if not set
GetOffloadThresholdWithContext(ctx context.Context, namespace utils.NameSpaceName) (int64, error)

// SetOffloadThresholdInSeconds sets the offloadThresholdInSeconds for a namespace
Expand All @@ -250,10 +251,10 @@ type Namespaces interface {
// SetOffloadThresholdInSecondsWithContext sets the offloadThresholdInSeconds for a namespace
SetOffloadThresholdInSecondsWithContext(ctx context.Context, namespace utils.NameSpaceName, threshold int64) error

// GetOffloadThresholdInSeconds returns the offloadThresholdInSeconds for a namespace
// GetOffloadThresholdInSeconds returns the offloadThresholdInSeconds for a namespace. Returns -1 if not set
GetOffloadThresholdInSeconds(namespace utils.NameSpaceName) (int64, error)

// GetOffloadThresholdInSecondsWithContext returns the offloadThresholdInSeconds for a namespace
// GetOffloadThresholdInSecondsWithContext returns the offloadThresholdInSeconds for a namespace. Returns -1 if not set
GetOffloadThresholdInSecondsWithContext(ctx context.Context, namespace utils.NameSpaceName) (int64, error)

// SetCompactionThreshold sets the compactionThreshold for a namespace
Expand All @@ -262,10 +263,10 @@ type Namespaces interface {
// SetCompactionThresholdWithContext sets the compactionThreshold for a namespace
SetCompactionThresholdWithContext(ctx context.Context, namespace utils.NameSpaceName, threshold int64) error

// GetCompactionThreshold returns the compactionThreshold for a namespace
// GetCompactionThreshold returns the compactionThreshold for a namespace. Returns -1 if not set
GetCompactionThreshold(namespace utils.NameSpaceName) (int64, error)

// GetCompactionThresholdWithContext returns the compactionThreshold for a namespace
// GetCompactionThresholdWithContext returns the compactionThreshold for a namespace. Returns -1 if not set
GetCompactionThresholdWithContext(ctx context.Context, namespace utils.NameSpaceName) (int64, error)

// SetMaxConsumersPerSubscription sets maxConsumersPerSubscription for a namespace.
Expand All @@ -276,10 +277,11 @@ type Namespaces interface {
//nolint: revive // It's ok here to use a built-in function name (max)
SetMaxConsumersPerSubscriptionWithContext(ctx context.Context, namespace utils.NameSpaceName, max int) error

// GetMaxConsumersPerSubscription returns the maxConsumersPerSubscription for a namespace.
// GetMaxConsumersPerSubscription returns the maxConsumersPerSubscription for a namespace. Returns -1 if not set
GetMaxConsumersPerSubscription(namespace utils.NameSpaceName) (int, error)

// GetMaxConsumersPerSubscriptionWithContext returns the maxConsumersPerSubscription for a namespace.
// Returns -1 if not set
GetMaxConsumersPerSubscriptionWithContext(ctx context.Context, namespace utils.NameSpaceName) (int, error)

// SetMaxConsumersPerTopic sets maxConsumersPerTopic for a namespace.
Expand All @@ -290,10 +292,10 @@ type Namespaces interface {
//nolint: revive // It's ok here to use a built-in function name (max)
SetMaxConsumersPerTopicWithContext(ctx context.Context, namespace utils.NameSpaceName, max int) error

// GetMaxConsumersPerTopic returns the maxProducersPerTopic for a namespace.
// GetMaxConsumersPerTopic returns the maxProducersPerTopic for a namespace. Returns -1 if not set
GetMaxConsumersPerTopic(namespace utils.NameSpaceName) (int, error)

// GetMaxConsumersPerTopicWithContext returns the maxProducersPerTopic for a namespace.
// GetMaxConsumersPerTopicWithContext returns the maxProducersPerTopic for a namespace. Returns -1 if not set
GetMaxConsumersPerTopicWithContext(ctx context.Context, namespace utils.NameSpaceName) (int, error)

// SetMaxProducersPerTopic sets maxProducersPerTopic for a namespace.
Expand All @@ -304,10 +306,10 @@ type Namespaces interface {
//nolint: revive // It's ok here to use a built-in function name (max)
SetMaxProducersPerTopicWithContext(ctx context.Context, namespace utils.NameSpaceName, max int) error

// GetMaxProducersPerTopic returns the maxProducersPerTopic for a namespace.
// GetMaxProducersPerTopic returns the maxProducersPerTopic for a namespace. Returns -1 if not set
GetMaxProducersPerTopic(namespace utils.NameSpaceName) (int, error)

// GetMaxProducersPerTopicWithContext returns the maxProducersPerTopic for a namespace.
// GetMaxProducersPerTopicWithContext returns the maxProducersPerTopic for a namespace. Returns -1 if not set
GetMaxProducersPerTopicWithContext(ctx context.Context, namespace utils.NameSpaceName) (int, error)

// SetMaxTopicsPerNamespace sets maxTopicsPerNamespace for a namespace.
Expand Down Expand Up @@ -851,7 +853,7 @@ func (n *namespaces) GetNamespaceMessageTTL(namespace string) (int, error) {
}

func (n *namespaces) GetNamespaceMessageTTLWithContext(ctx context.Context, namespace string) (int, error) {
var ttl int
var ttl = -1
nsName, err := utils.GetNamespaceName(namespace)
if err != nil {
return 0, err
Expand Down Expand Up @@ -1114,7 +1116,7 @@ func (n *namespaces) GetOffloadDeleteLag(namespace utils.NameSpaceName) (int64,
}

func (n *namespaces) GetOffloadDeleteLagWithContext(ctx context.Context, namespace utils.NameSpaceName) (int64, error) {
var result int64
var result int64 = -1
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "offloadDeletionLagMs")
err := n.pulsar.Client.GetWithContext(ctx, endpoint, &result)
return result, err
Expand Down Expand Up @@ -1143,7 +1145,7 @@ func (n *namespaces) GetMaxConsumersPerSubscriptionWithContext(
ctx context.Context,
namespace utils.NameSpaceName,
) (int, error) {
var result int
var result = -1
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "maxConsumersPerSubscription")
err := n.pulsar.Client.GetWithContext(ctx, endpoint, &result)
return result, err
Expand All @@ -1167,7 +1169,7 @@ func (n *namespaces) GetOffloadThreshold(namespace utils.NameSpaceName) (int64,
}

func (n *namespaces) GetOffloadThresholdWithContext(ctx context.Context, namespace utils.NameSpaceName) (int64, error) {
var result int64
var result int64 = -1
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "offloadThreshold")
err := n.pulsar.Client.GetWithContext(ctx, endpoint, &result)
return result, err
Expand All @@ -1194,7 +1196,7 @@ func (n *namespaces) GetOffloadThresholdInSecondsWithContext(
ctx context.Context,
namespace utils.NameSpaceName,
) (int64, error) {
var result int64
var result int64 = -1
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "offloadThresholdInSeconds")
err := n.pulsar.Client.GetWithContext(ctx, endpoint, &result)
return result, err
Expand Down Expand Up @@ -1223,7 +1225,7 @@ func (n *namespaces) GetMaxConsumersPerTopicWithContext(
ctx context.Context,
namespace utils.NameSpaceName,
) (int, error) {
var result int
var result = -1
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "maxConsumersPerTopic")
err := n.pulsar.Client.GetWithContext(ctx, endpoint, &result)
return result, err
Expand All @@ -1250,7 +1252,7 @@ func (n *namespaces) GetCompactionThresholdWithContext(
ctx context.Context,
namespace utils.NameSpaceName,
) (int64, error) {
var result int64
var result int64 = -1
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "compactionThreshold")
err := n.pulsar.Client.GetWithContext(ctx, endpoint, &result)
return result, err
Expand Down Expand Up @@ -1279,7 +1281,7 @@ func (n *namespaces) GetMaxProducersPerTopicWithContext(
ctx context.Context,
namespace utils.NameSpaceName,
) (int, error) {
var result int
var result = -1
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "maxProducersPerTopic")
err := n.pulsar.Client.GetWithContext(ctx, endpoint, &result)
return result, err
Expand Down Expand Up @@ -2014,7 +2016,7 @@ func (n *namespaces) GetMaxTopicsPerNamespaceWithContext(
ctx context.Context,
namespace utils.NameSpaceName,
) (int, error) {
var result int
var result int // This method does not require a sentinel value of -1 since the API never returns empty
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "maxTopicsPerNamespace")
err := n.pulsar.Client.GetWithContext(ctx, endpoint, &result)
return result, err
Expand Down
166 changes: 165 additions & 1 deletion pulsaradmin/pkg/admin/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,11 @@ func TestNamespaces_GetOffloadThresholdInSeconds(t *testing.T) {

namespace, _ := utils.GetNamespaceName("public/default")

// set the subscription expiration time and get it
// Get default (should be -1)
threshold, err := admin.Namespaces().GetOffloadThreshold(*namespace)
assert.NoError(t, err)
assert.Equal(t, int64(-1), threshold)

err = admin.Namespaces().SetOffloadThresholdInSeconds(*namespace,
60)
assert.Equal(t, nil, err)
Expand Down Expand Up @@ -599,3 +603,163 @@ func TestNamespaces_GetMaxTopicsPerNamespace(t *testing.T) {
expected = 0
assert.Equal(t, expected, maxTopics)
}

func TestNamespaces_MessageTTL(t *testing.T) {
config := &config.Config{}
admin, err := New(config)
require.NoError(t, err)
require.NotNil(t, admin)

namespace, _ := utils.GetNamespaceName("public/default")

// Get default (should be -1)
ttl, err := admin.Namespaces().GetNamespaceMessageTTL(namespace.String())
assert.NoError(t, err)
assert.Equal(t, -1, ttl)

// Set to 0 explicitly
err = admin.Namespaces().SetNamespaceMessageTTL(namespace.String(), 0)
assert.NoError(t, err)

// Verify returns 0
ttl, err = admin.Namespaces().GetNamespaceMessageTTL(namespace.String())
assert.NoError(t, err)
assert.Equal(t, 0, ttl)

// Set to positive value
err = admin.Namespaces().SetNamespaceMessageTTL(namespace.String(), 3600)
assert.NoError(t, err)

// Verify returns value
ttl, err = admin.Namespaces().GetNamespaceMessageTTL(namespace.String())
assert.NoError(t, err)
assert.Equal(t, 3600, ttl)
}

func TestNamespaces_OffloadDeleteLag(t *testing.T) {
config := &config.Config{}
admin, err := New(config)
require.NoError(t, err)
require.NotNil(t, admin)

namespace, _ := utils.GetNamespaceName("public/default")

// Get default (should be -1)
lag, err := admin.Namespaces().GetOffloadDeleteLag(*namespace)
assert.NoError(t, err)
assert.Equal(t, int64(-1), lag)

// Set to 0 explicitly
err = admin.Namespaces().SetOffloadDeleteLag(*namespace, 0)
assert.NoError(t, err)

// Verify returns 0
lag, err = admin.Namespaces().GetOffloadDeleteLag(*namespace)
assert.NoError(t, err)
assert.Equal(t, int64(0), lag)

// Set to positive value
err = admin.Namespaces().SetOffloadDeleteLag(*namespace, 1000)
assert.NoError(t, err)

// Verify returns value
lag, err = admin.Namespaces().GetOffloadDeleteLag(*namespace)
assert.NoError(t, err)
assert.Equal(t, int64(1000), lag)
}

func TestNamespaces_MaxConsumersPerTopic(t *testing.T) {
config := &config.Config{}
admin, err := New(config)
require.NoError(t, err)
require.NotNil(t, admin)

namespace, _ := utils.GetNamespaceName("public/default")

// Get default (should be -1)
maxConsumers, err := admin.Namespaces().GetMaxConsumersPerTopic(*namespace)
assert.NoError(t, err)
assert.Equal(t, -1, maxConsumers)

// Set to 0 explicitly
err = admin.Namespaces().SetMaxConsumersPerTopic(*namespace, 0)
assert.NoError(t, err)

// Verify returns 0
maxConsumers, err = admin.Namespaces().GetMaxConsumersPerTopic(*namespace)
assert.NoError(t, err)
assert.Equal(t, 0, maxConsumers)

// Set to positive value
err = admin.Namespaces().SetMaxConsumersPerTopic(*namespace, 100)
assert.NoError(t, err)

// Verify returns value
maxConsumers, err = admin.Namespaces().GetMaxConsumersPerTopic(*namespace)
assert.NoError(t, err)
assert.Equal(t, 100, maxConsumers)
}

func TestNamespaces_CompactionThreshold(t *testing.T) {
config := &config.Config{}
admin, err := New(config)
require.NoError(t, err)
require.NotNil(t, admin)

namespace, _ := utils.GetNamespaceName("public/default")

// Get default (should be -1)
threshold, err := admin.Namespaces().GetCompactionThreshold(*namespace)
assert.NoError(t, err)
assert.Equal(t, int64(-1), threshold)

// Set to 0 explicitly
err = admin.Namespaces().SetCompactionThreshold(*namespace, 0)
assert.NoError(t, err)

// Verify returns 0
threshold, err = admin.Namespaces().GetCompactionThreshold(*namespace)
assert.NoError(t, err)
assert.Equal(t, int64(0), threshold)

// Set to positive value
err = admin.Namespaces().SetCompactionThreshold(*namespace, 1024*1024) // 1MB
assert.NoError(t, err)

// Verify returns value
threshold, err = admin.Namespaces().GetCompactionThreshold(*namespace)
assert.NoError(t, err)
assert.Equal(t, int64(1024*1024), threshold)
}

func TestNamespaces_MaxProducersPerTopic(t *testing.T) {
config := &config.Config{}
admin, err := New(config)
require.NoError(t, err)
require.NotNil(t, admin)

namespace, _ := utils.GetNamespaceName("public/default")

// Get default (should be -1)
maxProducers, err := admin.Namespaces().GetMaxProducersPerTopic(*namespace)
assert.NoError(t, err)
assert.Equal(t, -1, maxProducers)

// Set to 0 explicitly
err = admin.Namespaces().SetMaxProducersPerTopic(*namespace, 0)
assert.NoError(t, err)

// Verify returns 0
maxProducers, err = admin.Namespaces().GetMaxProducersPerTopic(*namespace)
assert.NoError(t, err)
assert.Equal(t, 0, maxProducers)

// Set to positive value
err = admin.Namespaces().SetMaxProducersPerTopic(*namespace, 50)
assert.NoError(t, err)

// Verify returns value
maxProducers, err = admin.Namespaces().GetMaxProducersPerTopic(*namespace)
assert.NoError(t, err)
assert.Equal(t, 50, maxProducers)
}
Loading