Skip to content

Commit

Permalink
support for both INSERT and UPDATE during tuple sinking.
Browse files Browse the repository at this point in the history
  • Loading branch information
cwensel committed Apr 21, 2009
1 parent 540d301 commit ac6f0dc
Show file tree
Hide file tree
Showing 6 changed files with 227 additions and 55 deletions.
93 changes: 76 additions & 17 deletions src/java/cascading/jdbc/JDBCScheme.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,39 +21,84 @@
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.util.Util;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;

/** Class JDBCScheme ... */
/**
* Class JDBCScheme defines what its parent Tap will select and insert/update into the sql database.
* <p/>
* If updateBy column names are given, a SQL UPDATE statement will be generated if the values in those columns
* for the given Tuple are all not {@code null}. Otherwise an INSERT statement will be generated.
*/
public class JDBCScheme extends Scheme
{
private Class<? extends DBInputFormat> inputFormatClass;
private Class<? extends DBOutputFormat> outputFormatClass;
private String[] columns;
private String orderBy;
private String[] orderBy;
private String[] updateBy;
private Fields updateValueFields;
private Fields updateByFields;
private Fields columnFields;
private Tuple updateIfTuple;

public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, Class<? extends DBOutputFormat> outputFormatClass, String[] columns, String orderBy )
/**
* Constructor JDBCScheme creates a new JDBCScheme instance.
*
* @param inputFormatClass of type Class<? extends DBInputFormat>
* @param outputFormatClass of type Class<? extends DBOutputFormat>
* @param columns of type String[]
* @param orderBy of type String[]
* @param updateBy of type String[]
*/
public JDBCScheme( Class<? extends DBInputFormat> inputFormatClass, Class<? extends DBOutputFormat> outputFormatClass, String[] columns, String[] orderBy, String[] updateBy )
{
super( new Fields( columns ), new Fields( columns ) );
this.columnFields = new Fields( columns );

setSinkFields( columnFields );
setSourceFields( columnFields );

if( updateBy != null && updateBy.length != 0 )
{
this.updateBy = updateBy;
this.updateByFields = new Fields( updateBy );

if( !this.columnFields.contains( this.updateByFields ) )
throw new IllegalArgumentException( "columns must contain updateBy column names" );

this.updateValueFields = columnFields.subtract( updateByFields ).append( updateByFields );
this.updateIfTuple = Tuple.size( updateByFields.size() ); // all nulls
}

this.inputFormatClass = inputFormatClass;
this.outputFormatClass = outputFormatClass;
this.columns = columns;
this.orderBy = orderBy;

this.inputFormatClass = inputFormatClass;
this.outputFormatClass = outputFormatClass;
}

/**
* Constructor JDBCScheme creates a new JDBCScheme instance.
*
* @param columns of type String[]
* @param orderBy of type String
* @param columns of type String[]
* @param orderBy of type String[]
* @param updateBy of type String[]
*/
public JDBCScheme( String[] columns, String orderBy )
public JDBCScheme( String[] columns, String[] orderBy, String[] updateBy )
{
super( new Fields( columns ), new Fields( columns ) );
this( null, null, columns, orderBy, updateBy );
}

this.columns = columns;
this.orderBy = orderBy;
/**
* Constructor JDBCScheme creates a new JDBCScheme instance.
*
* @param columns of type String[]
* @param orderBy of type String[]
*/
public JDBCScheme( String[] columns, String[] orderBy )
{
this( null, null, columns, orderBy, null );
}

/**
Expand All @@ -63,13 +108,14 @@ public JDBCScheme( String[] columns, String orderBy )
*/
public JDBCScheme( String[] columns )
{
this( columns, null );
this( null, null, columns, null, null );
}

