Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.LoadingCache;
import java.io.File;
import java.io.IOException;
Expand All @@ -30,6 +32,7 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
Expand Down Expand Up @@ -100,6 +103,8 @@ public abstract class BaseTableDataManager implements TableDataManager {
// Fixed size LRU cache with TableName - SegmentName pair as key, and segment related
// errors as the value.
protected LoadingCache<Pair<String, String>, SegmentErrorInfo> _errorCache;
// Cache used for identifying segments which could not be acquired since they were recently deleted.
protected Cache<String, String> _recentlyDeletedSegments;

@Override
public void init(TableDataManagerConfig tableDataManagerConfig, String instanceId,
Expand Down Expand Up @@ -137,6 +142,11 @@ public void init(TableDataManagerConfig tableDataManagerConfig, String instanceI
_resourceTmpDir);
}
_errorCache = errorCache;
_recentlyDeletedSegments =
CacheBuilder.newBuilder()
.maximumSize(tableDataManagerConfig.getTableDeletedSegmentsCacheSize())
.expireAfterWrite(tableDataManagerConfig.getTableDeletedSegmentsCacheTtlMinutes(), TimeUnit.MINUTES)
.build();
_streamSegmentDownloadUntarRateLimitBytesPerSec =
tableDataManagerParams.getStreamSegmentDownloadUntarRateLimitBytesPerSec();
_isStreamSegmentDownloadUntar = tableDataManagerParams.isStreamSegmentDownloadUntar();
Expand Down Expand Up @@ -200,7 +210,7 @@ public void addSegment(ImmutableSegment immutableSegment) {
_serverMetrics.addValueToTableGauge(_tableNameWithType, ServerGauge.SEGMENT_COUNT, 1L);

ImmutableSegmentDataManager newSegmentManager = new ImmutableSegmentDataManager(immutableSegment);
SegmentDataManager oldSegmentManager = _segmentDataManagerMap.put(segmentName, newSegmentManager);
SegmentDataManager oldSegmentManager = registerSegment(segmentName, newSegmentManager);
if (oldSegmentManager == null) {
_logger.info("Added new immutable segment: {} to table: {}", segmentName, _tableNameWithType);
} else {
Expand Down Expand Up @@ -231,7 +241,7 @@ public void addSegment(String segmentName, TableConfig tableConfig, IndexLoading
@Override
public void removeSegment(String segmentName) {
_logger.info("Removing segment: {} from table: {}", segmentName, _tableNameWithType);
SegmentDataManager segmentDataManager = _segmentDataManagerMap.remove(segmentName);
SegmentDataManager segmentDataManager = unregisterSegment(segmentName);
if (segmentDataManager != null) {
releaseSegment(segmentDataManager);
_logger.info("Removed segment: {} from table: {}", segmentName, _tableNameWithType);
Expand All @@ -240,6 +250,15 @@ public void removeSegment(String segmentName) {
}
}

/**
* Returns true if the given segment has been deleted recently. The time range is determined by
* {@link org.apache.pinot.spi.config.instance.InstanceDataManagerConfig#getDeletedSegmentsCacheTtlMinutes()}.
*/
@Override
public boolean isSegmentDeletedRecently(String segmentName) {
return _recentlyDeletedSegments.getIfPresent(segmentName) != null;
}

@Override
public List<SegmentDataManager> acquireAllSegments() {
List<SegmentDataManager> segmentDataManagers = new ArrayList<>();
Expand Down Expand Up @@ -411,6 +430,31 @@ public void addOrReplaceSegment(String segmentName, IndexLoadingConfig indexLoad
zkMetadata.getCrc());
}

/**
* _segmentDataManagerMap is used for fetching segments that need to be queried. If a new segment is created,
* calling this method ensures that all queries in the future can use the new segment. This method may replace an
* existing segment with the same name.
*/
@Nullable
protected SegmentDataManager registerSegment(String segmentName, SegmentDataManager segmentDataManager) {
SegmentDataManager oldSegmentDataManager = _segmentDataManagerMap.put(segmentName, segmentDataManager);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we may wanna make _segmentDataManagerMap private so concrete classes that extend BaseTableDataManager use the map only via the provided methods here.

_recentlyDeletedSegments.invalidate(segmentName);
return oldSegmentDataManager;
}

/**
* De-registering a segment ensures that no query uses the given segment until a segment with that name is
* re-registered. There may be scenarios where the broker thinks that a segment is available even though it has
* been de-registered in the servers (either due to manual deletion or retention). In such cases, acquireSegments
* will mark those segments as missingSegments. The caller can use {@link #isSegmentDeletedRecently(String)} to
* identify this scenario.
*/
@Nullable
protected SegmentDataManager unregisterSegment(String segmentName) {
_recentlyDeletedSegments.put(segmentName, segmentName);
return _segmentDataManagerMap.remove(segmentName);
}

protected boolean allowDownload(String segmentName, SegmentZKMetadata zkMetadata) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ public void addSegment(String segmentName, TableConfig tableConfig, IndexLoading
}

_logger.info("Initialized RealtimeSegmentDataManager - " + segmentName);
_segmentDataManagerMap.put(segmentName, segmentDataManager);
registerSegment(segmentName, segmentDataManager);
_serverMetrics.addValueToTableGauge(_tableNameWithType, ServerGauge.SEGMENT_COUNT, 1L);
}

Expand Down Expand Up @@ -417,7 +417,7 @@ private void handleUpsert(ImmutableSegment immutableSegment) {
immutableSegment.getSegmentMetadata().getTotalDocs());
_serverMetrics.addValueToTableGauge(_tableNameWithType, ServerGauge.SEGMENT_COUNT, 1L);
ImmutableSegmentDataManager newSegmentManager = new ImmutableSegmentDataManager(immutableSegment);
SegmentDataManager oldSegmentManager = _segmentDataManagerMap.put(segmentName, newSegmentManager);
SegmentDataManager oldSegmentManager = registerSegment(segmentName, newSegmentManager);
if (oldSegmentManager == null) {
partitionUpsertMetadataManager.addSegment(immutableSegment);
_logger.info("Added new immutable segment: {} to upsert-enabled table: {}", segmentName, _tableNameWithType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.configuration.ConfigurationException;
Expand Down Expand Up @@ -191,8 +192,9 @@ private DataTable processQueryInternal(ServerQueryRequest queryRequest, Executor
}

List<String> segmentsToQuery = queryRequest.getSegmentsToQuery();
List<String> missingSegments = new ArrayList<>();
List<SegmentDataManager> segmentDataManagers = tableDataManager.acquireSegments(segmentsToQuery, missingSegments);
List<String> notAcquiredSegments = new ArrayList<>();
List<SegmentDataManager> segmentDataManagers = tableDataManager.acquireSegments(
segmentsToQuery, notAcquiredSegments);
int numSegmentsAcquired = segmentDataManagers.size();
List<IndexSegment> indexSegments = new ArrayList<>(numSegmentsAcquired);
for (SegmentDataManager segmentDataManager : segmentDataManagers) {
Expand Down Expand Up @@ -291,12 +293,18 @@ private DataTable processQueryInternal(ServerQueryRequest queryRequest, Executor
//
// After step 2 but before step 4, segment will be missing on server side
// TODO: Change broker to watch both IdealState and ExternalView to not query the removed segments
int numMissingSegments = missingSegments.size();
if (numMissingSegments != 0) {
dataTable.addException(QueryException.getException(QueryException.SERVER_SEGMENT_MISSING_ERROR,
String.format("%d segments %s missing on server: %s", numMissingSegments, missingSegments,
_instanceDataManager.getInstanceId())));
_serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_MISSING_SEGMENTS, numMissingSegments);
if (notAcquiredSegments.size() > 0) {
List<String> missingSegments =
notAcquiredSegments.stream()
.filter(segmentName -> !tableDataManager.isSegmentDeletedRecently(segmentName))
.collect(Collectors.toList());
int numMissingSegments = missingSegments.size();
if (numMissingSegments > 0) {
dataTable.addException(QueryException.getException(QueryException.SERVER_SEGMENT_MISSING_ERROR,
String.format("%d segments %s missing on server: %s", numMissingSegments, missingSegments,
_instanceDataManager.getInstanceId())));
_serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_MISSING_SEGMENTS, numMissingSegments);
}
}

if (tableDataManager instanceof RealtimeTableDataManager) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.data.manager;

import com.google.common.collect.ImmutableList;
import java.io.File;
import java.lang.reflect.Field;
import java.util.ArrayList;
Expand Down Expand Up @@ -57,6 +58,8 @@
public class BaseTableDataManagerAcquireSegmentTest {
private static final String TABLE_NAME = "testTable";
private static final String SEGMENT_PREFIX = "segment";
private static final int DELETED_SEGMENTS_CACHE_SIZE = 100;
private static final int DELETED_SEGMENTS_TTL_MINUTES = 2;

// Set once for the suite
private File _tmpDir;
Expand Down Expand Up @@ -121,6 +124,8 @@ private TableDataManager makeTestableManager()
when(config.getTableName()).thenReturn(TABLE_NAME);
when(config.getDataDir()).thenReturn(_tmpDir.getAbsolutePath());
when(config.getAuthConfig()).thenReturn(new MapConfiguration(new HashMap<>()));
when(config.getTableDeletedSegmentsCacheSize()).thenReturn(DELETED_SEGMENTS_CACHE_SIZE);
when(config.getTableDeletedSegmentsCacheTtlMinutes()).thenReturn(DELETED_SEGMENTS_TTL_MINUTES);
}
tableDataManager.init(config, "dummyInstance", mock(ZkHelixPropertyStore.class),
new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null,
Expand Down Expand Up @@ -174,6 +179,19 @@ public void basicTest()
List<SegmentDataManager> segmentDataManagers = tableDataManager.acquireAllSegments();
Assert.assertEquals(segmentDataManagers.size(), 0);

// If a caller tries to acquire the deleted segment using acquireSegments, it will be returned in
// notAcquiredSegments. The isSegmentDeletedRecently method should return true.
List<String> notAcquiredSegments = new ArrayList<>();
tableDataManager.acquireSegments(ImmutableList.of(segmentName), notAcquiredSegments);
Assert.assertEquals(notAcquiredSegments.size(), 1);
Assert.assertTrue(tableDataManager.isSegmentDeletedRecently(segmentName));

// Adding and removing the segment again is fine. After adding the segment back, isSegmentDeletedRecently should
// return false.
tableDataManager.addSegment(immutableSegment);
Assert.assertFalse(tableDataManager.isSegmentDeletedRecently(segmentName));
tableDataManager.removeSegment(segmentName);

// Removing the segment again is fine.
tableDataManager.removeSegment(segmentName);
Assert.assertEquals(tableDataManager.getNumSegments(), 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ void addOrReplaceSegment(String segmentName, IndexLoadingConfig indexLoadingConf
*/
void removeSegment(String segmentName);

/**
* Returns true if the segment was deleted in the last few minutes.
*/
boolean isSegmentDeletedRecently(String segmentName);

/**
* Acquires all segments of the table.
* <p>It is the caller's responsibility to return the segments by calling {@link #releaseSegment(SegmentDataManager)}.
Expand All @@ -134,6 +139,8 @@ void addOrReplaceSegment(String segmentName, IndexLoadingConfig indexLoadingConf
/**
* Acquires the segments with the given segment names.
* <p>It is the caller's responsibility to return the segments by calling {@link #releaseSegment(SegmentDataManager)}.
* This method may return some recently deleted segments in missingSegments. The caller can identify those segments
* by using {@link #isSegmentDeletedRecently(String)}.
*
* @param segmentNames List of names of the segment to acquire
* @param missingSegments Holder for segments unable to be acquired
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class TableDataManagerConfig {
private static final String TABLE_DATA_MANAGER_NAME = "name";
private static final String TABLE_IS_DIMENSION = "isDimTable";
private static final String TABLE_DATA_MANAGER_AUTH = "auth";
private static final String TABLE_DELETED_SEGMENTS_CACHE_SIZE = "deletedSegmentsCacheSize";
private static final String TABLE_DELETED_SEGMENTS_CACHE_TTL_MINUTES = "deletedSegmentsCacheTTL";

private final Configuration _tableDataManagerConfig;

Expand Down Expand Up @@ -72,6 +74,14 @@ public Configuration getAuthConfig() {
return _tableDataManagerConfig.subset(TABLE_DATA_MANAGER_AUTH);
}

public int getTableDeletedSegmentsCacheSize() {
return _tableDataManagerConfig.getInt(TABLE_DELETED_SEGMENTS_CACHE_SIZE);
}

public int getTableDeletedSegmentsCacheTtlMinutes() {
return _tableDataManagerConfig.getInt(TABLE_DELETED_SEGMENTS_CACHE_TTL_MINUTES);
}

public static TableDataManagerConfig getDefaultHelixTableDataManagerConfig(
InstanceDataManagerConfig instanceDataManagerConfig, String tableNameWithType) {
Configuration defaultConfig = new PropertiesConfiguration();
Expand All @@ -82,6 +92,10 @@ public static TableDataManagerConfig getDefaultHelixTableDataManagerConfig(
TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
Preconditions.checkNotNull(tableType);
defaultConfig.addProperty(TABLE_DATA_MANAGER_TYPE, tableType.name());
defaultConfig.addProperty(TABLE_DELETED_SEGMENTS_CACHE_SIZE,
instanceDataManagerConfig.getDeletedSegmentsCacheSize());
defaultConfig.addProperty(TABLE_DELETED_SEGMENTS_CACHE_TTL_MINUTES,
instanceDataManagerConfig.getDeletedSegmentsCacheTtlMinutes());

// copy auth-related configs
instanceDataManagerConfig.getConfig().subset(TABLE_DATA_MANAGER_AUTH).toMap()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,13 @@ public class HelixInstanceDataManagerConfig implements InstanceDataManagerConfig
// Size of cache that holds errors.
private static final String ERROR_CACHE_SIZE = "error.cache.size";

private static final String DELETED_SEGMENTS_CACHE_SIZE = "table.deleted.segments.cache.size";
private static final String DELETED_SEGMENTS_CACHE_TTL_MINUTES = "table.deleted.segments.cache.ttl.minutes";

private final static String[] REQUIRED_KEYS = {INSTANCE_ID, INSTANCE_DATA_DIR, READ_MODE};
private static final long DEFAULT_ERROR_CACHE_SIZE = 100L;
private static final int DEFAULT_DELETED_SEGMENTS_CACHE_SIZE = 10_000;
private static final int DEFAULT_DELETED_SEGMENTS_CACHE_TTL_MINUTES = 2;
private PinotConfiguration _instanceDataManagerConfiguration = null;

public HelixInstanceDataManagerConfig(PinotConfiguration serverConfig)
Expand Down Expand Up @@ -257,6 +262,18 @@ public long getStreamSegmentDownloadUntarRateLimit() {
DEFAULT_STREAM_SEGMENT_DOWNLOAD_UNTAR_RATE_LIMIT);
}

@Override
public int getDeletedSegmentsCacheSize() {
return _instanceDataManagerConfiguration.getProperty(DELETED_SEGMENTS_CACHE_SIZE,
DEFAULT_DELETED_SEGMENTS_CACHE_SIZE);
}

@Override
public int getDeletedSegmentsCacheTtlMinutes() {
return _instanceDataManagerConfiguration.getProperty(DELETED_SEGMENTS_CACHE_TTL_MINUTES,
DEFAULT_DELETED_SEGMENTS_CACHE_TTL_MINUTES);
}

@Override
public String toString() {
String configString = "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,8 @@ public interface InstanceDataManagerConfig {
boolean isStreamSegmentDownloadUntar();

long getStreamSegmentDownloadUntarRateLimit();

int getDeletedSegmentsCacheSize();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: getRecentlyDeletedSegmentsCacheSize()


int getDeletedSegmentsCacheTtlMinutes();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

}