Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/main/java/com/treasure_data/td_import/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,11 @@ public interface Constants extends com.treasure_data.client.Constants {
String BI_PREPARE_TIMEVALUE_HYPHEN = HYPHENHYPHEN + BI_PREPARE_PARTS_TIMEVALUE;
String BI_PREPARE_PARTS_TIMEVALUE_DESC = "long value of the time column";

// time-offset-value
String BI_PREPARE_PARTS_TIMEOFFSET = "time-offset";
String BI_PREPARE_TIMEOFFSETVALUE_HYPHEN = HYPHENHYPHEN + BI_PREPARE_PARTS_TIMEOFFSET;
String BI_PREPARE_PARTS_TIMEOFFSET_DESC = "long value of the time offset";

// time-format STRF_FORMAT; default=auto detect
String BI_PREPARE_PARTS_TIMEFORMAT = "time-format";
String BI_PREPARE_TIMEFORMAT_HYPHEN_HYPHEN = HYPHENHYPHEN + BI_PREPARE_PARTS_TIMEFORMAT;
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/com/treasure_data/td_import/Options.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ public void initPrepareOptionParser(Properties props) {
.withRequiredArg()
.describedAs("TIME")
.ofType(String.class);
op.acceptsAll(Arrays.asList(
Configuration.BI_PREPARE_PARTS_TIMEOFFSET),
Configuration.BI_PREPARE_PARTS_TIMEOFFSET_DESC)
.withRequiredArg()
.describedAs("TIME")
.ofType(String.class);
op.acceptsAll(Arrays.asList(
Configuration.BI_PREPARE_PARTS_PRIMARY_KEY),
Configuration.BI_PREPARE_PARTS_PRIMARY_KEY_DESC)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.treasure_data.td_import.prepare.Strftime;

public class AliasTimeColumnValue extends TimeColumnValue {
public AliasTimeColumnValue(int index, Strftime timeFormat) {
super(index, timeFormat);
public AliasTimeColumnValue(int index, Strftime timeFormat, long offset) {
super(index, timeFormat, offset);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,16 @@
public class TimeColumnValue {
protected int index;
protected Strftime timeFormat;
protected final long offset;

public TimeColumnValue(int index, Strftime timeFormat) {
this(index, timeFormat, 0);
}

public TimeColumnValue(int index, Strftime timeFormat, long offset) {
this.index = index;
this.timeFormat = timeFormat;
this.offset = offset;
}

public int getIndex() {
Expand All @@ -39,6 +45,10 @@ public Strftime getTimeFormat() {
return timeFormat;
}

public long getOffset() {
return offset;
}

public void write(ColumnValue v, RecordWriter with) throws PreparePartsException {
v.getColumnType().filterAndWrite(v, this, with);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ static InvalidColumnsHandling get(String key) {
protected String aliasTimeColumn;
protected TimeValueTimeColumnValue timeValue = new TimeValueTimeColumnValue(-1);
protected String timeFormat;
protected long timeOffset;

protected boolean hasPrimaryKey = false;
protected String primaryKey = null;
Expand Down Expand Up @@ -610,6 +611,9 @@ public void configure(Properties props, Options options) {
// time value
setTimeValue();

// time offset
setTimeOffset();

// time format
setTimeFormat();

Expand Down Expand Up @@ -933,6 +937,31 @@ public TimeValueTimeColumnValue getTimeValue() {
return timeValue;
}

public void setTimeOffset() {
if (!optionSet.has(BI_PREPARE_PARTS_TIMEOFFSET)) {
return;
}

if (optionSet.has(BI_PREPARE_PARTS_TIMEVALUE)) {
throw new IllegalArgumentException(
"cannot specify both of 'time-value' and 'time-offset' options");
}

String v = (String) optionSet.valueOf(BI_PREPARE_PARTS_TIMEOFFSET);
if (v != null) {
try {
timeOffset = Long.parseLong(v);
} catch (NumberFormatException e) {
throw new IllegalArgumentException(
"'time-offset option requires the long type argument'", e);
}
}
}

public long getTimeOffset() {
return timeOffset;
}

public void setTimeFormat() {
if (optionSet.has(BI_PREPARE_PARTS_TIMEFORMAT)) {
if (hasPrimaryKey()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,26 +251,26 @@ public void setTimeColumnValue(TimeColumnSampling[] sampleColumnValues,
if (index < 0) {
timeColumnValue = conf.getTimeValue();
} else if (conf.getTimeFormat() != null) {
timeColumnValue = createTimeColumnValue(index, isAlias, conf.getTimeFormat());
timeColumnValue = createTimeColumnValue(index, isAlias, conf.getTimeFormat(), conf.getTimeOffset());
} else {
String suggested = sampleColumnValues[index].getSTRFTimeFormatRank();
if (suggested != null) {
if (suggested.equals(TimeColumnSampling.HHmmss_STRF)) {
timeColumnValue = createTimeColumnValue(index, isAlias, new HHmmssStrftime());
timeColumnValue = createTimeColumnValue(index, isAlias, new HHmmssStrftime(), conf.getTimeOffset());
} else {
timeColumnValue = createTimeColumnValue(index, isAlias, conf.getTimeFormat(suggested));
timeColumnValue = createTimeColumnValue(index, isAlias, conf.getTimeFormat(suggested), conf.getTimeOffset());
}
} else {
timeColumnValue = createTimeColumnValue(index, isAlias, null);
timeColumnValue = createTimeColumnValue(index, isAlias, null, conf.getTimeOffset());
}
}
}

private TimeColumnValue createTimeColumnValue(int index, boolean isAlias, Strftime strftime) {
private TimeColumnValue createTimeColumnValue(int index, boolean isAlias, Strftime strftime, long offset) {
if (!isAlias) {
return new TimeColumnValue(index, strftime);
return new TimeColumnValue(index, strftime, offset);
} else {
return new AliasTimeColumnValue(index, strftime);
return new AliasTimeColumnValue(index, strftime, offset);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,9 @@ private void setTimeColumnValue(int timeColumnIndex, int aliasTimeColumnIndex)
timeColumnValue = conf.getTimeValue();
} else {
if (!isAlias) {
timeColumnValue = new TimeColumnValue(index, conf.getTimeFormat());
timeColumnValue = new TimeColumnValue(index, conf.getTimeFormat(), conf.getTimeOffset());
} else {
timeColumnValue = new AliasTimeColumnValue(index, conf.getTimeFormat());
timeColumnValue = new AliasTimeColumnValue(index, conf.getTimeFormat(), conf.getTimeOffset());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,9 @@ public void setTimeColumnValue() throws PreparePartsException {
}
} else {
if (timeColumnIndex >= 0) {
timeColumnValue = new TimeColumnValue(timeColumnIndex, conf.getTimeFormat());
timeColumnValue = new TimeColumnValue(timeColumnIndex, conf.getTimeFormat(), conf.getTimeOffset());
} else if (aliasTimeColumnIndex >= 0) {
timeColumnValue = new AliasTimeColumnValue(aliasTimeColumnIndex, conf.getTimeFormat());
timeColumnValue = new AliasTimeColumnValue(aliasTimeColumnIndex, conf.getTimeFormat(), conf.getTimeOffset());
} else if (conf.getTimeValue().getTimeValue() >= 0) {
timeColumnValue = conf.getTimeValue();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ public void write(TimeColumnValue filter, StringColumnValue v) throws PreparePar
}
}

time += filter.getOffset();

write(time);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public class TestBulkImportOptions {
private final String sampleTimeColumn = "timestamp";
private final String sampleTimeFormat = "timeformat";
private final String sampleTimeValue = "100";
private final String sampleTimeOffset = "86400";
private final String sampleOutput = "output_dir";
private final String sampleSplitSize = "100";
private final String sampleErrorRecordsHandling = "skip";
Expand Down Expand Up @@ -73,6 +74,7 @@ private String[] createPrepareArguments() {
"--time-column", sampleTimeColumn,
"--time-format", sampleTimeFormat,
"--time-value", sampleTimeValue,
"--time-offset", sampleTimeOffset,
"--output", sampleOutput,
"--split-size", sampleSplitSize,
"--error-records-handling", sampleErrorRecordsHandling,
Expand Down Expand Up @@ -114,6 +116,7 @@ public void assertPrepareOptionEquals(Options actualOpts) throws Exception {
assertOptionEquals("T", sampleTimeFormat, actualOpts);
assertOptionEquals("time-format", sampleTimeFormat, actualOpts);
assertOptionEquals("time-value", sampleTimeValue, actualOpts);
assertOptionEquals("time-offset", sampleTimeOffset, actualOpts);
assertOptionEquals("output", sampleOutput, actualOpts);
assertOptionEquals("split-size", sampleSplitSize, actualOpts);
assertOptionEquals("error-records-handling", sampleErrorRecordsHandling, actualOpts);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,14 @@ public void assertContextEquals(TestCSVFileReader test) {
}
}

public static class Context07 extends Context06 {

public void assertContextEquals(TestCSVFileReader test) {
super.assertContextEquals(test);
assertEquals(test.reader.getTimeColumnValue().getOffset(), 86400);
}
}

protected String fileName = "./file.csv";
protected int numLine;

Expand Down Expand Up @@ -398,6 +406,30 @@ public void checkContextWhenReaderConfigurationWithTimeFormat() throws Exception
checkContextWhenReaderConfiguration(context);
}

@Test
public void checkContextWhenReaderConfigurationWithTimeFormatWithTimeOffset() throws Exception {
Context07 context = new Context07();

// override system properties:-(
options = new Options();
options.initPrepareOptionParser(props);
options.setOptions(new String[] {
"--format",
"csv",
"--column-header",
"--time-format",
context.getSTRFTimeFormat(),
"--time-offset",
"86400"
});

createPrepareConfiguration();
createFileWriter();
createFileReader();

checkContextWhenReaderConfiguration(context);
}

private void checkContextWhenReaderConfiguration(Context context) throws Exception {
// create context
context.createContext(this);
Expand Down