Skip to content

Commit

Permalink
Merge pull request gbif#314 from gbif/gbif-dev
Browse files Browse the repository at this point in the history
Gbif dev
  • Loading branch information
muttcg authored Jul 1, 2020
2 parents 8144951 + 0cf1f4b commit 3274254
Show file tree
Hide file tree
Showing 11 changed files with 65 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import org.gbif.common.messaging.MessageListener;
import org.gbif.common.messaging.api.MessagePublisher;
import org.gbif.pipelines.common.configs.StepConfiguration;
import org.gbif.pipelines.ingest.java.utils.PipelinesConfigFactory;
import org.gbif.pipelines.ingest.java.utils.ConfigFactory;
import org.gbif.pipelines.keygen.config.KeygenConfig;
import org.gbif.pipelines.parsers.config.model.PipelinesConfig;
import org.gbif.registry.ws.client.pipelines.PipelinesHistoryWsClient;
Expand Down Expand Up @@ -68,7 +68,9 @@ protected void shutDown() {
}

private KeygenConfig readConfig(String hdfsSiteConfig, String pipelinesConfig){
PipelinesConfig c = PipelinesConfigFactory.getInstance(hdfsSiteConfig, pipelinesConfig).get();
PipelinesConfig c =
ConfigFactory.getInstance(hdfsSiteConfig, pipelinesConfig, PipelinesConfig.class)
.get();

String zk = c.getKeygen().getZkConnectionString();
zk = zk == null || zk.isEmpty() ? c.getZkConnectionString() : zk;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.gbif.pipelines.ingest.java.transforms.DefaultValuesTransform;
import org.gbif.pipelines.ingest.java.transforms.OccurrenceExtensionTransform;
import org.gbif.pipelines.ingest.java.transforms.UniqueGbifIdTransform;
import org.gbif.pipelines.ingest.java.utils.PipelinesConfigFactory;
import org.gbif.pipelines.ingest.java.utils.ConfigFactory;
import org.gbif.pipelines.ingest.options.InterpretationPipelineOptions;
import org.gbif.pipelines.ingest.options.PipelinesOptionsFactory;
import org.gbif.pipelines.ingest.utils.FsUtils;
Expand Down Expand Up @@ -149,7 +149,9 @@ public static void run(InterpretationPipelineOptions options, ExecutorService ex
String targetPath = options.getTargetPath();
String endPointType = options.getEndPointType();
String hdfsSiteConfig = options.getHdfsSiteConfig();
PipelinesConfig config = PipelinesConfigFactory.getInstance(hdfsSiteConfig, options.getProperties()).get();
PipelinesConfig config =
ConfigFactory.getInstance(hdfsSiteConfig, options.getProperties(), PipelinesConfig.class)
.get();

FsUtils.deleteInterpretIfExist(hdfsSiteConfig, targetPath, datasetId, attempt, types);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.gbif.pipelines.ingest.java.utils;

import org.gbif.pipelines.ingest.utils.FsUtils;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@SuppressWarnings("all")
public class ConfigFactory<T> {

private static volatile ConfigFactory instance;

private final T config;

private static final Object MUTEX = new Object();

@SneakyThrows
private ConfigFactory(String hdfsSiteConfig, String propertiesPath, Class<T> clazz) {
this.config = FsUtils.readConfigFile(hdfsSiteConfig, propertiesPath, clazz);
}

public static <T> ConfigFactory<T> getInstance(String hdfsSiteConfig, String propertiesPath, Class<T> clazz) {
if (instance == null) {
synchronized (MUTEX) {
if (instance == null) {
instance = new ConfigFactory(hdfsSiteConfig, propertiesPath, clazz);
}
}
}
return instance;
}

public T get() {
return config;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ public static void run(EsIndexingPipelineOptions options, Runnable pipeline) {

pipeline.run();

PipelinesConfig config = FsUtils.readConfigFile(options.getHdfsSiteConfig(), options.getProperties());
PipelinesConfig config =
FsUtils.readConfigFile(options.getHdfsSiteConfig(), options.getProperties(), PipelinesConfig.class);

String zk = config.getIndexLock().getZkConnectionString();
zk = zk == null || zk.isEmpty() ? config.getZkConnectionString() : zk;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ public static void run(InterpretationPipelineOptions options) {
Integer attempt = options.getAttempt();
String targetPath = options.getTargetPath();
String hdfsSiteConfig = options.getHdfsSiteConfig();
PipelinesConfig config = FsUtils.readConfigFile(options.getHdfsSiteConfig(), options.getProperties());
PipelinesConfig config =
FsUtils.readConfigFile(options.getHdfsSiteConfig(), options.getProperties(), PipelinesConfig.class);

FsUtils.deleteInterpretIfExist(hdfsSiteConfig, targetPath, datasetId, attempt, options.getInterpretationTypes());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,11 @@ public static void run(InterpretationPipelineOptions options) {

String datasetId = options.getDatasetId();
Integer attempt = options.getAttempt();
boolean tripletValid = options.isTripletValid();
boolean occurrenceIdValid = options.isOccurrenceIdValid();
boolean useExtendedRecordId = options.isUseExtendedRecordId();
String endPointType = options.getEndPointType();
Set<String> types = options.getInterpretationTypes();
String targetPath = options.getTargetPath();
String hdfsSiteConfig = options.getHdfsSiteConfig();
PipelinesConfig config = FsUtils.readConfigFile(hdfsSiteConfig, options.getProperties());
PipelinesConfig config =
FsUtils.readConfigFile(hdfsSiteConfig, options.getProperties(), PipelinesConfig.class);

FsUtils.deleteInterpretIfExist(hdfsSiteConfig, targetPath, datasetId, attempt, types);

Expand All @@ -130,7 +127,7 @@ public static void run(InterpretationPipelineOptions options) {
MetadataTransform.builder()
.clientSupplier(MetadataServiceClientFactory.createSupplier(config))
.attempt(attempt)
.endpointType(endPointType)
.endpointType(options.getEndPointType())
.create();

TaggedValuesTransform taggedValuesTransform = TaggedValuesTransform.builder().create();
Expand All @@ -139,9 +136,9 @@ public static void run(InterpretationPipelineOptions options) {
BasicTransform basicTransform =
BasicTransform.builder()
.keygenServiceSupplier(KeygenServiceFactory.createSupplier(config, datasetId))
.isTripletValid(tripletValid)
.isOccurrenceIdValid(occurrenceIdValid)
.useExtendedRecordId(useExtendedRecordId)
.isTripletValid(options.isTripletValid())
.isOccurrenceIdValid(options.isOccurrenceIdValid())
.useExtendedRecordId(options.isUseExtendedRecordId())
.create();

VerbatimTransform verbatimTransform = VerbatimTransform.create();
Expand All @@ -168,7 +165,7 @@ public static void run(InterpretationPipelineOptions options) {
ImageTransform imageTransform = ImageTransform.create();

// Extra
UniqueGbifIdTransform gbifIdTransform = UniqueGbifIdTransform.create(useExtendedRecordId);
UniqueGbifIdTransform gbifIdTransform = UniqueGbifIdTransform.create(options.isUseExtendedRecordId());

log.info("Creating beam pipeline");
// Create and write metadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.gbif.pipelines.common.PipelinesVariables.Pipeline.Interpretation;
import org.gbif.pipelines.ingest.options.BasePipelineOptions;
import org.gbif.pipelines.ingest.options.InterpretationPipelineOptions;
import org.gbif.pipelines.parsers.config.model.PipelinesConfig;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
Expand Down Expand Up @@ -288,7 +287,7 @@ public static boolean deleteIfExist(String hdfsSiteConfig, String directoryPath)
* @param filePath properties file path
*/
@SneakyThrows
public static PipelinesConfig readConfigFile(String hdfsSiteConfig, String filePath) {
public static <T> T readConfigFile(String hdfsSiteConfig, String filePath, Class<T> clazz) {
FileSystem fs = FsUtils.getLocalFileSystem(hdfsSiteConfig);
Path fPath = new Path(filePath);
if (fs.exists(fPath)) {
Expand All @@ -297,7 +296,7 @@ public static PipelinesConfig readConfigFile(String hdfsSiteConfig, String fileP
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, true);
mapper.findAndRegisterModules();
return mapper.readValue(br, PipelinesConfig.class);
return mapper.readValue(br, clazz);
}
}
throw new FileNotFoundException("The properties file doesn't exist - " + filePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ public static void doInReadLock(LockConfig config, Mutex.Action action) {

/** A write lock is acquired to avoid concurrent modifications while this operation is running */
public static void doHdfsPrefixLock(InterpretationPipelineOptions options, Mutex.Action action) {
PipelinesConfig config = FsUtils.readConfigFile(options.getHdfsSiteConfig(), options.getProperties());
PipelinesConfig config =
FsUtils.readConfigFile(options.getHdfsSiteConfig(), options.getProperties(), PipelinesConfig.class);

String zk = config.getHdfsLock().getZkConnectionString();
zk = zk == null || zk.isEmpty() ? config.getZkConnectionString() : zk;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,14 @@
import java.nio.file.Path;
import java.util.function.Function;

import org.gbif.pipelines.parsers.config.model.PipelinesConfig;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;

@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class PipelinesConfigFactory {
public class ConfigFactory {

private static final ObjectMapper MAPPER = new ObjectMapper(new YAMLFactory());

Expand All @@ -23,7 +21,7 @@ public class PipelinesConfigFactory {
MAPPER.findAndRegisterModules();
}

public static PipelinesConfig read(Path path) {
public static <T> T read(Path path, Class<T> clazz) {
Function<Path, InputStream> absolute =
p -> {
try {
Expand All @@ -41,7 +39,7 @@ public static PipelinesConfig read(Path path) {

try (InputStream in = function.apply(path)) {
// read properties from input stream
return MAPPER.readValue(in, PipelinesConfig.class);
return MAPPER.readValue(in, clazz);
} catch (Exception ex) {
String msg = "Properties with absolute path could not be read from " + path;
throw new IllegalArgumentException(msg, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@

import static org.junit.Assert.assertEquals;

public class PipelinesConfigFactoryTest {
public class ConfigFactoryTest {

private final String inpPath = getClass().getResource("/pipelines.yaml").getFile();

@Test
public void test() {

PipelinesConfig config = PipelinesConfigFactory.read(Paths.get(inpPath));
PipelinesConfig config = ConfigFactory.read(Paths.get(inpPath), PipelinesConfig.class);

WsConfig gbifApi = config.getGbifApi();
assertEquals("http://test.test", gbifApi.getWsUrl());
Expand Down

0 comments on commit 3274254

Please sign in to comment.