Skip to content

Commit

Permalink
PDI-5211: As an ETL Designer, I want a BulkLoad step for Greenplum th…
Browse files Browse the repository at this point in the history
…at wraps the gpload facility.

git-svn-id: svn://source.pentaho.org/svnkettleroot/Kettle/trunk@15067 5fb7f6ec-07c1-534a-b4ca-9155e429e800
  • Loading branch information
sflatley committed Apr 25, 2011
1 parent e3079a0 commit 4dd3537
Show file tree
Hide file tree
Showing 11 changed files with 1,439 additions and 619 deletions.
916 changes: 454 additions & 462 deletions src-ui/org/pentaho/di/ui/trans/steps/gpload/GPLoadDialog.java

Large diffs are not rendered by default.

318 changes: 214 additions & 104 deletions src/org/pentaho/di/trans/steps/gpload/GPLoad.java

Large diffs are not rendered by default.

4 changes: 0 additions & 4 deletions src/org/pentaho/di/trans/steps/gpload/GPLoadData.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@ public class GPLoadData extends BaseStepData implements StepDataInterface
{
public Database db;

public int keynrs[]; // nr of keylookup -value in row...
public int keynrs2[]; // nr of keylookup2-value in row...
public int valuenrs[]; // Stream valuename nrs to prevent searches.

/**
* Default constructor.
*/
Expand Down
6 changes: 5 additions & 1 deletion src/org/pentaho/di/trans/steps/gpload/GPLoadDataOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class GPLoadDataOutput
private boolean first = true;
private int fieldNumbers[] = null;
private String enclosure = null;
private String delimiter = null;
private SimpleDateFormat sdfDate = null;
private SimpleDateFormat sdfDateTime = null;

Expand Down Expand Up @@ -139,6 +140,7 @@ public void writeLine(RowMetaInterface mi, Object row[]) throws KettleException
first = false;

enclosure = meta.getEnclosure();
delimiter = meta.getDelimiter();

// Setup up the fields we need to take for each of the rows
// as this speeds up processing.
Expand All @@ -160,9 +162,11 @@ public void writeLine(RowMetaInterface mi, Object row[]) throws KettleException
// Write the data to the output
ValueMetaInterface v = null;
int number = 0;

for (int i=0;i<fieldNumbers.length;i++)
{
if ( i!=0 ) output.print(",");
// TODO: variable substitution
if ( i!=0 ) output.print(delimiter);
number = fieldNumbers[i];
v = mi.getValueMeta(number);
if ( row[number] == null)
Expand Down
132 changes: 94 additions & 38 deletions src/org/pentaho/di/trans/steps/gpload/GPLoadMeta.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,17 +83,23 @@ public class GPLoadMeta extends BaseStepMeta implements StepMetaInterface
/** database connection */
private DatabaseMeta databaseMeta;

/** Field value to dateMask after lookup */
/** Specified database field */
private String fieldTable[];

/** Field name in the stream */
private String fieldStream[];

/** boolean indicating if field needs to be updated */

/** Database column to match on for an update or merge operation */
private boolean matchColumn[];

/** Database columns to update */
private boolean updateColumn[];

/** the date mask to use if the value is a date */
private String dateMask[];

/** maximum errors */
private int maxErrors;
private String maxErrors;

/** Load method */
private String loadMethod;
Expand All @@ -107,12 +113,15 @@ public class GPLoadMeta extends BaseStepMeta implements StepMetaInterface
/** Erase files after use */
private boolean eraseFiles;

/** Database name override */
private String dbNameOverride;

/** Boolean to indicate that numbers are to be enclosed*/
/** Boolean to indicate that numbers are to be enclosed */
private boolean encloseNumbers;

/** Data file delimiter */
private String delimiter;

/** Default number of maximum errors allowed on a load */
public static String MAX_ERRORS_DEFAULT = "50";

/*
* Do not translate following values!!! They are will end up in the job export.
*/
Expand Down Expand Up @@ -245,6 +254,9 @@ public void allocate(int nrvalues)
fieldTable = new String[nrvalues];
fieldStream = new String[nrvalues];
dateMask = new String[nrvalues];
matchColumn = new boolean[nrvalues];
updateColumn = new boolean[nrvalues];

}

public void allocateLocalHosts(int numberOfLocalHosts) {
Expand All @@ -263,6 +275,8 @@ public Object clone()
retval.fieldTable[i] = fieldTable[i];
retval.fieldStream[i] = fieldStream[i];
retval.dateMask[i] = dateMask[i];
retval.matchColumn[i] = matchColumn[i];
retval.updateColumn[i] = updateColumn[i];
}
return retval;
}
Expand All @@ -274,23 +288,19 @@ private void readData(Node stepnode, List<? extends SharedObjectInterface> datab
{
String con = XMLHandler.getTagValue(stepnode, "connection"); //$NON-NLS-1$
databaseMeta = DatabaseMeta.findDatabase(databases, con);

String serror = XMLHandler.getTagValue(stepnode, "errors"); //$NON-NLS-1$
maxErrors = Const.toInt(serror, 50); // default to 50.

schemaName = XMLHandler.getTagValue(stepnode, "schema"); //$NON-NLS-1$
maxErrors = XMLHandler.getTagValue(stepnode, "errors"); //$NON-NLS-1$
schemaName = XMLHandler.getTagValue(stepnode, "schema"); //$NON-NLS-1$
tableName = XMLHandler.getTagValue(stepnode, "table"); //$NON-NLS-1$
errorTableName = XMLHandler.getTagValue(stepnode, "error_table"); //$NON-NLS-1$

errorTableName = XMLHandler.getTagValue(stepnode, "error_table"); //$NON-NLS-1$
loadMethod = XMLHandler.getTagValue(stepnode, "load_method"); //$NON-NLS-1$
loadAction = XMLHandler.getTagValue(stepnode, "load_action"); //$NON-NLS-1$
gploadPath = XMLHandler.getTagValue(stepnode, "gpload_path"); //$NON-NLS-1$
controlFile = XMLHandler.getTagValue(stepnode, "control_file"); //$NON-NLS-1$
dataFile = XMLHandler.getTagValue(stepnode, "data_file"); //$NON-NLS-1$
delimiter = XMLHandler.getTagValue(stepnode, "delimiter"); //$NON-NLS-1$
logFile = XMLHandler.getTagValue(stepnode, "log_file"); //$NON-NLS-1$
eraseFiles = "Y".equalsIgnoreCase( XMLHandler.getTagValue(stepnode, "erase_files")); //$NON-NLS-1$
encoding = XMLHandler.getTagValue(stepnode, "encoding"); //$NON-NLS-1$
dbNameOverride = XMLHandler.getTagValue(stepnode, "dbname_override"); //$NON-NLS-1$

Node localHostsNode = XMLHandler.getSubNode(stepnode, "local_hosts");
int nLocalHosts = XMLHandler.countNodes(localHostsNode, "local_host");//$NON-NLS-1$
Expand Down Expand Up @@ -327,6 +337,10 @@ private void readData(Node stepnode, List<? extends SharedObjectInterface> datab
dateMask[i] = "";
}
}


matchColumn[i] = ("Y".equalsIgnoreCase(XMLHandler.getTagValue(vnode, "match_column")));
updateColumn[i] = ("Y".equalsIgnoreCase(XMLHandler.getTagValue(vnode, "update_column"))); //$NON-NLS-1$
}
}
catch(Exception e)
Expand All @@ -337,21 +351,24 @@ private void readData(Node stepnode, List<? extends SharedObjectInterface> datab

public void setDefault()
{

// TODO: Make non empty defaults public static Strings

fieldTable = null;
databaseMeta = null;
maxErrors = 50;
maxErrors = GPLoadMeta.MAX_ERRORS_DEFAULT;
schemaName = ""; //$NON-NLS-1$
masterPort =
masterPort = "";
tableName = BaseMessages.getString(PKG, "GPLoadMeta.DefaultTableName"); //$NON-NLS-1$
errorTableName = BaseMessages.getString(PKG, "GPLocal.ErrorTable.Prefix")+tableName;
errorTableName = ""; //BaseMessages.getString(PKG, "GPLocal.ErrorTable.Prefix")+tableName;
loadMethod = METHOD_AUTO_END;
loadAction = ACTION_INSERT;
gploadPath = "/usr/local/greenplum-db/bin/gpload"; //$NON-NLS-1$
controlFile = "control${Internal.Step.CopyNr}.cfg"; //$NON-NLS-1$
dataFile = "load${Internal.Step.CopyNr}.dat"; //$NON-NLS-1$
logFile = ""; //$NON-NLS-1$
encoding = ""; //$NON-NLS-1$
dbNameOverride = "";
delimiter = ",";
encloseNumbers = false;
eraseFiles = true;

Expand All @@ -365,18 +382,18 @@ public String getXML()

retval.append(" ").append(XMLHandler.addTagValue("connection", databaseMeta==null?"":databaseMeta.getName())); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
retval.append(" ").append(XMLHandler.addTagValue("errors", maxErrors)); //$NON-NLS-1$ //$NON-NLS-2$
retval.append(" ").append(XMLHandler.addTagValue("schema", schemaName)); //$NON-NLS-1$ //$NON-NLS-2$
retval.append(" ").append(XMLHandler.addTagValue("schema", schemaName)); //$NON-NLS-1$ //$NON-NLS-2$
retval.append(" ").append(XMLHandler.addTagValue("table", tableName)); //$NON-NLS-1$ //$NON-NLS-2$
retval.append(" ").append(XMLHandler.addTagValue("error_table", errorTableName)); //$NON-NLS-1$ //$NON-NLS-2$
retval.append(" ").append(XMLHandler.addTagValue("load_method", loadMethod)); //$NON-NLS-1$ //$NON-NLS-2$
retval.append(" ").append(XMLHandler.addTagValue("load_action", loadAction)); //$NON-NLS-1$ //$NON-NLS-2$
retval.append(" ").append(XMLHandler.addTagValue("gpload_path", gploadPath)); //$NON-NLS-1$ //$NON-NLS-2$
retval.append(" ").append(XMLHandler.addTagValue("control_file", controlFile)); //$NON-NLS-1$ //$NON-NLS-2$
retval.append(" ").append(XMLHandler.addTagValue("data_file", dataFile)); //$NON-NLS-1$ //$NON-NLS-2$
retval.append(" ").append(XMLHandler.addTagValue("delimiter", delimiter)); //$NON-NLS-1$ //$NON-NLS-2$
retval.append(" ").append(XMLHandler.addTagValue("log_file", logFile)); //$NON-NLS-1$ //$NON-NLS-2$
retval.append(" ").append(XMLHandler.addTagValue("erase_files", eraseFiles)); //$NON-NLS-1$ //$NON-NLS-2$
retval.append(" ").append(XMLHandler.addTagValue("encoding", encoding)); //$NON-NLS-1$ //$NON-NLS-2$
retval.append(" ").append(XMLHandler.addTagValue("dbname_override", dbNameOverride)); //$NON-NLS-1$ //$NON-NLS-2$
retval.append(" ").append(XMLHandler.addTagValue("enclose_numbers", (encloseNumbers?"Y":"N"))); //$NON-NLS-1$ //$NON-NLS-2$
retval.append(" ").append(XMLHandler.addTagValue("master_port", masterPort)); //$NON-NLS-1$ //$NON-NLS-2$

Expand All @@ -386,6 +403,8 @@ public String getXML()
retval.append(" ").append(XMLHandler.addTagValue("stream_name", fieldTable[i])); //$NON-NLS-1$ //$NON-NLS-2$
retval.append(" ").append(XMLHandler.addTagValue("field_name", fieldStream[i])); //$NON-NLS-1$ //$NON-NLS-2$
retval.append(" ").append(XMLHandler.addTagValue("date_mask", dateMask[i])); //$NON-NLS-1$ //$NON-NLS-2$
retval.append(" ").append(XMLHandler.addTagValue("match_column", (matchColumn[i]?"Y":"N"))); //$NON-NLS-1$ //$NON-NLS-2$
retval.append(" ").append(XMLHandler.addTagValue("update_column", (updateColumn[i]?"Y":"N"))); //$NON-NLS-1$ //$NON-NLS-2$
retval.append(" </mapping>").append(Const.CR); //$NON-NLS-1$
}

Expand All @@ -404,7 +423,7 @@ public void readRep(Repository rep, ObjectId id_step, List<DatabaseMeta> databas
try
{
databaseMeta = rep.loadDatabaseMetaFromStepAttribute(id_step, "id_connection", databases);
maxErrors = (int)rep.getStepAttributeInteger(id_step, "errors"); //$NON-NLS-1$
maxErrors = rep.getStepAttributeString(id_step, "errors"); //$NON-NLS-1$
schemaName = rep.getStepAttributeString(id_step, "schema"); //$NON-NLS-1$
tableName = rep.getStepAttributeString(id_step, "table"); //$NON-NLS-1$
errorTableName = rep.getStepAttributeString(id_step, "error_table"); //$NON-NLS-1$
Expand All @@ -413,10 +432,10 @@ public void readRep(Repository rep, ObjectId id_step, List<DatabaseMeta> databas
gploadPath = rep.getStepAttributeString(id_step, "gpload_path"); //$NON-NLS-1$
controlFile = rep.getStepAttributeString(id_step, "control_file"); //$NON-NLS-1$
dataFile = rep.getStepAttributeString(id_step, "data_file"); //$NON-NLS-1$
logFile = rep.getStepAttributeString(id_step, "log_file"); //$NON-NLS-1$
delimiter = rep.getStepAttributeString(id_step, "delimiter"); //$NON-NLS-1$
logFile = rep.getStepAttributeString(id_step, "log_file"); //$NON-NLS-1$
eraseFiles = rep.getStepAttributeBoolean(id_step, "erase_files"); //$NON-NLS-1$
encoding = rep.getStepAttributeString(id_step, "encoding"); //$NON-NLS-1$
dbNameOverride = rep.getStepAttributeString(id_step, "dbname_override");//$NON-NLS-1$
masterPort = rep.getStepAttributeString(id_step, "master_port"); //$NON-NLS-1$
encloseNumbers = (rep.getStepAttributeString(id_step, "enclose_numbers").equalsIgnoreCase("Y")?true:false); //$NON-NLS-1$

Expand All @@ -434,6 +453,8 @@ public void readRep(Repository rep, ObjectId id_step, List<DatabaseMeta> databas
fieldTable[i] = rep.getStepAttributeString(id_step, i, "stream_name"); //$NON-NLS-1$
fieldStream[i] = rep.getStepAttributeString(id_step, i, "field_name"); //$NON-NLS-1$
dateMask[i] = rep.getStepAttributeString(id_step, i, "date_mask"); //$NON-NLS-1$
matchColumn[i] = rep.getStepAttributeBoolean(id_step, i, "match_column"); //$NON-NLS-1$
updateColumn[i] = rep.getStepAttributeBoolean(id_step, i, "update_column"); //$NON-NLS-1$
}
}
catch(Exception e)
Expand All @@ -457,10 +478,10 @@ public void saveRep(Repository rep, ObjectId id_transformation, ObjectId id_step
rep.saveStepAttribute(id_transformation, id_step, "gpload_path", gploadPath); //$NON-NLS-1$
rep.saveStepAttribute(id_transformation, id_step, "control_file", controlFile); //$NON-NLS-1$
rep.saveStepAttribute(id_transformation, id_step, "data_file", dataFile); //$NON-NLS-1$
rep.saveStepAttribute(id_transformation, id_step, "log_file", logFile); //$NON-NLS-1$
rep.saveStepAttribute(id_transformation, id_step, "delimiter", delimiter); //$NON-NLS-1$
rep.saveStepAttribute(id_transformation, id_step, "log_file", logFile); //$NON-NLS-1$
rep.saveStepAttribute(id_transformation, id_step, "erase_files", eraseFiles); //$NON-NLS-1$
rep.saveStepAttribute(id_transformation, id_step, "encoding", encoding); //$NON-NLS-1$
rep.saveStepAttribute(id_transformation, id_step, "dbname_override", dbNameOverride);//$NON-NLS-1$
rep.saveStepAttribute(id_transformation, id_step, "enclose_numbers", (encloseNumbers?"Y":"N"));//$NON-NLS-1$
rep.saveStepAttribute(id_transformation, id_step, "master_port", masterPort);//$NON-NLS-1$

Expand All @@ -473,6 +494,8 @@ public void saveRep(Repository rep, ObjectId id_transformation, ObjectId id_step
rep.saveStepAttribute(id_transformation, id_step, i, "stream_name", fieldTable[i]); //$NON-NLS-1$
rep.saveStepAttribute(id_transformation, id_step, i, "field_name", fieldStream[i]); //$NON-NLS-1$
rep.saveStepAttribute(id_transformation, id_step, i, "date_mask", dateMask[i]); //$NON-NLS-1$
rep.saveStepAttribute(id_transformation, id_step, i, "match_column", matchColumn[i]); //$NON-NLS-1$
rep.saveStepAttribute(id_transformation, id_step, i, "update_column", updateColumn[i]); //$NON-NLS-1$
}

// Also, save the step-database relationship!
Expand Down Expand Up @@ -870,8 +893,12 @@ public void setEncoding(String encoding) {
this.encoding = encoding;
}

public void setDelimiter(String delimiter) {
this.delimiter = delimiter;
}

public String getDelimiter() {
return ",";
return delimiter;
}

public String getEnclosure() {
Expand All @@ -886,21 +913,13 @@ public void setEraseFiles(boolean eraseFiles) {
this.eraseFiles = eraseFiles;
}

public int getMaxErrors() {
public String getMaxErrors() {
return maxErrors;
}

public void setMaxErrors(int maxErrors) {
public void setMaxErrors(String maxErrors) {
this.maxErrors = maxErrors;
}

public String getDbNameOverride() {
return dbNameOverride;
}

public void setDbNameOverride(String dbNameOverride) {
this.dbNameOverride = dbNameOverride;
}

public void setEncloseNumbers(boolean encloseNumbers) {
this.encloseNumbers = encloseNumbers;
Expand All @@ -926,4 +945,41 @@ public String getMasterPort() {
return masterPort;
}

public void setMatchColumns(boolean[] matchColumn) {
this.matchColumn = matchColumn;
}

public boolean[] getMatchColumn() {
return matchColumn;
}

public void setUpdateColumn(boolean[] updateColumn) {
this.updateColumn = updateColumn;
}

public boolean[] getUpdateColumn() {
return updateColumn;
}

public boolean hasMatchColumn() {

for (boolean matchColumn: this.matchColumn) {
if (matchColumn) {
return true;
}
}
return false;
}

public boolean hasUpdateColumn() {

for (boolean updateColumn: this.updateColumn) {
if (updateColumn) {
return true;
}
}

return false;
}

}
Loading

0 comments on commit 4dd3537

Please sign in to comment.