public void sourceInit( Tap tap, JobConf conf ) throws IOException
{
String tableName = ( (JDBCTap) tap ).getTableName();
DBInputFormat.setInput( conf, TupleRecord.class, tableName, null, orderBy, columns );
String joinedOrderBy = orderBy != null ? Util.join( orderBy, ", " ) : null;
DBInputFormat.setInput( conf, TupleRecord.class, tableName, null, joinedOrderBy, columns );

if( inputFormatClass != null )
conf.setInputFormat( inputFormatClass );
Expand All @@ -78,7 +124,7 @@ public void sourceInit( Tap tap, JobConf conf ) throws IOException
public void sinkInit( Tap tap, JobConf conf ) throws IOException
{
String tableName = ( (JDBCTap) tap ).getTableName();
DBOutputFormat.setOutput( conf, DBOutputFormat.class, tableName, columns );
DBOutputFormat.setOutput( conf, DBOutputFormat.class, tableName, columns, updateBy );

if( outputFormatClass != null )
conf.setOutputFormat( outputFormatClass );
Expand All @@ -91,8 +137,21 @@ public Tuple source( Object key, Object value )

public void sink( TupleEntry tupleEntry, OutputCollector outputCollector ) throws IOException
{
Tuple tuple = tupleEntry.selectTuple( getSinkFields() );
if( updateBy != null )
{
Tuple allValues = tupleEntry.selectTuple( updateValueFields );
Tuple updateValues = tupleEntry.selectTuple( updateByFields );

TupleRecord key = new TupleRecord( allValues );

if( updateValues.equals( updateIfTuple ) )
outputCollector.collect( key, null );
else
outputCollector.collect( key, key );

return;
}

outputCollector.collect( new TupleRecord( tuple ), null );
outputCollector.collect( new TupleRecord( tupleEntry.selectTuple( getSinkFields() ) ), null );
}
}
16 changes: 8 additions & 8 deletions src/java/cascading/jdbc/TableDesc.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ public class TableDesc implements Serializable
String[] columnNames;
/** Field columnDefs */
String[] columnDefs;
/** Field primaryKey */
String primaryKey;
/** Field primaryKeys */
String[] primaryKeys;

/**
* Constructor TableDesc creates a new TableDesc instance.
Expand All @@ -55,14 +55,14 @@ public TableDesc( String tableName, String[] columnNames )
* @param tableName of type String
* @param columnNames of type String[]
* @param columnDefs of type String[]
* @param primaryKey of type String
* @param primaryKeys of type String
*/
public TableDesc( String tableName, String[] columnNames, String[] columnDefs, String primaryKey )
public TableDesc( String tableName, String[] columnNames, String[] columnDefs, String[] primaryKeys )
{
this.tableName = tableName;
this.columnNames = columnNames;
this.columnDefs = columnDefs;
this.primaryKey = primaryKey;
this.primaryKeys = primaryKeys;
}

/**
Expand Down Expand Up @@ -108,7 +108,7 @@ protected List<String> addDefinitionsTo( List<String> createTableStatement )
protected List<String> addPrimaryKeyTo( List<String> createTableStatement )
{
if( hasPrimaryKey() )
createTableStatement.add( String.format( "PRIMARY KEY( %s )", primaryKey ) );
createTableStatement.add( String.format( "PRIMARY KEY( %s )", Util.join( primaryKeys, ", " ) ) );

return createTableStatement;
}
Expand All @@ -135,12 +135,12 @@ public String getTableExistsQuery()

private boolean hasPrimaryKey()
{
return primaryKey != null && primaryKey.length() != 0;
return primaryKeys != null && primaryKeys.length != 0;
}

@Override
public String toString()
{
return "TableDesc{" + "tableName='" + tableName + '\'' + ", columnNames=" + ( columnNames == null ? null : Arrays.asList( columnNames ) ) + ", primaryKey='" + primaryKey + '\'' + '}';
return "TableDesc{" + "tableName='" + tableName + '\'' + ", columnNames=" + ( columnNames == null ? null : Arrays.asList( columnNames ) ) + ", columnDefs=" + ( columnDefs == null ? null : Arrays.asList( columnDefs ) ) + ", primaryKeys=" + ( primaryKeys == null ? null : Arrays.asList( primaryKeys ) ) + '}';
}
}
1 change: 0 additions & 1 deletion src/java/cascading/jdbc/TupleRecord.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public TupleRecord()

public TupleRecord( Tuple tuple )
{

this.tuple = tuple;
}

Expand Down
13 changes: 13 additions & 0 deletions src/java/cascading/jdbc/db/DBConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ public class DBConfiguration
/** Field names in the Output table */
public static final String OUTPUT_FIELD_NAMES_PROPERTY = "mapred.jdbc.output.field.names";

/** Field names in the Output table */
public static final String OUTPUT_UPDATE_FIELD_NAMES_PROPERTY = "mapred.jdbc.output.update.field.names";

/** The number of statements to batch before executing */
public static final String BATCH_STATEMENTS_NUM = "mapred.jdbc.batch.statements.num";

Expand Down Expand Up @@ -260,6 +263,16 @@ void setOutputFieldNames( String... fieldNames )
job.setStrings( DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, fieldNames );
}

String[] getOutputUpdateFieldNames()
{
return job.getStrings( DBConfiguration.OUTPUT_UPDATE_FIELD_NAMES_PROPERTY );
}

void setOutputUpdateFieldNames( String... fieldNames )
{
job.setStrings( DBConfiguration.OUTPUT_UPDATE_FIELD_NAMES_PROPERTY, fieldNames );
}

int getBatchStatementsNum()
{
return job.getInt( DBConfiguration.BATCH_STATEMENTS_NUM, 1000 );
Expand Down
Loading

0 comments on commit ac6f0dc

Please sign in to comment.