Skip to content

Datalake [2/3?]: Dataset and to_HF()#271

Draft
christopherfish wants to merge 10 commits intodevfrom
feature/datalake/dataset
Draft

Datalake [2/3?]: Dataset and to_HF()#271
christopherfish wants to merge 10 commits intodevfrom
feature/datalake/dataset

Conversation

@christopherfish
Copy link
Contributor

This PR adds a concept of a "Dataset", the ability to create one from the outputs of Datalake.query_data(), and the ability to convert a Dataset to HF format.

To do this it adds a concept of a "contract" to the Datalake Datum, guaranteeing the data form so HF can be sure what to expect. For now these are hard-coded into the datalake, but perhaps in the future it could be possible for users to specify contracts.

The specific forms of the contract are also not set in stone - this should be decided based on the models we use and the data formats they want, and the data formats that our input data come in. I wasn't sure what format the contour segmentation and regression contracts needed to take but this is an easy thing to add.

There are definitely things that need doing before this could be merged but creating the PR so Can can confirm that the general structure is suitable.

@christopherfish christopherfish marked this pull request as draft November 21, 2025 13:14
@christopherfish christopherfish self-assigned this Nov 21, 2025
@christopherfish christopherfish changed the title Dataset and to_HF() Datalake [2/3?]: Dataset and to_HF() Nov 21, 2025
@canelbirlik
Copy link
Contributor

canelbirlik commented Nov 24, 2025

Thanks Chris, I'll comment on as i try it out, for now I think datalake API's shouldn't be asnyc as it forces users to manage the async context. e.g. :
datalake = Datalake(mongo_db_uri="mongodb://localhost:27017", mongo_db_name="test_db")
datalake.initialize() > doesnt work as you'd expect it to work from first glance, it's not awaited,

putting await in front of datalake.initialize() is not ideal as well because caller funct/method is forced to be asnyc and run through asyncio.run() etc. So whenever someone needs to call datalake, they need to create/manage an async context.
Instead, we could expose a sync version that manages the async context maybe?

ex. within init:
self._loop = asyncio.new_event_loop()
-> public APIs to be sync,
def initialize(self):
return self._run(self.initialize_async())
where run checks if there is a running loop etc.

If there are true asnyc methods like add_datum that could benefit from async, I think we can create a sync wrapper for them or expose async version too

- "bbox": Must be a dict with "bbox" key containing a list of lists of 4 floats
- "default": Any data type
metadata: Metadata dictionary associated with the datum
contract: Optional contract type specifying the data format. If None, defaults to "default".
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"type" might be better naming for this usage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me "type" has a precise meaning in Python which these "contract"s are very closely related to but not exactly, and so it's useful to use a different word?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh ok, I thought contracts were type primitives(classification etc), thus suggested as such. In that case, does it make sense coupling some stuff under single contract or in cases where we produce multiple outputs from a model, should we commit multiple datums or aggregate them within a single contract and data field? like if a visual inspection model produces 3 classification and 1 severity field, what should we do in general?

Exception: If database initialization fails
"""
await self.datum_database.initialize()
await self.dataset_database.initialize()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Screenshot 2025-11-24 at 17 34 41 dataset collection seems to be not initialized here. I think its because we are trying to create two connections to same db(datum_db and dataset_db are esentially on the same db but are different collections). What we should say I think here is define a sigle db with these two docs and try to create a single connection. something like self.db.initialize[datum_doc,dataset_doc]. However our usage of mongo backend assumes a single doc, so we might need to fix that too @vik-rant fyi https://github.com/Mindtrace/mindtrace/blob/dev/mindtrace/database/mindtrace/database/backends/mongo_odm_backend.py#L110

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, I'm not super clear on how the Database module works on the back end. Having a single DB with two different docs seems like what we want but not what's currently doable with the MT Database?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc: @mazen-elabd

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@canelbirlik Sorry just seen this comment,, The initialize is deprecated now. Still works, but deprecated. Will make sure to look into this and support it.

@canelbirlik
Copy link
Contributor

canelbirlik commented Nov 24, 2025

I have commented on individual parts, conceptually I think core logic is good but it's a bit too general imo. I think if we put more opinions into certain parts, it could be much easier(for our use cases)
1- I think it shouldn't put data into db if there is no registry and we should adopt a single storage strategy and only use it to prevent mistakes. The main benefit of datalake imo is to solve data coordination and prevent fragmantetion of data across devices. I.e. vineeth publishes some data, I'll just use that data by referring to it by name or id. I shouldn't know which registry he uses or whether data is in db vs registry. I think we can unify that once gcs registry is in. Also if we are storing the data in registry, we shouldn't be able to store it in db too, otherwise we need to check both places to see whether they might have some data.
2- I think if we discriminate between different datums, this could be more expressive:

  • We know that we want to store images in registry(not db) but we might want to store classifications, bboxes etc in db.
  • We currently link stuff assuming they are equals, i.e. if bbox is derived from an image, bbox belongs to that image. Since we don't discriminate, we allow storing bboxes on their own without links to images, which shouldn't be allowed. If we accept in the first place somoething like this: "there are certain data types(images,..) and they could have late arriving data that refers to them", we could enforce that more easily.

Overall looks good, my suggestions are related with overall direction and not this pr specifically. I'll play with it some more to report issues/bugs if i find any that's related with current pr. I think once we have the GCS registry in, easiest acid test would be to field test it with some big dataset that's internally used and try to see if we cover the whole lifecycle which would be:
1- Push raw images in.
2- Push labels in
3- Create a dataset
4- push some more raw images and then labels in
5- Update the dataset with new version

Exception: If database or registry operations fail
"""

if contract is None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think on datum we can have a required project_id/line_id key to help discriminate between unrelated tasks so we dont accidently get it while querying or potentially to speed up queries. Datastets would have them too im assuming

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, keep meaning to do this, done.

ValueError: If datums in the same column have different contracts
Exception: If data retrieval from the datalake fails
"""
return await asyncio.gather(*[self.load_row(datalake, row) for row in self.datum_ids])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might be too many calls, as load_row calls get_data per row on datalake which istself is
return await asyncio.gather(*[self.get_datum(datum_id) for datum_id in datum_ids])
-> I think we wessentially need a outer join equivalent here, which maybe possible with batch get_data?

Base automatically changed from feature/datalake/mongodb-query to dev November 28, 2025 12:55
@christopherfish christopherfish added enhancement New feature or request mindtrace-datalake Issues raised from datalake module in mindtrace package labels Dec 1, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request mindtrace-datalake Issues raised from datalake module in mindtrace package

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants