Skip to content

Commit

Permalink
[SPARK-49795][CORE][SQL][SS][DSTREAM][ML][MLLIB][K8S][YARN][EXAMPLES]…
Browse files Browse the repository at this point in the history
… Clean up deprecated Guava API usage

### What changes were proposed in this pull request?
In order to clean up the usage of deprecated Guava API, the following changes were made in this pr:

1. Replaced `Files.write(from, to, charset)` with `Files.asCharSink(to, charset).write(from)`. This change was made with reference to:

https://github.com/google/guava/blob/0c33dd12b193402cdf6962d43d69743521aa2f76/guava/src/com/google/common/io/Files.java#L275-L291

```java
/**
   * Writes a character sequence (such as a string) to a file using the given character set.
   *
   * param from the character sequence to write
   * param to the destination file
   * param charset the charset used to encode the output stream; see {link StandardCharsets} for
   *     helpful predefined constants
   * throws IOException if an I/O error occurs
   * deprecated Prefer {code asCharSink(to, charset).write(from)}.
   */
  Deprecated
  InlineMe(
      replacement = "Files.asCharSink(to, charset).write(from)",
      imports = "com.google.common.io.Files")
  public static void write(CharSequence from, File to, Charset charset) throws IOException {
    asCharSink(to, charset).write(from);
  }
```

2. Replaced `Files.append(from, to, charset)` with `Files.asCharSink(to, charset, FileWriteMode.APPEND).write(from)`. This change was made with reference to:

https://github.com/google/guava/blob/0c33dd12b193402cdf6962d43d69743521aa2f76/guava/src/com/google/common/io/Files.java#L350-L368

```java
/**
   * Appends a character sequence (such as a string) to a file using the given character set.
   *
   * param from the character sequence to append
   * param to the destination file
   * param charset the charset used to encode the output stream; see {link StandardCharsets} for
   *     helpful predefined constants
   * throws IOException if an I/O error occurs
   * deprecated Prefer {code asCharSink(to, charset, FileWriteMode.APPEND).write(from)}. This
   *     method is scheduled to be removed in October 2019.
   */
  Deprecated
  InlineMe(
      replacement = "Files.asCharSink(to, charset, FileWriteMode.APPEND).write(from)",
      imports = {"com.google.common.io.FileWriteMode", "com.google.common.io.Files"})
  public
  static void append(CharSequence from, File to, Charset charset) throws IOException {
    asCharSink(to, charset, FileWriteMode.APPEND).write(from);
  }
```

3. Replaced `Files.toString(file, charset)` with `Files.asCharSource(file, charset).read()`. This change was made with reference to:

https://github.com/google/guava/blob/0c33dd12b193402cdf6962d43d69743521aa2f76/guava/src/com/google/common/io/Files.java#L243-L259

```java
  /**
   * Reads all characters from a file into a {link String}, using the given character set.
   *
   * param file the file to read from
   * param charset the charset used to decode the input stream; see {link StandardCharsets} for
   *     helpful predefined constants
   * return a string containing all the characters from the file
   * throws IOException if an I/O error occurs
   * deprecated Prefer {code asCharSource(file, charset).read()}.
   */
  Deprecated
  InlineMe(
      replacement = "Files.asCharSource(file, charset).read()",
      imports = "com.google.common.io.Files")
  public static String toString(File file, Charset charset) throws IOException {
    return asCharSource(file, charset).read();
  }
```

4. Replaced `HashFunction.murmur3_32()` with `HashFunction.murmur3_32_fixed()`. This change was made with reference to:

https://github.com/google/guava/blob/0c33dd12b193402cdf6962d43d69743521aa2f76/guava/src/com/google/common/hash/Hashing.java#L99-L115

