Skip to content

Commit 1bb8f97

Browse files
author
Michael
committed
Combine poll funcs and remove logging
1 parent a1ce569 commit 1bb8f97

1 file changed

Lines changed: 78 additions & 136 deletions

File tree

substreams/substream.py

Lines changed: 78 additions & 136 deletions
Original file line numberDiff line numberDiff line change
@@ -173,118 +173,15 @@ def proto_file_map(self) -> dict[str, DescriptorProto]:
173173
name_map[mt.name] = pf.name
174174
return name_map
175175

176-
# This method indexes all blocks from start_block until end_block
177-
# Returns one single dataframe with all data in rows once all blocks have been indexed
178176
def poll(
179177
self,
180178
output_modules: list[str],
181179
start_block: int,
182180
end_block: int,
181+
stream_callback=None,
182+
return_first_result=False,
183183
initial_snapshot=False,
184-
):
185-
from sf.substreams.v1.substreams_pb2 import STEP_IRREVERSIBLE, Request
186-
187-
for module in output_modules:
188-
if module not in self.output_modules:
189-
raise Exception(f"module '{module}' is not supported for {self.name}")
190-
self._class_from_module(module)
191-
192-
stream = self.service.Blocks(
193-
Request(
194-
start_block_num=start_block,
195-
stop_block_num=end_block,
196-
fork_steps=[STEP_IRREVERSIBLE],
197-
modules=self.pkg.modules,
198-
output_modules=output_modules,
199-
initial_store_snapshot_for_modules=output_modules
200-
if initial_snapshot
201-
else None,
202-
)
203-
)
204-
raw_results = defaultdict(lambda: {"data": list(), "snapshots": list()})
205-
results = []
206-
try:
207-
for response in stream:
208-
snapshot = MessageToDict(response.snapshot_data)
209-
data = MessageToDict(response.data)
210-
if snapshot:
211-
module_name: str = snapshot["moduleName"]
212-
snapshot_deltas = self._parse_snapshot_deltas(snapshot)
213-
raw_results[module_name]["snapshots"].extend(snapshot_deltas)
214-
if data:
215-
print("data block #", data["clock"]["number"])
216-
if self.output_modules[module]["is_map"]:
217-
parsed = self._parse_data_outputs(data)
218-
else:
219-
parsed = self._parse_data_deltas(data)
220-
module_name: str = data["outputs"][0]["name"]
221-
raw_results[module_name]["data"].extend(parsed)
222-
print('FINISH STREAM')
223-
for output_module in output_modules:
224-
result = SubstreamOutput(module_name=output_module)
225-
data_dict: dict = raw_results.get(output_module)
226-
for k, v in data_dict.items():
227-
df = pd.DataFrame(v)
228-
df["output_module"] = output_module
229-
setattr(result, k, df)
230-
results.append(result)
231-
except Exception as e:
232-
results.append({"error": e})
233-
return results
234-
235-
# This method executes a function passed as a parameter on every block data has been streamed
236-
def poll_callback_on_data(
237-
self,
238-
output_modules: list[str],
239-
start_block: int,
240-
end_block: int,
241-
stream_callback,
242-
initial_snapshot=False,
243-
):
244-
# TODO make this general
245-
from sf.substreams.v1.substreams_pb2 import STEP_IRREVERSIBLE, Request
246-
247-
for module in output_modules:
248-
if module not in self.output_modules:
249-
raise Exception(f"module '{module}' is not supported for {self.name}")
250-
self._class_from_module(module)
251-
252-
stream = self.service.Blocks(
253-
Request(
254-
start_block_num=start_block,
255-
stop_block_num=end_block,
256-
fork_steps=[STEP_IRREVERSIBLE],
257-
modules=self.pkg.modules,
258-
output_modules=output_modules,
259-
initial_store_snapshot_for_modules=output_modules
260-
if initial_snapshot
261-
else None,
262-
)
263-
)
264-
raw_results = defaultdict(lambda: {"data": list(), "snapshots": list()})
265-
for response in stream:
266-
data = MessageToDict(response.data)
267-
if data:
268-
module_name: str = data["outputs"][0]["name"]
269-
if self.output_modules[module]["is_map"]:
270-
parsed = self._parse_data_outputs(data)
271-
else:
272-
parsed = self._parse_data_deltas(data)
273-
if len(parsed) > 0:
274-
print('data block #', data["clock"]["number"])
275-
stream_callback(module_name, parsed)
276-
print('FINISH STREAM')
277-
return
278-
279-
# This method indexes all blocks until the first block that data is received
280-
# Ideal for live streaming events and receiveing them on the front end
281-
def poll_return_first_dict(
282-
self,
283-
output_modules: list[str],
284-
start_block: int,
285-
end_block: int,
286184
highest_processed_block: int = 0,
287-
initial_snapshot=False,
288185
return_progress=False
289186
):
290187
from sf.substreams.v1.substreams_pb2 import STEP_IRREVERSIBLE, Request
@@ -294,7 +191,6 @@ def poll_return_first_dict(
294191
raise Exception(f"module '{module}' is not supported for {self.name}")
295192
self._class_from_module(module)
296193

297-
# initial_store_snapshot_for_modules could possibly import the starting snapshot to start indexing at chain head?
298194
stream = self.service.Blocks(
299195
Request(
300196
start_block_num=start_block,
@@ -307,33 +203,79 @@ def poll_return_first_dict(
307203
else None,
308204
)
309205
)
310-
module_name = ""
311-
parsed = None
312-
data_block = 0
313-
314-
print(stream.time_remaining(), start_block, end_block, highest_processed_block)
315-
try:
316-
for response in stream:
317-
data = MessageToDict(response.data)
318-
progress = MessageToDict(response.progress)
319-
if data:
320-
data_block = data["clock"]["number"]
321-
module_name: str = data["outputs"][0]["name"]
322-
if self.output_modules[module]["is_map"]:
323-
parsed = self._parse_data_outputs(data)
324-
else:
325-
parsed = self._parse_data_deltas(data)
326-
module_name: str = data["outputs"][0]["name"]
327-
if len(parsed) > 0:
328-
print('data block #', data["clock"]["number"])
329-
break
330-
elif progress and return_progress is True:
331-
endBlock = int(progress["modules"][0]['processedRanges']['processedRanges'][0]['endBlock'])
332-
data_block = endBlock
333-
if endBlock > highest_processed_block + 100 and progress["modules"][0]['name'] == output_modules[0]:
334-
print(data_block, 'datablock')
335-
return {"block": int(endBlock)}
336-
print('FINISH STREAM')
337-
return {"data": parsed, "module_name": module_name, "data_block": data_block}
338-
except Exception as e:
339-
return {"error": e}
206+
raw_results = defaultdict(lambda: {"data": list(), "snapshots": list()})
207+
results = []
208+
if callable(stream_callback):
209+
# This logic executes a function passed as a parameter on every block data has been streamed
210+
try:
211+
for response in stream:
212+
data = MessageToDict(response.data)
213+
if data:
214+
module_name: str = data["outputs"][0]["name"]
215+
if self.output_modules[module]["is_map"]:
216+
parsed = self._parse_data_outputs(data)
217+
else:
218+
parsed = self._parse_data_deltas(data)
219+
if len(parsed) > 0:
220+
stream_callback(module_name, parsed)
221+
except Exception as e:
222+
return {"error": e}
223+
return
224+
elif return_first_result is True:
225+
# This logic indexes all blocks until the first block that data is received
226+
# Ideal for live streaming events and receiveing them on the front end
227+
module_name = ""
228+
parsed = None
229+
data_block = 0
230+
try:
231+
for response in stream:
232+
data = MessageToDict(response.data)
233+
progress = MessageToDict(response.progress)
234+
if data:
235+
data_block = data["clock"]["number"]
236+
module_name: str = data["outputs"][0]["name"]
237+
if self.output_modules[module]["is_map"]:
238+
parsed = self._parse_data_outputs(data)
239+
else:
240+
parsed = self._parse_data_deltas(data)
241+
module_name: str = data["outputs"][0]["name"]
242+
if len(parsed) > 0:
243+
break
244+
elif progress and return_progress is True:
245+
endBlock = int(progress["modules"][0]['processedRanges']['processedRanges'][0]['endBlock'])
246+
data_block = endBlock
247+
if endBlock > highest_processed_block + 100 and progress["modules"][0]['name'] == output_modules[0]:
248+
return {"block": int(endBlock)}
249+
return {"data": parsed, "module_name": module_name, "data_block": data_block}
250+
except Exception as e:
251+
return {"error": e}
252+
else:
253+
# This logic indexes all blocks from start_block until end_block
254+
# Returns one single dataframe with all data in rows once all blocks have been indexed
255+
try:
256+
for response in stream:
257+
snapshot = MessageToDict(response.snapshot_data)
258+
data = MessageToDict(response.data)
259+
if snapshot:
260+
module_name: str = snapshot["moduleName"]
261+
snapshot_deltas = self._parse_snapshot_deltas(snapshot)
262+
raw_results[module_name]["snapshots"].extend(snapshot_deltas)
263+
if data:
264+
if self.output_modules[module]["is_map"]:
265+
parsed = self._parse_data_outputs(data)
266+
else:
267+
parsed = self._parse_data_deltas(data)
268+
module_name: str = data["outputs"][0]["name"]
269+
raw_results[module_name]["data"].extend(parsed)
270+
for output_module in output_modules:
271+
result = SubstreamOutput(module_name=output_module)
272+
data_dict: dict = raw_results.get(output_module)
273+
for k, v in data_dict.items():
274+
df = pd.DataFrame(v)
275+
df["output_module"] = output_module
276+
setattr(result, k, df)
277+
results.append(result)
278+
except Exception as e:
279+
results.append({"error": e})
280+
return results
281+

0 commit comments

Comments
 (0)