Skip to content

Commit 7151581

Browse files
authored
HBASE-27276 Reduce reflection overhead in Filter deserialization (#5488)
Signed-off-by: Nick Dimiduk <ndimiduk@apache.org> Signed-off-by: Duo Zhang <zhangduo@apache.org>
1 parent 7f3921a commit 7151581

File tree

9 files changed

+505
-11
lines changed

9 files changed

+505
-11
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@
127127
import org.apache.hadoop.hbase.util.DynamicClassLoader;
128128
import org.apache.hadoop.hbase.util.ExceptionUtil;
129129
import org.apache.hadoop.hbase.util.Methods;
130+
import org.apache.hadoop.hbase.util.ReflectedFunctionCache;
130131
import org.apache.hadoop.hbase.util.VersionInfo;
131132
import org.apache.hadoop.ipc.RemoteException;
132133
import org.apache.yetus.audience.InterfaceAudience;
@@ -306,6 +307,23 @@ public static boolean isClassLoaderLoaded() {
306307
return classLoaderLoaded;
307308
}
308309

310+
private static final String PARSE_FROM = "parseFrom";
311+
312+
// We don't bother using the dynamic CLASS_LOADER above, because currently we can't support
313+
// optimizing dynamically loaded classes. We can do it once we build for java9+, see the todo
314+
// in ReflectedFunctionCache
315+
private static final ReflectedFunctionCache<byte[], Filter> FILTERS =
316+
new ReflectedFunctionCache<>(Filter.class, byte[].class, PARSE_FROM);
317+
private static final ReflectedFunctionCache<byte[], ByteArrayComparable> COMPARATORS =
318+
new ReflectedFunctionCache<>(ByteArrayComparable.class, byte[].class, PARSE_FROM);
319+
320+
private static volatile boolean ALLOW_FAST_REFLECTION_FALLTHROUGH = true;
321+
322+
// Visible for tests
323+
public static void setAllowFastReflectionFallthrough(boolean val) {
324+
ALLOW_FAST_REFLECTION_FALLTHROUGH = val;
325+
}
326+
309327
/**
310328
* Prepend the passed bytes with four bytes of magic, {@link ProtobufMagic#PB_MAGIC}, to flag what
311329
* follows as a protobuf in hbase. Prepend these bytes to all content written to znodes, etc.
@@ -1554,13 +1572,23 @@ public static ComparatorProtos.Comparator toComparator(ByteArrayComparable compa
15541572
public static ByteArrayComparable toComparator(ComparatorProtos.Comparator proto)
15551573
throws IOException {
15561574
String type = proto.getName();
1557-
String funcName = "parseFrom";
15581575
byte[] value = proto.getSerializedComparator().toByteArray();
1576+
15591577
try {
1578+
ByteArrayComparable result = COMPARATORS.getAndCallByName(type, value);
1579+
if (result != null) {
1580+
return result;
1581+
}
1582+
1583+
if (!ALLOW_FAST_REFLECTION_FALLTHROUGH) {
1584+
throw new IllegalStateException("Failed to deserialize comparator " + type
1585+
+ " because fast reflection returned null and fallthrough is disabled");
1586+
}
1587+
15601588
Class<?> c = Class.forName(type, true, ClassLoaderHolder.CLASS_LOADER);
1561-
Method parseFrom = c.getMethod(funcName, byte[].class);
1589+
Method parseFrom = c.getMethod(PARSE_FROM, byte[].class);
15621590
if (parseFrom == null) {
1563-
throw new IOException("Unable to locate function: " + funcName + " in type: " + type);
1591+
throw new IOException("Unable to locate function: " + PARSE_FROM + " in type: " + type);
15641592
}
15651593
return (ByteArrayComparable) parseFrom.invoke(null, value);
15661594
} catch (Exception e) {
@@ -1577,12 +1605,22 @@ public static ByteArrayComparable toComparator(ComparatorProtos.Comparator proto
15771605
public static Filter toFilter(FilterProtos.Filter proto) throws IOException {
15781606
String type = proto.getName();
15791607
final byte[] value = proto.getSerializedFilter().toByteArray();
1580-
String funcName = "parseFrom";
1608+
15811609
try {
1610+
Filter result = FILTERS.getAndCallByName(type, value);
1611+
if (result != null) {
1612+
return result;
1613+
}
1614+
1615+
if (!ALLOW_FAST_REFLECTION_FALLTHROUGH) {
1616+
throw new IllegalStateException("Failed to deserialize comparator " + type
1617+
+ " because fast reflection returned null and fallthrough is disabled");
1618+
}
1619+
15821620
Class<?> c = Class.forName(type, true, ClassLoaderHolder.CLASS_LOADER);
1583-
Method parseFrom = c.getMethod(funcName, byte[].class);
1621+
Method parseFrom = c.getMethod(PARSE_FROM, byte[].class);
15841622
if (parseFrom == null) {
1585-
throw new IOException("Unable to locate function: " + funcName + " in type: " + type);
1623+
throw new IOException("Unable to locate function: " + PARSE_FROM + " in type: " + type);
15861624
}
15871625
return (Filter) parseFrom.invoke(c, value);
15881626
} catch (Exception e) {

hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestGet.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
*/
1818
package org.apache.hadoop.hbase.client;
1919

20+
import static org.hamcrest.CoreMatchers.instanceOf;
21+
import static org.hamcrest.MatcherAssert.assertThat;
2022
import static org.junit.Assert.assertEquals;
2123
import static org.junit.Assert.assertFalse;
2224
import static org.junit.Assert.assertTrue;
@@ -25,7 +27,6 @@
2527
import java.io.File;
2628
import java.io.FileOutputStream;
2729
import java.io.IOException;
28-
import java.lang.reflect.InvocationTargetException;
2930
import java.nio.ByteBuffer;
3031
import java.util.Arrays;
3132
import java.util.Base64;
@@ -34,7 +35,6 @@
3435
import org.apache.hadoop.conf.Configuration;
3536
import org.apache.hadoop.hbase.HBaseClassTestRule;
3637
import org.apache.hadoop.hbase.HBaseConfiguration;
37-
import org.apache.hadoop.hbase.exceptions.DeserializationException;
3838
import org.apache.hadoop.hbase.filter.Filter;
3939
import org.apache.hadoop.hbase.filter.FilterList;
4040
import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
@@ -48,6 +48,8 @@
4848
import org.junit.Test;
4949
import org.junit.experimental.categories.Category;
5050

51+
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
52+
5153
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
5254
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
5355

@@ -226,9 +228,9 @@ public void testDynamicFilter() throws Exception {
226228
ProtobufUtil.toGet(getProto2);
227229
fail("Should not be able to load the filter class");
228230
} catch (IOException ioe) {
229-
assertTrue(ioe.getCause() instanceof InvocationTargetException);
230-
InvocationTargetException ite = (InvocationTargetException) ioe.getCause();
231-
assertTrue(ite.getTargetException() instanceof DeserializationException);
231+
// This test is deserializing a FilterList, and one of the sub-filters is not found.
232+
// So the actual caused by is buried a few levels deep.
233+
assertThat(Throwables.getRootCause(ioe), instanceOf(ClassNotFoundException.class));
232234
}
233235
FileOutputStream fos = new FileOutputStream(jarFile);
234236
fos.write(Base64.getDecoder().decode(MOCK_FILTER_JAR));
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.util;
19+
20+
import edu.umd.cs.findbugs.annotations.Nullable;
21+
import java.util.concurrent.ConcurrentHashMap;
22+
import java.util.concurrent.ConcurrentMap;
23+
import java.util.concurrent.TimeUnit;
24+
import java.util.function.Function;
25+
import org.apache.yetus.audience.InterfaceAudience;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
/**
30+
* Cache to hold resolved Functions of a specific signature, generated through reflection. These can
31+
* be (relatively) costly to create, but then are much faster than typical Method.invoke calls when
32+
* executing. The cache is built-up on demand as calls are made to new classes. The functions are
33+
* cached for the lifetime of the process. If a function cannot be created (security reasons, method
34+
* not found, etc), a fallback function is cached which always returns null. Callers to
35+
* {@link #getAndCallByName(String, Object)} should have handling for null return values.
36+
* <p>
37+
* An instance is created for a specified baseClass (i.e. Filter), argClass (i.e. byte[]), and
38+
* static methodName to call. These are used to resolve a Function which delegates to that static
39+
* method, if it is found.
40+
* @param <I> the input argument type for the resolved functions
41+
* @param <R> the return type for the resolved functions
42+
*/
43+
@InterfaceAudience.Private
44+
public final class ReflectedFunctionCache<I, R> {
45+
46+
private static final Logger LOG = LoggerFactory.getLogger(ReflectedFunctionCache.class);
47+
48+
private final ConcurrentMap<String, Function<I, ? extends R>> lambdasByClass =
49+
new ConcurrentHashMap<>();
50+
private final Class<R> baseClass;
51+
private final Class<I> argClass;
52+
private final String methodName;
53+
private final ClassLoader classLoader;
54+
55+
public ReflectedFunctionCache(Class<R> baseClass, Class<I> argClass, String staticMethodName) {
56+
this.classLoader = getClass().getClassLoader();
57+
this.baseClass = baseClass;
58+
this.argClass = argClass;
59+
this.methodName = staticMethodName;
60+
}
61+
62+
/**
63+
* Get and execute the Function for the given className, passing the argument to the function and
64+
* returning the result.
65+
* @param className the full name of the class to lookup
66+
* @param argument the argument to pass to the function, if found.
67+
* @return null if a function is not found for classname, otherwise the result of the function.
68+
*/
69+
@Nullable
70+
public R getAndCallByName(String className, I argument) {
71+
// todo: if we ever make java9+ our lowest supported jdk version, we can
72+
// handle generating these for newly loaded classes from our DynamicClassLoader using
73+
// MethodHandles.privateLookupIn(). For now this is not possible, because we can't easily
74+
// create a privileged lookup in a non-default ClassLoader. So while this cache loads
75+
// over time, it will never load a custom filter from "hbase.dynamic.jars.dir".
76+
Function<I, ? extends R> lambda =
77+
ConcurrentMapUtils.computeIfAbsent(lambdasByClass, className, () -> loadFunction(className));
78+
79+
return lambda.apply(argument);
80+
}
81+
82+
private Function<I, ? extends R> loadFunction(String className) {
83+
long startTime = System.nanoTime();
84+
try {
85+
Class<?> clazz = Class.forName(className, false, classLoader);
86+
if (!baseClass.isAssignableFrom(clazz)) {
87+
LOG.debug("Requested class {} is not assignable to {}, skipping creation of function",
88+
className, baseClass.getName());
89+
return this::notFound;
90+
}
91+
return ReflectionUtils.getOneArgStaticMethodAsFunction(clazz, methodName, argClass,
92+
(Class<? extends R>) clazz);
93+
} catch (Throwable t) {
94+
LOG.debug("Failed to create function for {}", className, t);
95+
return this::notFound;
96+
} finally {
97+
LOG.debug("Populated cache for {} in {}ms", className,
98+
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
99+
}
100+
}
101+
102+
/**
103+
* In order to use computeIfAbsent, we can't store nulls in our cache. So we store a lambda which
104+
* resolves to null. The contract is that getAndCallByName returns null in this case.
105+
*/
106+
private R notFound(I argument) {
107+
return null;
108+
}
109+
110+
}

hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@
2121
import java.io.ByteArrayOutputStream;
2222
import java.io.PrintStream;
2323
import java.io.UnsupportedEncodingException;
24+
import java.lang.invoke.CallSite;
25+
import java.lang.invoke.LambdaMetafactory;
26+
import java.lang.invoke.MethodHandle;
27+
import java.lang.invoke.MethodHandles;
28+
import java.lang.invoke.MethodType;
2429
import java.lang.management.ManagementFactory;
2530
import java.lang.management.ThreadInfo;
2631
import java.lang.management.ThreadMXBean;
@@ -29,6 +34,7 @@
2934
import java.lang.reflect.InvocationTargetException;
3035
import java.lang.reflect.Method;
3136
import java.nio.charset.Charset;
37+
import java.util.function.Function;
3238
import org.apache.yetus.audience.InterfaceAudience;
3339
import org.slf4j.Logger;
3440

@@ -208,6 +214,30 @@ private static String getTaskName(long id, String name) {
208214
return id + " (" + name + ")";
209215
}
210216

217+
/**
218+
* Creates a Function which can be called to performantly execute a reflected static method. The
219+
* creation of the Function itself may not be fast, but executing that method thereafter should be
220+
* much faster than {@link #invokeMethod(Object, String, Object...)}.
221+
* @param lookupClazz the class to find the static method in
222+
* @param methodName the method name
223+
* @param argumentClazz the type of the argument
224+
* @param returnValueClass the type of the return value
225+
* @return a function which when called executes the requested static method.
226+
* @throws Throwable exception types from the underlying reflection
227+
*/
228+
public static <I, R> Function<I, R> getOneArgStaticMethodAsFunction(Class<?> lookupClazz,
229+
String methodName, Class<I> argumentClazz, Class<R> returnValueClass) throws Throwable {
230+
MethodHandles.Lookup lookup = MethodHandles.lookup();
231+
MethodHandle methodHandle = lookup.findStatic(lookupClazz, methodName,
232+
MethodType.methodType(returnValueClass, argumentClazz));
233+
CallSite site =
234+
LambdaMetafactory.metafactory(lookup, "apply", MethodType.methodType(Function.class),
235+
methodHandle.type().generic(), methodHandle, methodHandle.type());
236+
237+
return (Function<I, R>) site.getTarget().invokeExact();
238+
239+
}
240+
211241
/**
212242
* Get and invoke the target method from the given object with given parameters
213243
* @param obj the object to get and invoke method from

hbase-common/src/test/java/org/apache/hadoop/hbase/util/ClassLoaderTestHelper.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.hadoop.hbase.util;
1919

20+
import static org.junit.Assert.assertFalse;
2021
import static org.junit.Assert.assertTrue;
2122
import static org.junit.Assert.fail;
2223

@@ -202,4 +203,20 @@ public static void addJarFilesToJar(File targetJar, String libPrefix, File... sr
202203
public static String localDirPath(Configuration conf) {
203204
return conf.get(ClassLoaderBase.LOCAL_DIR_KEY) + File.separator + "jars" + File.separator;
204205
}
206+
207+
public static void deleteClass(String className, String testDir, Configuration conf)
208+
throws Exception {
209+
String jarFileName = className + ".jar";
210+
File file = new File(testDir, jarFileName);
211+
file.delete();
212+
assertFalse("Should be deleted: " + file.getPath(), file.exists());
213+
214+
file = new File(conf.get("hbase.dynamic.jars.dir"), jarFileName);
215+
file.delete();
216+
assertFalse("Should be deleted: " + file.getPath(), file.exists());
217+
218+
file = new File(ClassLoaderTestHelper.localDirPath(conf), jarFileName);
219+
file.delete();
220+
assertFalse("Should be deleted: " + file.getPath(), file.exists());
221+
}
205222
}

0 commit comments

Comments
 (0)