Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 111 additions & 60 deletions agentscope-core/src/main/java/io/agentscope/core/ReActAgent.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
import io.agentscope.core.hook.HookEvent;
import io.agentscope.core.hook.PostActingEvent;
import io.agentscope.core.hook.PostReasoningEvent;
import io.agentscope.core.hook.PostSummaryEvent;
import io.agentscope.core.hook.PreActingEvent;
import io.agentscope.core.hook.PreReasoningEvent;
import io.agentscope.core.hook.PreSummaryEvent;
import io.agentscope.core.hook.ReasoningChunkEvent;
import io.agentscope.core.hook.SummaryChunkEvent;
import io.agentscope.core.interruption.InterruptContext;
import io.agentscope.core.memory.InMemoryMemory;
import io.agentscope.core.memory.LongTermMemory;
Expand Down Expand Up @@ -619,6 +622,50 @@ private Mono<PostActingEvent> notifyPostActingHook(
protected Mono<Msg> summarizing() {
log.debug("Maximum iterations reached. Generating summary...");

List<Msg> messageList = prepareSummaryMessages();
GenerateOptions generateOptions = buildGenerateOptions();

return notifyPreSummaryHook(messageList, generateOptions)
.flatMap(
preSummaryEvent -> {
List<Msg> effectiveMessages = preSummaryEvent.getInputMessages();
GenerateOptions effectiveOptions =
preSummaryEvent.getEffectiveGenerateOptions();

return streamAndAccumulateSummary(effectiveMessages, effectiveOptions)
.flatMap(
msg ->
notifyPostSummaryHook(msg, effectiveOptions)
.map(
postEvent -> {
Msg finalMsg =
postEvent
.getSummaryMessage();
memory.addMessage(finalMsg);
return finalMsg;
}));
})
.onErrorResume(this::handleSummaryError);
}

private Mono<Msg> streamAndAccumulateSummary(
List<Msg> messages, GenerateOptions generateOptions) {
return model.stream(messages, null, generateOptions)
.concatMap(chunk -> checkInterruptedAsync().thenReturn(chunk))
.reduce(
new ReasoningContext(getName()),
(ctx, chunk) -> {
List<Msg> streamedMessages = ctx.processChunk(chunk);
for (Msg streamedMessage : streamedMessages) {
notifySummaryChunk(streamedMessage, ctx, generateOptions)
.subscribe();
}
return ctx;
})
.map(ReasoningContext::buildFinalMessage);
}

private List<Msg> prepareSummaryMessages() {
List<Msg> messageList = prepareMessages();
messageList.add(
Msg.builder()
Expand All @@ -632,67 +679,29 @@ protected Mono<Msg> summarizing() {
+ " summarizing the current situation.")
.build())
.build());
return messageList;
}

return model.stream(messageList, null, buildGenerateOptions())
.concatMap(chunk -> checkInterruptedAsync().thenReturn(chunk))
.reduce(
new ReasoningContext(getName()),
(ctx, chunk) -> {
ctx.processChunk(chunk);
return ctx;
})
.map(ReasoningContext::buildFinalMessage)
.flatMap(
msg -> {
if (msg != null) {
memory.addMessage(msg);
return Mono.just(msg);
}
Msg fallback =
Msg.builder()
.name(getName())
.role(MsgRole.ASSISTANT)
.content(
TextBlock.builder()
.text(
String.format(
"Maximum iterations"
+ " (%d) reached."
+ " Unable to"
+ " generate"
+ " summary.",
maxIters))
.build())
.build();
memory.addMessage(fallback);
return Mono.just(fallback);
})
.onErrorResume(
error -> {
if (error instanceof InterruptedException) {
return Mono.error(error);
}
log.error("Error generating summary", error);
Msg errorMsg =
Msg.builder()
.name(getName())
.role(MsgRole.ASSISTANT)
.content(
TextBlock.builder()
.text(
String.format(
"Maximum iterations"
+ " (%d) reached."
+ " Error"
+ " generating"
+ " summary: %s",
maxIters,
error.getMessage()))
.build())
.build();
memory.addMessage(errorMsg);
return Mono.just(errorMsg);
});
private Mono<Msg> handleSummaryError(Throwable error) {
if (error instanceof InterruptedException) {
return Mono.error(error);
}
log.error("Error generating summary", error);
Msg errorMsg =
Msg.builder()
.name(getName())
.role(MsgRole.ASSISTANT)
.content(
TextBlock.builder()
.text(
String.format(
"Maximum iterations (%d) reached."
+ " Error generating summary: %s",
maxIters, error.getMessage()))
.build())
.build();
memory.addMessage(errorMsg);
return Mono.just(errorMsg);
}

