Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
5a00631
[SPARK-45442][PYTHON][DOCS] Refine docstring of DataFrame.show
allisonwang-db Oct 12, 2023
e69752a
[SPARK-45488][SQL] XML: Add support for value in 'rowTag' element
shujingyang-db Oct 12, 2023
385a2f5
[SPARK-45510][SQL] Replace `scala.collection.generic.Growable` to `sc…
Hisoka-X Oct 12, 2023
470aaf3
[SPARK-43664][CONNECT][PS] Raise exception for `ps.sql` with Pandas-o…
itholic Oct 12, 2023
f0b2e6d
[SPARK-45132][SQL] Fix IDENTIFIER for function invocation
srielau Oct 12, 2023
d1bd21a
[SPARK-45502][BUILD] Upgrade Kafka to 3.6.0
dengziming Oct 12, 2023
b0576ff
[SPARK-45501][CORE][SQL] Use pattern matching for type checking and c…
LuciferYang Oct 12, 2023
7663fdf
[SPARK-45499][CORE][TESTS][FOLLOWUP] Use `ReferenceQueue#remove` inst…
LuciferYang Oct 12, 2023
e720cce
[SPARK-45516][CONNECT] Include QueryContext in SparkThrowable proto m…
heyihong Oct 12, 2023
280f6b3
[SPARK-45505][PYTHON] Refactor analyzeInPython to make it reusable
allisonwang-db Oct 13, 2023
12638b8
[SPARK-45418][SQL][PYTHON][CONNECT] Change current_database() column …
michaelzhan-db Oct 13, 2023
4df9fa2
[SPARK-45521][ML] Avoid re-computation of nnz in `VectorAssembler`
zhengruifeng Oct 13, 2023
f72b87b
[SPARK-45515][CORE][SQL] Use enhanced `switch` expressions to replace…
LuciferYang Oct 13, 2023
12880c8
[SPARK-45266][PYTHON][FOLLOWUP] Fix to resolve UnresolvedPolymorphicP…
ueshin Oct 13, 2023
fb3b707
[SPARK-45498][CORE] Followup: Ignore task completion from old stage a…
mayurdb Oct 13, 2023
f1f856d
[SPARK-45526][PYTHON][DOCS] Improve the example of DataFrameReader/Wr…
HyukjinKwon Oct 13, 2023
96bac6c
[SPARK-45508][CORE] Add "--add-opens=java.base/jdk.internal.ref=ALL-U…
JoshRosen Oct 13, 2023
0257b77
[SPARK-45532][DOCS] Restore codetabs for the Protobuf Data Source Guide
yaooqinn Oct 13, 2023
3e2470d
[SPARK-45536][BUILD] Lower the default `-Xmx` of `build/mvn` to 3g
LuciferYang Oct 13, 2023
632eabd
[SPARK-45495][CORE] Support stage level task resource profile for k8s…
wbo4958 Oct 13, 2023
bc7094e
[SPARK-45426][CORE] Add support for a ReloadingX509TrustManager
hasnain-db Oct 13, 2023
d4ceedd
[SPARK-45487] Fix SQLSTATEs and temp errors
srielau Oct 13, 2023
6f46ea2
[SPARK-45513][CORE][SQL][MLLIB][CONNECT] Replace `scala.runtime.Tuple…
beliefer Oct 14, 2023
2f6cca5
[SPARK-45439][SQL][UI] Reduce memory usage of LiveStageMetrics.accumI…
JoshRosen Oct 14, 2023
26aaf1c
[SPARK-45427][CORE] Add RPC SSL settings to SSLOptions and SparkTrans…
hasnain-db Oct 14, 2023
30d9570
[SPARK-45530][CORE] Use `java.lang.ref.Cleaner` instead of `finalize`…
LuciferYang Oct 14, 2023
41420ca
[SPARK-45429][CORE] Add helper classes for SSL RPC communication
hasnain-db Oct 15, 2023
c921ade
Revert "[SPARK-45536][BUILD] Lower the default `-Xmx` of `build/mvn` …
LuciferYang Oct 15, 2023
c93d40d
[SPARK-44594][SS] Remove redundant method parameter in kafka connector
zhaomin1423 Oct 15, 2023
8c3923a
[SPARK-45486][CONNECT] Make add_artifact request idempotent
cdkrot Oct 16, 2023
db35193
[SPARK-45476][SQL][FOLLOWUP] Raise exception directly instead of call…
itholic Oct 16, 2023
d8dbb66
[SPARK-45514][SQL][MLLIB] Replace `scala.runtime.Tuple3Zipped` to `sc…
beliefer Oct 16, 2023
4fd2d68
[SPARK-45009][SQL] Decorrelate predicate subqueries in join condition
andylam-db Oct 16, 2023
c35ca7c
[SPARK-45539][SS] Add assert and log to indicate watermark definition…
anishshri-db Oct 16, 2023
c691177
[SPARK-45491] Add missing SQLSTATES 2/2
srielau Oct 16, 2023
7796d8a
[SPARK-45531][SQL][DOCS] Add more comments and rename some variable n…
beliefer Oct 16, 2023
9bdad31
[SPARK-45538][PYTHON][CONNECT] pyspark connect overwrite_partitions bug
Oct 16, 2023
6994bad
[SPARK-44262][SQL] Add `dropTable` and `getInsertStatement` to JdbcDi…
Hisoka-X Oct 16, 2023
62653b9
Revert "[SPARK-45502][BUILD] Upgrade Kafka to 3.6.0"
LuciferYang Oct 16, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ public class ArrayKeyIndexType {

@Override
public boolean equals(Object o) {
if (o instanceof ArrayKeyIndexType) {
ArrayKeyIndexType other = (ArrayKeyIndexType) o;
if (o instanceof ArrayKeyIndexType other) {
return Arrays.equals(key, other.key) && Arrays.equals(id, other.id);
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ public class CustomType1 {

@Override
public boolean equals(Object o) {
if (o instanceof CustomType1) {
CustomType1 other = (CustomType1) o;
if (o instanceof CustomType1 other) {
return id.equals(other.id) && name.equals(other.name);
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ public class CustomType2 {

@Override
public boolean equals(Object o) {
if (o instanceof CustomType2) {
CustomType2 other = (CustomType2) o;
if (o instanceof CustomType2 other) {
return id.equals(other.id) && parentId.equals(other.parentId);
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ public class IntKeyType {

@Override
public boolean equals(Object o) {
if (o instanceof IntKeyType) {
IntKeyType other = (IntKeyType) o;
if (o instanceof IntKeyType other) {
return key == other.key && id.equals(other.id) && values.equals(other.values);
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ public int hashCode() {

@Override
public boolean equals(Object other) {
if (other instanceof ChunkFetchFailure) {
ChunkFetchFailure o = (ChunkFetchFailure) other;
if (other instanceof ChunkFetchFailure o) {
return streamChunkId.equals(o.streamChunkId) && errorString.equals(o.errorString);
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ public int hashCode() {

@Override
public boolean equals(Object other) {
if (other instanceof ChunkFetchRequest) {
ChunkFetchRequest o = (ChunkFetchRequest) other;
if (other instanceof ChunkFetchRequest o) {
return streamChunkId.equals(o.streamChunkId);
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ public int hashCode() {

@Override
public boolean equals(Object other) {
if (other instanceof ChunkFetchSuccess) {
ChunkFetchSuccess o = (ChunkFetchSuccess) other;
if (other instanceof ChunkFetchSuccess o) {
return streamChunkId.equals(o.streamChunkId) && super.equals(o);
}
return false;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.network.protocol;

import java.io.EOFException;
import java.io.InputStream;
import javax.annotation.Nullable;

import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.stream.ChunkedStream;
import io.netty.handler.stream.ChunkedInput;

import org.apache.spark.network.buffer.ManagedBuffer;

/**
* A wrapper message that holds two separate pieces (a header and a body).
*
* The header must be a ByteBuf, while the body can be any InputStream or ChunkedStream
*/
public class EncryptedMessageWithHeader implements ChunkedInput<ByteBuf> {

@Nullable private final ManagedBuffer managedBuffer;
private final ByteBuf header;
private final int headerLength;
private final Object body;
private final long bodyLength;
private long totalBytesTransferred;

/**
* Construct a new EncryptedMessageWithHeader.
*
* @param managedBuffer the {@link ManagedBuffer} that the message body came from. This needs to
* be passed in so that the buffer can be freed when this message is
* deallocated. Ownership of the caller's reference to this buffer is
* transferred to this class, so if the caller wants to continue to use the
* ManagedBuffer in other messages then they will need to call retain() on
* it before passing it to this constructor.
* @param header the message header.
* @param body the message body.
* @param bodyLength the length of the message body, in bytes.
*/

public EncryptedMessageWithHeader(
@Nullable ManagedBuffer managedBuffer, ByteBuf header, Object body, long bodyLength) {
Preconditions.checkArgument(body instanceof InputStream || body instanceof ChunkedStream,
"Body must be an InputStream or a ChunkedStream.");
this.managedBuffer = managedBuffer;
this.header = header;
this.headerLength = header.readableBytes();
this.body = body;
this.bodyLength = bodyLength;
this.totalBytesTransferred = 0;
}

@Override
public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
return readChunk(ctx.alloc());
}

@Override
public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception {
if (isEndOfInput()) {
return null;
}

if (totalBytesTransferred < headerLength) {
totalBytesTransferred += headerLength;
return header.retain();
} else if (body instanceof InputStream) {
InputStream stream = (InputStream) body;
int available = stream.available();
if (available <= 0) {
available = (int) (length() - totalBytesTransferred);
} else {
available = (int) Math.min(available, length() - totalBytesTransferred);
}
ByteBuf buffer = allocator.buffer(available);
int toRead = Math.min(available, buffer.writableBytes());
int read = buffer.writeBytes(stream, toRead);
if (read >= 0) {
totalBytesTransferred += read;
return buffer;
} else {
throw new EOFException("Unable to read bytes from InputStream");
}
} else if (body instanceof ChunkedStream) {
ChunkedStream stream = (ChunkedStream) body;
long old = stream.transferredBytes();
ByteBuf buffer = stream.readChunk(allocator);
long read = stream.transferredBytes() - old;
if (read >= 0) {
totalBytesTransferred += read;
assert(totalBytesTransferred <= length());
return buffer;
} else {
throw new EOFException("Unable to read bytes from ChunkedStream");
}
} else {
return null;
}
}

@Override
public long length() {
return headerLength + bodyLength;
}

@Override
public long progress() {
return totalBytesTransferred;
}

@Override
public boolean isEndOfInput() throws Exception {
return (headerLength + bodyLength) == totalBytesTransferred;
}

@Override
public void close() throws Exception {
header.release();
if (managedBuffer != null) {
managedBuffer.release();
}
if (body instanceof InputStream) {
((InputStream) body).close();
} else if (body instanceof ChunkedStream) {
((ChunkedStream) body).close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ public int hashCode() {

@Override
public boolean equals(Object other) {
if (other instanceof MergedBlockMetaRequest) {
MergedBlockMetaRequest o = (MergedBlockMetaRequest) other;
if (other instanceof MergedBlockMetaRequest o) {
return requestId == o.requestId && shuffleId == o.shuffleId &&
shuffleMergeId == o.shuffleMergeId && reduceId == o.reduceId &&
Objects.equal(appId, o.appId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,23 +55,23 @@ enum Type implements Encodable {

public static Type decode(ByteBuf buf) {
byte id = buf.readByte();
switch (id) {
case 0: return ChunkFetchRequest;
case 1: return ChunkFetchSuccess;
case 2: return ChunkFetchFailure;
case 3: return RpcRequest;
case 4: return RpcResponse;
case 5: return RpcFailure;
case 6: return StreamRequest;
case 7: return StreamResponse;
case 8: return StreamFailure;
case 9: return OneWayMessage;
case 10: return UploadStream;
case 11: return MergedBlockMetaRequest;
case 12: return MergedBlockMetaSuccess;
case -1: throw new IllegalArgumentException("User type messages cannot be decoded.");
default: throw new IllegalArgumentException("Unknown message type: " + id);
}
return switch (id) {
case 0 -> ChunkFetchRequest;
case 1 -> ChunkFetchSuccess;
case 2 -> ChunkFetchFailure;
case 3 -> RpcRequest;
case 4 -> RpcResponse;
case 5 -> RpcFailure;
case 6 -> StreamRequest;
case 7 -> StreamResponse;
case 8 -> StreamFailure;
case 9 -> OneWayMessage;
case 10 -> UploadStream;
case 11 -> MergedBlockMetaRequest;
case 12 -> MergedBlockMetaSuccess;
case -1 -> throw new IllegalArgumentException("User type messages cannot be decoded.");
default -> throw new IllegalArgumentException("Unknown message type: " + id);
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,48 +49,21 @@ public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
}

private Message decode(Message.Type msgType, ByteBuf in) {
switch (msgType) {
case ChunkFetchRequest:
return ChunkFetchRequest.decode(in);

case ChunkFetchSuccess:
return ChunkFetchSuccess.decode(in);

case ChunkFetchFailure:
return ChunkFetchFailure.decode(in);

case RpcRequest:
return RpcRequest.decode(in);

case RpcResponse:
return RpcResponse.decode(in);

case RpcFailure:
return RpcFailure.decode(in);

case OneWayMessage:
return OneWayMessage.decode(in);

case StreamRequest:
return StreamRequest.decode(in);

case StreamResponse:
return StreamResponse.decode(in);

case StreamFailure:
return StreamFailure.decode(in);

case UploadStream:
return UploadStream.decode(in);

case MergedBlockMetaRequest:
return MergedBlockMetaRequest.decode(in);

case MergedBlockMetaSuccess:
return MergedBlockMetaSuccess.decode(in);

default:
throw new IllegalArgumentException("Unexpected message type: " + msgType);
}
return switch (msgType) {
case ChunkFetchRequest -> ChunkFetchRequest.decode(in);
case ChunkFetchSuccess -> ChunkFetchSuccess.decode(in);
case ChunkFetchFailure -> ChunkFetchFailure.decode(in);
case RpcRequest -> RpcRequest.decode(in);
case RpcResponse -> RpcResponse.decode(in);
case RpcFailure -> RpcFailure.decode(in);
case OneWayMessage -> OneWayMessage.decode(in);
case StreamRequest -> StreamRequest.decode(in);
case StreamResponse -> StreamResponse.decode(in);
case StreamFailure -> StreamFailure.decode(in);
case UploadStream -> UploadStream.decode(in);
case MergedBlockMetaRequest -> MergedBlockMetaRequest.decode(in);
case MergedBlockMetaSuccess -> MergedBlockMetaSuccess.decode(in);
default -> throw new IllegalArgumentException("Unexpected message type: " + msgType);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ public void encode(ChannelHandlerContext ctx, Message in, List<Object> out) thro
isBodyInFrame = in.isBodyInFrame();
} catch (Exception e) {
in.body().release();
if (in instanceof AbstractResponseMessage) {
AbstractResponseMessage resp = (AbstractResponseMessage) in;
if (in instanceof AbstractResponseMessage resp) {
// Re-encode this message as a failure response.
String error = e.getMessage() != null ? e.getMessage() : "null";
logger.error(String.format("Error processing %s for client %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ public int hashCode() {

@Override
public boolean equals(Object other) {
if (other instanceof OneWayMessage) {
OneWayMessage o = (OneWayMessage) other;
if (other instanceof OneWayMessage o) {
return super.equals(o);
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ public int hashCode() {

@Override
public boolean equals(Object other) {
if (other instanceof RpcFailure) {
RpcFailure o = (RpcFailure) other;
if (other instanceof RpcFailure o) {
return requestId == o.requestId && errorString.equals(o.errorString);
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ public int hashCode() {

@Override
public boolean equals(Object other) {
if (other instanceof RpcRequest) {
RpcRequest o = (RpcRequest) other;
if (other instanceof RpcRequest o) {
return requestId == o.requestId && super.equals(o);
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ public int hashCode() {

@Override
public boolean equals(Object other) {
if (other instanceof RpcResponse) {
RpcResponse o = (RpcResponse) other;
if (other instanceof RpcResponse o) {
return requestId == o.requestId && super.equals(o);
}
return false;
Expand Down
Loading