```java
 /**
   * Returns a hash function implementing the <a
   * href="https://github.com/aappleby/smhasher/blob/master/src/MurmurHash3.cpp">32-bit murmur3
   * algorithm, x86 variant</a> (little-endian variant), using the given seed value, <b>with a known
   * bug</b> as described in the deprecation text.
   *
   * <p>The C++ equivalent is the MurmurHash3_x86_32 function (Murmur3A), which however does not
   * have the bug.
   *
   * deprecated This implementation produces incorrect hash values from the {link
   *     HashFunction#hashString} method if the string contains non-BMP characters. Use {link
   *     #murmur3_32_fixed()} instead.
   */
  Deprecated
  public static HashFunction murmur3_32() {
    return Murmur3_32HashFunction.MURMUR3_32;
  }
```

This change is safe for Spark. The difference between `MURMUR3_32` and `MURMUR3_32_FIXED` lies in the different `supplementaryPlaneFix` parameters passed when constructing the `Murmur3_32HashFunction`:

https://github.com/google/guava/blob/0c33dd12b193402cdf6962d43d69743521aa2f76/guava/src/com/google/common/hash/Murmur3_32HashFunction.java#L56-L59

```java
  static final HashFunction MURMUR3_32 =
      new Murmur3_32HashFunction(0, /* supplementaryPlaneFix= */ false);
  static final HashFunction MURMUR3_32_FIXED =
      new Murmur3_32HashFunction(0, /* supplementaryPlaneFix= */ true);
```

However, the `supplementaryPlaneFix` parameter is only used in `Murmur3_32HashFunction#hashString`, and Spark only utilizes `Murmur3_32HashFunction#hashInt`. Therefore, there will be no logical changes to this method after this change.

https://github.com/google/guava/blob/0c33dd12b193402cdf6962d43d69743521aa2f76/guava/src/com/google/common/hash/Murmur3_32HashFunction.java#L108-L114

```java
  Override
  public HashCode hashInt(int input) {
    int k1 = mixK1(input);
    int h1 = mixH1(seed, k1);

    return fmix(h1, Ints.BYTES);
  }
```

5. Replaced `Throwables.propagateIfPossible(throwable, declaredType)` with `Throwables.throwIfInstanceOf(throwable, declaredType)` + `Throwables.throwIfUnchecked(throwable)`. This change was made with reference to:

https://github.com/google/guava/blob/0c33dd12b193402cdf6962d43d69743521aa2f76/guava/src/com/google/common/base/Throwables.java#L156-L175

```
/**
   * Propagates {code throwable} exactly as-is, if and only if it is an instance of {link
   * RuntimeException}, {link Error}, or {code declaredType}.
   *
   * <p><b>Discouraged</b> in favor of calling {link #throwIfInstanceOf} and {link
   * #throwIfUnchecked}.
   *
   * param throwable the Throwable to possibly propagate
   * param declaredType the single checked exception type declared by the calling method
   * deprecated Use a combination of {link #throwIfInstanceOf} and {link #throwIfUnchecked},
   *     which togther provide the same behavior except that they reject {code null}.
   */
  Deprecated
  J2ktIncompatible
  GwtIncompatible // propagateIfInstanceOf
  public static <X extends Throwable> void propagateIfPossible(
      CheckForNull Throwable throwable, Class<X> declaredType) throws X {
    propagateIfInstanceOf(throwable, declaredType);
    propagateIfPossible(throwable);
  }
```

6. Made modifications to `Throwables.propagate` with reference to https://github.com/google/guava/wiki/Why-we-deprecated-Throwables.propagate

- For cases where it is known to be a checked exception, including `IOException`, `GeneralSecurityException`, `SaslException`, and `RocksDBException`, none of which are subclasses of `RuntimeException` or `Error`, directly replaced `Throwables.propagate(e)` with `throw new RuntimeException(e);`.

- For cases where it cannot be determined whether it is a checked exception or an unchecked exception or Error, use

```java
throwIfUnchecked(e);
throw new RuntimeException(e);
```

 to replace `Throwables.propagate(e)`。

https://github.com/google/guava/blob/0c33dd12b193402cdf6962d43d69743521aa2f76/guava/src/com/google/common/base/Throwables.java#L199-L235

