Skip to content

Commit

Permalink
Merge pull request #895 from ctripcorp/feature/aggregate_pull_instanc…
Browse files Browse the repository at this point in the history
…e_status

聚合console前端实例延迟查询
  • Loading branch information
LanternLee authored Oct 17, 2024
2 parents 3aa0383 + eee86c1 commit 90873da
Show file tree
Hide file tree
Showing 14 changed files with 184 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,11 @@ public List<RedisMeta> getAllInstancesOfShard(String cluster, String shard) {
return null;
}

@Override
public List<RedisMeta> getAllInstanceOfDc(String cluster, String dc) {
return null;
}

@Override
public boolean isAsymmetricCluster(String clusterName) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.ctrip.xpipe.tuple.Pair;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand All @@ -23,6 +24,8 @@ public interface ConsoleService extends CheckerService {

Long getInstanceDelayStatus(String ip, int port);

Map<HostPort, Long> getInstancesDelayStatus(List<HostPort> hostPorts);

Long getShardDelay(long shardId);

Long getInstanceDelayStatusFromParallelService(String ip, int port);
Expand All @@ -45,4 +48,6 @@ public interface ConsoleService extends CheckerService {

class ShardCheckerHealthCheckModels extends ArrayList<ShardCheckerHealthCheckModel> {}

class InstancesDelayStatusModels extends HashMap<HostPort, Long> {};

}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ public long getDelay(String ip, int port, String activeIdc) {
return service.getInstanceDelayStatus(ip, port);
}

public Map<HostPort, Long> getDelay(List<HostPort> hostPorts, String activeIdc) {
ConsoleService service = getServiceByDc(activeIdc);
return service.getInstancesDelayStatus(hostPorts);
}

public long getShardDelay(long shardId, String activeIdc) {
ConsoleService service = getServiceByDc(activeIdc);
return service.getShardDelay(shardId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public class DefaultConsoleService extends AbstractService implements ConsoleSer

private final String innerDelayStatusUrl;

private final String instancesDelayStatusUrl;

private final String delayStatusUrl;

private final String allDelayStatusUrl;
Expand Down Expand Up @@ -83,6 +85,7 @@ public DefaultConsoleService(String address){
pingStatusUrl = String.format("%s/api/redis/ping/{ip}/{port}", this.address);
innerShardDelayStatusUrl = String.format("%s/api/shard/inner/delay/{shardId}", this.address);
innerDelayStatusUrl = String.format("%s/api/redis/inner/delay/{ip}/{port}", this.address);
instancesDelayStatusUrl = String.format("%s/api/redises/inner/delay", this.address);
delayStatusUrl = String.format("%s/api/redis/delay/{ip}/{port}", this.address);
allDelayStatusUrl = String.format("%s/api/redis/inner/delay/all", this.address);
unhealthyInstanceUrl = String.format("%s/api/redis/inner/unhealthy", this.address);
Expand Down Expand Up @@ -140,6 +143,11 @@ public Long getInstanceDelayStatus(String ip, int port) {
return restTemplate.getForObject(innerDelayStatusUrl, Long.class, ip, port);
}

@Override
public Map<HostPort, Long> getInstancesDelayStatus(List<HostPort> hostPorts) {
return restTemplate.postForObject(instancesDelayStatusUrl, hostPorts, InstancesDelayStatusModels.class);
}

@Override
public Long getShardDelay(long shardId) {
return restTemplate.getForObject(innerShardDelayStatusUrl, Long.class, shardId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@
import com.ctrip.xpipe.tuple.Pair;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.*;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* @author wenchao.meng
Expand Down Expand Up @@ -63,6 +62,11 @@ public Long getInnerReplDelayMillis(@PathVariable String redisIp, @PathVariable
return delayService.getLocalCachedDelay(new HostPort(redisIp, redisPort));
}

@RequestMapping(value = "/redises/inner/delay", method = RequestMethod.POST)
public Map<HostPort, Long> getInnerReplDelaysMillis(@RequestBody List<HostPort> hostPorts) {
return hostPorts.stream().collect(Collectors.toMap(instance -> instance, instance -> delayService.getLocalCachedDelay(instance)));
}

@RequestMapping(value = "/redis/inner/delay/all", method = RequestMethod.GET)
public Map<HostPort, Long> getAllInnerReplDelayMills() {
return delayService.getDcCachedDelay(FoundationService.DEFAULT.getDataCenter());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ public Map<String, Long> getReplDelayMillis(@PathVariable String redisIp, @PathV
return ImmutableMap.of("delay", delayService.getDelay(new HostPort(redisIp, redisPort)));
}

@RequestMapping(value = "/redis/delay/{clusterType}/{redisIp}/{redisPort}", method = RequestMethod.GET)
public Map<String, Long> getReplDelayMillis(@PathVariable String clusterType, @PathVariable String redisIp, @PathVariable int redisPort) {
ClusterType type = ClusterType.lookup(clusterType);
return ImmutableMap.of("delay", delayService.getDelay(type, new HostPort(redisIp, redisPort)));
@RequestMapping(value = "/redises/delay/{dcId}/{clusterId}", method = RequestMethod.GET)
public Map<String, Map<HostPort, Long>> getAllReplDelayMillis(@PathVariable String dcId, @PathVariable String clusterId) {
return ImmutableMap.of("delay", delayService.getDelay(dcId, clusterId));
}


@RequestMapping(value = "/cross-master/delay/{dcId}/" + CLUSTER_ID_PATH_VARIABLE + "/" + SHARD_ID_PATH_VARIABLE, method = RequestMethod.GET)
public Map<String, Pair<HostPort, Long>> getCrossMasterReplHealthStatus(@PathVariable String dcId, @PathVariable String clusterId, @PathVariable String shardId) {
return crossMasterDelayService.getPeerMasterDelayFromSourceDc(dcId, clusterId, shardId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,11 @@ public List<RedisMeta> getAllInstancesOfShard(String cluster, String shard) {
return instances;
}

@Override
public List<RedisMeta> getAllInstanceOfDc(String cluster, String dc) {
return meta.getValue().getRedises(dc, cluster);
}

@Override
public String getDc(HostPort hostPort) {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.ctrip.xpipe.redis.console.service;

import com.ctrip.xpipe.cluster.ClusterType;
import com.ctrip.xpipe.endpoint.HostPort;
import com.ctrip.xpipe.redis.checker.RedisDelayManager;
import com.ctrip.xpipe.redis.console.model.consoleportal.UnhealthyInfoModel;

import java.util.List;
import java.util.Map;

/**
Expand All @@ -22,7 +22,7 @@ public interface DelayService extends RedisDelayManager {

long getDelay(HostPort hostPort);

long getDelay(ClusterType clusterType, HostPort hostPort);
Map<HostPort, Long> getDelay(String dcId, String clusterId);

long getLocalCachedDelay(HostPort hostPort);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,10 @@
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.GLOBAL_EXECUTOR;

Expand Down Expand Up @@ -101,18 +100,7 @@ public long getShardDelay(String clusterId, String shardId, Long shardDbId) {

@Override
public long getDelay(HostPort hostPort) {
Pair<String, String> clusterShard = metaCache.findClusterShard(hostPort);
if (null == clusterShard) return -1L;

ClusterType clusterType = metaCache.getClusterType(clusterShard.getKey());
ClusterType azGroupType = metaCache.getAzGroupType(hostPort);
String dcId = null;
if (clusterType.supportSingleActiveDC() && azGroupType != ClusterType.SINGLE_DC) {
dcId = metaCache.getActiveDc(hostPort);
} else if (clusterType.supportMultiActiveDC() || azGroupType == ClusterType.SINGLE_DC) {
dcId = metaCache.getDc(hostPort);
}

String dcId = getClusterActiveDc(hostPort);
if (StringUtil.isEmpty(dcId)) {
return -1L;
}
Expand All @@ -131,13 +119,60 @@ public long getDelay(HostPort hostPort) {
}

@Override
public long getDelay(ClusterType clusterType, HostPort hostPort) {
public Map<HostPort, Long> getDelay(String dcId, String clusterId) {
Map<HostPort, Long> result = new HashMap<>();
List<RedisMeta> allInstancesOfDc = metaCache.getAllInstanceOfDc(clusterId, dcId);
if (allInstancesOfDc == null || allInstancesOfDc.isEmpty()) {
return Collections.emptyMap();
}
List<HostPort> hostPorts = new ArrayList<>();
for (RedisMeta redisMeta : allInstancesOfDc) {
hostPorts.add(new HostPort(redisMeta.getIp(), redisMeta.getPort()));
}
ClusterType clusterType = metaCache.getClusterType(clusterId);
if (consoleConfig.getOwnClusterType().contains(clusterType.toString())) {
return getDelay(hostPort);
try {
String activeDcId = getClusterActiveDc(hostPorts.get(0));
if(!foundationService.getDataCenter().equals(activeDcId)) {
try {
Map<HostPort, Long> delay = consoleServiceManager.getDelay(hostPorts, activeDcId);
result = delay.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, longEntry -> TimeUnit.NANOSECONDS.toMillis(longEntry.getValue())));
} catch (Exception e) {
result.putAll(getFailDelayResult(hostPorts));
}
} else {
for (HostPort hostPort : hostPorts) {
result.put(hostPort, TimeUnit.NANOSECONDS.toMillis(hostPort2Delay.getOrDefault(hostPort, DelayAction.SAMPLE_LOST_AND_NO_PONG)));
}
}
} catch (Exception e) {
result = getFailDelayResult(hostPorts);
}
} else {
return consoleServiceManager.getDelayFromParallelService(hostPort.getHost(), hostPort.getPort());
result = getFailDelayResult(hostPorts);
}

return result;
}

private Map<HostPort, Long> getFailDelayResult(List<HostPort> instances) {
return instances.stream().collect(Collectors.toMap(instance -> instance, instance -> -1L));
}

protected String getClusterActiveDc(HostPort hostPort) {
Pair<String, String> clusterShard = metaCache.findClusterShard(hostPort);
if (null == clusterShard) return null;

ClusterType clusterType = metaCache.getClusterType(clusterShard.getKey());
ClusterType azGroupType = metaCache.getAzGroupType(hostPort);
String dcId = null;
if (clusterType.supportSingleActiveDC() && azGroupType != ClusterType.SINGLE_DC) {
dcId = metaCache.getActiveDc(hostPort);
} else if (clusterType.supportMultiActiveDC() || azGroupType == ClusterType.SINGLE_DC) {
dcId = metaCache.getDc(hostPort);
}
return dcId;
}

@Override
Expand Down
Loading

0 comments on commit 90873da

Please sign in to comment.