Skip to content

Commit 973cec2

Browse files
committed
[RSDK-7443] paginate data (viamrobotics#598)
1 parent d88d943 commit 973cec2

File tree

3 files changed

+172
-85
lines changed

3 files changed

+172
-85
lines changed

src/viam/app/data_client.py

+112-80
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import warnings
2+
from dataclasses import dataclass
23
from datetime import datetime
34
from pathlib import Path
45
from typing import Any, List, Mapping, Optional, Sequence, Tuple
@@ -33,6 +34,7 @@
3334
Filter,
3435
GetDatabaseConnectionRequest,
3536
GetDatabaseConnectionResponse,
37+
Order,
3638
RemoveBinaryDataFromDatasetByIDsRequest,
3739
RemoveBoundingBoxFromImageByIDRequest,
3840
RemoveTagsFromBinaryDataByFilterRequest,
@@ -109,6 +111,7 @@ async def main():
109111
110112
"""
111113

114+
@dataclass
112115
class TabularData:
113116
"""Class representing a piece of tabular data and associated metadata.
114117
@@ -119,16 +122,17 @@ class TabularData:
119122
time_received (datetime): the time the requested data was received.
120123
"""
121124

122-
def __init__(self, data: Mapping[str, Any], metadata: CaptureMetadata, time_requested: datetime, time_received: datetime) -> None:
123-
self.data = data
124-
self.metadata = metadata
125-
self.time_requested = time_requested
126-
self.time_received = time_received
127-
128125
data: Mapping[str, Any]
126+
"""The requested data"""
127+
129128
metadata: CaptureMetadata
129+
"""The metadata associated with the data"""
130+
130131
time_requested: datetime
132+
"""The time the data were requested"""
133+
131134
time_received: datetime
135+
"""The time the data were received"""
132136

133137
def __str__(self) -> str:
134138
return f"{self.data}\n{self.metadata}Time requested: {self.time_requested}\nTime received: {self.time_received}\n"
@@ -139,6 +143,7 @@ def __eq__(self, other: object) -> bool:
139143
return False
140144

141145
# TODO (RSDK-6684): Revisit if this shadow type is necessary
146+
@dataclass
142147
class BinaryData:
143148
"""Class representing a piece of binary data and associated metadata.
144149
@@ -147,12 +152,11 @@ class BinaryData:
147152
metadata (viam.proto.app.data.BinaryMetadata): the metadata from the request.
148153
"""
149154

150-
def __init__(self, data: bytes, metadata: BinaryMetadata) -> None:
151-
self.data = data
152-
self.metadata = metadata
153-
154155
data: bytes
156+
"""The request data"""
157+
155158
metadata: BinaryMetadata
159+
"""The metadata associated with the data"""
156160

157161
def __str__(self) -> str:
158162
return f"{self.data}\n{self.metadata}"
@@ -184,47 +188,68 @@ def __init__(self, channel: Channel, metadata: Mapping[str, str]):
184188
async def tabular_data_by_filter(
185189
self,
186190
filter: Optional[Filter] = None,
191+
limit: Optional[int] = None,
192+
sort_order: Optional[Order.ValueType] = None,
193+
last: Optional[str] = None,
194+
count_only: bool = False,
195+
include_internal_data: bool = False,
187196
dest: Optional[str] = None,
188-
) -> List[TabularData]:
189-
"""Filter and download tabular data.
197+
) -> Tuple[List[TabularData], int, str]:
198+
"""Filter and download tabular data. The data will be paginated into pages of `limit` items, and the pagination ID will be included
199+
in the returned tuple. If a destination is provided, the data will be saved to that file.
200+
If the file is not empty, it will be overwritten.
190201
191202
::
192203
193204
from viam.proto.app.data import Filter
194205
206+
my_data = []
207+
last = None
195208
my_filter = Filter(component_name="left_motor")
196-
tabular_data = await data_client.tabular_data_by_filter(my_filter)
209+
while True:
210+
tabular_data, last = await data_client.tabular_data_by_filter(my_filter, last)
211+
if not tabular_data:
212+
break
213+
my_data.extend(tabular_data)
214+
197215
198216
Args:
199217
filter (viam.proto.app.data.Filter): Optional `Filter` specifying tabular data to retrieve. No `Filter` implies all tabular
200218
data.
219+
limit (int): The maximum number of entries to include in a page. Defaults to 50 if unspecified.
220+
sort_order (viam.proto.app.data.Order): The desired sort order of the data.
221+
last (str): Optional string indicating the ID of the last-returned data.
222+
If provided, the server will return the next data entries after the `last` ID.
223+
count_only (bool): Whether to return only the total count of entries.
224+
include_internal_data (bool): Whether to return the internal data. Internal data is used for Viam-specific data ingestion,
225+
like cloud SLAM. Defaults to `False`
201226
dest (str): Optional filepath for writing retrieved data.
202227
203228
Returns:
204229
List[TabularData]: The tabular data.
230+
int: The count (number of entries)
231+
str: The last-returned page ID.
205232
"""
206233
filter = filter if filter else Filter()
207-
last = ""
208-
data = []
209-
210-
# `DataRequest`s are limited to 100 pieces of data, so we loop through calls until
211-
# we are certain we've received everything.
212-
while True:
213-
data_request = DataRequest(filter=filter, limit=100, last=last)
214-
request = TabularDataByFilterRequest(data_request=data_request, count_only=False)
215-
response: TabularDataByFilterResponse = await self._data_client.TabularDataByFilter(request, metadata=self._metadata)
216-
if not response.data or len(response.data) == 0:
217-
break
218-
data += [
219-
DataClient.TabularData(
220-
struct_to_dict(struct.data),
221-
response.metadata[struct.metadata_index],
222-
struct.time_requested.ToDatetime(),
223-
struct.time_received.ToDatetime(),
224-
)
225-
for struct in response.data
226-
]
227-
last = response.last
234+
235+
data_request = DataRequest(filter=filter)
236+
if limit:
237+
data_request.limit = limit
238+
if sort_order:
239+
data_request.sort_order = sort_order
240+
if last:
241+
data_request.last = last
242+
request = TabularDataByFilterRequest(data_request=data_request, count_only=count_only, include_internal_data=include_internal_data)
243+
response: TabularDataByFilterResponse = await self._data_client.TabularDataByFilter(request, metadata=self._metadata)
244+
data = [
245+
DataClient.TabularData(
246+
struct_to_dict(struct.data),
247+
response.metadata[struct.metadata_index],
248+
struct.time_requested.ToDatetime(),
249+
struct.time_received.ToDatetime(),
250+
)
251+
for struct in response.data
252+
]
228253

