Skip to content

Commit

Permalink
[BACKLOG-15764] Introducing new Logger for use inside Operation Funct…
Browse files Browse the repository at this point in the history
…ions. Modifying the Trans Adapter to subscribe to Operation Logging
  • Loading branch information
pentaho-nbaker committed Apr 24, 2017
1 parent fdc9641 commit 6a3351c
Showing 1 changed file with 55 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package org.pentaho.di.trans.ael.adapters;

import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.logging.LogChannelInterface;
import org.pentaho.di.engine.api.Engine;
import org.pentaho.di.engine.api.ExecutionContext;
import org.pentaho.di.engine.api.ExecutionResult;
Expand All @@ -38,6 +39,7 @@
import org.pentaho.di.trans.RowProducer;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.StepInterface;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.step.StepMetaDataCombi;
import org.reactivestreams.Subscriber;
Expand Down Expand Up @@ -66,7 +68,8 @@ public class TransEngineAdapter extends Trans {


public static final Map<org.pentaho.di.core.logging.LogLevel, LogLevel> LEVEL_MAP = new HashMap<>();
static{

static {
LEVEL_MAP.put( org.pentaho.di.core.logging.LogLevel.BASIC, LogLevel.BASIC );
LEVEL_MAP.put( org.pentaho.di.core.logging.LogLevel.DEBUG, LogLevel.DEBUG );
LEVEL_MAP.put( org.pentaho.di.core.logging.LogLevel.DETAILED, LogLevel.DETAILED );
Expand All @@ -93,43 +96,68 @@ public TransEngineAdapter( Engine engine, TransMeta transMeta ) {
@Override public void prepareExecution( String[] arguments ) throws KettleException {
setSteps( new ArrayList<>( opsToSteps() ) );
wireStatusToTransListeners();

subscribeToOpLogging();

executionContext.subscribe( transformation, LogEntry.class, new Subscriber<PDIEvent<Transformation, LogEntry>>() {
@Override public void onSubscribe( Subscription subscription ) {
subscription.request( Long.MAX_VALUE );
}

@Override public void onNext( PDIEvent<Transformation, LogEntry> event ) {
LogEntry data = event.getData();
LogLevel logLogLevel = data.getLogLogLevel();
switch( logLogLevel ) {
case ERROR:
getLogChannel().logError( data.getMessage() );
break;
case MINIMAL:
getLogChannel().logMinimal( data.getMessage() );
break;
case BASIC:
getLogChannel().logBasic( data.getMessage() );
break;
case DETAILED:
getLogChannel().logDetailed( data.getMessage() );
break;
case DEBUG:
getLogChannel().logDebug( data.getMessage() );
break;
case TRACE:
getLogChannel().logRowlevel( data.getMessage() );
break;
}
logToChannel( getLogChannel(), data );

}

@Override public void onError( Throwable throwable ) {}
@Override public void onError( Throwable throwable ) {
}

@Override public void onComplete() {}
@Override public void onComplete() {
}
} );
setReadyToStart( true );
}

private void logToChannel( LogChannelInterface logChannel, LogEntry data ) {
LogLevel logLogLevel = data.getLogLogLevel();
switch ( logLogLevel ) {
case ERROR:
logChannel.logError( data.getMessage() );
break;
case MINIMAL:
logChannel.logMinimal( data.getMessage() );
break;
case BASIC:
logChannel.logBasic( data.getMessage() );
break;
case DETAILED:
logChannel.logDetailed( data.getMessage() );
break;
case DEBUG:
logChannel.logDebug( data.getMessage() );
break;
case TRACE:
logChannel.logRowlevel( data.getMessage() );
break;
}
}

private void subscribeToOpLogging() {
transformation.getOperations().forEach( operation -> {
executionContext.subscribe( operation, LogEntry.class, logEntry -> {
StepInterface stepInterface = findStepInterface( operation.getId(), 0 );
if ( stepInterface != null ) {
LogChannelInterface logChannel = stepInterface.getLogChannel();
logToChannel( logChannel, logEntry );
} else {
// Could not find step, log at transformation level instead
logToChannel( getLogChannel(), logEntry );
}
} );
} );
}

private void wireStatusToTransListeners() {
executionContext.subscribe( transformation, Status.class,
new Subscriber<PDIEvent<Transformation, Status>>() {
Expand Down Expand Up @@ -166,7 +194,9 @@ private void wireStatusToTransListeners() {
getLogChannel().logError( "Error Executing Transformation", t );
setFinished( true );
// emit error on all steps
getSteps().stream().map( stepMetaDataCombi -> stepMetaDataCombi.step ).forEach( step-> { step.setStopped( true ); step.setRunning( false ); } );
getSteps().stream().map( stepMetaDataCombi -> stepMetaDataCombi.step ).forEach( step -> {
step.setStopped( true ); step.setRunning( false );
} );
getTransListeners().forEach( l -> {
try {
l.transFinished( TransEngineAdapter.this );
Expand Down

0 comments on commit 6a3351c

Please sign in to comment.