Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,7 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
Throwable actEx = FutureUtil.unwrapCompletionException(ex);
if (actEx instanceof WebApplicationException restException) {
if (restException.getResponse().getStatus() == Response.Status.NOT_FOUND.getStatusCode()) {
writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.MetadataError,
writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.TopicNotFound,
"Tenant or namespace or topic does not exist: " + topicName.getNamespace() ,
requestId));
lookupSemaphore.release();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -599,8 +599,7 @@ public void testNamespaceNotExist(TopicDomain topicDomain) throws Exception {
fail("Expected a not found ex");
} catch (Exception ex) {
Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex);
assertTrue(unwrapEx instanceof PulsarClientException.BrokerMetadataException ||
unwrapEx instanceof PulsarClientException.TopicDoesNotExistException);
assertTrue(unwrapEx instanceof PulsarClientException.TopicDoesNotExistException);
}
}
// Verify: lookup semaphore has been releases.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,17 @@
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

Expand All @@ -56,31 +58,35 @@
public class TokenAuthenticatedProducerConsumerTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(TokenAuthenticatedProducerConsumerTest.class);

private final static String ADMIN_ROLE = "admin";
private final String ADMIN_TOKEN;
private final String USER_TOKEN;
private final String TOKEN_PUBLIC_KEY;
private final KeyPair kp;

TokenAuthenticatedProducerConsumerTest() throws NoSuchAlgorithmException {
KeyPairGenerator kpg = KeyPairGenerator.getInstance("RSA");
KeyPair kp = kpg.generateKeyPair();
kp = kpg.generateKeyPair();

byte[] encodedPublicKey = kp.getPublic().getEncoded();
TOKEN_PUBLIC_KEY = "data:;base64," + Base64.getEncoder().encodeToString(encodedPublicKey);
ADMIN_TOKEN = generateToken(kp);
ADMIN_TOKEN = generateToken(ADMIN_ROLE);
USER_TOKEN = generateToken("user");
}

private String generateToken(KeyPair kp) {
private String generateToken(String subject) {
PrivateKey pkey = kp.getPrivate();
long expMillis = System.currentTimeMillis() + Duration.ofHours(1).toMillis();
Date exp = new Date(expMillis);

return Jwts.builder()
.setSubject("admin")
.setSubject(subject)
.setExpiration(exp)
.signWith(pkey, SignatureAlgorithm.forSigningKey(pkey))
.compact();
}

@BeforeMethod
@BeforeClass
@Override
protected void setup() throws Exception {
conf.setAuthenticationEnabled(true);
Expand Down Expand Up @@ -118,7 +124,7 @@ protected final void clientSetup() throws Exception {
.authentication(AuthenticationFactory.token(ADMIN_TOKEN)));
}

@AfterMethod(alwaysRun = true)
@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
Expand Down Expand Up @@ -172,4 +178,53 @@ public void testTokenProducerAndConsumer() throws Exception {
log.info("-- Exiting {} test --", methodName);
}

@DataProvider
public static Object[][] provider() {
// The 1st element specifies whether to use TCP service URL
// The 2nd element specifies whether to use a token with correct permission
return new Object[][] {
{ true, true },
{ true, false },
{ false, true },
{ false, false },
};
}

@Test(dataProvider = "provider")
public void testTopicNotFound(boolean useTcpServiceUrl, boolean useCorrectToken) throws Exception {
final var operationTimeoutMs = 10000;
final var url = useTcpServiceUrl ? pulsar.getBrokerServiceUrl() : pulsar.getWebServiceAddress();
final var token = useCorrectToken ? ADMIN_TOKEN : USER_TOKEN;
@Cleanup final var client = PulsarClient.builder().serviceUrl(url)
.operationTimeout(operationTimeoutMs, TimeUnit.MILLISECONDS)
.authentication(AuthenticationFactory.token(token)).build();
final var topic = "my-property/not-exist/tp"; // the namespace does not exist
var start = System.currentTimeMillis();
try {
client.newProducer().topic(topic).create();
Assert.fail();
} catch (PulsarClientException e) {
final var elapsedMs = System.currentTimeMillis() - start;
log.info("Failed to create producer after {} ms: {} {}", elapsedMs, e.getClass().getName(), e.getMessage());
Assert.assertTrue(elapsedMs < operationTimeoutMs);
if (useTcpServiceUrl) {
Assert.assertTrue(e instanceof PulsarClientException.TopicDoesNotExistException);
} else {
Assert.assertTrue(e instanceof PulsarClientException.NotFoundException);
}
}
start = System.currentTimeMillis();
try {
client.newConsumer().topic(topic).subscriptionName("sub").subscribe();
} catch (PulsarClientException e) {
final var elapsedMs = System.currentTimeMillis() - start;
log.info("Failed to subscribe after {} ms: {} {}", elapsedMs, e.getClass().getName(), e.getMessage());
Assert.assertTrue(elapsedMs < operationTimeoutMs);
if (useTcpServiceUrl) {
Assert.assertTrue(e instanceof PulsarClientException.TopicDoesNotExistException);
} else {
Assert.assertTrue(e instanceof PulsarClientException.NotFoundException);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -418,9 +418,9 @@ private CompletableFuture<Integer> checkPartitions(String topic, boolean forceNo
}
}).exceptionally(ex -> {
Throwable actEx = FutureUtil.unwrapCompletionException(ex);
if (forceNoPartitioned && actEx instanceof PulsarClientException.NotFoundException
if (forceNoPartitioned && (actEx instanceof PulsarClientException.NotFoundException
|| actEx instanceof PulsarClientException.TopicDoesNotExistException
|| actEx instanceof PulsarAdminException.NotFoundException) {
|| actEx instanceof PulsarAdminException.NotFoundException)) {
checkPartitions.complete(0);
} else {
checkPartitions.completeExceptionally(ex);
Expand Down