Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Fill assignment param of bucket shuffle and colocate shuffle for debug #5167

Merged
merged 43 commits into from
Jan 16, 2021
Merged
Show file tree
Hide file tree
Changes from 42 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
1a15f53
udf: replace function
Aug 13, 2020
7d072b9
Merge remote-tracking branch 'upstream/master' into str_replace
Aug 14, 2020
391158f
udf: replace function
Aug 14, 2020
5eb52e1
udf: replace function
Aug 14, 2020
a79ea91
Merge remote-tracking branch 'upstream/master' into str_replace
Aug 18, 2020
86f841f
udf: replace function
Aug 18, 2020
ade1afa
udf: replace function
Aug 18, 2020
1d95a49
udf: replace function
Aug 18, 2020
433eb87
Merge remote-tracking branch 'upstream/master' into str_replace
Aug 24, 2020
c62d239
Merge remote-tracking branch 'upstream/master' into str_replace
Aug 25, 2020
0a8db8c
Merge remote-tracking branch 'upstream/master' into str_replace
Aug 25, 2020
02d3f86
Merge remote-tracking branch 'upstream/master' into str_replace
Sep 1, 2020
41da0ba
Merge remote-tracking branch 'upstream/master' into str_replace
Sep 8, 2020
de4e523
Merge remote-tracking branch 'upstream/master' into str_replace
Sep 11, 2020
a863b0d
Merge remote-tracking branch 'upstream/master' into str_replace
Sep 17, 2020
024c422
Merge remote-tracking branch 'upstream/master' into str_replace
Sep 22, 2020
e1656c7
Merge remote-tracking branch 'upstream/master' into str_replace
Sep 23, 2020
837e037
Merge remote-tracking branch 'upstream/master' into str_replace
Sep 27, 2020
a36d3e7
Merge remote-tracking branch 'upstream/master' into str_replace
Oct 17, 2020
43cfa2b
Merge remote-tracking branch 'upstream/master' into str_replace
Oct 23, 2020
641efb5
Merge remote-tracking branch 'upstream/master' into str_replace
Oct 24, 2020
5066131
Merge remote-tracking branch 'upstream/master' into str_replace
Oct 26, 2020
dde044b
Merge remote-tracking branch 'upstream/master' into str_replace
Oct 27, 2020
8f8c823
Merge remote-tracking branch 'upstream/master' into str_replace
Oct 28, 2020
f7704ba
Merge remote-tracking branch 'upstream/master' into str_replace
Nov 1, 2020
8726e7a
Merge remote-tracking branch 'upstream/master' into str_replace
Nov 4, 2020
4dfc785
Merge remote-tracking branch 'upstream/master' into str_replace
Nov 6, 2020
d4bd91c
Merge remote-tracking branch 'upstream/master' into str_replace
Nov 11, 2020
2eb8ddf
Merge remote-tracking branch 'upstream/master' into str_replace
Nov 12, 2020
5aa7afc
Merge remote-tracking branch 'upstream/master' into str_replace
Nov 14, 2020
ce251b2
Merge remote-tracking branch 'upstream/master' into str_replace
Nov 17, 2020
04c5899
Merge remote-tracking branch 'upstream/master' into str_replace
Nov 18, 2020
2e614cf
Merge remote-tracking branch 'upstream/master' into str_replace
Nov 24, 2020
ede2e00
Merge remote-tracking branch 'upstream/master' into str_replace
Nov 25, 2020
0aed91f
Merge remote-tracking branch 'upstream/master' into str_replace
Nov 26, 2020
bd28b73
Merge remote-tracking branch 'upstream/master' into str_replace
Nov 30, 2020
e56e17d
Merge remote-tracking branch 'upstream/master' into str_replace
Dec 10, 2020
7d5d011
Merge remote-tracking branch 'upstream/master' into str_replace
Dec 17, 2020
22ebef7
fill assignment for instrance trace
Dec 19, 2020
ff2dec7
fill bucket shuffle join and colocate join assignment
Dec 29, 2020
e0b83ea
update
Dec 29, 2020
c13a20f
Merge remote-tracking branch 'upstream/master' into fill_assignment
Dec 29, 2020
5d9a64b
Merge remote-tracking branch 'upstream/master' into fill_assignment
Jan 15, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -1183,9 +1183,10 @@ private void computeColocateJoinInstanceParam(PlanFragmentId fragmentId, int par
}
addressToScanRanges.get(address).add(filteredNodeScanRanges);
}

