Skip to content

Commit d278e3a

Browse files
shiyuhang0iosmanthussunxiaoguang
authored
[close #699] Rebase release 3.3.1.11 (#704)
Co-authored-by: iosmanthus <myosmanthustree@gmail.com> Co-authored-by: Xiaoguang Sun <sunxiaoguang@users.noreply.github.com> close #699
1 parent 3e02966 commit d278e3a

File tree

9 files changed

+163
-60
lines changed

9 files changed

+163
-60
lines changed

pom.xml

+8-3
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
<protobuf.version>3.5.1</protobuf.version>
5858
<log4j.version>1.2.17</log4j.version>
5959
<slf4j.version>1.7.16</slf4j.version>
60-
<grpc.version>1.38.0</grpc.version>
60+
<grpc.version>1.48.0</grpc.version>
6161
<netty.tcnative.version>2.0.34.Final</netty.tcnative.version>
6262
<gson.version>2.8.9</gson.version>
6363
<powermock.version>1.6.6</powermock.version>
@@ -75,12 +75,12 @@
7575
<dependency>
7676
<groupId>com.google.protobuf</groupId>
7777
<artifactId>protobuf-java</artifactId>
78-
<version>3.16.1</version>
78+
<version>3.19.6</version>
7979
</dependency>
8080
<dependency>
8181
<groupId>com.google.protobuf</groupId>
8282
<artifactId>protobuf-java-util</artifactId>
83-
<version>3.16.1</version>
83+
<version>3.19.6</version>
8484
</dependency>
8585
<dependency>
8686
<groupId>io.perfmark</groupId>
@@ -232,6 +232,11 @@
232232
<version>3.9</version>
233233
<scope>compile</scope>
234234
</dependency>
235+
<dependency>
236+
<groupId>commons-codec</groupId>
237+
<artifactId>commons-codec</artifactId>
238+
<version>1.15</version>
239+
</dependency>
235240
<dependency>
236241
<groupId>org.apache.httpcomponents</groupId>
237242
<artifactId>httpclient</artifactId>

src/main/java/io/grpc/internal/ClientCallImpl.java

+14-8
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import static io.grpc.Status.DEADLINE_EXCEEDED;
2525
import static io.grpc.internal.GrpcUtil.CONTENT_ACCEPT_ENCODING_KEY;
2626
import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY;
27+
import static io.grpc.internal.GrpcUtil.CONTENT_LENGTH_KEY;
2728
import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY;
2829
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
2930
import static java.lang.Math.max;
@@ -33,6 +34,7 @@
3334
import io.grpc.Attributes;
3435
import io.grpc.CallOptions;
3536
import io.grpc.ClientCall;
37+
import io.grpc.ClientStreamTracer;
3638
import io.grpc.Codec;
3739
import io.grpc.Compressor;
3840
import io.grpc.CompressorRegistry;
@@ -166,6 +168,7 @@ static void prepareHeaders(
166168
DecompressorRegistry decompressorRegistry,
167169
Compressor compressor,
168170
boolean fullStreamDecompression) {
171+
headers.discardAll(CONTENT_LENGTH_KEY);
169172
headers.discardAll(MESSAGE_ENCODING_KEY);
170173
if (compressor != Codec.Identity.NONE) {
171174
headers.put(MESSAGE_ENCODING_KEY, compressor.getMessageEncoding());
@@ -260,10 +263,13 @@ public void runInContext() {
260263
effectiveDeadline, context.getDeadline(), callOptions.getDeadline());
261264
stream = clientStreamProvider.newStream(method, callOptions, headers, context);
262265
} else {
266+
ClientStreamTracer[] tracers =
267+
GrpcUtil.getClientStreamTracers(callOptions, headers, 0, false);
263268
stream =
264269
new FailingClientStream(
265270
DEADLINE_EXCEEDED.withDescription(
266-
"ClientCall started after deadline exceeded: " + effectiveDeadline));
271+
"ClientCall started after deadline exceeded: " + effectiveDeadline),
272+
tracers);
267273
}
268274

269275
if (callExecutorIsDirect) {
@@ -363,12 +369,14 @@ private static void logIfContextNarrowedTimeout(
363369
StringBuilder builder =
364370
new StringBuilder(
365371
String.format(
366-
"Call timeout set to '%d' ns, due to context deadline.", effectiveTimeout));
372+
Locale.US,
373+
"Call timeout set to '%d' ns, due to context deadline.",
374+
effectiveTimeout));
367375
if (callDeadline == null) {
368376
builder.append(" Explicit call timeout was not set.");
369377
} else {
370378
long callTimeout = callDeadline.timeRemaining(TimeUnit.NANOSECONDS);
371-
builder.append(String.format(" Explicit call timeout was '%d' ns.", callTimeout));
379+
builder.append(String.format(Locale.US, " Explicit call timeout was '%d' ns.", callTimeout));
372380
}
373381

