Skip to content

Commit

Permalink
PDI-5211: As an ETL Designer, I want a BulkLoad step for Greenplum th…
Browse files Browse the repository at this point in the history
…at wraps the gpload facility. Changed "master port" on gui, additional unit tests, valdiation of paths when the command line is built, support for variable substitution.

git-svn-id: svn://source.pentaho.org/svnkettleroot/Kettle/trunk@15074 5fb7f6ec-07c1-534a-b4ca-9155e429e800
  • Loading branch information
sflatley committed Apr 26, 2011
1 parent 8ddaac4 commit b616513
Show file tree
Hide file tree
Showing 12 changed files with 635 additions and 179 deletions.
52 changes: 26 additions & 26 deletions src-ui/org/pentaho/di/ui/trans/steps/gpload/GPLoadDialog.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ public class GPLoadDialog extends BaseStepDialog implements StepDialogInterface
private TextVar wSchema;
private FormData fdlSchema, fdSchema;

private Label wlMasterPort;
private TextVar wMasterPort;
private FormData fdlMasterPort, fdMasterPort;
private Label wlLocalhostPort;
private TextVar wLocalhostPort;
private FormData fdlLocalhostPort, fdLocalhostPort;

private Label wlTable;
private Button wbTable;
Expand Down Expand Up @@ -823,8 +823,8 @@ public void getData() {
wLogFile.setText(input.getLogFile());
if (input.getEncoding() != null)
wEncoding.setText(input.getEncoding());
if (input.getMasterPort() != null)
wMasterPort.setText(input.getMasterPort());
if (input.getLocalhostPort() != null)
wLocalhostPort.setText(input.getLocalhostPort());
wEraseFiles.setSelection(input.isEraseFiles());

String method = input.getLoadMethod();
Expand Down Expand Up @@ -914,7 +914,7 @@ else if (BaseMessages
inf.setLogFile(wLogFile.getText());
inf.setEncoding(wEncoding.getText());
inf.setEraseFiles(wEraseFiles.getSelection());
inf.setMasterPort(wMasterPort.getText());
inf.setLocalhostPort(wLocalhostPort.getText());
inf.setDelimiter(wDelimiter.getText());

/*
Expand Down Expand Up @@ -1213,33 +1213,33 @@ private void addLocalHostsTabItem(CTabFolder tabFolder, int margin, ModifyListen
wLocalHostsComp.setLayout(formLayout);

// Master Port line...
wlMasterPort = new Label(wLocalHostsComp, SWT.NONE);
wlMasterPort.setText(BaseMessages.getString(PKG, "GPLoadDialog.MasterPort.Label")); //$NON-NLS-1$
props.setLook(wlMasterPort);
fdlMasterPort = new FormData();
fdlMasterPort.left = new FormAttachment(0, 0);
//fdlMasterPort.right = new FormAttachment(middle, -margin);
fdlMasterPort.top = new FormAttachment(0, margin * 2);
wlMasterPort.setLayoutData(fdlMasterPort);

wMasterPort = new TextVar(transMeta, wLocalHostsComp, SWT.SINGLE | SWT.LEFT | SWT.BORDER);
props.setLook(wMasterPort);
wMasterPort.addModifyListener(lsMod);
wMasterPort.addFocusListener(lsFocusLost);
fdMasterPort = new FormData();
fdMasterPort.left = new FormAttachment(wlMasterPort, 0);
fdMasterPort.top = new FormAttachment(0, margin * 2);
fdMasterPort.right = new FormAttachment(middle, 0);
wMasterPort.setLayoutData(fdMasterPort);
wMasterPort.addModifyListener(lsMod);
wlLocalhostPort = new Label(wLocalHostsComp, SWT.NONE);
wlLocalhostPort.setText(BaseMessages.getString(PKG, "GPLoadDialog.Port.Label")); //$NON-NLS-1$
props.setLook(wlLocalhostPort);
fdlLocalhostPort = new FormData();
fdlLocalhostPort.left = new FormAttachment(0, 0);
//fdlLocalhostPort.right = new FormAttachment(middle, -margin);
fdlLocalhostPort.top = new FormAttachment(0, margin * 2);
wlLocalhostPort.setLayoutData(fdlLocalhostPort);

wLocalhostPort = new TextVar(transMeta, wLocalHostsComp, SWT.SINGLE | SWT.LEFT | SWT.BORDER);
props.setLook(wLocalhostPort);
wLocalhostPort.addModifyListener(lsMod);
wLocalhostPort.addFocusListener(lsFocusLost);
fdLocalhostPort = new FormData();
fdLocalhostPort.left = new FormAttachment(wlLocalhostPort, 0);
fdLocalhostPort.top = new FormAttachment(0, margin * 2);
fdLocalhostPort.right = new FormAttachment(middle, 0);
wLocalhostPort.setLayoutData(fdLocalhostPort);
wLocalhostPort.addModifyListener(lsMod);

// Local Hosts Label
wlLocalHosts = new Label(wLocalHostsComp, SWT.NONE);
wlLocalHosts.setText(BaseMessages.getString(PKG, "GPLoadDialog.LocalHosts.Label")); //$NON-NLS-1$
props.setLook(wlLocalHosts);
fdlLocalHosts = new FormData();
fdlLocalHosts.left = new FormAttachment(0, 0);
fdlLocalHosts.top = new FormAttachment(wMasterPort, margin);
fdlLocalHosts.top = new FormAttachment(wLocalhostPort, margin);
wlLocalHosts.setLayoutData(fdlLocalHosts);

// Local Hosts Table
Expand Down
165 changes: 94 additions & 71 deletions src/org/pentaho/di/trans/steps/gpload/GPLoad.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.io.InputStreamReader;

import org.apache.commons.vfs.FileObject;
import org.apache.commons.vfs.FileSystemException;
import org.pentaho.di.core.Const;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.exception.KettleException;
Expand Down Expand Up @@ -65,7 +66,6 @@ public class GPLoad extends BaseStep implements StepInterface
private static String CLOSE_BRACKET = "]";
private static String SPACE_PADDED_DASH = " - ";
private static String COLON = ":";
private static int GPLOAD_MAX_ERRORS_DEFAULT = 0;

Process gploadProcess = null;

Expand Down Expand Up @@ -233,16 +233,16 @@ public String getControlFileContents(GPLoadMeta meta, RowMetaInterface rm, Objec
}
stringLocalHosts = sbLocalHosts.toString();
if (!Const.isEmpty(stringLocalHosts)) {
contents.append(GPLoad.INDENT).append("- LOCAL_HOSTNAME: ").append(Const.CR).append(stringLocalHosts);
contents.append(GPLoad.INDENT).append(GPLoad.INDENT).append("LOCAL_HOSTNAME: ").append(Const.CR).append(stringLocalHosts);
}
}

// Add a PORT section if we have a port
String masterPort = meta.getMasterPort();
if (!Const.isEmpty(masterPort)) {
masterPort = environmentSubstitute(meta.getMasterPort().trim());
if (!Const.isEmpty(masterPort)) {
contents.append(GPLoad.INDENT).append("- PORT: ").append(masterPort).append(Const.CR);
String localhostPort = meta.getLocalhostPort();
if (!Const.isEmpty(localhostPort)) {
localhostPort = environmentSubstitute(localhostPort).trim();
if (!Const.isEmpty(localhostPort)) {
contents.append(GPLoad.INDENT).append(GPLoad.INDENT).append("PORT: ").append(localhostPort).append(Const.CR);
}
}

Expand Down Expand Up @@ -299,13 +299,13 @@ public String getControlFileContents(GPLoadMeta meta, RowMetaInterface rm, Objec
}
}

contents.append(GPLoad.INDENT).append("- ERROR_LIMIT: ").append(meta.getMaxErrors()).append(Const.CR);
contents.append(GPLoad.INDENT).append("- ERROR_LIMIT: ").append(maxErrors).append(Const.CR);

String errorTableName = meta.getErrorTableName();
if (!Const.isEmpty(errorTableName)) {
errorTableName = environmentSubstitute(errorTableName).trim();
if (!Const.isEmpty(errorTableName)) {
contents.append(GPLoad.INDENT).append("- ERROR_TABLE: ").append(meta.getErrorTableName()).append(Const.CR);
contents.append(GPLoad.INDENT).append("- ERROR_TABLE: ").append(errorTableName).append(Const.CR);
}
}

Expand Down Expand Up @@ -356,8 +356,19 @@ public String getControlFileContents(GPLoadMeta meta, RowMetaInterface rm, Objec
* @param meta
* @throws KettleException
*/
public void createControlFile(String filename, Object[] row, GPLoadMeta meta) throws KettleException
public void createControlFile(Object[] row, GPLoadMeta meta) throws KettleException
{
String filename = meta.getControlFile();
if (Const.isEmpty(filename)) {
throw new KettleException(BaseMessages.getString(PKG, ""));
}
else {
filename = environmentSubstitute(filename).trim();
if (Const.isEmpty(filename)) {
throw new KettleException(BaseMessages.getString(PKG, ""));
}
}

File controlFile = new File(filename);
FileWriter fw = null;

Expand All @@ -380,9 +391,65 @@ public void createControlFile(String filename, Object[] row, GPLoadMeta meta) th
catch ( Exception ex ) {}
}
}

/**
* Returns the path to the pathToFile. It should be the same as what was
* passed but this method will check the file system to see if the path
* is valid.
*
* @param pathToFile Path to the file to verify.
* @param exceptionMessageKey The key of the message in the properties bundle to use when the path is not provided.
* @param checkExistence When true the path's existence will be verified.
* @return
* @throws KettleException
*/
private String getPath(String pathToFile, String exceptionMessageKey, boolean checkExistenceOfFile)
throws KettleException {

// Make sure the path is not empty
if (Const.isEmpty(pathToFile))
{
throw new KettleException(BaseMessages.getString(PKG, exceptionMessageKey));
}

// make sure the variable substitution is not empty
pathToFile = environmentSubstitute(pathToFile).trim();
if (Const.isEmpty(pathToFile))
{
throw new KettleException(BaseMessages.getString(PKG, exceptionMessageKey));
}

FileObject fileObject = KettleVFS.getFileObject(pathToFile, getTransMeta());
try
{
// we either check the existence of the file
if (checkExistenceOfFile)
{
if (!fileObject.exists()) {
throw new KettleException(BaseMessages.getString(PKG, "GPLoad.Execption.FileDoesNotExist", pathToFile));
}
}
else { // if the file does not have to exist, the parent, or source folder, does.
FileObject parentFolder = fileObject.getParent();
if (parentFolder.exists()) {
return KettleVFS.getFilename(fileObject);
}
else {
throw new KettleException(BaseMessages.getString(PKG, "GPLoad.Exception.DirectoryDoesNotExist", parentFolder.getURL().getPath()));
}

}
return KettleVFS.getFilename(fileObject);
}
catch (FileSystemException fsex)
{
throw new KettleException(BaseMessages.getString(PKG, "GPLoad.Exception.GPLoadCommandBuild", fsex.getMessage()));
}
}


/**
* Create the command line for a psql process depending on the meta
* Create the command line for GPLoad depending on the meta
* information supplied.
*
* @param meta The meta data to create the command line from
Expand All @@ -394,66 +461,22 @@ public void createControlFile(String filename, Object[] row, GPLoadMeta meta) th
*/
public String createCommandLine(GPLoadMeta meta, boolean password) throws KettleException
{
StringBuffer sb = new StringBuffer(300);
StringBuffer sbCommandLine = new StringBuffer(300);

if ( meta.getGploadPath() != null )
{
try
{
FileObject fileObject = KettleVFS.getFileObject(environmentSubstitute(meta.getGploadPath()), getTransMeta());
String psqlexec = KettleVFS.getFilename(fileObject);
//sb.append('\'').append(psqlexec).append('\'');
sb.append(psqlexec);
}
catch ( Exception ex )
{
throw new KettleException("Error retrieving sqlldr string", ex);
}
}
else
{
throw new KettleException("No psql application specified");
}

if ( meta.getControlFile() != null )
{
try
{
FileObject fileObject = KettleVFS.getFileObject(environmentSubstitute(meta.getControlFile()), getTransMeta());

sb.append(" -f ");
//sb.append('\'').append(KettleVFS.getFilename(fileObject)).append('\'');
sb.append(KettleVFS.getFilename(fileObject));
}
catch ( Exception ex )
{
throw new KettleException("Error retrieving controlfile string", ex);
}
}
else
{
throw new KettleException("No control file specified");
}

if ( meta.getLogFile() != null )
{
try
{
FileObject fileObject = KettleVFS.getFileObject(environmentSubstitute(meta.getLogFile()), getTransMeta());
// get path to the executable
sbCommandLine.append(getPath(meta.getGploadPath(), "GPLoad.Exception.GPLoadPathMisssing", true));

sb.append(" -l ");
sb.append('\'').append(KettleVFS.getFilename(fileObject)).append('\'');
}
catch ( Exception ex )
{
throw new KettleException("Error retrieving logfile string", ex);
}
}

// hostname, port and so on are passed through the control file
//

return sb.toString();
// get the path to the control file
sbCommandLine.append(" -f ");
sbCommandLine.append(getPath(meta.getControlFile(), "GPLoad.Exception.ControlFilePathMissing", false));

// get the path to the log file, if specified
String logfile = meta.getLogFile();
if (!Const.isEmpty(logfile)) {
sbCommandLine.append(" -l ");
sbCommandLine.append(getPath(meta.getLogFile(), "GPLoad.Exception.LogFilePathMissing", false));
}
return sbCommandLine.toString();
}

public boolean execute(GPLoadMeta meta, boolean wait) throws KettleException
Expand Down Expand Up @@ -552,8 +575,8 @@ public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws K
if (first)
{
first=false;
createControlFile(environmentSubstitute(meta.getControlFile()), r, meta);
output = new GPLoadDataOutput(meta, log.getLogLevel());
createControlFile(r, meta);
output = new GPLoadDataOutput(this, meta, log.getLogLevel());

// if ( GPLoadMeta.METHOD_AUTO_CONCURRENT.equals(meta.getLoadMethod()) )
// {
Expand Down
Loading

0 comments on commit b616513

Please sign in to comment.