// ==================== Helper Methods ====================
Expand Down Expand Up @@ -829,6 +838,48 @@ private Mono<Void> notifyReasoningChunk(Msg chunkMsg, ReasoningContext context)
return Mono.empty();
}

// ==================== Summary Hook Notification Methods ====================

private Mono<PreSummaryEvent> notifyPreSummaryHook(
List<Msg> msgs, GenerateOptions generateOptions) {
return notifyHooks(
new PreSummaryEvent(
this, model.getModelName(), generateOptions, msgs, maxIters, maxIters));
}

private Mono<PostSummaryEvent> notifyPostSummaryHook(Msg msg, GenerateOptions generateOptions) {
return notifyHooks(new PostSummaryEvent(this, model.getModelName(), generateOptions, msg));
}

private Mono<Void> notifySummaryChunk(
Msg chunkMsg, ReasoningContext context, GenerateOptions generateOptions) {
ContentBlock content = chunkMsg.getFirstContentBlock();

ContentBlock accumulatedContent = null;
if (content instanceof TextBlock) {
accumulatedContent = TextBlock.builder().text(context.getAccumulatedText()).build();
} else if (content instanceof ThinkingBlock) {
accumulatedContent =
ThinkingBlock.builder().thinking(context.getAccumulatedThinking()).build();
}

if (accumulatedContent != null) {
Msg accumulated =
Msg.builder()
.id(chunkMsg.getId())
.name(chunkMsg.getName())
.role(chunkMsg.getRole())
.content(accumulatedContent)
.build();
SummaryChunkEvent event =
new SummaryChunkEvent(
this, model.getModelName(), generateOptions, chunkMsg, accumulated);
return Flux.fromIterable(getSortedHooks()).flatMap(hook -> hook.onEvent(event)).then();
}

return Mono.empty();
}