374382
log.fine(builder.toString());
@@ -562,6 +570,9 @@ public void setMessageCompression(boolean enabled) {
562570

563571
@Override
564572
public boolean isReady() {
573+
if (halfCloseCalled) {
574+
return false;
575+
}
565576
return stream.isReady();
566577
}
567578

@@ -711,11 +722,6 @@ private void runInternal() {
711722
}
712723
}
713724

714-
@Override
715-
public void closed(Status status, Metadata trailers) {
716-
closed(status, RpcProgress.PROCESSED, trailers);
717-
}
718-
719725
@Override
720726
public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
721727
PerfMark.startTask("ClientStreamListener.closed", tag);

src/main/java/io/netty/buffer/PoolArena.java

+51-40
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ enum SizeClass {
5757

5858
final int numSmallSubpagePools;
5959
final int directMemoryCacheAlignment;
60-
final int directMemoryCacheAlignmentMask;
6160
private final PoolSubpage<T>[] smallSubpagePools;
6261

6362
private final PoolChunkList<T> q050;
@@ -97,7 +96,6 @@ protected PoolArena(
9796
super(pageSize, pageShifts, chunkSize, cacheAlignment);
9897
this.parent = parent;
9998
directMemoryCacheAlignment = cacheAlignment;
100-
directMemoryCacheAlignmentMask = cacheAlignment - 1;
10199

102100
numSmallSubpagePools = nSubpages;
103101
smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
@@ -183,17 +181,23 @@ private void tcacheAllocateSmall(
183181
return;
184182
}
185183

186-
/**
187-
* Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and {@link
188-
* PoolChunk#free(long)} may modify the doubly linked list as well.
184+
/*
185+
* Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
186+
* {@link PoolChunk#free(long)} may modify the doubly linked list as well.
189187
*/
190188
final PoolSubpage<T> head = smallSubpagePools[sizeIdx];
191189
final boolean needsNormalAllocation;
192190
synchronized (head) {
193191
final PoolSubpage<T> s = head.next;
194192
needsNormalAllocation = s == head;
195193
if (!needsNormalAllocation) {
196-
assert s.doNotDestroy && s.elemSize == sizeIdx2size(sizeIdx);
194+
assert s.doNotDestroy && s.elemSize == sizeIdx2size(sizeIdx)
195+
: "doNotDestroy="
196+
+ s.doNotDestroy
197+
+ ", elemSize="
198+
+ s.elemSize
199+
+ ", sizeIdx="
200+
+ sizeIdx;
197201
long handle = s.allocate();
198202
assert handle >= 0;
199203
s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity, cache);
@@ -221,7 +225,7 @@ private void tcacheAllocateNormal(
221225
}
222226
}
223227

224-
// Method must be called inside synchronized(this) { ... } block
228+
// Method must be called inside synchronized(this) { ... } block
225229
private void allocateNormal(
226230
PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolThreadCache threadCache) {
227231
if (q050.allocate(buf, reqCapacity, sizeIdx, threadCache)
@@ -272,7 +276,7 @@ void free(
272276
}
273277
}
274278

275-
private SizeClass sizeClass(long handle) {
279+
private static SizeClass sizeClass(long handle) {
276280
return isSubpage(handle) ? SizeClass.Small : SizeClass.Normal;
277281
}
278282

@@ -499,6 +503,25 @@ public long numActiveBytes() {
499503
return max(0, val);
500504
}
501505

506+
/**
507+
* Return the number of bytes that are currently pinned to buffer instances, by the arena. The
508+
* pinned memory is not accessible for use by any other allocation, until the buffers using have
509+
* all been released.
510+
*/
511+
public long numPinnedBytes() {
512+
long val =
513+
activeBytesHuge
514+
.value(); // Huge chunks are exact-sized for the buffers they were allocated to.
515+
synchronized (this) {
516+
for (int i = 0; i < chunkListMetrics.size(); i++) {
517+
for (PoolChunkMetric m : chunkListMetrics.get(i)) {
518+
val += ((PoolChunk<?>) m).pinnedBytes();
519+
}
520+
}
521+
}
522+
return max(0, val);
523+
}
524+
502525
protected abstract PoolChunk<T> newChunk(
503526
int pageSize, int maxPageIdx, int pageShifts, int chunkSize);
504527

@@ -588,13 +611,8 @@ private void destroyPoolChunkLists(PoolChunkList<T>... chunkLists) {
588611

589612
static final class HeapArena extends PoolArena<byte[]> {
590613

591-
HeapArena(
592-
PooledByteBufAllocator parent,
593-
int pageSize,
594-
int pageShifts,
595-
int chunkSize,
596-
int directMemoryCacheAlignment) {
597-
super(parent, pageSize, pageShifts, chunkSize, directMemoryCacheAlignment);
614+
HeapArena(PooledByteBufAllocator parent, int pageSize, int pageShifts, int chunkSize) {
615+
super(parent, pageSize, pageShifts, chunkSize, 0);
598616
}
599617

600618
private static byte[] newByteArray(int size) {
@@ -610,12 +628,12 @@ boolean isDirect() {
610628
protected PoolChunk<byte[]> newChunk(
611629
int pageSize, int maxPageIdx, int pageShifts, int chunkSize) {
612630
return new PoolChunk<byte[]>(
613-
this, newByteArray(chunkSize), pageSize, pageShifts, chunkSize, maxPageIdx, 0);
631+
this, null, newByteArray(chunkSize), pageSize, pageShifts, chunkSize, maxPageIdx);
614632
}
615633

616634
@Override
617635
protected PoolChunk<byte[]> newUnpooledChunk(int capacity) {
618-
return new PoolChunk<byte[]>(this, newByteArray(capacity), capacity, 0);
636+
return new PoolChunk<byte[]>(this, null, newByteArray(capacity), capacity);
619637
}
620638

621639
@Override
@@ -656,40 +674,33 @@ boolean isDirect() {
656674
return true;
657675
}
658676

659-
// mark as package-private, only for unit test
660-
int offsetCacheLine(ByteBuffer memory) {
661-
// We can only calculate the offset if Unsafe is present as otherwise directBufferAddress(...)
662-
// will
663-
// throw an NPE.
664-
int remainder =
665-
HAS_UNSAFE
666-
? (int)
667-
(PlatformDependent.directBufferAddress(memory) & directMemoryCacheAlignmentMask)
668-
: 0;
669-
670-
// offset = alignment - address & (alignment - 1)
671-
return directMemoryCacheAlignment - remainder;
672-
}
673-
674677
@Override
675678
protected PoolChunk<ByteBuffer> newChunk(
676679
int pageSize, int maxPageIdx, int pageShifts, int chunkSize) {
677680
if (directMemoryCacheAlignment == 0) {
681+
ByteBuffer memory = allocateDirect(chunkSize);
678682
return new PoolChunk<ByteBuffer>(
679-
this, allocateDirect(chunkSize), pageSize, pageShifts, chunkSize, maxPageIdx, 0);
683+
this, memory, memory, pageSize, pageShifts, chunkSize, maxPageIdx);
680684
}
681-
final ByteBuffer memory = allocateDirect(chunkSize + directMemoryCacheAlignment);
685+
686+
final ByteBuffer base = allocateDirect(chunkSize + directMemoryCacheAlignment);
687+
final ByteBuffer memory =
688+
PlatformDependent.alignDirectBuffer(base, directMemoryCacheAlignment);
682689
return new PoolChunk<ByteBuffer>(
683-
this, memory, pageSize, pageShifts, chunkSize, maxPageIdx, offsetCacheLine(memory));
690+
this, base, memory, pageSize, pageShifts, chunkSize, maxPageIdx);
684691
}
685692

686693
@Override
687694
protected PoolChunk<ByteBuffer> newUnpooledChunk(int capacity) {
688695
if (directMemoryCacheAlignment == 0) {
689-
return new PoolChunk<ByteBuffer>(this, allocateDirect(capacity), capacity, 0);
696+
ByteBuffer memory = allocateDirect(capacity);
697+
return new PoolChunk<ByteBuffer>(this, memory, memory, capacity);
690698
}
691-
final ByteBuffer memory = allocateDirect(capacity + directMemoryCacheAlignment);
692-
return new PoolChunk<ByteBuffer>(this, memory, capacity, offsetCacheLine(memory));
699+
700+
final ByteBuffer base = allocateDirect(capacity + directMemoryCacheAlignment);
701+
final ByteBuffer memory =
702+
PlatformDependent.alignDirectBuffer(base, directMemoryCacheAlignment);
703+
return new PoolChunk<ByteBuffer>(this, base, memory, capacity);
693704
}
694705

695706
private static ByteBuffer allocateDirect(int capacity) {
@@ -701,9 +712,9 @@ private static ByteBuffer allocateDirect(int capacity) {
701712
@Override
702713
protected void destroyChunk(PoolChunk<ByteBuffer> chunk) {
703714
if (PlatformDependent.useDirectBufferNoCleaner()) {
704-
PlatformDependent.freeDirectNoCleaner(chunk.memory);
715+
PlatformDependent.freeDirectNoCleaner((ByteBuffer) chunk.base);
705716
} else {
706-
PlatformDependent.freeDirectBuffer(chunk.memory);
717+
PlatformDependent.freeDirectBuffer((ByteBuffer) chunk.base);
707718
}
708719
}
709720

src/main/java/org/tikv/common/AbstractGRPCClient.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -193,13 +193,17 @@ private boolean doCheckHealth(BackOffer backOffer, String addressStr, HostMappin
193193
HealthCheckResponse resp = stub.check(req);
194194
return resp.getStatus() == HealthCheckResponse.ServingStatus.SERVING;
195195
} catch (Exception e) {
196-
logger.warn("check health failed.", e);
196+
logger.warn("check health failed, addr: {}, caused by: {}", addressStr, e.getMessage());
197197
backOffer.doBackOff(BackOffFuncType.BoCheckHealth, e);
198198
}
199199
}
200200
}
201201

202202
protected boolean checkHealth(BackOffer backOffer, String addressStr, HostMapping hostMapping) {
203-
return doCheckHealth(backOffer, addressStr, hostMapping);
203+
try {
204+
return doCheckHealth(backOffer, addressStr, hostMapping);
205+
} catch (Exception e) {
206+
return false;
207+
}
204208
}
205209
}

src/main/java/org/tikv/common/PDClient.java

+22-6
Original file line numberDiff line numberDiff line change
@@ -462,14 +462,19 @@ private GetMembersResponse doGetMembers(BackOffer backOffer, URI uri) {
462462
}
463463
return resp;
464464
} catch (Exception e) {
465-
logger.warn("failed to get member from pd server.", e);
465+
logger.warn(
466+
"failed to get member from pd server from {}, caused by: {}", uri, e.getMessage());
466467
backOffer.doBackOff(BackOffFuncType.BoPDRPC, e);
467468
}
468469
}
469470
}
470471

471472
private GetMembersResponse getMembers(BackOffer backOffer, URI uri) {
472-
return doGetMembers(backOffer, uri);
473+
try {
474+
return doGetMembers(backOffer, uri);
475+
} catch (Exception e) {
476+
return null;
477+
}
473478
}
474479

475480
// return whether the leader has changed to target address `leaderUrlStr`.
@@ -524,13 +529,16 @@ synchronized boolean createFollowerClientWrapper(
524529
public void tryUpdateLeaderOrForwardFollower() {
525530
if (updateLeaderNotify.compareAndSet(false, true)) {
526531
try {
527-
BackOffer backOffer = defaultBackOffer();
528532
updateLeaderService.submit(
529533
() -> {
530534
try {
531-
updateLeaderOrForwardFollower(backOffer);
535+
updateLeaderOrForwardFollower();
536+
} catch (Exception e) {
537+
logger.info("update leader or forward follower failed", e);
538+
throw e;
532539
} finally {
533540
updateLeaderNotify.set(false);
541+
logger.info("updating leader finish");
534542
}
535543
});
536544
} catch (RejectedExecutionException e) {
@@ -540,11 +548,13 @@ public void tryUpdateLeaderOrForwardFollower() {
540548
}
541549
}
542550

543-
private synchronized void updateLeaderOrForwardFollower(BackOffer backOffer) {
551+
private synchronized void updateLeaderOrForwardFollower() {
552+
logger.warn("updating leader or forward follower");
544553
if (System.currentTimeMillis() - lastUpdateLeaderTime < MIN_TRY_UPDATE_DURATION) {
545554
return;
546555
}
547556
for (URI url : this.pdAddrs) {
557+
BackOffer backOffer = this.probeBackOffer();
548558
// since resp is null, we need update leader's address by walking through all pd server.
549559
GetMembersResponse resp = getMembers(backOffer, url);
550560
if (resp == null) {
@@ -602,8 +612,9 @@ && createFollowerClientWrapper(backOffer, followerUrlStr, leaderUrlStr)) {
602612
}
603613

604614
public void tryUpdateLeader() {
615+
logger.info("try update leader");
605616
for (URI url : this.pdAddrs) {
606-
BackOffer backOffer = defaultBackOffer();
617+
BackOffer backOffer = this.probeBackOffer();
607618
// since resp is null, we need update leader's address by walking through all pd server.
608619
GetMembersResponse resp = getMembers(backOffer, url);
609620
if (resp == null) {
@@ -856,4 +867,9 @@ public RequestKeyCodec getCodec() {
856867
private static BackOffer defaultBackOffer() {
857868
return ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF);
858869
}
870+
871+
private BackOffer probeBackOffer() {
872+
int maxSleep = (int) getTimeout() * 2;
873+
return ConcreteBackOffer.newCustomBackOff(maxSleep);
874+
}
859875
}

0 commit comments

Comments
 (0)