Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2871,7 +2871,7 @@ public String startReplaceSegments(String tableNameWithType, List<String> segmen
// at any time in case of REFRESH use case.
if (forceCleanup) {
if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS && CollectionUtils
.isEqualCollection(segmentsFrom, lineageEntry.getSegmentsFrom())) {
.isEqualCollection(segmentsFrom, lineageEntry.getSegmentsFrom())) {
LOGGER.info(
"Detected the incomplete lineage entry with the same 'segmentsFrom'. Reverting the lineage "
+ "entry to unblock the new segment protocol. tableNameWithType={}, entryId={}, segmentsFrom={}, "
Expand Down Expand Up @@ -3078,6 +3078,25 @@ public void revertReplaceSegments(String tableNameWithType, String segmentLineag
throw new RuntimeException(errorMsg);
}

// We do not allow to revert the lineage entry which segments in 'segmentsTo' appear in 'segmentsFrom' of other
// 'IN_PROGRESS' or 'COMPLETED' entries. E.g. we do not allow reverting entry1 because it will block reverting
// entry2.
// entry1: {(Seg_0, Seg_1, Seg_2) -> (Seg_3, Seg_4, Seg_5), COMPLETED}
// entry2: {(Seg_3, Seg_4, Seg_5) -> (Seg_6, Seg_7, Seg_8), IN_PROGRESS/COMPLETED}
// TODO: need to expand the logic to revert multiple entries in one go when we support > 2 data snapshots
for (String currentEntryId : segmentLineage.getLineageEntryIds()) {
LineageEntry currentLineageEntry = segmentLineage.getLineageEntry(currentEntryId);
if (currentLineageEntry.getState() == LineageEntryState.IN_PROGRESS
|| currentLineageEntry.getState() == LineageEntryState.COMPLETED) {
Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsTo(), currentLineageEntry
.getSegmentsFrom()), String.format("Cannot revert lineage entry, found segments from 'segmentsTo' "
+ "appear in 'segmentsFrom' of another lineage entry. (tableNameWithType='%s', "
+ "segmentLineageEntryId='%s', segmentsTo = '%s', segmentLineageEntryId='%s' "
+ "segmentsFrom = '%s')", tableNameWithType, segmentLineageEntryId, lineageEntry.getSegmentsTo(),
currentEntryId, currentLineageEntry.getSegmentsFrom()));
}
}

// Update segment lineage entry to 'REVERTED'
updateSegmentLineageEntryToReverted(tableNameWithType, segmentLineage, segmentLineageEntryId, lineageEntry);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import static org.apache.pinot.spi.utils.CommonConstants.Helix.NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE;
import static org.apache.pinot.spi.utils.CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE;
import static org.apache.pinot.spi.utils.CommonConstants.Server.DEFAULT_ADMIN_API_PORT;
import static org.testng.Assert.fail;


public class PinotHelixResourceManagerTest {
Expand Down Expand Up @@ -491,6 +492,7 @@ public void testSegmentReplacement()
try {
ControllerTestUtils.getHelixResourceManager()
.startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo, false);
fail();
} catch (Exception e) {
// expected
}
Expand All @@ -499,6 +501,7 @@ public void testSegmentReplacement()
try {
ControllerTestUtils.getHelixResourceManager()
.startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo, false);
fail();
} catch (Exception e) {
// expected
}
Expand All @@ -508,20 +511,23 @@ public void testSegmentReplacement()
try {
ControllerTestUtils.getHelixResourceManager()
.startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo, false);
fail();
} catch (Exception e) {
// expected
}

// Invalid table
try {
ControllerTestUtils.getHelixResourceManager().endReplaceSegments(OFFLINE_TABLE_NAME, lineageEntryId);
fail();
} catch (Exception e) {
// expected
}

// Invalid lineage entry id
try {
ControllerTestUtils.getHelixResourceManager().endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, "aaa");
fail();
} catch (Exception e) {
// expected
}
Expand All @@ -530,6 +536,7 @@ public void testSegmentReplacement()
try {
ControllerTestUtils.getHelixResourceManager()
.endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, lineageEntryId);
fail();
} catch (Exception e) {
// expected
}
Expand Down Expand Up @@ -575,6 +582,7 @@ public void testSegmentReplacement()
try {
ControllerTestUtils.getHelixResourceManager()
.revertReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, lineageEntryId2, false);
fail();
} catch (Exception e) {
// expected
}
Expand Down Expand Up @@ -613,6 +621,7 @@ public void testSegmentReplacement()
try {
ControllerTestUtils.getHelixResourceManager()
.startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo, false);
fail();
} catch (Exception e) {
// expected
}
Expand Down Expand Up @@ -755,6 +764,14 @@ public void testSegmentReplacementForRefresh()
Assert.assertEquals(new HashSet<>(ControllerTestUtils.getHelixResourceManager()
.getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, true)),
new HashSet<>(Arrays.asList("s3", "s4", "s5")));
// Try to revert the first entry should fail
try {
ControllerTestUtils.getHelixResourceManager()
.revertReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, lineageEntryId, false);
fail();
} catch (Exception e) {
// expected
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you put a fail after line 761 then please?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jtao15 Can we add fail() for all the places where we try{} catch{} ?

I think that we have a lot of tests like this and I remember that I didn't add fail().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

}

// Add partial segments to indicate incomplete protocol
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
Expand Down Expand Up @@ -799,6 +816,7 @@ public void testSegmentReplacementForRefresh()
try {
ControllerTestUtils.getHelixResourceManager()
.endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, lineageEntryId2);
fail();
} catch (Exception e) {
// expected
}
Expand Down