Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[server] Dropping unassigned partitions #1196

Open
wants to merge 45 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
cc1d7ff
Initial commit
kristyelee Sep 12, 2024
c4b6682
First test for removal of partition
kristyelee Sep 17, 2024
8093581
Write function to remove unsubscribed (unassigned) partitions
kristyelee Sep 18, 2024
c8ebe2d
Merge branch 'linkedin:main' into kristy_lee/650
kristyelee Sep 18, 2024
3eaeafb
Write test function to test removing unsubscribed (unassigned) partit…
kristyelee Sep 18, 2024
0e24cec
Update StorageEngine intializer in VeniceServer
kristyelee Sep 23, 2024
ff6a165
StorageService arguments set to initial state in VeniceServer
kristyelee Sep 24, 2024
01ca67f
Standardize code
kristyelee Sep 24, 2024
fd14574
Standardize code
kristyelee Sep 24, 2024
56e7711
Update ideal state
kristyelee Sep 24, 2024
04c4e9e
Update initialized StorageService object [with revised import]
kristyelee Sep 25, 2024
e09f7bf
[Placeholder]
kristyelee Sep 25, 2024
6c50513
Updated StorageService constructor and initializer with functionToChe…
kristyelee Sep 27, 2024
06dd6c2
Updated StorageService constructor and initializer [modified]
kristyelee Sep 27, 2024
53d177c
Revised StorageService constructor
kristyelee Sep 27, 2024
d8bad05
Merge branch 'linkedin:main' into kristy_lee/650
kristyelee Sep 27, 2024
df1c0b9
Code restructure: verifying storage partition
kristyelee Sep 30, 2024
54d8708
Code restructure: move storage partition check to AbstractStorageEngine
kristyelee Oct 1, 2024
14924e1
Merge branch 'linkedin:main' into kristy_lee/650
kristyelee Oct 1, 2024
eb5dd89
[Commented modified code]
kristyelee Oct 2, 2024
d55f704
Code restructure: move storage partition check to AbstractStorageEngine
kristyelee Oct 2, 2024
3ffd26d
Code restructure
kristyelee Oct 2, 2024
c63d62f
Retain relevant/used code changes
kristyelee Oct 3, 2024
3979b63
StorageService unit test
kristyelee Oct 7, 2024
b926839
Updates to StorageService unit test
kristyelee Oct 7, 2024
9a8b968
StorageService + unit test update
kristyelee Oct 8, 2024
a32b55c
Update to StorageService unit test
kristyelee Oct 10, 2024
8b27c8d
Apply review comments and code addition for hostname comparison.
kristyelee Oct 11, 2024
3431081
Apply review comments
kristyelee Oct 12, 2024
528f7a9
Apply review comments
kristyelee Oct 14, 2024
cdf99f7
[server] Remove storage partitions not assigned to host
kristyelee Oct 18, 2024
4ba8f75
[server]
kristyelee Oct 21, 2024
f8c1365
[server] Drop unassigned partitions
kristyelee Oct 30, 2024
48067cc
Integration Test [In Writing]
kristyelee Oct 31, 2024
d6f835e
Integration Test [In Writing]
kristyelee Nov 1, 2024
0730b55
[server] Remove partitions not assigned to current host. Wrote relate…
kristyelee Nov 4, 2024
850f586
Simplified Integration Test
kristyelee Nov 5, 2024
3f17c45
Merge branch 'main' into kristy_lee/650
kristyelee Nov 6, 2024
4547ce7
Apply review comments
kristyelee Nov 9, 2024
93e9076
Modified Integration Test
kristyelee Nov 11, 2024
9186d1f
Modified Integration Test
kristyelee Nov 12, 2024
fa55047
Modified Integration Test
kristyelee Nov 12, 2024
494423c
Modified Integration Test
kristyelee Nov 13, 2024
ef9e08c
[server] Remove storage partitions not assigned to current host. Drop…
kristyelee Nov 13, 2024
9bc2b7e
Modified Integration Test
kristyelee Nov 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import com.linkedin.venice.ConfigKeys;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.exceptions.VeniceNoStoreException;
import com.linkedin.venice.helix.SafeHelixDataAccessor;
import com.linkedin.venice.helix.SafeHelixManager;
import com.linkedin.venice.kafka.protocol.state.PartitionState;
import com.linkedin.venice.kafka.protocol.state.StoreVersionState;
import com.linkedin.venice.meta.PersistenceType;
Expand All @@ -42,6 +44,8 @@
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.helix.PropertyKey;
import org.apache.helix.model.IdealState;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.rocksdb.RocksDBException;
Expand Down Expand Up @@ -371,6 +375,32 @@ public synchronized AbstractStorageEngine openStore(
return engine;
}

