Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.pinot.controller.helix.core.realtime.PinotRealtimeSegmentManager;
import org.apache.pinot.controller.helix.core.realtime.SegmentCompletionManager;
import org.apache.pinot.controller.helix.core.relocation.SegmentRelocator;
import org.apache.pinot.controller.helix.core.relocation.SegmentTierAssigner;
import org.apache.pinot.controller.helix.core.retention.RetentionManager;
import org.apache.pinot.controller.helix.core.statemodel.LeadControllerResourceMasterSlaveStateModelFactory;
import org.apache.pinot.controller.helix.core.util.HelixSetupUtils;
Expand Down Expand Up @@ -151,6 +152,7 @@ public abstract class BaseControllerStarter implements ServiceStartable {
protected RealtimeSegmentValidationManager _realtimeSegmentValidationManager;
protected BrokerResourceValidationManager _brokerResourceValidationManager;
protected SegmentRelocator _segmentRelocator;
protected SegmentTierAssigner _segmentTierAssigner;
protected RetentionManager _retentionManager;
protected SegmentStatusChecker _segmentStatusChecker;
protected PinotTaskManager _taskManager;
Expand Down Expand Up @@ -679,6 +681,9 @@ protected List<PeriodicTask> setupControllerPeriodicTasks() {
_segmentRelocator = new SegmentRelocator(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics,
_executorService);
periodicTasks.add(_segmentRelocator);
_segmentTierAssigner =
new SegmentTierAssigner(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics);
periodicTasks.add(_segmentTierAssigner);
_minionInstancesCleanupTask =
new MinionInstancesCleanupTask(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics);
periodicTasks.add(_minionInstancesCleanupTask);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ public static class ControllerPeriodicTasksConf {
"controller.realtimeSegmentRelocation.initialDelayInSeconds";
public static final String SEGMENT_RELOCATOR_INITIAL_DELAY_IN_SECONDS =
"controller.segmentRelocator.initialDelayInSeconds";
public static final String SEGMENT_TIER_ASSIGNER_FREQUENCY_PERIOD =
"controller.segmentTierAssigner.frequencyPeriod";
public static final String SEGMENT_TIER_ASSIGNER_INITIAL_DELAY_IN_SECONDS =
"controller.segmentTierAssigner.initialDelayInSeconds";

// The flag to indicate if controller periodic job will fix the missing LLC segment deep store copy.
// Default value is false.
Expand Down Expand Up @@ -214,6 +218,7 @@ private static long getRandomInitialDelayInSeconds() {

private static final int DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS = 24 * 60 * 60;
private static final int DEFAULT_SEGMENT_RELOCATOR_FREQUENCY_IN_SECONDS = 60 * 60;
private static final int DEFAULT_SEGMENT_TIER_ASSIGNER_FREQUENCY_IN_SECONDS = -1; // Disabled
}

private static final String SERVER_ADMIN_REQUEST_TIMEOUT_SECONDS = "server.request.timeoutSeconds";
Expand Down Expand Up @@ -849,6 +854,17 @@ public long getSegmentRelocatorInitialDelayInSeconds() {
return segmentRelocatorInitialDelaySeconds;
}

public int getSegmentTierAssignerFrequencyInSeconds() {
return Optional.ofNullable(getProperty(ControllerPeriodicTasksConf.SEGMENT_TIER_ASSIGNER_FREQUENCY_PERIOD))
.map(period -> (int) convertPeriodToSeconds(period))
.orElse(ControllerPeriodicTasksConf.DEFAULT_SEGMENT_TIER_ASSIGNER_FREQUENCY_IN_SECONDS);
}

public long getSegmentTierAssignerInitialDelayInSeconds() {
return getProperty(ControllerPeriodicTasksConf.SEGMENT_TIER_ASSIGNER_INITIAL_DELAY_IN_SECONDS,
ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds());
}

public long getPeriodicTaskInitialDelayInSeconds() {
return ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.helix.core.relocation;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.tier.Tier;
import org.apache.pinot.common.tier.TierSegmentSelector;
import org.apache.pinot.common.utils.config.TierConfigUtils;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
import org.apache.pinot.spi.config.table.TableConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Periodic task to calculate the target tier the segment belongs to and set it into segment ZK metadata as goal
* state, which can be checked by servers when loading the segment to put it onto the target storage tier.
*/
public class SegmentTierAssigner extends ControllerPeriodicTask<Void> {
private static final Logger LOGGER = LoggerFactory.getLogger(SegmentTierAssigner.class);

public SegmentTierAssigner(PinotHelixResourceManager pinotHelixResourceManager,
LeadControllerManager leadControllerManager, ControllerConf config, ControllerMetrics controllerMetrics) {
super(SegmentTierAssigner.class.getSimpleName(), config.getSegmentTierAssignerFrequencyInSeconds(),
config.getSegmentTierAssignerInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager,
controllerMetrics);
}

@Override
protected void processTable(String tableNameWithType) {
TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
Preconditions.checkState(tableConfig != null, "Failed to find table config for table: {}", tableNameWithType);
List<Tier> sortedTiers;
if (CollectionUtils.isEmpty(tableConfig.getTierConfigsList())) {
LOGGER.info("No tierConfigs so use default tier for segments of table: {}", tableNameWithType);
sortedTiers = Collections.emptyList();
} else {
LOGGER.info("Checking and updating target tiers for segments of table: {}", tableNameWithType);
sortedTiers = TierConfigUtils
.getSortedTiers(tableConfig.getTierConfigsList(), _pinotHelixResourceManager.getHelixZkManager());
LOGGER.debug("Sorted tiers: {} configured for table: {}", sortedTiers, tableNameWithType);
}
for (String segmentName : _pinotHelixResourceManager.getSegmentsFor(tableNameWithType, true)) {
updateSegmentTier(tableNameWithType, segmentName, sortedTiers);
}
}

@VisibleForTesting
void updateSegmentTier(String tableNameWithType, String segmentName, List<Tier> sortedTiers) {
ZNRecord segmentMetadataZNRecord =
_pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType, segmentName);
if (segmentMetadataZNRecord == null) {
LOGGER.debug("No ZK metadata for segment: {} of table: {}", segmentName, tableNameWithType);
return;
}
Tier targetTier = null;
for (Tier tier : sortedTiers) {
TierSegmentSelector tierSegmentSelector = tier.getSegmentSelector();
if (tierSegmentSelector.selectSegment(tableNameWithType, segmentName)) {
targetTier = tier;
break;
}
}
SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(segmentMetadataZNRecord);
String targetTierName = null;
if (targetTier == null) {
if (segmentZKMetadata.getTier() == null) {
LOGGER.debug("Segment: {} of table: {} is already on the default tier", segmentName, tableNameWithType);
return;
}
LOGGER.info("Segment: {} of table: {} is put back on default tier", segmentName, tableNameWithType);
} else {
targetTierName = targetTier.getName();
if (targetTierName.equals(segmentZKMetadata.getTier())) {
LOGGER.debug("Segment: {} of table: {} is already on the target tier: {}", segmentName, tableNameWithType,
targetTierName);
return;
}
LOGGER.info("Segment: {} of table: {} is put onto new tier: {}", segmentName, tableNameWithType, targetTierName);
}
// Update the tier in segment ZK metadata and write it back to ZK.
segmentZKMetadata.setTier(targetTierName);
_pinotHelixResourceManager
.updateZkMetadata(tableNameWithType, segmentZKMetadata, segmentMetadataZNRecord.getVersion());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public Map<String, TableTierInfo> getTableTierInfoFromServers(BiMap<String, Stri
}
serverUrls.add(tierUri);
}
LOGGER.debug("Getting table tier info with serverUrls: {}", serverUrls);
CompletionServiceHelper completionServiceHelper =
new CompletionServiceHelper(_executor, _connectionManager, endpointsToServers);
CompletionServiceHelper.CompletionServiceResponse serviceResponse =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
*/
package org.apache.pinot.controller.util;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import com.google.common.base.Preconditions;
import com.google.common.collect.BiMap;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -30,6 +32,7 @@
import javax.annotation.Nullable;
import org.apache.commons.httpclient.HttpConnectionManager;
import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.restlet.resources.TableTierInfo;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;

Expand Down Expand Up @@ -86,10 +89,20 @@ public TableTierDetails getTableTierDetails(String tableNameWithType, @Nullable
List<String> expectedSegmentsOnServer = entry.getValue();
TableTierInfo tableTierInfo = serverToTableTierInfoMap.get(server);
for (String expectedSegment : expectedSegmentsOnServer) {
tableTierDetails._segmentTiers.computeIfAbsent(expectedSegment, (k) -> new HashMap<>()).put(server,
tableTierDetails._segmentCurrentTiers.computeIfAbsent(expectedSegment, (k) -> new HashMap<>()).put(server,
(tableTierInfo == null) ? ERROR_RESP_NO_RESPONSE : getSegmentTier(expectedSegment, tableTierInfo));
}
}
if (segmentName == null) {
for (SegmentZKMetadata segmentZKMetadata : _helixResourceManager.getSegmentsZKMetadata(tableNameWithType)) {
tableTierDetails._segmentTargetTiers.put(segmentZKMetadata.getSegmentName(), segmentZKMetadata.getTier());
}
} else {
SegmentZKMetadata segmentZKMetadata = _helixResourceManager.getSegmentZKMetadata(tableNameWithType, segmentName);
Preconditions.checkState(segmentZKMetadata != null,
"No segmentZKMetadata for segment: %s of table: %s to find the target tier", segmentName, tableNameWithType);
tableTierDetails._segmentTargetTiers.put(segmentName, segmentZKMetadata.getTier());
}
return tableTierDetails;
}

Expand All @@ -108,7 +121,9 @@ private static String getSegmentTier(String expectedSegment, TableTierInfo table
@JsonIgnoreProperties(ignoreUnknown = true)
public static class TableTierDetails {
private final String _tableName;
private final Map<String/*segment*/, Map<String/*server*/, String/*tier or err*/>> _segmentTiers = new HashMap<>();
private final Map<String/*segment*/, Map<String/*server*/, String/*tier or err*/>> _segmentCurrentTiers =
new HashMap<>();
private final Map<String/*segment*/, String/*target tier*/> _segmentTargetTiers = new HashMap<>();

TableTierDetails(String tableName) {
_tableName = tableName;
Expand All @@ -123,7 +138,21 @@ public String getTableName() {
@JsonPropertyDescription("Storage tiers of segments for the given table")
@JsonProperty("segmentTiers")
public Map<String, Map<String, String>> getSegmentTiers() {
return _segmentTiers;
HashMap<String, Map<String, String>> segmentTiers = new HashMap<>(_segmentCurrentTiers);
for (Map.Entry<String, String> entry : _segmentTargetTiers.entrySet()) {
segmentTiers.computeIfAbsent(entry.getKey(), (s) -> new HashMap<>()).put("targetTier", entry.getValue());
}
return segmentTiers;
}

@JsonIgnore
public Map<String, Map<String, String>> getSegmentCurrentTiers() {
return _segmentCurrentTiers;
}

@JsonIgnore
public Map<String, String> getSegmentTargetTiers() {
return _segmentTargetTiers;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.commons.httpclient.HttpConnectionManager;
import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.restlet.resources.TableTierInfo;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.util.TableTierReader;
Expand Down Expand Up @@ -295,11 +296,15 @@ public void testGetTableTierInfoFromAllServers()
@Test
public void testGetSegmentTierInfoFromAllServers()
throws InvalidConfigException {
SegmentZKMetadata segZKMeta = mock(SegmentZKMetadata.class);
when(segZKMeta.getTier()).thenReturn("coolTier");
when(_helix.getSegmentZKMetadata(ArgumentMatchers.anyString(), ArgumentMatchers.anyString())).thenReturn(segZKMeta);
final String[] servers = {"server6", "server7", "server8", "server9"};
TableTierReader.TableTierDetails tableTierDetails = testRunner(servers, "myTable_OFFLINE", "segX");
assertEquals(tableTierDetails.getSegmentTiers().size(), 1);
Map<String, String> tiersByServer = tableTierDetails.getSegmentTiers().get("segX");
assertEquals(tiersByServer.size(), 4);
assertEquals(tiersByServer.size(), 5);
assertEquals(tiersByServer.get("targetTier"), "coolTier");
assertEquals(tiersByServer.get("server6"), "someTier");
assertEquals(tiersByServer.get("server7"), "SEGMENT_MISSED_ON_SERVER");
assertEquals(tiersByServer.get("server8"), "NO_RESPONSE_FROM_SERVER");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public ControllerStarter getControllerStarter() {
}

private class MockControllerStarter extends ControllerStarter {
private static final int NUM_PERIODIC_TASKS = 9;
private static final int NUM_PERIODIC_TASKS = 10;

public MockControllerStarter() {
super();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.helix.core.relocation;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.tier.FixedTierSegmentSelector;
import org.apache.pinot.common.tier.Tier;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.spi.utils.CommonConstants;
import org.mockito.ArgumentCaptor;
import org.testng.annotations.Test;

import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;


public class SegmentTierAssignerTest {
@Test
public void testUpdateSegmentTierBackToDefault() {
String tableName = "table01_OFFLINE";
String segmentName = "seg01";
PinotHelixResourceManager helixMgrMock = mock(PinotHelixResourceManager.class);
when(helixMgrMock.getSegmentMetadataZnRecord(tableName, segmentName))
.thenReturn(createSegmentMetadataZNRecord(segmentName, "hotTier"));
SegmentTierAssigner assigner = new SegmentTierAssigner(helixMgrMock, null, new ControllerConf(), null);

// Move back to default as not tier configs.
List<Tier> sortedTiers = new ArrayList<>();
assigner.updateSegmentTier(tableName, "seg01", sortedTiers);
ArgumentCaptor<SegmentZKMetadata> recordCapture = ArgumentCaptor.forClass(SegmentZKMetadata.class);
verify(helixMgrMock).updateZkMetadata(eq(tableName), recordCapture.capture(), eq(10));
SegmentZKMetadata record = recordCapture.getValue();
assertNull(record.getTier());
}

@Test
public void testUpdateSegmentTierToNewTier() {
String tableName = "table01_OFFLINE";
String segmentName = "seg01";
PinotHelixResourceManager helixMgrMock = mock(PinotHelixResourceManager.class);
when(helixMgrMock.getSegmentMetadataZnRecord(tableName, segmentName))
.thenReturn(createSegmentMetadataZNRecord(segmentName, "hotTier"));
SegmentTierAssigner assigner = new SegmentTierAssigner(helixMgrMock, null, new ControllerConf(), null);

// Move back to default as not tier configs.
List<Tier> sortedTiers = new ArrayList<>();
sortedTiers.add(new Tier("coldTier", new FixedTierSegmentSelector(null, Collections.singleton("seg01")), null));
assigner.updateSegmentTier(tableName, "seg01", sortedTiers);
ArgumentCaptor<SegmentZKMetadata> recordCapture = ArgumentCaptor.forClass(SegmentZKMetadata.class);
verify(helixMgrMock).updateZkMetadata(eq(tableName), recordCapture.capture(), eq(10));
SegmentZKMetadata record = recordCapture.getValue();
assertEquals(record.getTier(), "coldTier");
}

private static ZNRecord createSegmentMetadataZNRecord(String segmentName, String tierName) {
ZNRecord segmentMetadataZNRecord = new ZNRecord(segmentName);
segmentMetadataZNRecord.setVersion(10);
segmentMetadataZNRecord.setSimpleField(CommonConstants.Segment.TIER, tierName);
return segmentMetadataZNRecord;
}
}