FragmentScanRangeAssignment assignment = params.scanRangeAssignment;
for (Map.Entry<TNetworkAddress, List<Map<Integer, List<TScanRangeParams>>>> addressScanRange : addressToScanRanges.entrySet()) {
List<Map<Integer, List<TScanRangeParams>>> scanRange = addressScanRange.getValue();
Map<Integer, List<TScanRangeParams>> range = findOrInsert(assignment, addressScanRange.getKey(), new HashMap<Integer, List<TScanRangeParams>>());
int expectedInstanceNum = 1;
if (parallelExecInstanceNum > 1) {
//the scan instance num should not larger than the tablets num
Expand All @@ -1200,13 +1201,17 @@ private void computeColocateJoinInstanceParam(PlanFragmentId fragmentId, int par
for (List<Map<Integer, List<TScanRangeParams>>> perInstanceScanRange : perInstanceScanRanges) {
FInstanceExecParam instanceParam = new FInstanceExecParam(null, addressScanRange.getKey(), 0, params);


for (Map<Integer, List<TScanRangeParams>> nodeScanRangeMap : perInstanceScanRange) {
for (Map.Entry<Integer, List<TScanRangeParams>> nodeScanRange : nodeScanRangeMap.entrySet()) {
if (!instanceParam.perNodeScanRanges.containsKey(nodeScanRange.getKey())) {
range.put(nodeScanRange.getKey(), nodeScanRange.getValue());
instanceParam.perNodeScanRanges.put(nodeScanRange.getKey(), nodeScanRange.getValue());
} else {
range.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue());
instanceParam.perNodeScanRanges.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue());
}

}
}
params.instanceExecParams.add(instanceParam);
Expand Down Expand Up @@ -1615,7 +1620,6 @@ private void computeInstanceParam(PlanFragmentId fragmentId, int parallelExecIns
for (Map.Entry<Integer, Map<Integer, List<TScanRangeParams>>> scanRanges : bucketSeqToScanRange.entrySet()) {
TNetworkAddress address = bucketSeqToAddress.get(scanRanges.getKey());
Map<Integer, List<TScanRangeParams>> nodeScanRanges = scanRanges.getValue();

// We only care about the node scan ranges of scan nodes which belong to this fragment
Map<Integer, List<TScanRangeParams>> filteredNodeScanRanges = Maps.newHashMap();
for (Integer scanNodeId : nodeScanRanges.keySet()) {
Expand All @@ -1630,9 +1634,10 @@ private void computeInstanceParam(PlanFragmentId fragmentId, int parallelExecIns
}
addressToScanRanges.get(address).add(filteredScanRanges);
}

FragmentScanRangeAssignment assignment = params.scanRangeAssignment;
for (Map.Entry<TNetworkAddress, List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>>> addressScanRange : addressToScanRanges.entrySet()) {
List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>> scanRange = addressScanRange.getValue();
Map<Integer, List<TScanRangeParams>> range = findOrInsert(assignment, addressScanRange.getKey(), new HashMap<Integer, List<TScanRangeParams>>());
int expectedInstanceNum = 1;
if (parallelExecInstanceNum > 1) {
//the scan instance num should not larger than the tablets num
Expand All @@ -1651,8 +1656,10 @@ private void computeInstanceParam(PlanFragmentId fragmentId, int parallelExecIns
instanceParam.addBucketSeq(nodeScanRangeMap.first);
for (Map.Entry<Integer, List<TScanRangeParams>> nodeScanRange : nodeScanRangeMap.second.entrySet()) {
if (!instanceParam.perNodeScanRanges.containsKey(nodeScanRange.getKey())) {
range.put(nodeScanRange.getKey(), nodeScanRange.getValue());
instanceParam.perNodeScanRanges.put(nodeScanRange.getKey(), nodeScanRange.getValue());
} else {
range.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue());
instanceParam.perNodeScanRanges.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue());
}
}
Expand Down
94 changes: 92 additions & 2 deletions fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,46 @@ public void testComputeScanRangeAssignmentByBucketq() {
Assert.assertEquals(targetBeCount, 3);
}

@Test
public void testColocateJoinAssignment() {
Coordinator coordinator = new Coordinator(context, analyzer, planner);

PlanFragmentId planFragmentId = new PlanFragmentId(1);
int scanNodeId = 1;
Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds = new HashMap<>();
fragmentIdToScanNodeIds.put(planFragmentId, new HashSet<>());
fragmentIdToScanNodeIds.get(planFragmentId).add(scanNodeId);
Deencapsulation.setField(coordinator, "fragmentIdToScanNodeIds", fragmentIdToScanNodeIds);

// 1. set fragmentToBucketSeqToAddress in coordinator
Map<Integer, TNetworkAddress> bucketSeqToAddress = new HashMap<>();
TNetworkAddress address = new TNetworkAddress();
for (int i = 0; i < 3; i++) {
bucketSeqToAddress.put(i, address);
}
Map<PlanFragmentId, Map<Integer, TNetworkAddress>> fragmentToBucketSeqToAddress = new HashMap<>();
fragmentToBucketSeqToAddress.put(planFragmentId, bucketSeqToAddress);
Deencapsulation.setField(coordinator, "fragmentIdToSeqToAddressMap", fragmentToBucketSeqToAddress);

// 2. set bucketSeqToScanRange in coordinator
BucketSeqToScanRange bucketSeqToScanRange = new BucketSeqToScanRange();
Map<Integer, List<TScanRangeParams>> ScanRangeMap = new HashMap<>();
ScanRangeMap.put(scanNodeId, new ArrayList<>());
for (int i = 0; i < 3; i++) {
bucketSeqToScanRange.put(i, ScanRangeMap);
}
Deencapsulation.setField(coordinator, "bucketSeqToScanRange", bucketSeqToScanRange);
TupleDescriptor tupleDescriptor = new TupleDescriptor(new TupleId(-1));
OlapScanNode olapScanNode = new OlapScanNode(new PlanNodeId(scanNodeId), tupleDescriptor, "test");
PlanFragment fragment = new PlanFragment(planFragmentId, olapScanNode,
new DataPartition(TPartitionType.UNPARTITIONED));
FragmentExecParams params = new FragmentExecParams(fragment);
Deencapsulation.invoke(coordinator, "computeColocateJoinInstanceParam", planFragmentId, 1, params);
StringBuilder sb = new StringBuilder();
params.appendTo(sb);
Assert.assertTrue(sb.toString().contains("range=[id1,range=[]]"));
}

@Test
public void testComputeScanRangeAssignmentByBucket() {
PlanFragmentId planFragmentId = new PlanFragmentId(1);
Expand Down Expand Up @@ -370,11 +410,16 @@ public void testComputeBucketShuffleJoinInstanceParam() {
fragmentIdBucketSeqToScanRangeMap.put(planFragmentId, bucketSeqToScanRange);
Deencapsulation.setField(bucketShuffleJoinController, "fragmentIdBucketSeqToScanRangeMap", fragmentIdBucketSeqToScanRangeMap);


FragmentExecParams params = new FragmentExecParams(null);
Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 1, params);
Assert.assertEquals(1, params.instanceExecParams.size());

try {
StringBuilder sb = new StringBuilder();
params.appendTo(sb);
System.out.println(sb);
} catch (Exception e) {
e.printStackTrace();
}
params = new FragmentExecParams(null);
Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 2, params);
Assert.assertEquals(2, params.instanceExecParams.size());
Expand All @@ -388,6 +433,51 @@ public void testComputeBucketShuffleJoinInstanceParam() {
Assert.assertEquals(3, params.instanceExecParams.size());
}

@Test
public void testBucketShuffleAssignment() {
PlanFragmentId planFragmentId = new PlanFragmentId(1);
int scanNodeId = 1;

// set fragment id to scan node ids map
Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds = new HashMap<>();
fragmentIdToScanNodeIds.put(planFragmentId, new HashSet<>());
fragmentIdToScanNodeIds.get(planFragmentId).add(scanNodeId);
Coordinator.BucketShuffleJoinController bucketShuffleJoinController
= new Coordinator.BucketShuffleJoinController(fragmentIdToScanNodeIds);

// 1. set fragmentToBucketSeqToAddress in bucketShuffleJoinController
Map<Integer, TNetworkAddress> bucketSeqToAddress = new HashMap<>();
TNetworkAddress address = new TNetworkAddress();
for (int i = 0; i < 3; i++) {
bucketSeqToAddress.put(i, address);
}
Map<PlanFragmentId, Map<Integer, TNetworkAddress>> fragmentToBucketSeqToAddress = new HashMap<>();
fragmentToBucketSeqToAddress.put(planFragmentId, bucketSeqToAddress);
Deencapsulation.setField(bucketShuffleJoinController, "fragmentIdToSeqToAddressMap", fragmentToBucketSeqToAddress);

// 2. set bucketSeqToScanRange in bucketShuffleJoinController
Map<PlanFragmentId, BucketSeqToScanRange> fragmentIdBucketSeqToScanRangeMap = new HashMap<>();
BucketSeqToScanRange bucketSeqToScanRange = new BucketSeqToScanRange();
Map<Integer, List<TScanRangeParams>> ScanRangeMap = new HashMap<>();
ScanRangeMap.put(scanNodeId, new ArrayList<>());
for (int i = 0; i < 3; i++) {
bucketSeqToScanRange.put(i, ScanRangeMap);
}
fragmentIdBucketSeqToScanRangeMap.put(planFragmentId, bucketSeqToScanRange);
Deencapsulation.setField(bucketShuffleJoinController, "fragmentIdBucketSeqToScanRangeMap", fragmentIdBucketSeqToScanRangeMap);
TupleDescriptor tupleDescriptor = new TupleDescriptor(new TupleId(-1));
OlapScanNode olapScanNode = new OlapScanNode(new PlanNodeId(scanNodeId), tupleDescriptor, "test");
PlanFragment fragment = new PlanFragment(planFragmentId, olapScanNode,
new DataPartition(TPartitionType.UNPARTITIONED));

FragmentExecParams params = new FragmentExecParams(fragment);
Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 1, params);
Assert.assertEquals(1, params.instanceExecParams.size());
StringBuilder sb = new StringBuilder();
params.appendTo(sb);
Assert.assertTrue(sb.toString().contains("range=[id1,range=[]]"));
}

@Test
public void testComputeScanRangeAssignmentByScheduler() {
Coordinator coordinator = new Coordinator(context, analyzer, planner);
Expand Down