Skip to content

Commit

Permalink
Probe tiflash (#2640)
Browse files Browse the repository at this point in the history
  • Loading branch information
shiyuhang0 authored Mar 13, 2023
1 parent f2ab73e commit 30cab9b
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 4 deletions.
2 changes: 1 addition & 1 deletion docs/userguide_3.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ spark.sql("select t1.id,t2.id from spark_catalog.default.t t1 left join tidb_cat
|------------------------------------------------|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `spark.tispark.pd.addresses` | `127.0.0.1:2379` | The addresses of PD cluster, which are split by comma |
| `spark.tispark.grpc.framesize` | `2147483647` | The maximum frame size of gRPC response in bytes (default 2G) |
| `spark.tispark.grpc.timeout_in_sec` | `10` | The gRPC timeout time in seconds |
| `spark.tispark.grpc.timeout_in_sec` | `180` | The gRPC timeout time in seconds |
| `spark.tispark.plan.allow_agg_pushdown` | `true` | Whether aggregations are allowed to push down to TiKV (in case of busy TiKV nodes) |
| `spark.tispark.plan.allow_index_read` | `true` | Whether index is enabled in planning (which might cause heavy pressure on TiKV) |
| `spark.tispark.index.scan_batch_size` | `20000` | The number of row key in batch for the concurrent index scan |
Expand Down
2 changes: 1 addition & 1 deletion tikv-client/scripts/proto.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ CURRENT_DIR=`pwd`
TISPARK_HOME="$(cd "`dirname "$0"`"/../..; pwd)"
cd $TISPARK_HOME/tikv-client

tipb_hash=45e60c77588fefe421d0f6f29426a36b5b15171d
tipb_hash=4d69c6f95e683dfb5859277563bf896aca06ec34

if [ -d "tipb" ]; then
cd tipb; git fetch -p; git checkout ${tipb_hash}; cd ..
Expand Down
34 changes: 34 additions & 0 deletions tikv-client/src/main/java/com/pingcap/tikv/ClientSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
import com.pingcap.tikv.util.ConvertUpstreamUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import lombok.Getter;
import org.tikv.common.TiSession;
Expand All @@ -38,6 +42,9 @@ public class ClientSession implements AutoCloseable {
private volatile boolean isClosed = false;
private volatile TiTimestamp snapshotTimestamp;
private volatile Catalog snapshotCatalog;
// storeStatusCache will be init at @see DAGIterator#isMppStoreAlive
private volatile Map<String, Boolean> storeStatusCache;
private ScheduledExecutorService storeStatusCacheExecutor;

/**
* This is used for setting call back function to invalidate cache information
Expand Down Expand Up @@ -133,6 +140,30 @@ public static ClientSession getInstance(com.pingcap.tikv.TiConfiguration config)
}
}

public Map<String, Boolean> getStoreStatusCache() {
if (storeStatusCache == null) {
synchronized (this) {
if (storeStatusCache == null) {
storeStatusCache = new ConcurrentHashMap<>();
storeStatusCacheExecutor = Executors.newScheduledThreadPool(1);
storeStatusCacheExecutor.scheduleAtFixedRate(
() -> {
storeStatusCache.replaceAll(
(k, v) ->
TiFlashClient.isMppAlive(
this.tiKVSession
.getChannelFactory()
.getChannel(k, this.tiKVSession.getPDClient().getHostMapping())));
},
0,
5,
TimeUnit.SECONDS);
}
}
}
return storeStatusCache;
}

@Override
public void close() throws Exception {
shutdown();
Expand All @@ -147,6 +178,9 @@ private synchronized void shutdown() throws Exception {
if (snapshotCatalog != null) {
snapshotCatalog.close();
}
if (storeStatusCacheExecutor != null) {
storeStatusCacheExecutor.shutdownNow();
}
synchronized (sessionCachedMap) {
sessionCachedMap.remove(conf.getPdAddrsString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ public class TiConfiguration implements Serializable {
// --------------- different ------------------
// |item | tispark | client-java |
// --------------------------------------------
// |timeout | 10minutes | 200ms |
// |timeout | 3minutes | 200ms |
// |maxFrameSize | 2Gb | 512MB |
// --------------------------------------------
private int maxFrameSize = DEF_MAX_FRAME_SIZE;
private static final int DEF_MAX_FRAME_SIZE = 2147483647; // 2 GB
private int timeout = DEF_TIMEOUT;
private TimeUnit timeoutUnit = DEF_TIMEOUT_UNIT;
private static final int DEF_TIMEOUT = 10;
private static final int DEF_TIMEOUT = 3;
private static final TimeUnit DEF_TIMEOUT_UNIT = TimeUnit.MINUTES;

// -------------- only in tispark -------------
Expand Down
78 changes: 78 additions & 0 deletions tikv-client/src/main/java/com/pingcap/tikv/TiFlashClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2023 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.pingcap.tikv;

import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.AbstractGRPCClient;
import org.tikv.common.PDClient;
import org.tikv.common.TiConfiguration;
import org.tikv.common.region.TiStore;
import org.tikv.common.util.ChannelFactory;
import org.tikv.kvproto.Mpp;
import org.tikv.kvproto.TikvGrpc;
import org.tikv.shade.io.grpc.ManagedChannel;
import org.tikv.shade.io.grpc.stub.ClientCalls;

public class TiFlashClient
extends AbstractGRPCClient<TikvGrpc.TikvBlockingStub, TikvGrpc.TikvFutureStub> {

private static final Logger logger = LoggerFactory.getLogger(TiFlashClient.class);
protected final PDClient pdClient;
protected TiStore store;

public TiFlashClient(
TiConfiguration conf, TiStore store, ChannelFactory channelFactory, PDClient pdClient) {
super(conf, channelFactory);
this.pdClient = pdClient;
this.store = store;
String addressStr = store.getStore().getAddress();
ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping());
this.blockingStub = TikvGrpc.newBlockingStub(channel);
this.asyncStub = TikvGrpc.newFutureStub(channel);
}

@Override
protected TikvGrpc.TikvBlockingStub getBlockingStub() {
return blockingStub.withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS);
}

@Override
protected TikvGrpc.TikvFutureStub getAsyncStub() {
return asyncStub.withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS);
}

@Override
public void close() throws Exception {}

public static boolean isMppAlive(ManagedChannel channel) {
TikvGrpc.TikvBlockingStub stub =
TikvGrpc.newBlockingStub(channel).withDeadlineAfter(500, TimeUnit.MILLISECONDS);
Supplier<Mpp.IsAliveRequest> factory = () -> Mpp.IsAliveRequest.newBuilder().build();
try {
Mpp.IsAliveResponse resp =
ClientCalls.blockingUnaryCall(
stub.getChannel(), TikvGrpc.getIsAliveMethod(), stub.getCallOptions(), factory.get());
return resp != null && resp.getAvailable();
} catch (Exception e) {
logger.warn("Call mpp isAlive fail with Exception", e);
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.pingcap.tidb.tipb.EncodeType;
import com.pingcap.tidb.tipb.SelectResponse;
import com.pingcap.tikv.ClientSession;
import com.pingcap.tikv.TiFlashClient;
import com.pingcap.tikv.meta.TiDAGRequest.PushDownType;
import com.pingcap.tikv.operation.SchemaInfer;
import java.util.ArrayDeque;
Expand All @@ -31,6 +32,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ExecutorCompletionService;
import org.slf4j.Logger;
Expand All @@ -43,6 +45,7 @@
import org.tikv.common.region.TiStoreType;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.common.util.RangeSplitter;
import org.tikv.common.util.RangeSplitter.RegionTask;
import org.tikv.kvproto.Coprocessor;

Expand Down Expand Up @@ -222,6 +225,15 @@ private SelectResponse process(RegionTask regionTask) {
.getTiKVSession()
.getRegionStoreClientBuilder()
.build(region, store, storeType);
// if mpp store is not alive, drop it and generate a new task.
if (storeType == TiStoreType.TiFlash && !isMppStoreAlive(store.getAddress())) {
logger.info("Re-splitting region task due to TiFlash is unavailable");
remainTasks.addAll(
RangeSplitter.newSplitter(clientSession.getTiKVSession().getRegionManager())
.splitRangeByRegion(ranges, storeType));
continue;
}

client.addResolvedLocks(startTs, resolvedLocks);
Collection<RegionTask> tasks =
client.coprocess(backOffer, dagRequest, ranges, responseQueue, startTs);
Expand Down Expand Up @@ -280,4 +292,22 @@ private Iterator<SelectResponse> processByStreaming(RegionTask regionTask) {
throw new TiClientInternalException("Error Closing Store client.", e);
}
}

// See https://github.com/pingcap/tispark/pull/2619 for more details
public Boolean isMppStoreAlive(String address) {
try {
Map<String, Boolean> storeStatusCache = clientSession.getStoreStatusCache();
return storeStatusCache.computeIfAbsent(
address,
key ->
TiFlashClient.isMppAlive(
clientSession
.getTiKVSession()
.getChannelFactory()
.getChannel(
address, clientSession.getTiKVSession().getPDClient().getHostMapping())));
} catch (Exception e) {
throw new TiClientInternalException("Error get MppStore Status.", e);
}
}
}

0 comments on commit 30cab9b

Please sign in to comment.