A zero-dependency streaming XML-like parser with FSM-based state transitions, token boundary preservation, and Aho-Corasick pattern matching for structured text processing.
ContentStreamAdapter parses XML-like sectioned text that arrives token-by-token (e.g., from LLM streaming responses), extracts content while preserving path context, and outputs structured tokens in real-time.
- O(1) State Transitions: HashMap-based fast transition table
- Token Boundary Preservation: Maintains original token segmentation
- Aho-Corasick Algorithm: O(n) multi-pattern matching
- Multi-depth Path Support: Hierarchical structures like
/section/subsection/content - Alias Support: Map multiple tag names to the same path
- Attribute Support: Parse and filter tag attributes (e.g.,
<cite id="ref">) - Fault-tolerant: Unrecognized or invalid transitions output as text
- Java 21 or higher
- Zero runtime dependencies
This library is available via JitPack.
Step 1. Add JitPack repository to your settings.gradle:
dependencyResolutionManagement {
repositories {
mavenCentral()
maven { url 'https://jitpack.io' }
}
}Step 2. Add the dependency:
dependencies {
implementation 'com.github.agent-hanju:content-stream-adapter:0.1.6'
}Step 1. Add JitPack repository to your pom.xml:
<repositories>
<repository>
<id>jitpack.io</id>
<url>https://jitpack.io</url>
</repository>
</repositories>Step 2. Add the dependency:
<dependency>
<groupId>com.github.agent-hanju</groupId>
<artifactId>content-stream-adapter</artifactId>
<version>0.1.6</version>
</dependency>import me.hanju.adapter.ContentStreamAdapter;
import me.hanju.adapter.transition.TransitionSchema;
import me.hanju.adapter.payload.TaggedToken;
import java.util.List;
// 1. Define schema
TransitionSchema schema = TransitionSchema.root()
.tag("section", section -> section
.tag("subsection", subsection -> subsection
.tag("content"))
.tag("metadata"))
.tag("result");
// 2. Create adapter
ContentStreamAdapter adapter = new ContentStreamAdapter(schema);
// 3. Feed tokens
String input = "Hello <section><subsection><content>world</content></subsection></section>!";
List<TaggedToken> tokens = adapter.feedToken(input);
// 4. Process output
for (TaggedToken token : tokens) {
System.out.println("Path: " + token.path() + ", Content: " + token.content());
}
// 5. Flush buffer (on stream end)
List<TaggedToken> remaining = adapter.flush();Path: /, Content: Hello
Path: /section/subsection/content, Content: world
Path: /, Content: !
Map multiple tag names to the same path:
TransitionSchema schema = TransitionSchema.root()
.tag("cite").alias("rag") // Both <cite> and <rag> map to /cite
.tag("think").alias("thinking"); // Both <think> and <thinking> map to /think
ContentStreamAdapter adapter = new ContentStreamAdapter(schema);
// Both processed as /cite path
adapter.feedToken("Reference: <cite>source1</cite>");
adapter.feedToken("RAG: <rag>source2</rag>");Parse tag attributes and filter them through schema-defined allowed attributes:
TransitionSchema schema = TransitionSchema.root()
.tag("cite").attr("id", "source") // Allow only "id" and "source" attributes
.tag("think");
ContentStreamAdapter adapter = new ContentStreamAdapter(schema);
for (TaggedToken token : adapter.feedToken("<cite id=\"ref1\" source=\"wiki\" extra=\"ignored\">content</cite>")) {
if ("OPEN".equals(token.event())) {
// token.attributes() contains only allowed attributes: {id: "ref1", source: "wiki"}
// "extra" is filtered out
System.out.println("Cite opened with: " + token.attributes());
}
}Key behaviors:
- Attributes are parsed from open tags (e.g.,
<cite id="ref">) - Only schema-defined attributes are included in the output
- Attributes support both double and single quotes
- Incomplete attributes (unclosed quotes) are ignored on flush
- Tags without defined attributes have empty
attributes()map
TaggedToken includes an event field that notifies when tags open or close:
TransitionSchema schema = TransitionSchema.root()
.tag("cite")
.tag("think");
ContentStreamAdapter adapter = new ContentStreamAdapter(schema);
for (TaggedToken token : adapter.feedToken("Start <cite>source</cite> end")) {
if ("OPEN".equals(token.event())) {
System.out.println("Tag opened: " + token.path());
} else if ("CLOSE".equals(token.event())) {
System.out.println("Tag closed: " + token.path());
} else {
// Regular content (event is null)
System.out.println("[" + token.path() + "] " + token.content());
}
}Output:
[/] Start
Tag opened: /cite
[/cite] source
Tag closed: /cite
[/] end
This is useful for tracking section boundaries, triggering UI updates, or collecting metadata about tag structure.
Retrieve the accumulated raw input at any time using getRaw():
ContentStreamAdapter adapter = new ContentStreamAdapter(schema);
adapter.feedToken("Hello ");
adapter.feedToken("<cite>");
adapter.feedToken("content");
adapter.feedToken("</cite>");
// Get all accumulated input as-is
String raw = adapter.getRaw(); // "Hello <cite>content</cite>"This is useful for debugging, logging, or when you need the original unprocessed input.
TransitionSchema schema = TransitionSchema.root()
.tag("think")
.tag("cite");
ContentStreamAdapter adapter = new ContentStreamAdapter(schema);
// Process LLM streaming tokens
for (String token : llmStreamingTokens) {
List<TaggedToken> results = adapter.feedToken(token);
for (TaggedToken taggedToken : results) {
// Real-time processing per path
switch (taggedToken.path()) {
case "/think" -> logThinkingProcess(taggedToken.content());
case "/cite" -> collectCitation(taggedToken.content());
default -> outputToUser(taggedToken.content());
}
}
}
// Flush remaining buffer on stream end
adapter.flush().forEach(token -> processToken(token));import reactor.core.publisher.Flux;
TransitionSchema schema = TransitionSchema.root()
.tag("think")
.tag("cite");
// SSE or WebFlux streaming endpoint
public Flux<ServerSentEvent<String>> streamLlmResponse() {
ContentStreamAdapter adapter = new ContentStreamAdapter(schema);
return llmClient.streamTokens()
.flatMapIterable(adapter::feedToken) // Feed each token
.filter(token -> !"/think".equals(token.path())) // Filter out thinking
.map(TaggedToken::content)
.concatWith(Flux.defer(() ->
Flux.fromIterable(adapter.flush()) // Flush on stream end
.filter(token -> !"/think".equals(token.path()))
.map(TaggedToken::content)
))
.map(content -> ServerSentEvent.builder(content).build());
}public class StreamingConsumer {
private final ContentStreamAdapter adapter;
private final Consumer<String> onUserContent;
private final Consumer<String> onCitation;
public StreamingConsumer(
TransitionSchema schema,
Consumer<String> onUserContent,
Consumer<String> onCitation) {
this.adapter = new ContentStreamAdapter(schema);
this.onUserContent = onUserContent;
this.onCitation = onCitation;
}
public void accept(String token) {
adapter.feedToken(token).forEach(taggedToken -> {
switch (taggedToken.path()) {
case "/" -> onUserContent.accept(taggedToken.content());
case "/cite" -> onCitation.accept(taggedToken.content());
// Silently ignore "/think" path
}
});
}
public void end() {
adapter.flush().forEach(taggedToken -> {
switch (taggedToken.path()) {
case "/" -> onUserContent.accept(taggedToken.content());
case "/cite" -> onCitation.accept(taggedToken.content());
}
});
}
}
// Usage
StreamingConsumer consumer = new StreamingConsumer(
schema,
content -> sendToClient(content), // User-visible content
citation -> storeCitation(citation) // Background processing
);
llmStream.forEach(consumer::accept);
consumer.end();-
ContentStreamAdapter: Main adapter class
- Accepts tokens and returns TaggedToken lists
- FSM-based state management
-
TransitionSchema: Hierarchical tag schema builder
- Fluent API for intuitive schema definition
- Alias support (
.alias()) - Attribute whitelist (
.attr())
-
TaggedToken: Output token (record)
path: Current FSM path (e.g., "/", "/section", "/section/subsection")content: Text content excluding tagsevent: Event type ("OPEN", "CLOSE", or null for regular content)attributes: Filtered attribute map (OPEN event only)
-
StreamPatternMatcher: Aho-Corasick based pattern matching
- O(n) multi-pattern detection
- Token boundary preservation
-
TransitionTable: State transition table
- O(1) transitions using TransitionNode tree
- Alias support
- Attribute whitelist lookup
-
OpenTagParser: Streaming open tag parser
- State machine-based attribute parsing
- Handles quotes spanning multiple tokens
- Supports both single and double quotes
- State Transitions: O(1) - HashMap lookup
- Pattern Matching: O(n) - Aho-Corasick algorithm (n = input length)
- Token Processing: Preserves original token boundaries
- No support for self-closing tags (
<tag/>) - No support for nested identical tags (
<a><a></a></a>)
MIT License - see LICENSE file for details.
Issues and Pull Requests are welcome.
- Feature: Tag attribute parsing support (
<cite id="ref">) - Feature: Schema-based attribute whitelist (
.attr("id", "source")) - Feature:
TaggedToken.attributes()for accessing parsed attributes - Architecture:
OpenTagParserfor streaming attribute parsing with state machine - Architecture:
TransitionTable.getAllowedAttributes()for attribute filtering
- Feature:
getRaw()method to retrieve accumulated raw input
- Fix: Multiple patterns in single token now processed correctly
- Fix: Non-prefix text after pattern detection now flushed immediately
- Fix: CLOSE event now shows the closed path instead of post-transition path
- Build: Upgraded to Java 21 (toolchain-based)
- Build: Updated JUnit 5.10.1 → 5.11.4
- Build: Updated AssertJ 3.24.2 → 3.27.6
- Performance: Optimized string buffer output with direct StringBuilder usage
- Performance: Optimized TokenBuffer with O(1) split and remove operations
- Feature: Added event field to TaggedToken (OPEN/CLOSE events)