229254
if dest:
230255
try:
@@ -233,59 +258,72 @@ async def tabular_data_by_filter(
233258
file.flush()
234259
except Exception as e:
235260
LOGGER.error(f"Failed to write tabular data to file {dest}", exc_info=e)
236-
return data
261+
return data, response.count, response.last
237262

238263
async def binary_data_by_filter(
239-
self, filter: Optional[Filter] = None, dest: Optional[str] = None, include_file_data: bool = True, num_files: Optional[int] = None
240-
) -> List[BinaryData]:
241-
"""Filter and download binary data.
264+
self,
265+
filter: Optional[Filter] = None,
266+
limit: Optional[int] = None,
267+
sort_order: Optional[Order.ValueType] = None,
268+
last: Optional[str] = None,
269+
include_binary_data: bool = True,
270+
count_only: bool = False,
271+
include_internal_data: bool = False,
272+
dest: Optional[str] = None,
273+
) -> Tuple[List[BinaryData], int, str]:
274+
"""Filter and download binary data. The data will be paginated into pages of `limit` items, and the pagination ID will be included
275+
in the returned tuple. If a destination is provided, the data will be saved to that file.
276+
If the file is not empty, it will be overwritten.
242277
243278
::
244279
245280
from viam.proto.app.data import Filter
246281
247-
my_filter = Filter(component_type="camera")
248-
binary_data = await data_client.binary_data_by_filter(my_filter)
249282
250-
Args:
251-
filter (Optional[viam.proto.app.data.Filter]): Optional `Filter` specifying binary data to retrieve. No `Filter` implies all
252-
binary data.
253-
dest (Optional[str]): Optional filepath for writing retrieved data.
254-
include_file_data (bool): Boolean specifying whether to actually include the binary file data with each retrieved file. Defaults
255-
to true (i.e., both the files' data and metadata are returned).
256-
num_files (Optional[str]): Number of binary data to return. Passing 0 returns all binary data matching the filter no matter.
257-
Defaults to 100 if no binary data is requested, otherwise 10. All binary data or the first `num_files` will be returned,
258-
whichever comes first.
283+
my_data = []
284+
last = None
285+
my_filter = Filter(component_name="camera")
286+
while True:
287+
data, last = await data_client.binary_data_by_filter(my_filter, last)
288+
if not data:
289+
break
290+
my_data.extend(data)
259291
260-
Raises:
261-
ValueError: If `num_files` is less than 0.
292+
Args:
293+
filter (viam.proto.app.data.Filter): Optional `Filter` specifying tabular data to retrieve. No `Filter` implies all binary
294+
data.
295+
limit (int): The maximum number of entries to include in a page. Defaults to 50 if unspecified.
296+
sort_order (viam.proto.app.data.Order): The desired sort order of the data.
297+
last (str): Optional string indicating the ID of the last-returned data.
298+
If provided, the server will return the next data entries after the `last` ID.
299+
include_binary_data (bool): Boolean specifying whether to actually include the binary file data with each retrieved file.
300+
Defaults to true (i.e., both the files' data and metadata are returned).
301+
count_only (bool): Whether to return only the total count of entries.
302+
include_internal_data (bool): Whether to return the internal data. Internal data is used for Viam-specific data ingestion,
303+
like cloud SLAM. Defaults to `False`
304+
dest (str): Optional filepath for writing retrieved data.
262305
263306
Returns:
264307
List[BinaryData]: The binary data.
308+
int: The count (number of entries)
309+
str: The last-returned page ID.
265310
"""
266-
num_files = num_files if num_files else 10 if include_file_data else 100
267-
if num_files < 0:
268-
raise ValueError("num_files must be at least 0.")
269-
filter = filter if filter else Filter()
270-
limit = 1 if include_file_data else 100
271-
last = ""
272-
data = []
273-
274-
# `DataRequest`s are limited in pieces of data, so we loop through calls until
275-
# we are certain we've received everything.
276-
while True:
277-
new_data, last = await self._binary_data_by_filter(filter=filter, limit=limit, include_binary=include_file_data, last=last)
278-
if not new_data or len(new_data) == 0:
279-
break
280-
elif num_files != 0 and len(new_data) > num_files:
281-
data += new_data[0:num_files]
282-
break
283-
else:
284-
data += new_data
285-
num_files -= len(new_data)
286-
if num_files == 0:
287-
break
288311

312+
data_request = DataRequest(filter=filter)
313+
if limit:
314+
data_request.limit = limit
315+
if sort_order:
316+
data_request.sort_order = sort_order
317+
if last:
318+
data_request.last = last
319+
request = BinaryDataByFilterRequest(
320+
data_request=data_request,
321+
include_binary=include_binary_data,
322+
count_only=count_only,
323+
include_internal_data=include_internal_data,
324+
)
325+
response: BinaryDataByFilterResponse = await self._data_client.BinaryDataByFilter(request, metadata=self._metadata)
326+
data = [DataClient.BinaryData(data.binary, data.metadata) for data in response.data]
289327
if dest:
290328
try:
291329
file = open(dest, "w")
@@ -294,13 +332,7 @@ async def binary_data_by_filter(
294332
except Exception as e:
295333
LOGGER.error(f"Failed to write binary data to file {dest}", exc_info=e)
296334

297-
return data
298-
299-
async def _binary_data_by_filter(self, filter: Filter, limit: int, include_binary: bool, last: str) -> Tuple[List[BinaryData], str]:
300-
data_request = DataRequest(filter=filter, limit=limit, last=last)
301-
request = BinaryDataByFilterRequest(data_request=data_request, count_only=False, include_binary=include_binary)
302-
response: BinaryDataByFilterResponse = await self._data_client.BinaryDataByFilter(request, metadata=self._metadata)
303-
return [DataClient.BinaryData(data.binary, data.metadata) for data in response.data], response.last
335+
return data, response.count, response.last
304336

305337
async def binary_data_by_ids(
306338
self,

tests/mocks/services.py

+22-2
Original file line numberDiff line numberDiff line change
@@ -775,6 +775,10 @@ async def TabularDataByFilter(self, stream: Stream[TabularDataByFilterRequest, T
775775
await stream.send_message(TabularDataByFilterResponse(data=None))
776776
return
777777
self.filter = request.data_request.filter
778+
self.order = request.data_request.sort_order
779+
self.limit = request.data_request.limit
780+
self.count_only = request.count_only
781+
self.last = request.data_request.last
778782
tabular_response_structs = []
779783
tabular_metadata = [data.metadata for data in self.tabular_response]
780784
for idx, tabular_data in enumerate(self.tabular_response):
@@ -786,7 +790,14 @@ async def TabularDataByFilter(self, stream: Stream[TabularDataByFilterRequest, T
786790
time_received=datetime_to_timestamp(tabular_data.time_received),
787791
)
788792
)
789-
await stream.send_message(TabularDataByFilterResponse(data=tabular_response_structs, metadata=tabular_metadata))
793+
await stream.send_message(
794+
TabularDataByFilterResponse(
795+
data=tabular_response_structs,
796+
metadata=tabular_metadata,
797+
count=len(tabular_response_structs),
798+
last="LAST_TABULAR_DATA_PAGE_ID",
799+
)
800+
)
790801
self.was_tabular_data_requested = True
791802

792803
async def BinaryDataByFilter(self, stream: Stream[BinaryDataByFilterRequest, BinaryDataByFilterResponse]) -> None:
@@ -796,9 +807,18 @@ async def BinaryDataByFilter(self, stream: Stream[BinaryDataByFilterRequest, Bin
796807
await stream.send_message(BinaryDataByFilterResponse())
797808
return
798809
self.filter = request.data_request.filter
810+
self.order = request.data_request.sort_order
811+
self.limit = request.data_request.limit
799812
self.include_binary = request.include_binary
813+
self.count_only = request.count_only
814+
self.include_internal_data = request.include_internal_data
815+
self.last = request.data_request.last
800816
await stream.send_message(
801-
BinaryDataByFilterResponse(data=[BinaryData(binary=data.data, metadata=data.metadata) for data in self.binary_response])
817+
BinaryDataByFilterResponse(
818+
data=[BinaryData(binary=data.data, metadata=data.metadata) for data in self.binary_response],
819+
count=len(self.binary_response),
820+
last="LAST_BINARY_DATA_PAGE_ID",
821+
)
802822
)
803823
self.was_binary_data_requested = True
804824

0 commit comments

Comments
 (0)