Skip to content

Commit

Permalink
updated logs messages
Browse files Browse the repository at this point in the history
  • Loading branch information
cwensel committed May 27, 2009
1 parent 92d067e commit d6c624f
Showing 1 changed file with 25 additions and 13 deletions.
38 changes: 25 additions & 13 deletions src/java/cascading/jdbc/db/DBOutputFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ public class DBOutputFormat<K extends DBWritable, V> implements OutputFormat<K,
/** A RecordWriter that writes the reduce output to a SQL table */
protected class DBRecordWriter implements RecordWriter<K, V>
{

private Connection connection;
private PreparedStatement insertStatement;
private PreparedStatement updateStatement;
Expand All @@ -79,12 +78,12 @@ protected DBRecordWriter( Connection connection, PreparedStatement insertStateme
/** {@inheritDoc} */
public void close( Reporter reporter ) throws IOException
{

executeBatch();

try
{
insertStatement.close();
if( insertStatement != null )
insertStatement.close();

if( updateStatement != null )
updateStatement.close();
Expand All @@ -95,7 +94,7 @@ public void close( Reporter reporter ) throws IOException
{
rollBack();

createThrowMessage( "unable to commit batch", exception );
createThrowMessage( "unable to commit batch", 0, exception );
}
finally
{
Expand All @@ -115,29 +114,37 @@ private void executeBatch() throws IOException
try
{
if( insertStatementsCurrent != 0 )
{
LOG.info( "executing insert batch " + createBatchMessage( insertStatementsCurrent ) );

insertStatement.executeBatch();
}

insertStatementsCurrent = 0;
}
catch( SQLException exception )
{
rollBack();

createThrowMessage( "unable to execute insert batch", exception );
createThrowMessage( "unable to execute insert batch", insertStatementsCurrent, exception );
}

try
{
if( updateStatementsCurrent != 0 )
{
LOG.info( "executing update batch " + createBatchMessage( updateStatementsCurrent ) );

updateStatement.executeBatch();
}

updateStatementsCurrent = 0;
}
catch( SQLException exception )
{
rollBack();

createThrowMessage( "unable to execute update batch", exception );
createThrowMessage( "unable to execute update batch", updateStatementsCurrent, exception );
}
}

Expand All @@ -153,21 +160,29 @@ private void rollBack()
}
}

private void createThrowMessage( String stateMessage, SQLException exception ) throws IOException
private String createBatchMessage( long currentStatements )
{
return String.format( "[totstmts: %d][crntstmts: %d][batch: %d]", statementsAdded, currentStatements, statementsBeforeExecute );
}

private void createThrowMessage( String stateMessage, long currentStatements, SQLException exception ) throws IOException
{
String message = exception.getMessage();

message = message.substring( 0, Math.min( 75, message.length() ) );

String errorMessage = String.format( "%s [length: %d][stmts: %d]: %s", stateMessage, exception.getMessage().length(), statementsAdded, message );
int messageLength = exception.getMessage().length();
String batchMessage = createBatchMessage( currentStatements );
String template = "%s [msglength: %d]%s %s";
String errorMessage = String.format( template, stateMessage, messageLength, batchMessage, message );

LOG.error( errorMessage, exception.getNextException() );

throw new IOException( errorMessage, exception.getNextException() );
}

/** {@inheritDoc} */
public void write( K key, V value ) throws IOException
public synchronized void write( K key, V value ) throws IOException
{
try
{
Expand Down Expand Up @@ -290,7 +305,6 @@ public void checkOutputSpecs( FileSystem filesystem, JobConf job ) throws IOExce
{
}


/** {@inheritDoc} */
public RecordWriter<K, V> getRecordWriter( FileSystem filesystem, JobConf job, String name, Progressable progress ) throws IOException
{
Expand All @@ -305,14 +319,13 @@ public RecordWriter<K, V> getRecordWriter( FileSystem filesystem, JobConf job, S

configureConnection( connection );


String sqlInsert = constructInsertQuery( tableName, fieldNames );
PreparedStatement insertPreparedStatement;

try
{
insertPreparedStatement = connection.prepareStatement( sqlInsert );
insertPreparedStatement.setEscapeProcessing( true ); // should be on be default
insertPreparedStatement.setEscapeProcessing( true ); // should be on by default
}
catch( SQLException exception )
{
Expand All @@ -331,7 +344,6 @@ public RecordWriter<K, V> getRecordWriter( FileSystem filesystem, JobConf job, S
throw new IOException( "unable to create statement for: " + sqlUpdate, exception );
}


return new DBRecordWriter( connection, insertPreparedStatement, updatePreparedStatement, batchStatements );
}

Expand Down

0 comments on commit d6c624f

Please sign in to comment.