Skip to content

Commit

Permalink
Make numTasks read-only metadata property in get datastream (#824)
Browse files Browse the repository at this point in the history
numTasks is an additional datastream property which is calculated as part of elastic task count and saves in a separate node in /dms/<datastream>/numTasks. Rest client should be able to expose this information and any user which cache this information and call update to modify some other field should be able to do it.

Note: It will be a read-only property and the server will encapsulate how numTasks is stored. For external user, who has no access to zookeeper cannot differentiate between this metadata v/s any other metadata using the get output.
  • Loading branch information
vmaheshw authored May 12, 2021
1 parent 3e03420 commit d411dbc
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,9 @@ public class DatastreamMetadataConstants {
* Datastream override for custom checkpointing. This overrides the connector level flag if present.
*/
public static final String CUSTOM_CHECKPOINT = "system.customCheckpoint";

/**
* Datastream metadata that represents number of tasks
*/
public static final String NUM_TASKS = "numTasks";
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@
import com.linkedin.restli.server.annotations.RestLiCollection;
import com.linkedin.restli.server.resources.CollectionResourceTemplate;

import static com.linkedin.datastream.common.DatastreamMetadataConstants.NUM_TASKS;


/**
* Resources classes are used by rest.li to process corresponding HTTP request.
Expand Down Expand Up @@ -194,6 +196,12 @@ private void doUpdateDatastreams(Map<String, Datastream> datastreamMap) {
throw new DatastreamValidationException(String.format("Failed to update %s. Can't update status in update request."
+ " old: %s new: %s", key, oldDatastream, datastream));
}

if (datastream.getMetadata().containsKey(NUM_TASKS) &&
!datastream.getMetadata().get(NUM_TASKS).equals(oldDatastream.getMetadata().get(NUM_TASKS))) {
throw new DatastreamValidationException(String.format("Failed to update %s. Can't update numTasks."
+ " old: %s new: %s", key, oldDatastream, datastream));
}
} catch (DatastreamValidationException e) {
_dynamicMetricsManager.createOrUpdateMeter(CLASS_NAME, CALL_ERROR, 1);
_errorLogger.logAndThrowRestLiServiceException(HttpStatus.S_400_BAD_REQUEST, e.getMessage());
Expand Down Expand Up @@ -759,6 +767,7 @@ public CreateResponse create(Datastream datastream) {
StringMap metadataMap = datastream.getMetadata();
Validate.isTrue(metadataMap.containsKey(DatastreamMetadataConstants.OWNER_KEY),
"Must specify owner of Datastream");
Validate.isTrue(!metadataMap.containsKey(NUM_TASKS), "Cannot set numTasks in datastream");

if (datastream.hasDestination() && datastream.getDestination().hasConnectionString()) {
metadataMap.put(DatastreamMetadataConstants.IS_USER_MANAGED_DESTINATION_KEY, "true");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.linkedin.datastream.server.zk.KeyBuilder;
import com.linkedin.datastream.server.zk.ZkAdapter;

import static com.linkedin.datastream.common.DatastreamMetadataConstants.NUM_TASKS;
import static com.linkedin.datastream.server.Coordinator.PAUSED_INSTANCE;


Expand Down Expand Up @@ -74,7 +75,18 @@ public Datastream getDatastream(String key) {
if (json == null) {
return null;
}
return DatastreamUtils.fromJSON(json);

Datastream datastream = DatastreamUtils.fromJSON(json);
if (datastream == null) {
return null;
}
String numTasksPath = KeyBuilder.datastreamNumTasks(_cluster, key);
String numTasks = _zkClient.readData(numTasksPath, true /* returnNullIfPathNotExists */);
if (numTasks != null) {
Objects.requireNonNull(datastream.getMetadata()).put(NUM_TASKS, numTasks);
}

return datastream;
}

