Skip to content

Commit

Permalink
Reader interface documentation (apache#1096)
Browse files Browse the repository at this point in the history
* add basic description in concepts/architecture doc

* begin adding section to java API doc

* finish draft of java API section

* finish draft of concepts/architecture section

* add python example
  • Loading branch information
lucperkins authored and merlimat committed Jan 26, 2018
1 parent 4e0812b commit 57e9d39
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 8 deletions.
40 changes: 32 additions & 8 deletions site/docs/latest/clients/Java.md
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,30 @@ CompletableFuture<Message> asyncMessage = consumer.receiveAsync();

Async receive operations return a {% javadoc Message client org.apache.pulsar.client.api.Message %} wrapped in a [`CompletableFuture`](http://www.baeldung.com/java-completablefuture).

## Reader interface

The Pulsar [Reader API](../../getting-started/ConceptsAndArchitecture#reader-interface) enables applications to access messages on Pulsar {% popover topics %}

With the Reader API, Pulsar clients can "manually position" themselves within a topic, reading all messages from a specified message onward. The Pulsar API for Java enables you to create {% javadoc Reader client org.apache.pulsar.client.api.Reader %} objects by specifying a {% popover topic %}, a {% javadoc MessageId client org.apache.pulsar.client.api.MessageId %}, and {% javadoc ReaderConfiguration client org.apache.pulsar.client.api.ReaderConfiguration %}.

Here's an example:

```java
ReaderConfiguration conf = new ReaderConfiguration();
byte[] msgIdBytes = // Some message ID byte array
MessageId id = MessageId.fromByteArray(msgIdBytes);
Reader reader = pulsarClient.createReader(topic, id, conf);

while (true) {
Message message = reader.readNext();
// Process message
}
```

In the example above, a `Reader` object is instantiated for a specific topic and message (by ID); the reader then iterates over each message in the topic after the message identified by `msgIdBytes` (how that value is obtained depends on the application).

The code sample above shows pointing the `Reader` object to a specific message (by ID), but you can also use `MessageId.earliest` to point to the earliest available message on the topic of `MessageId.latest` to point to the most recent available message.

## Authentication

Pulsar currently supports two authentication schemes: [TLS](../../admin/Authz#tls-client-auth) and [Athenz](../../admin/Authz#athenz). The Pulsar Java client can be used with both.
Expand All @@ -235,8 +259,7 @@ authParams.put("tlsCertFile", "/path/to/client-cert.pem");
authParams.put("tlsKeyFile", "/path/to/client-key.pem");
conf.setAuthentication(AuthenticationTls.class.getName(), authParams);

PulsarClient client = PulsarClient.create(
"pulsar+ssl://my-broker.com:6651", conf);
PulsarClient client = PulsarClient.create("pulsar+ssl://my-broker.com:6651", conf);
```

### Athenz
Expand Down Expand Up @@ -270,10 +293,11 @@ PulsarClient client = PulsarClient.create(
"pulsar+ssl://my-broker.com:6651", conf);
```

**Note**: *`privateKey` parameter supports following three patterns format*.
{% include admonition.html type="info" title="Supported pattern formats"
content='
The `privateKey` parameter supports the following three pattern formats:

* `file:///path/to/file`
* `file:/path/to/file`
* `data:application/x-pem-file;base64,<base64-encoded value>`' %}

```
file:///path/to/file
file:/path/to/file
data:application/x-pem-file;base64,<base64-encoded value>
```
16 changes: 16 additions & 0 deletions site/docs/latest/clients/Python.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,19 @@ while True:

client.close()
```

### Reader interface example

You can use the Pulsar Python API to use the Pulsar [reader interface](../../getting-started/ConceptsAndArchitecture#reader-interface). Here's an example:

```python
# MessageId taken from a previously fetched message
msg_id = msg.message_id()

reader = client.create_reader(TOPIC, msg_id)

while True:
msg = reader.receive()
print("Received message '%s' id='%s'", msg.data(), msg.message_id())
# No acknowledgment
```
51 changes: 51 additions & 0 deletions site/docs/latest/getting-started/ConceptsAndArchitecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ As in other pub-sub systems, topics in Pulsar are named channels for transmittin
content="Application does not explicitly create the topic but attempting to write or receive message on a topic that does not yet exist, Pulsar will automatically create that topic under the [namespace](#namespace)." %}

### Namespace

A namespace is a logical nomenclature within a property. A property can create multiple namespaces via [admin API](../../admin-api/namespaces#create). For instance, a property with different applications can create a separate namespace for each application. A namespace allows the application to create and manage a hierarchy of topics.
For e.g. `my-property/my-cluster/my-property-app1` is a namespace for the application `my-property-app1` in cluster `my-cluster` for `my-property`.
Application can create any number of [topics](#topics) under the namespace.
Expand Down Expand Up @@ -311,3 +312,53 @@ Whenever the TCP connection breaks, the client will immediately re-initiate this
[Clients](../../getting-started/Clients) connecting to Pulsar {% popover brokers %} need to be able to communicate with an entire Pulsar {% popover instance %} using a single URL. Pulsar provides a built-in service discovery mechanism that you can set up using the instructions in the [Deploying a Pulsar instance](../../deployment/InstanceSetup#service-discovery-setup) guide.

You can use your own service discovery system if you'd like. If you use your own system, there is just one requirement: when a client performs an HTTP request to an endpoint, such as `http://pulsar.us-west.example.com:8080`, the client needs to be redirected to *some* active broker in the desired {% popover cluster %}, whether via DNS, an HTTP or IP redirect, or some other means.

## Reader interface

In Pulsar, the "standard" [consumer interface](#consumers) involves using {% popover consumers %} to listen on {% popover topics %}, process incoming messages, and finally {% popover acknowledge %} those messages when they've been processed. Whenever a consumer disconnects from and then reconnects to a topic, it automatically begins reading from the earliest un-acked message onward because the topic's cursor is automatically managed by Pulsar.

The **reader interface** for Pulsar enables applications to manually manage cursors. When you use a reader to connect to a topic---rather than a consumer---you need to specify *which* message the reader begins reading from. When specifying that initial message, the reader interface gives you three options:

* The **earliest** available message in the topic
* The **latest** available message in the topic
* Some other message between the earliest and the latest. If you select this option, you'll need to explicitly provide a message ID. Your application will be responsible for "knowing" this message ID in advance, perhaps fetching it from a persistent data store or cache.

The reader interface is helpful for use cases like using Pulsar to provide [effectively-once](https://streaml.io/blog/exactly-once/) processing semantics for a stream processing system. For this use case, it's essential that the stream processing system be able to "rewind" topics to a specific message and begin reading there. The reader interface provides Pulsar clients with the low-level abstraction necessary to "manually position" themselves within a topic.

{% include admonition.html type="warning" title="Non-partitioned topics only"
content="The reader interface for Pulsar cannot currently be used with [partitioned topics](#partitioned-topics)." %}

Here's a Java example that begins reading from the earliest available message on a topic:

```java
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Reader;

String topic = "persistent://sample/standalone/ns1/reader-api-test";
MessageId id = MessageId.earliest;

// Create a reader on a topic and for a specific message (and onward)
Reader reader = pulsarClient.createReader(topic, id, new ReaderConfiguration());

while (true) {
Message message = reader.readNext();

// Process the message
}
```

To create a reader that will read from the latest available message:

```java
MessageId id = MessageId.latest;
Reader reader = pulsarClient.createReader(topic, id, new ReaderConfiguration());
```

To create a reader that will read from some message between earliest and latest:

```java
byte[] msgIdBytes = // Some byte array
MessageId id = MessageId.fromByteArray(msgIdBytes);
Reader reader = pulsarClient.createReader(topic, id, new ReaderConfiguration());
```

0 comments on commit 57e9d39

Please sign in to comment.