public synchronized void checkWhetherStoragePartitionsShouldBeKeptOrNot(SafeHelixManager manager) {
if (manager == null) {
return;
}
for (AbstractStorageEngine storageEngine: getStorageEngineRepository().getAllLocalStorageEngines()) {
String storeName = storageEngine.getStoreVersionName();
Set<Integer> storageEnginePartitionIds = new HashSet<>(storageEngine.getPartitionIds());
kristyelee marked this conversation as resolved.
Show resolved Hide resolved
String instanceHostName = manager.getInstanceName();
PropertyKey.Builder propertyKeyBuilder =
new PropertyKey.Builder(configLoader.getVeniceClusterConfig().getClusterName());
SafeHelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
IdealState idealState = helixDataAccessor.getProperty(propertyKeyBuilder.idealStates(storeName));

if (idealState != null) {
Map<String, Map<String, String>> mapFields = idealState.getRecord().getMapFields();
for (Integer partitionId: storageEnginePartitionIds) {
String partitionDbName = storeName + "_" + partitionId;
if (!mapFields.containsKey(partitionDbName)
|| !mapFields.get(partitionDbName).containsKey(instanceHostName)) {
storageEngine.dropPartition(partitionId);
}
}
}
}
}

/**
* Drops the partition of the specified store version in the storage service. When all data partitions are dropped,
* it will also drop the storage engine of the specific store version.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package com.linkedin.davinci.storage;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.linkedin.davinci.config.VeniceClusterConfig;
import com.linkedin.davinci.config.VeniceConfigLoader;
import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.davinci.config.VeniceStoreVersionConfig;
Expand All @@ -14,6 +19,8 @@
import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.davinci.store.StorageEngineFactory;
import com.linkedin.venice.exceptions.VeniceNoStoreException;
import com.linkedin.venice.helix.SafeHelixDataAccessor;
import com.linkedin.venice.helix.SafeHelixManager;
import com.linkedin.venice.kafka.protocol.state.PartitionState;
import com.linkedin.venice.kafka.protocol.state.StoreVersionState;
import com.linkedin.venice.meta.PartitionerConfig;
Expand All @@ -23,13 +30,21 @@
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer;
import com.linkedin.venice.utils.Utils;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.helix.PropertyKey;
import org.apache.helix.model.IdealState;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.mockito.internal.util.collections.Sets;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -121,4 +136,74 @@ public void testGetStoreAndUserPartitionsMapping() {
expectedMapping.put(resourceName, partitionSet);
Assert.assertEquals(storageService.getStoreAndUserPartitionsMapping(), expectedMapping);
}

@Test
public void testCheckWhetherStoragePartitionsShouldBeKeptOrNot() throws NoSuchFieldException, IllegalAccessException {
StorageService mockStorageService = mock(StorageService.class);
SafeHelixManager manager = mock(SafeHelixManager.class);
StorageEngineRepository mockStorageEngineRepository = mock(StorageEngineRepository.class);
AbstractStorageEngine abstractStorageEngine = mock(AbstractStorageEngine.class);
mockStorageEngineRepository.addLocalStorageEngine(abstractStorageEngine);

String resourceName = "test_store_v1";
String storeName = "test_store";

when(abstractStorageEngine.getStoreVersionName()).thenReturn(resourceName);
abstractStorageEngine.addStoragePartition(0);
abstractStorageEngine.addStoragePartition(1);

String clusterName = "test_cluster";
VeniceConfigLoader mockVeniceConfigLoader = mock(VeniceConfigLoader.class);
VeniceClusterConfig mockClusterConfig = mock(VeniceClusterConfig.class);
when(mockVeniceConfigLoader.getVeniceClusterConfig()).thenReturn(mockClusterConfig);
when(mockVeniceConfigLoader.getVeniceClusterConfig().getClusterName()).thenReturn(clusterName);

List<AbstractStorageEngine> localStorageEngines = new ArrayList<>();
localStorageEngines.add(abstractStorageEngine);

SafeHelixDataAccessor helixDataAccessor = mock(SafeHelixDataAccessor.class);
when(manager.getHelixDataAccessor()).thenReturn(helixDataAccessor);
IdealState idealState = mock(IdealState.class);
when(helixDataAccessor.getProperty((PropertyKey) any())).thenReturn(idealState);
ZNRecord record = new ZNRecord("testId");
Map<String, Map<String, String>> mapFields = new HashMap<>();
Map<String, String> testPartitionZero = new HashMap<>();
Map<String, String> testPartitionOne = new HashMap<>();
testPartitionZero.put("host_1430", "LEADER");
testPartitionZero.put("host_1435", "STANDBY");
testPartitionZero.put("host_1440", "STANDBY");
testPartitionOne.put("host_1520", "LEADER");
testPartitionOne.put("host_1525", "STANDBY");
testPartitionOne.put("host_1530", "STANDBY");
mapFields.put("test_store_v1_0", testPartitionZero);
mapFields.put("test_store_v1_1", testPartitionOne);
record.setMapFields(mapFields);
when(idealState.getRecord()).thenReturn(record);
when(manager.getInstanceName()).thenReturn("host_1520");

Set<Integer> partitionSet = new HashSet<>(Arrays.asList(0, 1));
when(abstractStorageEngine.getPartitionIds()).thenReturn(partitionSet);
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
int partitionId = invocation.getArgument(0);
abstractStorageEngine.getPartitionIds().remove(partitionId);
return null;
}
}).when(abstractStorageEngine).dropPartition(anyInt());

Field storageEngineRepositoryField = StorageService.class.getDeclaredField("storageEngineRepository");
storageEngineRepositoryField.setAccessible(true);
storageEngineRepositoryField.set(mockStorageService, mockStorageEngineRepository);
when(mockStorageService.getStorageEngineRepository()).thenReturn(mockStorageEngineRepository);
when(mockStorageService.getStorageEngineRepository().getAllLocalStorageEngines()).thenReturn(localStorageEngines);
Field configLoaderField = StorageService.class.getDeclaredField("configLoader");
configLoaderField.setAccessible(true);
configLoaderField.set(mockStorageService, mockVeniceConfigLoader);

doCallRealMethod().when(mockStorageService).checkWhetherStoragePartitionsShouldBeKeptOrNot(manager);
mockStorageService.checkWhetherStoragePartitionsShouldBeKeptOrNot(manager);
verify(abstractStorageEngine).dropPartition(0);
Assert.assertFalse(abstractStorageEngine.getPartitionIds().contains(0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.linkedin.d2.balancer.D2Client;
import com.linkedin.davinci.listener.response.ServerCurrentVersionResponse;
import com.linkedin.davinci.storage.StorageEngineRepository;
import com.linkedin.davinci.storage.StorageService;
import com.linkedin.r2.message.rest.RestRequest;
import com.linkedin.r2.message.rest.RestRequestBuilder;
import com.linkedin.r2.message.rest.RestResponse;
Expand Down Expand Up @@ -182,6 +183,37 @@ public void testCheckBeforeJointClusterBeforeHelixInitializingCluster() throws E
}
}

@Test
public void testStartServerAndShutdownWithPartitionAssignmentVerification() {
try (VeniceClusterWrapper cluster = ServiceFactory.getVeniceCluster(1, 0, 0)) {
Properties featureProperties = new Properties();
featureProperties.setProperty(SERVER_ENABLE_SERVER_ALLOW_LIST, Boolean.toString(true));
featureProperties.setProperty(SERVER_IS_AUTO_JOIN, Boolean.toString(true));
cluster.addVeniceServer(featureProperties, new Properties());
VeniceServerWrapper server = cluster.getVeniceServers().get(0);
Assert.assertTrue(server.getVeniceServer().isStarted());
StorageService storageService = server.getVeniceServer().getStorageService();
StorageEngineRepository repository = storageService.getStorageEngineRepository();
Assert
.assertTrue(repository.getAllLocalStorageEngines().isEmpty(), "New node should not have any storage engine.");

// Create a storage engine.
String storeName = Version.composeKafkaTopic(cluster.createStore(1), 1);
Assert.assertEquals(repository.getAllLocalStorageEngines().size(), 1);
Assert.assertTrue(server.getVeniceServer().getHelixParticipationService().isRunning());
Assert.assertEquals(storageService.getStorageEngine(storeName).getPartitionIds().size(), 3);

server.getVeniceServer().shutdown();
cluster.getControllerClient().deleteStore(storeName);

cluster.stopVeniceServer(server.getPort());
kristyelee marked this conversation as resolved.
Show resolved Hide resolved
cluster.restartVeniceServer(server.getPort());
repository = server.getVeniceServer().getStorageService().getStorageEngineRepository();
Assert.assertEquals(repository.getAllLocalStorageEngines().size(), 1);
kristyelee marked this conversation as resolved.
Show resolved Hide resolved
Assert.assertEquals(storageService.getStorageEngine(storeName).getPartitionIds().size(), 0);
}
}

@Test
public void testMetadataFetchRequest() throws ExecutionException, InterruptedException, IOException {
Utils.thisIsLocalhost();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6331,8 +6331,10 @@ private void createClusterIfRequired(String clusterName) {
helixClusterProperties
.put(ClusterConfig.ClusterConfigProperty.DELAY_REBALANCE_TIME.name(), String.valueOf(delayedTime));
}
// Topology and fault zone type fields are used by CRUSH/CRUSHED/WAGED/etc alg. Helix would apply the constrains on
// these alg to choose proper instance to hold the replica.
helixClusterProperties
.put(ClusterConfig.ClusterConfigProperty.PERSIST_BEST_POSSIBLE_ASSIGNMENT.name(), String.valueOf(true));
// Topology and fault zone type fields are used by CRUSH alg. Helix would apply the constrains on CRUSH alg to
// choose proper instance to hold the replica.
helixClusterProperties
.put(ClusterConfig.ClusterConfigProperty.TOPOLOGY.name(), "/" + HelixUtils.TOPOLOGY_CONSTRAINT);
helixClusterProperties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,11 @@ private List<AbstractVeniceService> createServices() {
return helixData;
});

managerFuture.thenApply(manager -> {
storageService.checkWhetherStoragePartitionsShouldBeKeptOrNot(manager);
return true;
});

heartbeatMonitoringService = new HeartbeatMonitoringService(
metricsRepository,
metadataRepo,
Expand Down