Skip to content

[FLINK-7190] Activate checkstyle flink-java/* #4343

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.objectweb.asm.ClassVisitor;
import org.objectweb.asm.MethodVisitor;
import org.objectweb.asm.Opcodes;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -41,53 +40,52 @@
*/
@Internal
public class ClosureCleaner {
private static Logger LOG = LoggerFactory.getLogger(ClosureCleaner.class);

private static final Logger LOG = LoggerFactory.getLogger(ClosureCleaner.class);

/**
* Tries to clean the closure of the given object, if the object is a non-static inner
* class.
*
*
* @param func The object whose closure should be cleaned.
* @param checkSerializable Flag to indicate whether serializability should be checked after
* the closure cleaning attempt.
*
*
* @throws InvalidProgramException Thrown, if 'checkSerializable' is true, and the object was
* not serializable after the closure cleaning.
*
*
* @throws RuntimeException A RuntimeException may be thrown, if the code of the class could not
* be loaded, in order to process during teh closure cleaning.
*/
public static void clean(Object func, boolean checkSerializable) {
if (func == null) {
return;
}

final Class<?> cls = func.getClass();

// First find the field name of the "this$0" field, this can
// be "this$x" depending on the nesting
boolean closureAccessed = false;

for (Field f: cls.getDeclaredFields()) {
if (f.getName().startsWith("this$")) {
// found a closure referencing field - now try to clean
closureAccessed |= cleanThis0(func, cls, f.getName());
}
}

if (checkSerializable) {
try {
InstantiationUtil.serializeObject(func);
}
catch (Exception e) {
String functionType = getSuperClassOrInterfaceName(func.getClass());

String msg = functionType == null ?
(func + " is not serializable.") :
("The implementation of the " + functionType + " is not serializable.");



if (closureAccessed) {
msg += " The implementation accesses fields of its enclosing class, which is " +
"a common reason for non-serializability. " +
Expand All @@ -96,7 +94,7 @@ public static void clean(Object func, boolean checkSerializable) {
} else {
msg += " The object probably contains or references non serializable fields.";
}

throw new InvalidProgramException(msg, e);
}
}
Expand All @@ -109,14 +107,14 @@ public static void ensureSerializable(Object obj) {
throw new InvalidProgramException("Object " + obj + " is not serializable", e);
}
}

private static boolean cleanThis0(Object func, Class<?> cls, String this0Name) {

This0AccessFinder this0Finder = new This0AccessFinder(this0Name);
getClassReader(cls).accept(this0Finder, 0);

final boolean accessesClosure = this0Finder.isThis0Accessed();

if (LOG.isDebugEnabled()) {
LOG.debug(this0Name + " is accessed: " + accessesClosure);
}
Expand All @@ -129,7 +127,7 @@ private static boolean cleanThis0(Object func, Class<?> cls, String this0Name) {
// has no this$0, just return
throw new RuntimeException("Could not set " + this0Name + ": " + e);
}

try {
this0.setAccessible(true);
this0.set(func, null);
Expand All @@ -139,10 +137,10 @@ private static boolean cleanThis0(Object func, Class<?> cls, String this0Name) {
throw new RuntimeException("Could not set " + this0Name + " to null. " + e.getMessage(), e);
}
}

return accessesClosure;
}

private static ClassReader getClassReader(Class<?> cls) {
String className = cls.getName().replaceFirst("^.*\\.", "") + ".class";
try {
Expand All @@ -151,8 +149,7 @@ private static ClassReader getClassReader(Class<?> cls) {
throw new RuntimeException("Could not create ClassReader: " + e.getMessage(), e);
}
}



private static String getSuperClassOrInterfaceName(Class<?> cls) {
Class<?> superclass = cls.getSuperclass();
if (superclass.getName().startsWith("org.apache.flink")) {
Expand All @@ -176,7 +173,6 @@ class This0AccessFinder extends ClassVisitor {

private final String this0Name;
private boolean isThis0Accessed;


public This0AccessFinder(String this0Name) {
super(Opcodes.ASM5);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.operators.CollectionExecutor;

/**
* Version of {@link ExecutionEnvironment} that allows serial, local, collection-based executions of Flink programs.
*/
@PublicEvolving
public class CollectionEnvironment extends ExecutionEnvironment {

Expand All @@ -40,7 +43,7 @@ public JobExecutionResult execute(String jobName) throws Exception {
public int getParallelism() {
return 1; // always serial
}

@Override
public String getExecutionPlan() throws Exception {
throw new UnsupportedOperationException("Execution plans are not used for collection-based execution.");
Expand Down
Loading