Skip to content
This repository was archived by the owner on Jan 5, 2026. It is now read-only.

Commit f9e26b5

Browse files
davetaaxelsrz
authored andcommitted
Add Transcript (#295)
* Transcript * Add tests PrivateConversation back in
1 parent 9632e44 commit f9e26b5

File tree

6 files changed

+476
-0
lines changed

6 files changed

+476
-0
lines changed

libraries/botbuilder-core/botbuilder/core/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from .intent_score import IntentScore
2121
from .invoke_response import InvokeResponse
2222
from .memory_storage import MemoryStorage
23+
from .memory_transcript_store import MemoryTranscriptStore
2324
from .message_factory import MessageFactory
2425
from .middleware_set import AnonymousReceiveMiddleware, Middleware, MiddlewareSet
2526
from .null_telemetry_client import NullTelemetryClient
@@ -55,6 +56,7 @@
5556
"IntentScore",
5657
"InvokeResponse",
5758
"MemoryStorage",
59+
"MemoryTranscriptStore",
5860
"MessageFactory",
5961
"Middleware",
6062
"MiddlewareSet",
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
# Copyright (c) Microsoft Corporation. All rights reserved.
2+
# Licensed under the MIT License.
3+
"""The memory transcript store stores transcripts in volatile memory."""
4+
import datetime
5+
from typing import List, Dict
6+
from botbuilder.schema import Activity
7+
from .transcript_logger import PagedResult, TranscriptInfo, TranscriptStore
8+
9+
# pylint: disable=line-too-long
10+
class MemoryTranscriptStore(TranscriptStore):
11+
"""This provider is most useful for simulating production storage when running locally against the
12+
emulator or as part of a unit test.
13+
"""
14+
15+
channels: Dict[str, Dict[str, Activity]] = {}
16+
17+
async def log_activity(self, activity: Activity) -> None:
18+
if not activity:
19+
raise TypeError("activity cannot be None for log_activity()")
20+
21+
# get channel
22+
channel = {}
23+
if not activity.channel_id in self.channels:
24+
channel = {}
25+
self.channels[activity.channel_id] = channel
26+
else:
27+
channel = self.channels[activity.channel_id]
28+
29+
# Get conversation transcript.
30+
transcript = []
31+
if activity.conversation.id in channel:
32+
transcript = channel[activity.conversation.id]
33+
else:
34+
transcript = []
35+
channel[activity.conversation.id] = transcript
36+
37+
transcript.append(activity)
38+
39+
async def get_transcript_activities(
40+
self,
41+
channel_id: str,
42+
conversation_id: str,
43+
continuation_token: str = None,
44+
start_date: datetime = datetime.datetime.min,
45+
) -> "PagedResult[Activity]":
46+
if not channel_id:
47+
raise TypeError("Missing channel_id")
48+
49+
if not conversation_id:
50+
raise TypeError("Missing conversation_id")
51+
52+
paged_result = PagedResult()
53+
if channel_id in self.channels:
54+
channel = self.channels[channel_id]
55+
if conversation_id in channel:
56+
transcript = channel[conversation_id]
57+
if continuation_token:
58+
paged_result.items = (
59+
[
60+
x
61+
for x in sorted(
62+
transcript, key=lambda x: x.timestamp, reverse=False
63+
)
64+
if x.timestamp >= start_date
65+
]
66+
.dropwhile(lambda x: x.id != continuation_token)
67+
.Skip(1)[:20]
68+
)
69+
if paged_result.items.count == 20:
70+
paged_result.continuation_token = paged_result.items[-1].id
71+
else:
72+
paged_result.items = [
73+
x
74+
for x in sorted(
75+
transcript, key=lambda x: x.timestamp, reverse=False
76+
)
77+
if x.timestamp >= start_date
78+
][:20]
79+
if paged_result.items.count == 20:
80+
paged_result.continuation_token = paged_result.items[-1].id
81+
82+
return paged_result
83+
84+
async def delete_transcript(self, channel_id: str, conversation_id: str) -> None:
85+
if not channel_id:
86+
raise TypeError("channel_id should not be None")
87+
88+
if not conversation_id:
89+
raise TypeError("conversation_id should not be None")
90+
91+
if channel_id in self.channels:
92+
if conversation_id in self.channels[channel_id]:
93+
del self.channels[channel_id][conversation_id]
94+
95+
async def list_transcripts(
96+
self, channel_id: str, continuation_token: str = None
97+
) -> "PagedResult[TranscriptInfo]":
98+
if not channel_id:
99+
raise TypeError("Missing channel_id")
100+
101+
paged_result = PagedResult()
102+
103+
if channel_id in self.channels:
104+
channel: Dict[str, List[Activity]] = self.channels[channel_id]
105+
106+
if continuation_token:
107+
paged_result.items = (
108+
sorted(
109+
[
110+
TranscriptInfo(
111+
channel_id,
112+
c.value()[0].timestamp if c.value() else None,
113+
c.id,
114+
)
115+
for c in channel
116+
],
117+
key=lambda x: x.created,
118+
reverse=True,
119+
)
120+
.dropwhile(lambda x: x.id != continuation_token)
121+
.Skip(1)
122+
.Take(20)
123+
)
124+
if paged_result.items.count == 20:
125+
paged_result.continuation_token = paged_result.items[-1].id
126+
else:
127+
paged_result.items = (
128+
sorted(
129+
[
130+
TranscriptInfo(
131+
channel_id,
132+
c.value()[0].timestamp if c.value() else None,
133+
c.id,
134+
)
135+
for c in channel
136+
],
137+
key=lambda x: x.created,
138+
reverse=True,
139+
)
140+
.dropwhile(lambda x: x.id != continuation_token)
141+
.Skip(1)
142+
.Take(20)
143+
)
144+
if paged_result.items.count == 20:
145+
paged_result.continuation_token = paged_result.items[-1].id
146+
147+
return paged_result
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
# Copyright (c) Microsoft Corporation. All rights reserved.
2+
# Licensed under the MIT License.
3+
"""Logs incoming and outgoing activities to a TranscriptStore.."""
4+
5+
import datetime
6+
import copy
7+
from queue import Queue
8+
from abc import ABC, abstractmethod
9+
from typing import Awaitable, Callable, List
10+
from botbuilder.schema import Activity, ActivityTypes, ConversationReference
11+
from .middleware_set import Middleware
12+
from .turn_context import TurnContext
13+
14+
15+
class TranscriptLogger(ABC):
16+
"""Transcript logger stores activities for conversations for recall."""
17+
18+
@abstractmethod
19+
async def log_activity(self, activity: Activity) -> None:
20+
"""Log an activity to the transcript.
21+
:param activity:Activity being logged.
22+
"""
23+
raise NotImplementedError
24+
25+
26+
class TranscriptLoggerMiddleware(Middleware):
27+
"""Logs incoming and outgoing activities to a TranscriptStore."""
28+
29+
def __init__(self, logger: TranscriptLogger):
30+
if not logger:
31+
raise TypeError(
32+
"TranscriptLoggerMiddleware requires a TranscriptLogger instance."
33+
)
34+
self.logger = logger
35+
36+
async def on_process_request(
37+
self, context: TurnContext, logic: Callable[[TurnContext], Awaitable]
38+
):
39+
"""Initialization for middleware.
40+
:param context: Context for the current turn of conversation with the user.
41+
:param logic: Function to call at the end of the middleware chain.
42+
"""
43+
transcript = Queue()
44+
activity = context.activity
45+
# Log incoming activity at beginning of turn
46+
if activity:
47+
if not activity.from_property.role:
48+
activity.from_property.role = "user"
49+
self.log_activity(transcript, copy.copy(activity))
50+
51+
# hook up onSend pipeline
52+
# pylint: disable=unused-argument
53+
async def send_activities_handler(
54+
ctx: TurnContext,
55+
activities: List[Activity],
56+
next_send: Callable[[], Awaitable[None]],
57+
):
58+
# Run full pipeline
59+
responses = await next_send()
60+
for activity in activities:
61+
await self.log_activity(transcript, copy.copy(activity))
62+
return responses
63+
64+
context.on_send_activities(send_activities_handler)
65+
66+
# hook up update activity pipeline
67+
async def update_activity_handler(
68+
ctx: TurnContext, activity: Activity, next_update: Callable[[], Awaitable]
69+
):
70+
# Run full pipeline
71+
response = await next_update()
72+
update_activity = copy.copy(activity)
73+
update_activity.type = ActivityTypes.message_update
74+
await self.log_activity(transcript, update_activity)
75+
return response
76+
77+
context.on_update_activity(update_activity_handler)
78+
79+
# hook up delete activity pipeline
80+
async def delete_activity_handler(
81+
ctx: TurnContext,
82+
reference: ConversationReference,
83+
next_delete: Callable[[], Awaitable],
84+
):
85+
# Run full pipeline
86+
await next_delete()
87+
88+
delete_msg = Activity(
89+
type=ActivityTypes.message_delete, id=reference.activity_id
90+
)
91+
deleted_activity: Activity = TurnContext.apply_conversation_reference(
92+
delete_msg, reference, False
93+
)
94+
await self.log_activity(transcript, deleted_activity)
95+
96+
context.on_delete_activity(delete_activity_handler)
97+
98+
if logic:
99+
await logic()
100+
101+
# Flush transcript at end of turn
102+
while not transcript.empty():
103+
activity = transcript.get()
104+
if activity is None:
105+
break
106+
await self.logger.log_activity(activity)
107+
transcript.task_done()
108+
109+
def log_activity(self, transcript: Queue, activity: Activity) -> None:
110+
"""Logs the activity.
111+
:param transcript: transcript.
112+
:param activity: Activity to log.
113+
"""
114+
transcript.put(activity)
115+
116+
117+
class TranscriptStore(TranscriptLogger):
118+
""" Transcript storage for conversations."""
119+
120+
@abstractmethod
121+
async def get_transcript_activities(
122+
self,
123+
channel_id: str,
124+
conversation_id: str,
125+
continuation_token: str,
126+
start_date: datetime,
127+
) -> "PagedResult":
128+
"""Get activities for a conversation (Aka the transcript).
129+
:param channel_id: Channel Id where conversation took place.
130+
:param conversation_id: Conversation ID
131+
:param continuation_token: Continuation token to page through results.
132+
:param start_date: Earliest time to include
133+
:result: Page of results of Activity objects
134+
"""
135+
raise NotImplementedError
136+
137+
@abstractmethod
138+
async def list_transcripts(
139+
self, channel_id: str, continuation_token: str
140+
) -> "PagedResult":
141+
"""List conversations in the channelId.
142+
:param channel_id: Channel Id where conversation took place.
143+
:param continuation_token : Continuation token to page through results.
144+
:result: Page of results of TranscriptInfo objects
145+
"""
146+
raise NotImplementedError
147+
148+
@abstractmethod
149+
async def delete_transcript(self, channel_id: str, conversation_id: str) -> None:
150+
"""Delete a specific conversation and all of it's activities.
151+
:param channel_id: Channel Id where conversation took place.
152+
:param conversation_id: Id of the conversation to delete.
153+
:result: None
154+
"""
155+
raise NotImplementedError
156+
157+
158+
class ConsoleTranscriptLogger(TranscriptLogger):
159+
"""ConsoleTranscriptLogger writes activities to Console output."""
160+
161+
async def log_activity(self, activity: Activity) -> None:
162+
"""Log an activity to the transcript.
163+
:param activity:Activity being logged.
164+
"""
165+
if activity:
166+
print(f"Activity Log: {activity}")
167+
else:
168+
raise TypeError("Activity is required")
169+
170+
171+
class TranscriptInfo:
172+
"""Metadata for a stored transcript."""
173+
174+
# pylint: disable=invalid-name
175+
def __init__(
176+
self,
177+
channel_id: str = None,
178+
created: datetime = None,
179+
conversation_id: str = None,
180+
):
181+
"""
182+
:param channel_id: Channel ID the transcript was taken from
183+
:param created: Timestamp when event created
184+
:param id: Conversation ID
185+
"""
186+
self.channel_id = channel_id
187+
self.created = created
188+
self.id = conversation_id
189+
190+
191+
class PagedResult:
192+
"""Paged results for transcript data."""
193+
194+
# Page of Items
195+
items: List[object] = None
196+
# Token used to page through multiple pages.
197+
continuation_token: str = None

0 commit comments

Comments
 (0)