Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
[pip][design] PIP-281: Add notifyError method on PushSource (apache#2…
Browse files Browse the repository at this point in the history
…0807)

Co-authored-by: Anonymitaet <50226895+Anonymitaet@users.noreply.github.com>
  • Loading branch information
shibd and Anonymitaet authored Aug 15, 2023
1 parent 3ab420c commit 9127d89
Showing 1 changed file with 103 additions and 0 deletions.
103 changes: 103 additions & 0 deletions pip/pip-281.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Title: [io] Add notifyError method on PushSource

## Motivation

In function framework, when [source.read()](https://github.com/apache/pulsar/blob/f7c0b3c49c9ad8c28d0b00aa30d727850eb8bc04/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L496-L506) method throw an exception, it will trigger close function instance. If it is in the k8s environment, it will be restarted,
you can use the [PushSource](https://github.com/apache/pulsar/blob/branch-3.0/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java) class and extend it to quickly implement the push message model.
It overrides the `read` method and provides the `consume` method for the user to call.

However, if the source connector extends from the class,
it cannot notify the function framework if it encounters an exception while consuming data internally,
in other words, the function call `source.read()` never triggers an exception and never exits the process.


## Goals

Add `notifyError` method on PushSource, This method can receive an exception and put the exception in the queue. The next time an exception is `read`, will throws exception.
```java

public Record<T> read() throws Exception {
Record<T> record = queue.take();
if (record instanceof ErrorNotifierRecord) {
throw ((ErrorNotifierRecord) record).getException();
}
return record;
}


/**
* Allows the source to notify errors asynchronously.
* @param ex
*/
public void notifyError(Exception ex) {
consume(new ErrorNotifierRecord(ex));
}
}
```

Just like the implementation of the current [BatchPushSource](https://github.com/apache/pulsar/blob/branch-3.0/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java)


### Compatibility

This PIP is to provide a method for users rather than introducing a new interface.

- So it is forward compatible
- However, connectors using this method are not backward compatible.
For example, If a Kafka source connector built upon pulsar-io v3.1 (including features introduced in this PIP) and uses the `notifyError` method,
when it switches back to pulsar-io v3.0 (excluding features introduced in this PIP), it will encounter errors during compilation.

### In Scope

After this PIP, the source connectors can extends the `PushSource`, and use `notifyError` method to throw exception. Such as:
- [KafkaSourceConnector](https://github.com/apache/pulsar/blob/branch-3.0/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java)
- [CanalSourceConnector](https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalAbstractSource.java#L43)
- [MongoSourceConnector](https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSource.java#L59)
- etc.

### Out of Scope
None

## Design & Implementation Details

- Abstract BatchPushSource logic to AbstractPushSource.
- Let PushSource to extends AbstractPushSource to extend a new method(notifyError).

Please refer this PR: https://github.com/apache/pulsar/pull/20791

## Note
None


## Concrete Example

### BEFORE
- Not possible

### AFTER

```java
public class PushSourceTest {

PushSource testBatchSource = new PushSource() {
@Override
public void open(Map config, SourceContext context) throws Exception {

}

@Override
public void close() throws Exception {

}
};

@Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = "test exception")
public void testNotifyErrors() throws Exception {
testBatchSource.notifyError(new RuntimeException("test exception"));
testBatchSource.readNext();
}
}
```

## Links
None

0 comments on commit 9127d89

Please sign in to comment.