/**
Expand All @@ -97,6 +109,7 @@ public void updateDatastream(String key, Datastream datastream, boolean notifyLe
throw new DatastreamException("Datastream does not exists, can not be updated: " + key);
}

Objects.requireNonNull(datastream.getMetadata()).remove("numTasks");
String json = DatastreamUtils.toJSON(datastream);
_zkClient.writeData(getZnodePath(key), json);
if (notifyLeader) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,10 @@ public void testUpdateDatastream() throws Exception {
modifyStatus.setTransportProviderName("Random");
checkBadRequest(() -> resource.update(modifyTransport.getName(), modifyTransport), HttpStatus.S_400_BAD_REQUEST);

Datastream modifyNumTasks = generateDatastream(1);
modifyNumTasks.getMetadata().put("numTasks", "10");
checkBadRequest(() -> resource.update(modifyNumTasks.getName(), modifyNumTasks), HttpStatus.S_400_BAD_REQUEST);

// Create another datastream that gets deduped to datastream1.
Datastream originalDatastream2 = generateDatastream(2);
originalDatastream2.getDestination().setConnectionString("a different destination");
Expand Down Expand Up @@ -624,6 +628,11 @@ public void testCreateDatastream() throws Exception {
Assert.assertNotNull(e.getMessage());
Assert.assertTrue(e.getMessage().contains(undefinedProviderName));
}

// Test datastream creation with numTasks
Datastream badDatastream = generateDatastream(0);
badDatastream.getMetadata().put("numTasks", "100");
checkBadRequest(() -> resource.create(badDatastream));
}

private Datastream createDatastream(DatastreamResources resource, String name, int seed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@

import java.io.IOException;
import java.util.List;
import java.util.Objects;

import org.apache.zookeeper.CreateMode;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -38,20 +40,20 @@ public class TestZookeeperBackedDatastreamStore {
private String _zkConnectionString;
private ZkClient _zkClient;
private ZookeeperBackedDatastreamStore _store;
private String _clusterName = "testcluster";

@BeforeMethod
public void setup() throws IOException {
String clusterName = "testcluster";
_embeddedZookeeper = new EmbeddedZookeeper();
_zkConnectionString = _embeddedZookeeper.getConnection();
_embeddedZookeeper.startup();
_zkClient = new ZkClient(_zkConnectionString);
CachedDatastreamReader datastreamCache = new CachedDatastreamReader(_zkClient, clusterName);
_store = new ZookeeperBackedDatastreamStore(datastreamCache, _zkClient, clusterName);
CachedDatastreamReader datastreamCache = new CachedDatastreamReader(_zkClient, _clusterName);
_store = new ZookeeperBackedDatastreamStore(datastreamCache, _zkClient, _clusterName);
}

@AfterMethod
public void teardown() throws IOException {
public void teardown() {
_embeddedZookeeper.shutdown();
}

Expand All @@ -77,7 +79,7 @@ private Datastream generateDatastream(int seed) {
* Test Datastream store with single Datastream for creating, reading, deleting
*/
@Test
public void testSingleDatastreamBasics() throws Exception {
public void testSingleDatastreamBasics() {
Datastream ds = generateDatastream(0);

Assert.assertNull(_store.getDatastream(ds.getName()));
Expand All @@ -94,7 +96,7 @@ public void testSingleDatastreamBasics() throws Exception {
try {
_store.createDatastream(ds.getName(), ds);
Assert.fail();
} catch (DatastreamAlreadyExistsException e) {
} catch (DatastreamAlreadyExistsException ignored) {
}

// deleting the Datastream
Expand Down Expand Up @@ -146,7 +148,7 @@ public void testUpdatePartitionAssignments() throws Exception {
Assert.assertTrue(nodes.size() > 0);

String touchedTimestamp = _zkClient.readData(KeyBuilder.getTargetAssignmentBase(clusterName, ds.getConnectorName()));
long touchedTime = Long.valueOf(touchedTimestamp);
long touchedTime = Long.parseLong(touchedTimestamp);
Assert.assertTrue(endTime >= touchedTime && touchedTime >= startTime);
}

Expand Down Expand Up @@ -199,15 +201,15 @@ public void testCreateDuplicateDatastreams() {
* Test invalid parameters or data on DatastreamStore
*/
@Test(expectedExceptions = IllegalArgumentException.class)
public void testCreateWithNullName() throws Exception {
public void testCreateWithNullName() {
_store.createDatastream(null, generateDatastream(0));
}

/**
* Test invalid parameters or data on DatastreamStore
*/
@Test(expectedExceptions = IllegalArgumentException.class)
public void testCreateWithNullDatastream() throws Exception {
public void testCreateWithNullDatastream() {
_store.createDatastream("name_0", null);
}

Expand All @@ -223,4 +225,41 @@ public void testGetCorruptedDatastream() {
_zkClient.writeData("/testcluster/dms/datastream1", data);
Assert.assertNull(_store.getDatastream("datastream1"));
}

/**
* Test Datastream store with single Datastream for creating, reading, deleting
*/
@Test
public void testSingleDatastreamBasicsWithNumTasks() throws DatastreamException {
Datastream ds = generateDatastream(0);

Assert.assertNull(_store.getDatastream(ds.getName()));

// creating a Datastream
_store.createDatastream(ds.getName(), ds);

_zkClient.create(KeyBuilder.datastreamNumTasks(_clusterName, ds.getName()), "10", CreateMode.PERSISTENT);

// get the same Datastream back
Datastream ds2 = _store.getDatastream(ds.getName());
Assert.assertNotNull(ds2);
Assert.assertNotEquals(ds2, ds);
Objects.requireNonNull(ds.getMetadata()).put("numTasks", "10");
Assert.assertEquals(ds2, ds);

// try modifying the numTasks. It should not work.
Objects.requireNonNull(ds.getMetadata()).put("numTasks", "20");
_store.updateDatastream(ds.getName(), ds, false);
// get the same Datastream back
Datastream ds3 = _store.getDatastream(ds.getName());
Assert.assertNotNull(ds3);
Assert.assertNotEquals(ds3, ds);
Assert.assertEquals(ds3.getMetadata().get("numTasks"), "10");

// deleting the Datastream
_store.deleteDatastream(ds.getName());
Assert.assertEquals(_store.getDatastream(ds.getName()).getStatus(), DatastreamStatus.DELETING);

Assert.assertNull(_store.getDatastream(null));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
*/
package com.linkedin.datastream.server.zk;

import static com.linkedin.datastream.common.DatastreamMetadataConstants.NUM_TASKS;


/**
* Helper class to build commonly accessed ZooKeeper znodes
*/
Expand Down Expand Up @@ -93,7 +96,7 @@ private KeyBuilder() {
/**
* numTasks information stored for each datastream that uses elastic task assignment
*/
private static final String DATASTREAM_NUMTASKS = DATASTREAM + "/numTasks";
private static final String DATASTREAM_NUMTASKS = DATASTREAM + "/" + NUM_TASKS;

/**
* Get the root level ZooKeeper znode of a Brooklin cluster
Expand Down

0 comments on commit d411dbc

Please sign in to comment.