Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,22 @@
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.pinot.common.exception.TableNotFoundException;
import org.apache.pinot.controller.api.exception.ControllerApplicationException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


@Api(tags = Constants.TABLE_TAG)
@Path("/")
public class PinotTableInstances {

private static final Logger LOGGER = LoggerFactory.getLogger(PinotTableInstances.class);

@Inject
PinotHelixResourceManager _pinotHelixResourceManager;

Expand Down Expand Up @@ -116,16 +123,21 @@ public String getTableInstances(
}

@GET
@Path("/tables/{tableNameWithType}/livebrokers")
@Path("/tables/{tableName}/livebrokers")
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "List the brokers serving a table", notes = "List live brokers of the given table based on EV")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Success"),
@ApiResponse(code = 404, message = "Table not found"),
@ApiResponse(code = 500, message = "Internal server error")})
@ApiResponse(code = 500, message = "Internal server error")
})
public List<String> getLiveBrokersForTable(
@ApiParam(value = "Table name with type", required = true)
@PathParam("tableNameWithType") String tableNameWithType) {
return _pinotHelixResourceManager.getLiveBrokersForTable(tableNameWithType);
@ApiParam(value = "Table name (with or without type)", required = true)
@PathParam("tableName") String tableName) {
try {
return _pinotHelixResourceManager.getLiveBrokersForTable(tableName);
} catch (TableNotFoundException e) {
throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.NOT_FOUND);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3181,15 +3181,52 @@ public TableStats getTableStats(String tableNameWithType) {
}

/**
* Return the list of live brokers serving the corresponding table.
* Each entry in the broker list is of the following format:
* Broker_hostname_port
* Return the list of live brokers serving the corresponding table. Based on the
* input tableName, there can be 3 cases:
*
* 1. If the tableName has a type-suffix, then brokers for only that table-type
* will be returned.
* 2. If the tableName doesn't have a type-suffix and there's only 1 type for that
* table, then the brokers for that table-type would be returned.
* 3. If the tableName doesn't have a type-suffix and there are both REALTIME
* and OFFLINE tables, then the intersection of the brokers for the two table-types
* would be returned. Intersection is taken since the method guarantees to return
* brokers which can serve the given table.
*
* @param tableName name of table with or without type suffix.
* @return list of brokers serving the given table in the format: Broker_hostname_port.
* @throws TableNotFoundException when no table exists with the given name.
*/
public List<String> getLiveBrokersForTable(String tableNameWithType) {
public List<String> getLiveBrokersForTable(String tableName)
throws TableNotFoundException {
ExternalView ev = _helixDataAccessor.getProperty(_keyBuilder.externalView(Helix.BROKER_RESOURCE_INSTANCE));
if (ev == null) {
return Collections.EMPTY_LIST;
throw new IllegalStateException("Failed to find external view for " + Helix.BROKER_RESOURCE_INSTANCE);
}
TableType inputTableType = TableNameBuilder.getTableTypeFromTableName(tableName);
if (inputTableType != null) {
if (!hasTable(tableName)) {
throw new TableNotFoundException(String.format("Table=%s not found", tableName));
}
return getLiveBrokersForTable(ev, tableName);
}
String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName);
boolean hasOfflineTable = hasTable(offlineTableName);
boolean hasRealtimeTable = hasTable(realtimeTableName);
if (!hasOfflineTable && !hasRealtimeTable) {
throw new TableNotFoundException(String.format("Table=%s not found", tableName));
}
if (hasOfflineTable && hasRealtimeTable) {
Set<String> offlineTables = new HashSet<>(getLiveBrokersForTable(ev, offlineTableName));
return getLiveBrokersForTable(ev, realtimeTableName).stream().filter(offlineTables::contains)
.collect(Collectors.toList());
} else {
return getLiveBrokersForTable(ev, hasOfflineTable ? offlineTableName : realtimeTableName);
}
}

private List<String> getLiveBrokersForTable(ExternalView ev, String tableNameWithType) {
Map<String, String> brokerToStateMap = ev.getStateMap(tableNameWithType);
List<String> hosts = new ArrayList<>();
if (brokerToStateMap != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.MasterSlaveSMD;
import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.exception.TableNotFoundException;
import org.apache.pinot.common.lineage.LineageEntryState;
import org.apache.pinot.common.lineage.SegmentLineage;
import org.apache.pinot.common.lineage.SegmentLineageAccessHelper;
Expand All @@ -53,6 +54,7 @@
import org.apache.pinot.common.utils.helix.LeadControllerUtils;
import org.apache.pinot.controller.ControllerTestUtils;
import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
import org.apache.pinot.spi.config.instance.Instance;
import org.apache.pinot.spi.config.instance.InstanceType;
import org.apache.pinot.spi.config.table.TableConfig;
Expand All @@ -79,7 +81,9 @@


public class PinotHelixResourceManagerTest {
private static final int NUM_INSTANCES = 2;
private static final int NUM_REALTIME_SERVER_INSTANCES = 2;
private static final int NUM_OFFLINE_SERVER_INSTANCES = 2;
private static final int NUM_INSTANCES = NUM_REALTIME_SERVER_INSTANCES + NUM_OFFLINE_SERVER_INSTANCES;
private static final String BROKER_TENANT_NAME = "rBrokerTenant";
private static final String SERVER_TENANT_NAME = "rServerTenant";
private static final String TABLE_NAME = "resourceTestTable";
Expand All @@ -104,7 +108,8 @@ public void setUp()
ControllerTestUtils.setupClusterAndValidate();

// Create server tenant on all Servers
Tenant serverTenant = new Tenant(TenantRole.SERVER, SERVER_TENANT_NAME, NUM_INSTANCES, NUM_INSTANCES, 0);
Tenant serverTenant = new Tenant(TenantRole.SERVER, SERVER_TENANT_NAME, NUM_INSTANCES, NUM_OFFLINE_SERVER_INSTANCES,
NUM_REALTIME_SERVER_INSTANCES);
ControllerTestUtils.getHelixResourceManager().createServerTenant(serverTenant);

// Enable lead controller resource
Expand Down Expand Up @@ -904,7 +909,7 @@ private void waitForSegmentsToDelete(String tableNameWithType, int expectedNumSe

@Test
public void testGetLiveBrokersForTable()
throws IOException {
throws IOException, TableNotFoundException {
// Create broker tenant
Tenant brokerTenant = new Tenant(TenantRole.BROKER, BROKER_TENANT_NAME, 2, 0, 0);
PinotResourceManagerResponse response =
Expand Down Expand Up @@ -936,11 +941,70 @@ public void testGetLiveBrokersForTable()
List<String> liveBrokersForTable =
ControllerTestUtils.getHelixResourceManager().getLiveBrokersForTable(OFFLINE_TABLE_NAME);
Assert.assertEquals(liveBrokersForTable.size(), 2);
for (String broker: liveBrokersForTable) {
for (String broker : liveBrokersForTable) {
Assert.assertTrue(broker.startsWith("Broker_localhost"));
}

// Test retrieving the live broker for table without table-type suffix.
liveBrokersForTable = ControllerTestUtils.getHelixResourceManager().getLiveBrokersForTable(TABLE_NAME);
Assert.assertEquals(liveBrokersForTable.size(), 2);

// Test retrieving the live broker for table with non-existent table-type.
try {
ControllerTestUtils.getHelixResourceManager().getLiveBrokersForTable(REALTIME_TABLE_NAME);
Assert.fail("Method call above should have failed");
} catch (TableNotFoundException tableNotFoundException) {
Assert.assertTrue(tableNotFoundException.getMessage().contains(REALTIME_TABLE_NAME));
}

// Create the realtime table.
ControllerTestUtils.addDummySchema(REALTIME_TABLE_NAME);
tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
.setNumReplicas(ControllerTestUtils.MIN_NUM_REPLICAS).setBrokerTenant(BROKER_TENANT_NAME)
.setStreamConfigs(FakeStreamConfigUtils.getDefaultHighLevelStreamConfigs().getStreamConfigsMap())
.setSchemaName(REALTIME_TABLE_NAME).setServerTenant(SERVER_TENANT_NAME).build();
ControllerTestUtils.getHelixResourceManager().addTable(tableConfig);
// Wait for EV to be updated with realtime table.
TestUtils.waitForCondition(aVoid -> {
ExternalView externalView = ControllerTestUtils.getHelixResourceManager().getHelixAdmin()
.getResourceExternalView(ControllerTestUtils.getHelixClusterName(),
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
int onlineBrokersCnt = 0;
Map<String, String> brokerToStateMap = externalView.getStateMap(REALTIME_TABLE_NAME);
if (brokerToStateMap == null) {
return false;
}
for (Map.Entry<String, String> entry : brokerToStateMap.entrySet()) {
if ("ONLINE".equalsIgnoreCase(entry.getValue())) {
onlineBrokersCnt++;
}
}
return onlineBrokersCnt == 2;
}, TIMEOUT_IN_MS, "");

// Test retrieving using table name without type suffix.
liveBrokersForTable = ControllerTestUtils.getHelixResourceManager().getLiveBrokersForTable(TABLE_NAME);
Assert.assertEquals(liveBrokersForTable.size(), 2);

// Test case when table with given name doesn't exist.
String fakeNonExistentTableName = "fake_non_existent_table_name";
try {
ControllerTestUtils.getHelixResourceManager().getLiveBrokersForTable(fakeNonExistentTableName);
Assert.fail("Method call above should have failed");
} catch (TableNotFoundException tableNotFoundException) {
Assert.assertTrue(tableNotFoundException.getMessage().contains(fakeNonExistentTableName));
}

try {
ControllerTestUtils.getHelixResourceManager().getLiveBrokersForTable(fakeNonExistentTableName + "_OFFLINE");
Assert.fail("Method call above should have failed");
} catch (TableNotFoundException tableNotFoundException) {
Assert.assertTrue(tableNotFoundException.getMessage().contains(fakeNonExistentTableName + "_OFFLINE"));
}

// Delete the table
ControllerTestUtils.getHelixResourceManager().deleteOfflineTable(TABLE_NAME);
ControllerTestUtils.getHelixResourceManager().deleteRealtimeTable(TABLE_NAME);
// Clean up.
untagBrokers();
}
Expand Down