Skip to content

Commit

Permalink
Replace read/write lock in JarResource to avoid virtual threads pinning
Browse files Browse the repository at this point in the history
  • Loading branch information
mariofusco committed Jul 8, 2024
1 parent 7e98e77 commit ff4b32b
Show file tree
Hide file tree
Showing 4 changed files with 330 additions and 128 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package io.quarkus.bootstrap.runner;

import static io.quarkus.bootstrap.runner.VirtualThreadSupport.isVirtualThread;

import java.io.IOException;
import java.nio.file.Path;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.jar.JarFile;

import io.smallrye.common.io.jar.JarFiles;

public class JarFileReference {
// Guarded by an atomic reader counter that emulate the behaviour of a read/write lock.
// To enable virtual threads compatibility and avoid pinning it is not possible to use an explicit read/write lock
// because the jarFile access may happen inside a native call (for example triggered by the RunnerClassLoader)
// and then it is necessary to avoid blocking on it.
private final JarFile jarFile;

// The referenceCounter - 1 represents the number of effective readers (#aqcuire - #release), while the first
// reference is used to determine if a close has been required.
// The JarFileReference is created as already acquired and that's why the referenceCounter starts from 2
private final AtomicInteger referenceCounter = new AtomicInteger(2);

private JarFileReference(JarFile jarFile) {
this.jarFile = jarFile;
}

/**
* Increase the readers counter of the jarFile.
*
* @return true if the acquiring succeeded: it's now safe to access and use the inner jarFile.
* false if the jar reference is going to be closed and then no longer usable.
*/
private boolean acquire() {
while (true) {
int count = referenceCounter.get();
if (count == 0) {
return false;
}
if (referenceCounter.compareAndSet(count, count + 1)) {
return true;
}
}
}

/**
* Decrease the readers counter of the jarFile.
* If the counter drops to 0 and a release has been requested also closes the jarFile.
*
* @return true if the release also closes the underlying jarFile.
*/
private boolean release(JarResource jarResource) {
while (true) {
int count = referenceCounter.get();
if (count <= 0) {
throw new IllegalStateException(
"The reference counter cannot be negative, found: " + (referenceCounter.get() - 1));
}
if (referenceCounter.compareAndSet(count, count - 1)) {
if (count == 1) {
try {
jarFile.close();
} catch (IOException e) {
// ignore
} finally {
jarResource.jarFileReference.set(null);
}
return true;
}
return false;
}
}
}

/**
* Ask to close this reference.
* If there are no readers currently accessing the jarFile also close it, otherwise defer the closing when the last reader
* will leave.
*/
void close(JarResource jarResource) {
release(jarResource);
}

@FunctionalInterface
interface JarFileConsumer<T> {
T apply(JarFile jarFile, Path jarPath, String resource);
}

static <T> T withJarFile(JarResource jarResource, String resource, JarFileConsumer<T> fileConsumer) {

// Happy path: the jar reference already exists and it's ready to be used
final var localJarFileRefFuture = jarResource.jarFileReference.get();
if (localJarFileRefFuture != null && localJarFileRefFuture.isDone()) {
JarFileReference jarFileReference = localJarFileRefFuture.join();
if (jarFileReference.acquire()) {
return consumeSharedJarFile(jarFileReference, jarResource, resource, fileConsumer);
}
}

// There's no valid jar reference, so load a new one

// Platform threads can load the jarfile asynchronously and eventually blocking till not ready
// to avoid loading the same jarfile multiple times in parallel
if (!isVirtualThread()) {
// It's ok to eventually block on a join() here since we're sure this is used only by platform thread
return consumeSharedJarFile(asyncLoadAcquiredJarFile(jarResource).join(), jarResource, resource, fileConsumer);
}

// Virtual threads needs to load the jarfile synchronously to avoid blocking. This means that eventually
// multiple threads could load the same jarfile in parallel and this duplication has to be reconciled
final var newJarFileRef = syncLoadAcquiredJarFile(jarResource);
if (jarResource.jarFileReference.compareAndSet(localJarFileRefFuture, newJarFileRef) ||
jarResource.jarFileReference.compareAndSet(null, newJarFileRef)) {
// The new file reference has been successfully published and can be used by the current and other threads
// The join() cannot be blocking here since the CompletableFuture has been created already completed
return consumeSharedJarFile(newJarFileRef.join(), jarResource, resource, fileConsumer);
}

// The newly created file reference hasn't been published, so it can be used exclusively by the current virtual thread
return consumeUnsharedJarFile(newJarFileRef, jarResource, resource, fileConsumer);
}

private static <T> T consumeSharedJarFile(JarFileReference jarFileReference,
JarResource jarResource, String resource, JarFileConsumer<T> fileConsumer) {
try {
return fileConsumer.apply(jarFileReference.jarFile, jarResource.jarPath, resource);
} finally {
jarFileReference.release(jarResource);
}
}

private static <T> T consumeUnsharedJarFile(CompletableFuture<JarFileReference> jarFileReferenceFuture,
JarResource jarResource, String resource, JarFileConsumer<T> fileConsumer) {
JarFileReference jarFileReference = jarFileReferenceFuture.join();
try {
return fileConsumer.apply(jarFileReference.jarFile, jarResource.jarPath, resource);
} finally {
boolean closed = jarFileReference.release(jarResource);
assert !closed;
// Check one last time if the file reference can be published and reused by other threads, otherwise close it
if (!jarResource.jarFileReference.compareAndSet(null, jarFileReferenceFuture)) {
closed = jarFileReference.release(jarResource);
assert closed;
}
}
}

private static CompletableFuture<JarFileReference> syncLoadAcquiredJarFile(JarResource jarResource) {
try {
return CompletableFuture.completedFuture(new JarFileReference(JarFiles.create(jarResource.jarPath.toFile())));
} catch (IOException e) {
throw new RuntimeException("Failed to open " + jarResource.jarPath, e);
}
}

private static CompletableFuture<JarFileReference> asyncLoadAcquiredJarFile(JarResource jarResource) {
CompletableFuture<JarFileReference> newJarFileRef = new CompletableFuture<>();
do {
if (jarResource.jarFileReference.compareAndSet(null, newJarFileRef)) {
try {
newJarFileRef.complete(new JarFileReference(JarFiles.create(jarResource.jarPath.toFile())));
return newJarFileRef;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
newJarFileRef = jarResource.jarFileReference.get();
} while (newJarFileRef == null || !newJarFileRef.join().acquire());
return newJarFileRef;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,44 +11,28 @@
import java.security.ProtectionDomain;
import java.security.cert.Certificate;
import java.util.Objects;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;

import io.smallrye.common.io.jar.JarEntries;
import io.smallrye.common.io.jar.JarFiles;

/**
* A jar resource
*/
public class JarResource implements ClassLoadingResource {

private final ManifestInfo manifestInfo;
private final Path jarPath;

private final Lock readLock;
private final Lock writeLock;

private volatile ProtectionDomain protectionDomain;
private final ManifestInfo manifestInfo;

//Guarded by the read/write lock; open/close operations on the JarFile require the exclusive lock,
//while using an existing open reference can use the shared lock.
//If a lock is acquired, and as long as it's owned, we ensure that the zipFile reference
//points to an open JarFile instance, and read operations are valid.
//To close the jar, the exclusive lock must be owned, and reference will be set to null before releasing it.
//Likewise, opening a JarFile requires the exclusive lock.
private volatile JarFile zipFile;
final Path jarPath;
final AtomicReference<CompletableFuture<JarFileReference>> jarFileReference = new AtomicReference<>();

public JarResource(ManifestInfo manifestInfo, Path jarPath) {
this.manifestInfo = manifestInfo;
this.jarPath = jarPath;
final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
this.readLock = readWriteLock.readLock();
this.writeLock = readWriteLock.writeLock();
}

@Override
Expand All @@ -69,38 +53,48 @@ public void init() {

@Override
public byte[] getResourceData(String resource) {
final ZipFile zipFile = readLockAcquireAndGetJarReference();
try {
ZipEntry entry = zipFile.getEntry(resource);
return JarFileReference.withJarFile(this, resource, JarResourceDataProvider.INSTANCE);
}

private static class JarResourceDataProvider implements JarFileReference.JarFileConsumer<byte[]> {
private static final JarResourceDataProvider INSTANCE = new JarResourceDataProvider();

@Override
public byte[] apply(JarFile jarFile, Path path, String res) {
ZipEntry entry = jarFile.getEntry(res);
if (entry == null) {
return null;
}
try (InputStream is = zipFile.getInputStream(entry)) {
try (InputStream is = jarFile.getInputStream(entry)) {
byte[] data = new byte[(int) entry.getSize()];
int pos = 0;
int rem = data.length;
while (rem > 0) {
int read = is.read(data, pos, rem);
if (read == -1) {
throw new RuntimeException("Failed to read all data for " + resource);
throw new RuntimeException("Failed to read all data for " + res);
}
pos += read;
rem -= read;
}
return data;
} catch (IOException e) {
throw new RuntimeException("Failed to read zip entry " + resource, e);
throw new RuntimeException("Failed to read zip entry " + res, e);
}
} finally {
readLock.unlock();
}
}

@Override
public URL getResourceURL(String resource) {
final JarFile jarFile = readLockAcquireAndGetJarReference();
try {
JarEntry entry = jarFile.getJarEntry(resource);
return JarFileReference.withJarFile(this, resource, JarResourceURLProvider.INSTANCE);
}

private static class JarResourceURLProvider implements JarFileReference.JarFileConsumer<URL> {
private static final JarResourceURLProvider INSTANCE = new JarResourceURLProvider();

@Override
public URL apply(JarFile jarFile, Path path, String res) {
JarEntry entry = jarFile.getJarEntry(res);
if (entry == null) {
return null;
}
Expand All @@ -110,15 +104,7 @@ public URL getResourceURL(String resource) {
if (realName.endsWith("/")) {
realName = realName.substring(0, realName.length() - 1);
}
final URI jarUri = jarPath.toUri();
// first create a URI which includes both the jar file path and the relative resource name
// and then invoke a toURL on it. The URI reconstruction allows for any encoding to be done
// for the "path" which includes the "realName"
var ssp = new StringBuilder(jarUri.getPath().length() + realName.length() + 2);
ssp.append(jarUri.getPath());
ssp.append("!/");
ssp.append(realName);
final URL resUrl = new URI(jarUri.getScheme(), ssp.toString(), null).toURL();
final URL resUrl = getUrl(path, realName);
// wrap it up into a "jar" protocol URL
//horrible hack to deal with '?' characters in the URL
//seems to be the only way, the URI constructor just does not let you handle them in a sane way
Expand All @@ -136,8 +122,18 @@ public URL getResourceURL(String resource) {
} catch (MalformedURLException | URISyntaxException e) {
throw new RuntimeException(e);
}
} finally {
readLock.unlock();
}

private static URL getUrl(Path jarPath, String realName) throws MalformedURLException, URISyntaxException {
final URI jarUri = jarPath.toUri();
// first create a URI which includes both the jar file path and the relative resource name
// and then invoke a toURL on it. The URI reconstruction allows for any encoding to be done
// for the "path" which includes the "realName"
var ssp = new StringBuilder(jarUri.getPath().length() + realName.length() + 2);
ssp.append(jarUri.getPath());
ssp.append("!/");
ssp.append(realName);
return new URI(jarUri.getScheme(), ssp.toString(), null).toURL();
}
}

Expand All @@ -151,60 +147,15 @@ public ProtectionDomain getProtectionDomain() {
return protectionDomain;
}

private JarFile readLockAcquireAndGetJarReference() {
while (true) {
readLock.lock();
final JarFile zipFileLocal = this.zipFile;
if (zipFileLocal != null) {
//Expected fast path: returns a reference to the open JarFile while owning the readLock
return zipFileLocal;
} else {
//This Lock implementation doesn't allow upgrading a readLock to a writeLock, so release it
//as we're going to need the WriteLock.
readLock.unlock();
//trigger the JarFile being (re)opened.
ensureJarFileIsOpen();
//Now since we no longer own any lock, we need to try again to obtain the readLock
//and check for the reference still being valid.
//This exposes us to a race with closing the just-opened JarFile;
//however this should be extremely rare, so we can trust we won't loop much;
//A counter doesn't seem necessary, as in fact we know that methods close()
//and resetInternalCaches() are invoked each at most once, which limits the amount
//of loops here in practice.
}
}
}

private void ensureJarFileIsOpen() {
writeLock.lock();
try {
if (this.zipFile == null) {
try {
this.zipFile = JarFiles.create(jarPath.toFile());
} catch (IOException e) {
throw new RuntimeException("Failed to open " + jarPath, e);
}
}
} finally {
writeLock.unlock();
}
}

@Override
public void close() {
writeLock.lock();
try {
final JarFile zipFileLocal = this.zipFile;
if (zipFileLocal != null) {
try {
this.zipFile = null;
zipFileLocal.close();
} catch (Throwable e) {
//ignore
}
}
} finally {
writeLock.unlock();
var futureRef = jarFileReference.get();
if (futureRef != null) {
// The jarfile has been already used and it's going to be removed from the cache,
// so the future must be already completed
var ref = futureRef.getNow(null);
assert (ref != null);
ref.close(this);
}
}

Expand Down
Loading

0 comments on commit ff4b32b

Please sign in to comment.