In function framework, when source.read() method throw an exception, it will trigger close function instance. If it is in the k8s environment, it will be restarted,
you can use the PushSource class and extend it to quickly implement the push message model.
It overrides the read
method and provides the consume
method for the user to call.
However, if the source connector extends from the class,
it cannot notify the function framework if it encounters an exception while consuming data internally,
in other words, the function call source.read()
never triggers an exception and never exits the process.
Add notifyError
method on PushSource, This method can receive an exception and put the exception in the queue. The next time an exception is read
, will throws exception.
public Record<T> read() throws Exception {
Record<T> record = queue.take();
if (record instanceof ErrorNotifierRecord) {
throw ((ErrorNotifierRecord) record).getException();
}
return record;
}
/**
* Allows the source to notify errors asynchronously.
* @param ex
*/
public void notifyError(Exception ex) {
consume(new ErrorNotifierRecord(ex));
}
}
Just like the implementation of the current BatchPushSource
This PIP is to provide a method for users rather than introducing a new interface.
- So it is forward compatible
- However, connectors using this method are not backward compatible.
For example, If a Kafka source connector built upon pulsar-io v3.1 (including features introduced in this PIP) and uses the
notifyError
method, when it switches back to pulsar-io v3.0 (excluding features introduced in this PIP), it will encounter errors during compilation.
After this PIP, the source connectors can extends the PushSource
, and use notifyError
method to throw exception. Such as:
None
- Abstract BatchPushSource logic to AbstractPushSource.
- Let PushSource to extends AbstractPushSource to extend a new method(notifyError).
Please refer this PR: #20791
None
- Not possible
public class PushSourceTest {
PushSource testBatchSource = new PushSource() {
@Override
public void open(Map config, SourceContext context) throws Exception {
}
@Override
public void close() throws Exception {
}
};
@Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = "test exception")
public void testNotifyErrors() throws Exception {
testBatchSource.notifyError(new RuntimeException("test exception"));
testBatchSource.readNext();
}
}
None