Skip to content

Commit

Permalink
fix_app_bug (DataLinkDC#2800)
Browse files Browse the repository at this point in the history
  • Loading branch information
zackyoungh authored Dec 26, 2023
1 parent b25694d commit 3765506
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.yarn.YarnConfigKeys.LOCAL_RESOURCE_DESCRIPTOR_SEPARATOR;

import org.dinky.utils.ClassPathUtils;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.java.tuple.Tuple2;
Expand Down Expand Up @@ -854,7 +856,7 @@ private ApplicationReport startAppMaster(
}

// normalize classpath by sorting
Collections.sort(systemClassPaths);
ClassPathUtils.sort(systemClassPaths);
Collections.sort(userClassPaths);

// classpath assembler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.yarn.YarnConfigKeys.LOCAL_RESOURCE_DESCRIPTOR_SEPARATOR;

import org.dinky.utils.ClassPathUtils;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.java.tuple.Tuple2;
Expand Down Expand Up @@ -878,7 +880,8 @@ private ApplicationReport startAppMaster(
}

// normalize classpath by sorting
Collections.sort(systemClassPaths);
ClassPathUtils.sort(systemClassPaths);

Collections.sort(userClassPaths);

// classpath assembler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
import static org.apache.flink.yarn.YarnConfigKeys.LOCAL_RESOURCE_DESCRIPTOR_SEPARATOR;

import org.dinky.utils.ClassPathUtils;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.java.tuple.Tuple2;
Expand Down Expand Up @@ -894,7 +896,7 @@ private ApplicationReport startAppMaster(
}

// normalize classpath by sorting
Collections.sort(systemClassPaths);
ClassPathUtils.sort(systemClassPaths);
Collections.sort(userClassPaths);

// classpath assembler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
import static org.apache.flink.yarn.YarnConfigKeys.LOCAL_RESOURCE_DESCRIPTOR_SEPARATOR;

import org.dinky.utils.ClassPathUtils;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.java.tuple.Tuple2;
Expand Down Expand Up @@ -897,7 +899,7 @@ private ApplicationReport startAppMaster(
}

// normalize classpath by sorting
Collections.sort(systemClassPaths);
ClassPathUtils.sort(systemClassPaths);
Collections.sort(userClassPaths);

// classpath assembler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
import static org.apache.flink.yarn.YarnConfigKeys.LOCAL_RESOURCE_DESCRIPTOR_SEPARATOR;

import org.dinky.utils.ClassPathUtils;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.java.tuple.Tuple2;
Expand Down Expand Up @@ -905,7 +907,7 @@ private ApplicationReport startAppMaster(
}

// normalize classpath by sorting
Collections.sort(systemClassPaths);
ClassPathUtils.sort(systemClassPaths);
Collections.sort(userClassPaths);

// classpath assembler
Expand Down
47 changes: 47 additions & 0 deletions dinky-common/src/main/java/org/dinky/utils/ClassPathUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.utils;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;

public class ClassPathUtils {
public static void sort(List<String> list) {
Collections.sort(list);
List<Integer> indexList = new ArrayList<>();
for (int i = 0; i < list.size(); i++) {
String path = list.get(i);
if (StrUtil.contains(path, "dinky")) {
indexList.add(i);
}
}
if (CollUtil.isNotEmpty(indexList)) {
for (Integer i : indexList) {
int index = i;
String path = list.remove(index);
list.add(0, path);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public AppBatchExecutor(ExecutorConfig executorConfig, DinkyClassLoader classLoa
Configuration configuration = Configuration.fromMap(executorConfig.getConfig());
this.environment = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
} else {
this.environment = StreamExecutionEnvironment.createLocalEnvironment();
this.environment = StreamExecutionEnvironment.getExecutionEnvironment();
}
init(classLoader);
}
Expand Down
8 changes: 8 additions & 0 deletions dinky-core/src/main/java/org/dinky/trans/Operations.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.Objects;
import java.util.Set;

import org.reflections.Reflections;
Expand Down Expand Up @@ -65,8 +66,15 @@ private static Operation[] getAllOperations() {
| NoSuchMethodException e) {
log.error(String.format("getAllOperations error, class %s, err: %s", t, e));
throw new RuntimeException(e);
} catch (NoClassDefFoundError e) {
log.warn(
"getAllOperations error, If you do not have this class, please add the corresponding dependency. Operation: {}.",
t,
e);
return null;
}
})
.filter(Objects::nonNull)
.toArray(Operation[]::new);
}

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
<maven-surefire-plugin.version>3.0.0-M5</maven-surefire-plugin.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<maven.resource.version>3.2.0</maven.resource.version>
<maven.resource.version>3.3.1</maven.resource.version>
<mockito.version>3.12.4</mockito.version>
<mybatis-plus-boot-starter.version>3.5.3.1</mybatis-plus-boot-starter.version>
<mysql-connector-java.version>8.0.28</mysql-connector-java.version>
Expand Down

0 comments on commit 3765506

Please sign in to comment.