@Override
protected Mono<Msg> handleInterrupt(InterruptContext context, Msg... originalArgs) {
String recoveryText = "I noticed that you have interrupted me. What can I do for you?";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,22 @@ public class StreamOptions {
*/
private final boolean includeActingChunk;

/**
* Whether to include the incremental chunks from summary generation during streaming.
* <p>
* If false, intermediate summary chunk emissions should be filtered out by the stream
* implementation.
*/
private final boolean includeSummaryChunk;

/**
* Whether to include the final consolidated summary output in the response.
* <p>
* If false, final summary result emissions should be filtered out by the stream
* implementation.
*/
private final boolean includeSummaryResult;

/**
* Private constructor called by the builder.
*
Expand All @@ -99,6 +115,8 @@ private StreamOptions(Builder builder) {
this.includeReasoningChunk = builder.includeReasoningChunk;
this.includeReasoningResult = builder.includeReasoningResult;
this.includeActingChunk = builder.includeActingChunk;
this.includeSummaryChunk = builder.includeSummaryChunk;
this.includeSummaryResult = builder.includeSummaryResult;
}

/**
Expand Down Expand Up @@ -175,6 +193,30 @@ public boolean isIncludeActingChunk() {
return includeActingChunk;
}

/**
* Whether summary chunk emissions should be included.
*
* <p>Summary chunks are the incremental outputs from summary generation when max iterations
* is reached.</p>
*
* @return true if summary chunks should be included
*/
public boolean isIncludeSummaryChunk() {
return includeSummaryChunk;
}

/**
* Whether the final summary result should be included.
*
* <p>The summary result is the final consolidated summary output when max iterations
* is reached.</p>
*
* @return true if the final summary result should be included
*/
public boolean isIncludeSummaryResult() {
return includeSummaryResult;
}

/**
* Check if a specific event type should be streamed.
*
Expand All @@ -198,6 +240,16 @@ public boolean shouldIncludeReasoningEmission(boolean isChunk) {
return isChunk ? includeReasoningChunk : includeReasoningResult;
}

/**
* Convenience method for stream implementations to decide whether to emit a summary subtype.
*
* @param isChunk true if the summary emission is an incremental chunk, false if it is the final result
* @return true if this summary emission should be included
*/
public boolean shouldIncludeSummaryEmission(boolean isChunk) {
return isChunk ? includeSummaryChunk : includeSummaryResult;
}

/** Builder for {@link StreamOptions}. */
public static class Builder {
private Set<EventType> eventTypes = EnumSet.of(EventType.ALL);
Expand All @@ -207,6 +259,8 @@ public static class Builder {
private boolean includeReasoningChunk = true;
private boolean includeReasoningResult = true;
private boolean includeActingChunk = true;
private boolean includeSummaryChunk = true;
private boolean includeSummaryResult = true;

/**
* Set which event types to stream.
Expand Down Expand Up @@ -281,6 +335,34 @@ public Builder includeActingChunk(boolean includeActingChunk) {
return this;
}

/**
* Include or exclude summary chunk emissions.
*
* <p>When {@link EventType#SUMMARY} is enabled, summary generation may emit intermediate
* chunks. Set to false to hide these and only receive the final summary result.</p>
*
* @param includeSummaryChunk true to include chunk emissions, false to filter them out
* @return this builder
*/
public Builder includeSummaryChunk(boolean includeSummaryChunk) {
this.includeSummaryChunk = includeSummaryChunk;
return this;
}

/**
* Include or exclude the final consolidated summary result emission.
*
* <p>When {@link EventType#SUMMARY} is enabled, the final summary result is emitted after
* generation completes. Set to false to hide it.</p>
*
* @param includeSummaryResult true to include the final summary result, false to filter it out
* @return this builder
*/
public Builder includeSummaryResult(boolean includeSummaryResult) {
this.includeSummaryResult = includeSummaryResult;
return this;
}

public StreamOptions build() {
return new StreamOptions(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import io.agentscope.core.hook.HookEvent;
import io.agentscope.core.hook.PostActingEvent;
import io.agentscope.core.hook.PostReasoningEvent;
import io.agentscope.core.hook.PostSummaryEvent;
import io.agentscope.core.hook.ReasoningChunkEvent;
import io.agentscope.core.hook.SummaryChunkEvent;
import io.agentscope.core.message.ContentBlock;
import io.agentscope.core.message.Msg;
import io.agentscope.core.message.MsgRole;
Expand Down Expand Up @@ -95,6 +97,25 @@ public <T extends HookEvent> Mono<T> onEvent(T event) {
emitEvent(EventType.TOOL_RESULT, toolMsg, false);
}
return Mono.just(event);
} else if (event instanceof PostSummaryEvent) {
PostSummaryEvent e = (PostSummaryEvent) event;
// Summary generation completed
if (options.shouldStream(EventType.SUMMARY)
&& options.shouldIncludeSummaryEmission(false)) {
emitEvent(EventType.SUMMARY, e.getSummaryMessage(), true);
}
return Mono.just(event);
} else if (event instanceof SummaryChunkEvent) {
SummaryChunkEvent e = (SummaryChunkEvent) event;
// Intermediate summary chunk
if (options.shouldStream(EventType.SUMMARY)
&& options.shouldIncludeSummaryEmission(true)) {
// Use incremental or accumulated based on StreamOptions
Msg msgToEmit =
options.isIncremental() ? e.getIncrementalChunk() : e.getAccumulated();
emitEvent(EventType.SUMMARY, msgToEmit, false);
}
return Mono.just(event);
}
return Mono.just(event);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
* @see HookEventType
*/
public abstract sealed class HookEvent
permits PreCallEvent, PostCallEvent, ReasoningEvent, ActingEvent, ErrorEvent {
permits PreCallEvent, PostCallEvent, ReasoningEvent, ActingEvent, SummaryEvent, ErrorEvent {

private final HookEventType type;
private final Agent agent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ public enum HookEventType {
/** During tool execution streaming */
ACTING_CHUNK,

/** Before summary generation (when max iterations reached) */
PRE_SUMMARY,

/** After summary generation completes */
POST_SUMMARY,

/** During summary streaming */
SUMMARY_CHUNK,

/** When an error occurs */
ERROR
}
Loading
Loading