Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ public synchronized void buildRouting(String tableNameWithType) {
String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
if (_routingEntryMap.containsKey(realtimeTableName)) {
LOGGER.info("Adding time boundary manager for table: {}", tableNameWithType);
timeBoundaryManager = new TimeBoundaryManager(tableConfig, _propertyStore);
timeBoundaryManager = new TimeBoundaryManager(tableConfig, _propertyStore, _brokerMetrics);
timeBoundaryManager.init(idealState, externalView, preSelectedOnlineSegments);
}
} else {
Expand Down Expand Up @@ -477,7 +477,7 @@ public synchronized void buildRouting(String tableNameWithType) {
Set<String> offlineTablePreSelectedOnlineSegments =
offlineTableSegmentPreSelector.preSelect(offlineTableOnlineSegments);
TimeBoundaryManager offlineTableTimeBoundaryManager =
new TimeBoundaryManager(offlineTableConfig, _propertyStore);
new TimeBoundaryManager(offlineTableConfig, _propertyStore, _brokerMetrics);
offlineTableTimeBoundaryManager.init(offlineTableIdealState, offlineTableExternalView,
offlineTablePreSelectedOnlineSegments);
offlineTableRoutingEntry.setTimeBoundaryManager(offlineTableTimeBoundaryManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.BrokerGauge;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
Expand All @@ -52,23 +55,27 @@
*/
public class TimeBoundaryManager {
private static final Logger LOGGER = LoggerFactory.getLogger(TimeBoundaryManager.class);
private static final long INVALID_END_TIME_MS = -1;
private static final long INVALID_TIME_MS = -1;

private final String _offlineTableName;
private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
private final BrokerMetrics _brokerMetrics;
private final String _segmentZKMetadataPathPrefix;
private final String _timeColumn;
private final DateTimeFormatSpec _timeFormatSpec;
private final long _timeOffsetMs;
private final Map<String, Long> _endTimeMsMap = new HashMap<>();

private long _explicitlySetTimeBoundaryMs = INVALID_TIME_MS;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be marked @volatile

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't need to be because it is always under the synchronized block. Maybe worth adding a comment

private volatile TimeBoundaryInfo _timeBoundaryInfo;

public TimeBoundaryManager(TableConfig tableConfig, ZkHelixPropertyStore<ZNRecord> propertyStore) {
public TimeBoundaryManager(TableConfig tableConfig, ZkHelixPropertyStore<ZNRecord> propertyStore,
BrokerMetrics brokerMetrics) {
Preconditions.checkState(tableConfig.getTableType() == TableType.OFFLINE,
"Cannot construct TimeBoundaryManager for real-time table: %s", tableConfig.getTableName());
_offlineTableName = tableConfig.getTableName();
_propertyStore = propertyStore;
_brokerMetrics = brokerMetrics;
_segmentZKMetadataPathPrefix = ZKMetadataProvider.constructPropertyStorePathForResource(_offlineTableName) + "/";

Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _offlineTableName);
Expand Down Expand Up @@ -104,6 +111,8 @@ public TimeBoundaryManager(TableConfig tableConfig, ZkHelixPropertyStore<ZNRecor
*/
@SuppressWarnings("unused")
public void init(IdealState idealState, ExternalView externalView, Set<String> onlineSegments) {
updateExplicitlySetTimeBoundary(idealState);

// Bulk load time info for all online segments
int numSegments = onlineSegments.size();
List<String> segments = new ArrayList<>(numSegments);
Expand All @@ -113,7 +122,7 @@ public void init(IdealState idealState, ExternalView externalView, Set<String> o
segmentZKMetadataPaths.add(_segmentZKMetadataPathPrefix + segment);
}
List<ZNRecord> znRecords = _propertyStore.get(segmentZKMetadataPaths, null, AccessOption.PERSISTENT, false);
long maxEndTimeMs = INVALID_END_TIME_MS;
long maxEndTimeMs = INVALID_TIME_MS;
for (int i = 0; i < numSegments; i++) {
String segment = segments.get(i);
long endTimeMs = extractEndTimeMsFromSegmentZKMetadataZNRecord(segment, znRecords.get(i));
Expand All @@ -123,37 +132,66 @@ public void init(IdealState idealState, ExternalView externalView, Set<String> o
updateTimeBoundaryInfo(maxEndTimeMs);
}

private void updateExplicitlySetTimeBoundary(IdealState idealState) {
String timeBoundary = idealState.getRecord().getSimpleField(CommonConstants.IdealState.HYBRID_TABLE_TIME_BOUNDARY);
long timeBoundaryMs = timeBoundary != null ? Long.parseLong(timeBoundary) : INVALID_TIME_MS;
if (_explicitlySetTimeBoundaryMs != timeBoundaryMs) {
LOGGER.info("Updating explicitly set time boundary to: {} for table: {}", timeBoundaryMs, _offlineTableName);
_explicitlySetTimeBoundaryMs = timeBoundaryMs;
}
}

private long extractEndTimeMsFromSegmentZKMetadataZNRecord(String segment, @Nullable ZNRecord znRecord) {
if (znRecord == null) {
LOGGER.warn("Failed to find segment ZK metadata for segment: {}, table: {}", segment, _offlineTableName);
return INVALID_END_TIME_MS;
return INVALID_TIME_MS;
}
long totalDocs = znRecord.getLongField(CommonConstants.Segment.TOTAL_DOCS, -1);
long endTimeMs = INVALID_END_TIME_MS;
if (totalDocs != 0) {
long endTime = znRecord.getLongField(CommonConstants.Segment.END_TIME, -1);
if (endTime > 0) {
TimeUnit timeUnit = znRecord.getEnumField(CommonConstants.Segment.TIME_UNIT, TimeUnit.class, TimeUnit.DAYS);
endTimeMs = timeUnit.toMillis(endTime);
} else {
LOGGER.warn("Failed to find valid end time for segment: {}, table: {}", segment, _offlineTableName);
}
SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(znRecord);
if (segmentZKMetadata.getTotalDocs() == 0) {
return INVALID_TIME_MS;
}
long endTimeMs = segmentZKMetadata.getEndTimeMs();
if (endTimeMs > 0) {
return endTimeMs;
} else {
LOGGER.warn("Failed to find valid end time for segment: {}, table: {}", segment, _offlineTableName);
return INVALID_TIME_MS;
}
return endTimeMs;
}

private void updateTimeBoundaryInfo(long maxEndTimeMs) {
if (maxEndTimeMs > 0) {
String timeBoundary = _timeFormatSpec.fromMillisToFormat(maxEndTimeMs - _timeOffsetMs);
TimeBoundaryInfo currentTimeBoundaryInfo = _timeBoundaryInfo;
TimeBoundaryInfo currentTimeBoundaryInfo = _timeBoundaryInfo;

long timeBoundaryMs;
if (_explicitlySetTimeBoundaryMs > 0) {
// Use explicitly set time boundary
timeBoundaryMs = _explicitlySetTimeBoundaryMs;
LOGGER.debug("Using explicitly set time boundary: {} for table: {}", _explicitlySetTimeBoundaryMs,
_offlineTableName);
} else {
// No explicit time boundary set
if (maxEndTimeMs > 0) {
timeBoundaryMs = maxEndTimeMs - _timeOffsetMs;
} else {
LOGGER.warn("Failed to find segment with valid end time for table: {}, no time boundary generated",
_offlineTableName);
timeBoundaryMs = INVALID_TIME_MS;
}
}

if (timeBoundaryMs > 0) {
String timeBoundary = _timeFormatSpec.fromMillisToFormat(timeBoundaryMs);
if (currentTimeBoundaryInfo == null || !currentTimeBoundaryInfo.getTimeValue().equals(timeBoundary)) {
_timeBoundaryInfo = new TimeBoundaryInfo(_timeColumn, timeBoundary);
LOGGER.info("Updated time boundary to: {} for table: {}", timeBoundary, _offlineTableName);
}
// Convert formatted time boundary to millis in case the time boundary is rounded
long formattedTimeBoundaryMs = _timeFormatSpec.fromFormatToMillis(timeBoundary);
_brokerMetrics.setValueOfTableGauge(_offlineTableName, BrokerGauge.TIME_BOUNDARY_DIFFERENCE,
maxEndTimeMs - formattedTimeBoundaryMs);
} else {
LOGGER.warn("Failed to find segment with valid end time for table: {}, no time boundary generated",
_offlineTableName);
_timeBoundaryInfo = null;
_brokerMetrics.removeTableGauge(_offlineTableName, BrokerGauge.TIME_BOUNDARY_DIFFERENCE);
}
}

Expand All @@ -167,6 +205,8 @@ private void updateTimeBoundaryInfo(long maxEndTimeMs) {
@SuppressWarnings("unused")
public synchronized void onAssignmentChange(IdealState idealState, ExternalView externalView,
Set<String> onlineSegments) {
updateExplicitlySetTimeBoundary(idealState);

for (String segment : onlineSegments) {
// NOTE: Only update the segment end time when there are ONLINE instances in the external view to prevent moving
// the time boundary before the new segment is picked up by the servers
Expand All @@ -181,7 +221,7 @@ public synchronized void onAssignmentChange(IdealState idealState, ExternalView
}

private long getMaxEndTimeMs() {
long maxEndTimeMs = INVALID_END_TIME_MS;
long maxEndTimeMs = INVALID_TIME_MS;
for (long endTimeMs : _endTimeMsMap.values()) {
maxEndTimeMs = Math.max(maxEndTimeMs, endTimeMs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.TimeGranularitySpec;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.mockito.Mockito;
Expand All @@ -48,7 +50,6 @@

import static org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE;
import static org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
Expand Down Expand Up @@ -112,10 +113,11 @@ private void testDailyPushTable(String rawTableName, TableConfig tableConfig, Ti
Map<String, String> offlineInstanceStateMap = Collections.singletonMap("server", OFFLINE);
Set<String> onlineSegments = new HashSet<>();
// NOTE: Ideal state is not used in the current implementation.
IdealState idealState = mock(IdealState.class);
IdealState idealState = new IdealState("");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to change this?


// Start with no segment
TimeBoundaryManager timeBoundaryManager = new TimeBoundaryManager(tableConfig, _propertyStore);
TimeBoundaryManager timeBoundaryManager =
new TimeBoundaryManager(tableConfig, _propertyStore, Mockito.mock(BrokerMetrics.class));
timeBoundaryManager.init(idealState, externalView, onlineSegments);
assertNull(timeBoundaryManager.getTimeBoundaryInfo());

Expand Down Expand Up @@ -170,14 +172,34 @@ private void testDailyPushTable(String rawTableName, TableConfig tableConfig, Ti
// Refresh the changed segment should update the time boundary
timeBoundaryManager.refreshSegment(segment2);
verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(4, TimeUnit.DAYS));

// Setting the enforced time boundary in ideal state should update the time boundary
idealState.getRecord().setSimpleField(CommonConstants.IdealState.HYBRID_TABLE_TIME_BOUNDARY,
Long.toString(TimeUnit.MILLISECONDS.convert(50, TimeUnit.DAYS)));
timeBoundaryManager.onAssignmentChange(idealState, externalView, onlineSegments);
verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(50, TimeUnit.DAYS));

// Refresh with more recent segment should not update the enforced time boundary
String segment3 = "segment3";
setSegmentZKMetadata(rawTableName, segment3, 100, timeUnit);
onlineSegments.add(segment3);
segmentAssignment.put(segment3, onlineInstanceStateMap);
timeBoundaryManager.refreshSegment(segment3);
verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(50, TimeUnit.DAYS));

// Unsetting the enforced time boundary should change it to most recent time boundary
idealState.getRecord().getSimpleFields().remove(CommonConstants.IdealState.HYBRID_TABLE_TIME_BOUNDARY);
timeBoundaryManager.onAssignmentChange(idealState, externalView, onlineSegments);
verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(99, TimeUnit.DAYS));
}

private void testHourlyPushTable(String rawTableName, TableConfig tableConfig, TimeUnit timeUnit) {
// NOTE: External view and ideal state are not used in the current implementation.
ExternalView externalView = Mockito.mock(ExternalView.class);
IdealState idealState = Mockito.mock(IdealState.class);
IdealState idealState = new IdealState("");

TimeBoundaryManager timeBoundaryManager = new TimeBoundaryManager(tableConfig, _propertyStore);
TimeBoundaryManager timeBoundaryManager =
new TimeBoundaryManager(tableConfig, _propertyStore, Mockito.mock(BrokerMetrics.class));
Set<String> onlineSegments = new HashSet<>();
String segment0 = "segment0";
onlineSegments.add(segment0);
Expand All @@ -192,6 +214,17 @@ private void testHourlyPushTable(String rawTableName, TableConfig tableConfig, T
expectedTimeValue = timeUnit.convert(47, TimeUnit.HOURS);
}
verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), expectedTimeValue);

// Setting the enforced time boundary in ideal state should update the time boundary
idealState.getRecord().setSimpleField(CommonConstants.IdealState.HYBRID_TABLE_TIME_BOUNDARY,
Long.toString(TimeUnit.MILLISECONDS.convert(50, TimeUnit.HOURS)));
timeBoundaryManager.onAssignmentChange(idealState, externalView, onlineSegments);
verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(50, TimeUnit.HOURS));

// Unsetting the enforced time boundary should change it back to original time boundary
idealState.getRecord().getSimpleFields().remove(CommonConstants.IdealState.HYBRID_TABLE_TIME_BOUNDARY);
timeBoundaryManager.onAssignmentChange(idealState, externalView, onlineSegments);
verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), expectedTimeValue);
}

private TableConfig getTableConfig(String rawTableName, String pushFrequency) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ public enum BrokerGauge implements AbstractMetrics.Gauge {
NETTY_CONNECTION_CONNECT_TIME_MS("nettyConnection", true),
REQUEST_SIZE("requestSize", false),
RESIZE_TIME_MS("milliseconds", false),
UNHEALTHY_SERVERS("servers", true);
UNHEALTHY_SERVERS("servers", true),
TIME_BOUNDARY_DIFFERENCE("milliseconds", false);

private final String _brokerGaugeName;
private final String _unit;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pinot.common.restlet.resources;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;


public class TableSegmentValidationInfo {
private final boolean _valid;
private final long _maxEndTimeMs;

@JsonCreator
public TableSegmentValidationInfo(@JsonProperty("valid") boolean valid,
@JsonProperty("maxEndTimeMs") long maxEndTimeMs) {
_valid = valid;
_maxEndTimeMs = maxEndTimeMs;
}

public boolean isValid() {
return _valid;
}

public long getMaxEndTimeMs() {
return _maxEndTimeMs;
}
}
Loading