Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,24 @@ public <InputT, OutputT> DoFnInvoker<InputT, OutputT> invokerFor(DoFn<InputT, Ou
* Needed because generating an invoker class is expensive, and to avoid generating an excessive
* number of classes consuming PermGen memory.
*
* <p>The cache uses {@link DoFnSignature} as the key rather than just {@code Class<?>} because:
*
* <ul>
* <li>While Java type erasure means {@code MyDoFn<String>} and {@code MyDoFn<Integer>} share
* the same runtime {@code Class} object, in Apache Beam a DoFn's behavior isn't defined
* solely by its Class.
* <li>Users can override {@code getInputTypeDescriptor()} or use mechanisms that capture types
* to provide different type information for the same raw DoFn class.
* <li>The {@link DoFnSignature} properly captures this type information, ensuring that DoFns
* with different {@code TypeDescriptor}s receive correctly generated invokers.
* <li>This is critical for correct serialization (Coders) and schema verification.
* </ul>
*
* <p>Note that special care must be taken to enumerate this object as concurrent hash maps are <a
* href="https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/package-summary.html#Weakly>weakly
* consistent</a>.
*/
private final Map<Class<?>, Constructor<?>> byteBuddyInvokerConstructorCache =
private final Map<DoFnSignature, Constructor<?>> byteBuddyInvokerConstructorCache =
new ConcurrentHashMap<>();

private ByteBuddyDoFnInvokerFactory() {}
Expand Down Expand Up @@ -297,19 +310,22 @@ public <InputT, OutputT> DoFnInvoker<InputT, OutputT> newByteBuddyInvoker(
}

/**
* Returns a generated constructor for a {@link DoFnInvoker} for the given {@link DoFn} class.
* Returns a generated constructor for a {@link DoFnInvoker} for the given {@link DoFnSignature}.
*
* <p>These are cached such that at most one {@link DoFnInvoker} class exists for a given {@link
* DoFn} class.
* DoFnSignature}. Using {@link DoFnSignature} as the cache key (rather than just {@code Class})
* ensures that DoFns with the same raw class but different type descriptors (e.g., via overriding
* {@code getInputTypeDescriptor()}) receive correctly generated invokers for proper serialization
* and schema verification.
*/
private Constructor<?> getByteBuddyInvokerConstructor(DoFnSignature signature) {
Class<? extends DoFn<?, ?>> fnClass = signature.fnClass();
return byteBuddyInvokerConstructorCache.computeIfAbsent(
fnClass,
clazz -> {
Class<? extends DoFnInvoker<?, ?>> invokerClass = generateInvokerClass(signature);
signature,
sig -> {
Class<? extends DoFnInvoker<?, ?>> invokerClass = generateInvokerClass(sig);
try {
return invokerClass.getConstructor(clazz);
return invokerClass.getConstructor(fnClass);
} catch (IllegalArgumentException | NoSuchMethodException | SecurityException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1406,4 +1406,43 @@ public void processElement(BundleFinalizer bundleFinalizer) {

verify(mockBundleFinalizer).afterBundleCommit(eq(Instant.ofEpochSecond(42L)), eq(null));
}

/**
* Test that DoFn instances with different generic types but same raw class don't experience cache
* collisions in the invoker factory.
*
* <p>Due to Java type erasure, generic type parameters are erased at runtime for non-anonymous
* classes. However, when using anonymous subclasses (which preserve type information in the class
* metadata), different type instantiations should receive correctly generated invokers.
*/
@Test
public void testGenericDoFnCacheKeyingWithAnonymousSubclasses() {
// Create two anonymous subclasses with different generic types.
// Anonymous classes preserve generic type information in their class metadata,
// so they should be treated as different classes and not share cache entries.
DoFn<String, String> stringDoFn =
new DoFn<String, String>() {
@ProcessElement
public void processElement(@Element String element, OutputReceiver<String> out) {
out.output(element);
}
};

DoFn<Integer, Integer> integerDoFn =
new DoFn<Integer, Integer>() {
@ProcessElement
public void processElement(@Element Integer element, OutputReceiver<Integer> out) {
out.output(element);
}
};

DoFnInvoker<String, String> stringInvoker = DoFnInvokers.invokerFor(stringDoFn);
DoFnInvoker<Integer, Integer> integerInvoker = DoFnInvokers.invokerFor(integerDoFn);

// Verify that different anonymous classes produce different invoker classes
assertThat(
"Anonymous DoFn subclasses with different generic types should produce different invokers",
stringInvoker.getClass(),
org.hamcrest.Matchers.not(equalTo(integerInvoker.getClass())));
}
}
Loading