```java
  /**
   * ...
   * deprecated To preserve behavior, use {code throw e} or {code throw new RuntimeException(e)}
   *     directly, or use a combination of {link #throwIfUnchecked} and {code throw new
   *     RuntimeException(e)}. But consider whether users would be better off if your API threw a
   *     different type of exception. For background on the deprecation, read <a
   *     href="https://goo.gl/Ivn2kc">Why we deprecated {code Throwables.propagate}</a>.
   */
  CanIgnoreReturnValue
  J2ktIncompatible
  GwtIncompatible
  Deprecated
  public static RuntimeException propagate(Throwable throwable) {
    throwIfUnchecked(throwable);
    throw new RuntimeException(throwable);
  }
```

### Why are the changes needed?
Clean up deprecated Guava API usage.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GitHub Actions

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #48248 from LuciferYang/guava-deprecation.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
LuciferYang authored and dongjoon-hyun committed Oct 1, 2024
1 parent 8d0f6fb commit c0a1ea2
Show file tree
Hide file tree
Showing 58 changed files with 148 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@ public Iterator<T> iterator() {
iteratorTracker.add(new WeakReference<>(it));
return it;
} catch (Exception e) {
throw Throwables.propagate(e);
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public boolean hasNext() {
try {
close();
} catch (IOException ioe) {
throw Throwables.propagate(ioe);
throw new RuntimeException(ioe);
}
}
return next != null;
Expand All @@ -151,7 +151,8 @@ public T next() {
next = null;
return ret;
} catch (Exception e) {
throw Throwables.propagate(e);
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,8 @@ public Iterator<T> iterator() {
iteratorTracker.add(new WeakReference<>(it));
return it;
} catch (Exception e) {
throw Throwables.propagate(e);
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public boolean hasNext() {
try {
close();
} catch (IOException ioe) {
throw Throwables.propagate(ioe);
throw new RuntimeException(ioe);
}
}
return next != null;
Expand All @@ -137,7 +137,8 @@ public T next() {
next = null;
return ret;
} catch (Exception e) {
throw Throwables.propagate(e);
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,9 +290,11 @@ public void onFailure(Throwable e) {
try {
return result.get(timeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw Throwables.propagate(e.getCause());
Throwables.throwIfUnchecked(e.getCause());
throw new RuntimeException(e.getCause());
} catch (Exception e) {
throw Throwables.propagate(e);
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,8 @@ public void operationComplete(final Future<Channel> handshakeFuture) {
logger.error("Exception while bootstrapping client after {} ms", e,
MDC.of(LogKeys.BOOTSTRAP_TIME$.MODULE$, bootstrapTimeMs));
client.close();
throw Throwables.propagate(e);
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
long postBootstrap = System.nanoTime();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.security.GeneralSecurityException;
import java.util.concurrent.TimeoutException;

import com.google.common.base.Throwables;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
Expand Down Expand Up @@ -80,7 +79,7 @@ public void doBootstrap(TransportClient client, Channel channel) {
doSparkAuth(client, channel);
client.setClientId(appId);
} catch (GeneralSecurityException | IOException e) {
throw Throwables.propagate(e);
throw new RuntimeException(e);
} catch (RuntimeException e) {
// There isn't a good exception that can be caught here to know whether it's really
// OK to switch back to SASL (because the server doesn't speak the new protocol). So
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ protected boolean doAuthChallenge(
try {
engine.close();
} catch (Exception e) {
throw Throwables.propagate(e);
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;

import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;

import org.apache.spark.internal.SparkLogger;
Expand Down Expand Up @@ -62,7 +61,7 @@ public SparkSaslClient(String secretKeyId, SecretKeyHolder secretKeyHolder, bool
this.saslClient = Sasl.createSaslClient(new String[] { DIGEST }, null, null, DEFAULT_REALM,
saslProps, new ClientCallbackHandler());
} catch (SaslException e) {
throw Throwables.propagate(e);
throw new RuntimeException(e);
}
}

Expand All @@ -72,7 +71,7 @@ public synchronized byte[] firstToken() {
try {
return saslClient.evaluateChallenge(new byte[0]);
} catch (SaslException e) {
throw Throwables.propagate(e);
throw new RuntimeException(e);
}
} else {
return new byte[0];
Expand All @@ -98,7 +97,7 @@ public synchronized byte[] response(byte[] token) {
try {
return saslClient != null ? saslClient.evaluateChallenge(token) : new byte[0];
} catch (SaslException e) {
throw Throwables.propagate(e);
throw new RuntimeException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.Map;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
Expand Down Expand Up @@ -94,7 +93,7 @@ public SparkSaslServer(
this.saslServer = Sasl.createSaslServer(DIGEST, null, DEFAULT_REALM, saslProps,
new DigestCallbackHandler());
} catch (SaslException e) {
throw Throwables.propagate(e);
throw new RuntimeException(e);
}
}

Expand All @@ -119,7 +118,7 @@ public synchronized byte[] response(byte[] token) {
try {
return saslServer != null ? saslServer.evaluateResponse(token) : new byte[0];
} catch (SaslException e) {
throw Throwables.propagate(e);
throw new RuntimeException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.network.shuffledb;

import com.google.common.base.Throwables;

import java.io.IOException;
import java.util.Map;
import java.util.NoSuchElementException;
Expand Down Expand Up @@ -47,7 +45,7 @@ public boolean hasNext() {
try {
close();
} catch (IOException ioe) {
throw Throwables.propagate(ioe);
throw new RuntimeException(ioe);
}
}
return next != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import java.io.IOException;

import com.google.common.base.Throwables;
import org.rocksdb.RocksDBException;

/**
Expand All @@ -37,7 +36,7 @@ public void put(byte[] key, byte[] value) {
try {
db.put(key, value);
} catch (RocksDBException e) {
throw Throwables.propagate(e);
throw new RuntimeException(e);
}
}

Expand All @@ -46,7 +45,7 @@ public byte[] get(byte[] key) {
try {
return db.get(key);
} catch (RocksDBException e) {
throw Throwables.propagate(e);
throw new RuntimeException(e);
}
}

Expand All @@ -55,7 +54,7 @@ public void delete(byte[] key) {
try {
db.delete(key);
} catch (RocksDBException e) {
throw Throwables.propagate(e);
throw new RuntimeException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.Map;
import java.util.NoSuchElementException;

import com.google.common.base.Throwables;
import org.rocksdb.RocksIterator;

/**
Expand Down Expand Up @@ -52,7 +51,7 @@ public boolean hasNext() {
try {
close();
} catch (IOException ioe) {
throw Throwables.propagate(ioe);
throw new RuntimeException(ioe);
}
}
return next != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ class KafkaTestUtils(
}

kdc.getKrb5conf.delete()
Files.write(krb5confStr, kdc.getKrb5conf, StandardCharsets.UTF_8)
Files.asCharSink(kdc.getKrb5conf, StandardCharsets.UTF_8).write(krb5confStr)
logDebug(s"krb5.conf file content: $krb5confStr")
}

Expand Down Expand Up @@ -240,7 +240,7 @@ class KafkaTestUtils(
| principal="$kafkaServerUser@$realm";
|};
""".stripMargin.trim
Files.write(content, file, StandardCharsets.UTF_8)
Files.asCharSink(file, StandardCharsets.UTF_8).write(content)
logDebug(s"Created JAAS file: ${file.getPath}")
logDebug(s"JAAS file content: $content")
file.getAbsolutePath()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ private boolean isEndOfStream() {

private void checkReadException() throws IOException {
if (readAborted) {
Throwables.propagateIfPossible(readException, IOException.class);
Throwables.throwIfInstanceOf(readException, IOException.class);
Throwables.throwIfUnchecked(readException);
throw new IOException(readException);
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ private[spark] object TestUtils extends SparkTestUtils {
def createTempScriptWithExpectedOutput(dir: File, prefix: String, output: String): String = {
val file = File.createTempFile(prefix, ".sh", dir)
val script = s"cat <<EOF\n$output\nEOF\n"
Files.write(script, file, StandardCharsets.UTF_8)
Files.asCharSink(file, StandardCharsets.UTF_8).write(script)
JavaFiles.setPosixFilePermissions(file.toPath,
EnumSet.of(OWNER_READ, OWNER_EXECUTE, OWNER_WRITE))
file.getPath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.nio.charset.StandardCharsets

import scala.jdk.CollectionConverters._

import com.google.common.io.Files
import com.google.common.io.{Files, FileWriteMode}

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.{DriverDescription, SparkHadoopUtil}
Expand Down Expand Up @@ -216,7 +216,7 @@ private[deploy] class DriverRunner(
val redactedCommand = Utils.redactCommandLineArgs(conf, builder.command.asScala.toSeq)
.mkString("\"", "\" \"", "\"")
val header = "Launch Command: %s\n%s\n\n".format(redactedCommand, "=" * 40)
Files.append(header, stderr, StandardCharsets.UTF_8)
Files.asCharSink(stderr, StandardCharsets.UTF_8, FileWriteMode.APPEND).write(header)
CommandUtils.redirectStream(process.getErrorStream, stderr)
}
runCommandWithRetry(ProcessBuilderLike(builder), initialize, supervise)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ private[deploy] class ExecutorRunner(
stdoutAppender = FileAppender(process.getInputStream, stdout, conf, true)

val stderr = new File(executorDir, "stderr")
Files.write(header, stderr, StandardCharsets.UTF_8)
Files.asCharSink(stderr, StandardCharsets.UTF_8).write(header)
stderrAppender = FileAppender(process.getErrorStream, stderr, conf, true)

state = ExecutorState.RUNNING
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64)
/**
* Re-hash a value to deal better with hash functions that don't differ in the lower bits.
*/
private def rehash(h: Int): Int = Hashing.murmur3_32().hashInt(h).asInt()
private def rehash(h: Int): Int = Hashing.murmur3_32_fixed().hashInt(h).asInt()

/** Double the table's size and re-hash everything */
protected def growTable(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ class OpenHashSet[@specialized(Long, Int, Double, Float) T: ClassTag](
/**
* Re-hash a value to deal better with hash functions that don't differ in the lower bits.
*/
private def hashcode(h: Int): Int = Hashing.murmur3_32().hashInt(h).asInt()
private def hashcode(h: Int): Int = Hashing.murmur3_32_fixed().hashInt(h).asInt()

private def nextPowerOf2(n: Int): Int = {
if (n == 0) {
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/java/test/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -960,7 +960,7 @@ public void textFiles() throws IOException {
rdd.saveAsTextFile(outputDir);
// Read the plain text file and check it's OK
File outputFile = new File(outputDir, "part-00000");
String content = Files.toString(outputFile, StandardCharsets.UTF_8);
String content = Files.asCharSource(outputFile, StandardCharsets.UTF_8).read();
assertEquals("1\n2\n3\n4\n", content);
// Also try reading it in as a text file RDD
List<String> expected = Arrays.asList("1", "2", "3", "4");
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,8 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {

for (i <- 0 until 8) {
val tempFile = new File(tempDir, s"part-0000$i")
Files.write("someline1 in file1\nsomeline2 in file1\nsomeline3 in file1", tempFile,
StandardCharsets.UTF_8)
Files.asCharSink(tempFile, StandardCharsets.UTF_8)
.write("someline1 in file1\nsomeline2 in file1\nsomeline3 in file1")
}

for (p <- Seq(1, 2, 8)) {
Expand Down
Loading

0 comments on commit c0a1ea2

Please sign in to comment.