Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#864] feat(server): Introduce Jersey to strengthen REST API #939

Merged
merged 40 commits into from
Jul 7, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
5a4e760
Support metric reporter and Support promethues push gateway
xianjingfeng Dec 13, 2022
b7828bf
Merge branch 'master' into issue_169
xianjingfeng Dec 13, 2022
4226a0e
Optimize
xianjingfeng Dec 14, 2022
02c5161
Remove useless modification
xianjingfeng Dec 14, 2022
583db4f
Miss files
xianjingfeng Dec 14, 2022
52eaab5
Fix code style
xianjingfeng Dec 14, 2022
480d4de
Optimize
xianjingfeng Dec 15, 2022
6f76966
Add instanceId
xianjingfeng Dec 15, 2022
c4293e0
合并metric接口
xianjingfeng Dec 19, 2022
dd0ee24
ISSUE-455
xianjingfeng Feb 1, 2023
39bc96c
ISSUE-456
xianjingfeng Feb 1, 2023
9e541e2
ISSUE-468
xianjingfeng Feb 1, 2023
e36e3a3
Revert "[#772] fix(kerberos): cache proxy user ugi to avoid memory le…
jerqi Apr 13, 2023
7330d44
[#772] fix(kerberos): cache proxy user ugi to avoid memory leak (#773)
zuston Mar 29, 2023
4cdfbec
Revert "[#772] fix(kerberos): cache proxy user ugi to avoid memory le…
jerqi Apr 17, 2023
926c314
[#772][0.7] fix(kerberos): cache proxy user ugi to avoid memory leak …
zuston Apr 17, 2023
4b82788
Revert "[#886] fix(mr): MR Client may lost data or throw exception wh…
jerqi May 19, 2023
a7fa07f
Merge branch 'master' of https://github.com/xianjingfeng/incubator-un…
xianjingfeng Jun 20, 2023
69518b5
Introduce Jersey to strengthen REST API
xianjingfeng Jun 7, 2023
c13825f
Introduce Jersey to to shuffle server
xianjingfeng Jun 7, 2023
61b4db7
fix ut
xianjingfeng Jun 8, 2023
e37c717
fix ut
xianjingfeng Jun 8, 2023
0784bf7
use jersey1
xianjingfeng Jun 8, 2023
921bba2
fix ut
xianjingfeng Jun 8, 2023
593ddf7
fix ut
xianjingfeng Jun 8, 2023
c31f067
fix ut
xianjingfeng Jun 8, 2023
57479a1
fix ut
xianjingfeng Jun 8, 2023
df7a321
nit
xianjingfeng Jun 8, 2023
eef176d
Adapt to other APIs of the master branch
xianjingfeng Jun 20, 2023
131c3d2
nit
xianjingfeng Jun 20, 2023
549f86f
nit
xianjingfeng Jun 20, 2023
f6e08af
use hbase thirdparty
xianjingfeng Jun 30, 2023
3c7a5bb
nit
xianjingfeng Jun 30, 2023
d62081e
fix spotbugs
xianjingfeng Jun 30, 2023
7c7f8a0
fix bug
xianjingfeng Jun 30, 2023
d650f00
fix codestyle
xianjingfeng Jun 30, 2023
621977f
introduce hbase-shaded-jackson-jaxrs-json-provider
xianjingfeng Jun 30, 2023
1bf8ec2
Revert "introduce hbase-shaded-jackson-jaxrs-json-provider"
xianjingfeng Jun 30, 2023
04c9d31
nit
xianjingfeng Jul 6, 2023
07f3daa
fix license
xianjingfeng Jul 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
ISSUE-468
  • Loading branch information
xianjingfeng committed Feb 1, 2023
commit 9e541e2cae002db43cff864351a6df4a8b79c27b
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
Expand Down Expand Up @@ -105,6 +106,7 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
private final ExecutorService dataTransferPool;
private final int unregisterThreadPoolSize;
private final int unregisterRequestTimeSec;
private Set<ShuffleServerInfo> defectiveServers;

public ShuffleWriteClientImpl(
String clientType,
Expand Down Expand Up @@ -133,6 +135,9 @@ public ShuffleWriteClientImpl(
this.dataCommitPoolSize = dataCommitPoolSize;
this.unregisterThreadPoolSize = unregisterThreadPoolSize;
this.unregisterRequestTimeSec = unregisterRequestTimeSec;
if (replica > 1) {
defectiveServers = Sets.newConcurrentHashSet();
}
}

private boolean sendShuffleDataAsync(
Expand Down Expand Up @@ -170,12 +175,21 @@ private boolean sendShuffleDataAsync(
if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
// mark a replica of block that has been sent
serverToBlockIds.get(ssi).forEach(block -> blockIdsTracker.get(block).incrementAndGet());
if (defectiveServers != null) {
defectiveServers.remove(ssi);
}
LOG.info("{} successfully.", logMsg);
} else {
if (defectiveServers != null) {
defectiveServers.add(ssi);
}
LOG.warn("{}, it failed wth statusCode[{}]", logMsg, response.getStatusCode());
return false;
}
} catch (Exception e) {
if (defectiveServers != null) {
defectiveServers.add(ssi);
}
LOG.warn("Send: " + serverToBlockIds.get(ssi).size() + " blocks to [" + ssi.getId() + "] failed.", e);
return false;
}
Expand All @@ -192,12 +206,27 @@ private boolean sendShuffleDataAsync(
return result;
}

private void genServerToBlocks(ShuffleBlockInfo sbi, List<ShuffleServerInfo> serverList,
void genServerToBlocks(
ShuffleBlockInfo sbi,
List<ShuffleServerInfo> serverList,
int replicaNum,
List<ShuffleServerInfo> excludeServers,
Map<ShuffleServerInfo, Map<Integer, Map<Integer, List<ShuffleBlockInfo>>>> serverToBlocks,
Map<ShuffleServerInfo, List<Long>> serverToBlockIds) {
Map<ShuffleServerInfo, List<Long>> serverToBlockIds,
boolean excludeDefectiveServers) {
if (replicaNum <= 0) {
return;
}
int partitionId = sbi.getPartitionId();
int shuffleId = sbi.getShuffleId();
int assignedNum = 0;
for (ShuffleServerInfo ssi : serverList) {
if (excludeDefectiveServers && replica > 1 && defectiveServers.contains(ssi)) {
continue;
}
if (CollectionUtils.isNotEmpty(excludeServers) && excludeServers.contains(ssi)) {
continue;
}
if (!serverToBlockIds.containsKey(ssi)) {
serverToBlockIds.put(ssi, Lists.newArrayList());
}
Expand All @@ -216,6 +245,18 @@ private void genServerToBlocks(ShuffleBlockInfo sbi, List<ShuffleServerInfo> ser
partitionToBlocks.put(partitionId, Lists.newArrayList());
}
partitionToBlocks.get(partitionId).add(sbi);
if (excludeServers != null) {
excludeServers.add(ssi);
}
assignedNum++;
if (assignedNum >= replicaNum) {
break;
}
}

if (assignedNum < replicaNum && excludeDefectiveServers) {
genServerToBlocks(sbi, serverList, replicaNum - assignedNum,
excludeServers, serverToBlocks, serverToBlockIds, false);
}
}

Expand Down Expand Up @@ -247,14 +288,15 @@ public SendShuffleDataResult sendShuffleData(String appId, List<ShuffleBlockInfo
for (ShuffleBlockInfo sbi : shuffleBlockInfoList) {
List<ShuffleServerInfo> allServers = sbi.getShuffleServerInfos();
if (replicaSkipEnabled) {
genServerToBlocks(sbi, allServers.subList(0, replicaWrite),
primaryServerToBlocks, primaryServerToBlockIds);
genServerToBlocks(sbi, allServers.subList(replicaWrite, replica),
secondaryServerToBlocks, secondaryServerToBlockIds);
List<ShuffleServerInfo> excludeServers = new ArrayList<>();
genServerToBlocks(sbi, allServers, replicaWrite, excludeServers,
primaryServerToBlocks, primaryServerToBlockIds, true);
genServerToBlocks(sbi, allServers,replica - replicaWrite,
excludeServers, secondaryServerToBlocks, secondaryServerToBlockIds, false);
} else {
// When replicaSkip is disabled, we send data to all replicas within one round.
genServerToBlocks(sbi, allServers,
primaryServerToBlocks, primaryServerToBlockIds);
genServerToBlocks(sbi, allServers, allServers.size(),
null, primaryServerToBlocks, primaryServerToBlockIds, false);
}
}

Expand Down Expand Up @@ -756,6 +798,11 @@ public ShuffleServerClient getShuffleServerClient(ShuffleServerInfo shuffleServe
return ShuffleServerClientFactory.getInstance().getShuffleServerClient(clientType, shuffleServerInfo);
}

@VisibleForTesting
Set<ShuffleServerInfo> getDefectiveServers() {
return defectiveServers;
}

void addShuffleServer(String appId, int shuffleId, ShuffleServerInfo serverInfo) {
Map<Integer, Set<ShuffleServerInfo>> appServerMap = shuffleServerInfoMap.get(appId);
if (appServerMap == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

package org.apache.uniffle.client.impl;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
Expand Down Expand Up @@ -104,4 +106,80 @@ public void testRegisterAndUnRegisterShuffleServer() {
shuffleWriteClient.unregisterShuffle(appId1, 1);
assertEquals(1, shuffleWriteClient.getAllShuffleServers(appId1).size());
}

@Test
public void testSendDataWithDefectiveServers() {
ShuffleWriteClientImpl shuffleWriteClient =
new ShuffleWriteClientImpl("GRPC", 3, 2000, 4, 3, 2, 2, true, 1, 1, 10, 10);
ShuffleServerClient mockShuffleServerClient = mock(ShuffleServerClient.class);
ShuffleWriteClientImpl spyClient = Mockito.spy(shuffleWriteClient);
doReturn(mockShuffleServerClient).when(spyClient).getShuffleServerClient(any());
when(mockShuffleServerClient.sendShuffleData(any())).thenReturn(
new RssSendShuffleDataResponse(ResponseStatusCode.NO_BUFFER),
new RssSendShuffleDataResponse(ResponseStatusCode.SUCCESS),
new RssSendShuffleDataResponse(ResponseStatusCode.SUCCESS));

String appId = "testSendDataWithDefectiveServers_appId";
ShuffleServerInfo ssi1 = new ShuffleServerInfo("127.0.0.1", 0);
ShuffleServerInfo ssi2 = new ShuffleServerInfo("127.0.0.1", 1);
ShuffleServerInfo ssi3 = new ShuffleServerInfo("127.0.0.1", 2);
List<ShuffleServerInfo> shuffleServerInfoList =
Lists.newArrayList(ssi1, ssi2, ssi3);
List<ShuffleBlockInfo> shuffleBlockInfoList = Lists.newArrayList(new ShuffleBlockInfo(
0, 0, 10, 10, 10, new byte[]{1}, shuffleServerInfoList, 10, 100, 0));
SendShuffleDataResult result = spyClient.sendShuffleData(appId, shuffleBlockInfoList, () -> false);
assertEquals(0, result.getFailedBlockIds().size());

// Send data for the second time, the first shuffle server will be moved to the last.
when(mockShuffleServerClient.sendShuffleData(any())).thenReturn(
new RssSendShuffleDataResponse(ResponseStatusCode.SUCCESS),
new RssSendShuffleDataResponse(ResponseStatusCode.SUCCESS));
List<ShuffleServerInfo> excludeServers = new ArrayList<>();
spyClient.genServerToBlocks(shuffleBlockInfoList.get(0), shuffleServerInfoList,
2, excludeServers, Maps.newHashMap(), Maps.newHashMap(), true);
assertEquals(2, excludeServers.size());
assertEquals(ssi2, excludeServers.get(0));
assertEquals(ssi3, excludeServers.get(1));
spyClient.genServerToBlocks(shuffleBlockInfoList.get(0), shuffleServerInfoList,
1, excludeServers, Maps.newHashMap(), Maps.newHashMap(), false);
assertEquals(3, excludeServers.size());
assertEquals(ssi1, excludeServers.get(2));
result = spyClient.sendShuffleData(appId, shuffleBlockInfoList, () -> false);
assertEquals(0, result.getFailedBlockIds().size());

// Send data for the third time, the first server will be removed from the defectiveServers
// and the second server will be added to the defectiveServers.
when(mockShuffleServerClient.sendShuffleData(any())).thenReturn(
new RssSendShuffleDataResponse(ResponseStatusCode.NO_BUFFER),
new RssSendShuffleDataResponse(ResponseStatusCode.SUCCESS),
new RssSendShuffleDataResponse(ResponseStatusCode.SUCCESS));
List<ShuffleServerInfo> shuffleServerInfoList2 = Lists.newArrayList(ssi2, ssi1, ssi3);
List<ShuffleBlockInfo> shuffleBlockInfoList2 = Lists.newArrayList(new ShuffleBlockInfo(0, 0, 10, 10, 10,
new byte[]{1}, shuffleServerInfoList2, 10, 100, 0));
result = spyClient.sendShuffleData(appId, shuffleBlockInfoList2, () -> false);
assertEquals(0, result.getFailedBlockIds().size());
assertEquals(1, spyClient.getDefectiveServers().size());
assertEquals(ssi2, spyClient.getDefectiveServers().toArray()[0]);
excludeServers = new ArrayList<>();
spyClient.genServerToBlocks(shuffleBlockInfoList.get(0), shuffleServerInfoList,
2, excludeServers, Maps.newHashMap(), Maps.newHashMap(), true);
assertEquals(2, excludeServers.size());
assertEquals(ssi1, excludeServers.get(0));
assertEquals(ssi3, excludeServers.get(1));
spyClient.genServerToBlocks(shuffleBlockInfoList.get(0), shuffleServerInfoList,
1, excludeServers, Maps.newHashMap(), Maps.newHashMap(), false);
assertEquals(3, excludeServers.size());
assertEquals(ssi2, excludeServers.get(2));

// Check whether it is normal when two shuffle servers in defectiveServers
spyClient.getDefectiveServers().add(ssi1);
assertEquals(2, spyClient.getDefectiveServers().size());
excludeServers = new ArrayList<>();
spyClient.genServerToBlocks(shuffleBlockInfoList.get(0), shuffleServerInfoList,
2, excludeServers, Maps.newHashMap(), Maps.newHashMap(), true);
assertEquals(2, excludeServers.size());
assertEquals(ssi3, excludeServers.get(0));
assertEquals(ssi1, excludeServers.get(1));
}

}