Skip to content

[FSTORE-1617] Integrate delta-rs HopsFS writer to hsfs python client #536

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

davitbzh
Copy link
Contributor

This PR adds/fixes/changes...

  • please summarize your changes to the code
  • and make sure to include all changes to user-facing APIs

JIRA Issue: -

Priority for Review: -

Related PRs: -

How Has This Been Tested?

  • Unit Tests
  • Integration Tests
  • Manual Tests on VM

Checklist For The Assigned Reviewer:

- [ ] Checked if merge conflicts with master exist
- [ ] Checked if stylechecks for Java and Python pass
- [ ] Checked if all docstrings were added and/or updated appropriately
- [ ] Ran spellcheck on docstring
- [ ] Checked if guides & concepts need to be updated
- [ ] Checked if naming conventions for parameters and variables were followed
- [ ] Checked if private methods are properly declared and used
- [ ] Checked if hard-to-understand areas of code are commented
- [ ] Checked if tests are effective
- [ ] Built and deployed changes on dev VM and tested manually
- [x] (Checked if all type annotations were added and/or updated appropriately)

@davitbzh davitbzh requested a review from bubriks April 17, 2025 08:15
@davitbzh
Copy link
Contributor Author

Just for visibility. Don't merge yet. There is incompatibility with Spark Delta and most probably will be solved by upgrade

==================================
Logs for EXECUTOR on node davit.Executor Id: 1
============================================================================================== 


==================================
Logs for DRIVER on node davit.
============================================================================================== 
Picked up _JAVA_OPTIONS:  -Duser.home=/srv/hops/anaconda
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/srv/hops/hadoop-3.2.0.15-EE-RC0/share/hadoop/common/lib/log4j-slf4j-impl-2.19.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/srv/hops/hadoop-3.2.0.15-EE-RC0/share/hadoop/hdfs/lib/log4j-slf4j-impl-2.19.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]

Logged in to project, explore it here https://hopsworks.ai.local/p/119
2025-04-16 17:42:37,129 INFO aaa,fraud_snowfake_fv_deltars_1_create_fv_td_16042025174156,37,**APPID Version: Elasticsearch Hadoop v7.13.0-SNAPSHOT [ba4a70edaf]
Traceback (most recent call last):
  File "/srv/hops/artifacts/hsfs_utils.py", line 425, in <module>
    create_fv_td(job_conf)
  File "/srv/hops/artifacts/hsfs_utils.py", line 122, in create_fv_td
    fv_engine.compute_training_dataset(
  File "/srv/hops/anaconda/envs/hopsworks_environment/lib/python3.10/site-packages/hsfs/core/feature_view_engine.py", line 796, in compute_training_dataset
    td_job = engine.get_instance().write_training_dataset(
  File "/srv/hops/anaconda/envs/hopsworks_environment/lib/python3.10/site-packages/hsfs/engine/spark.py", line 792, in write_training_dataset
    split_dataset = self._split_df(
  File "/srv/hops/anaconda/envs/hopsworks_environment/lib/python3.10/site-packages/hsfs/engine/spark.py", line 857, in _split_df
    query_obj.read(read_options=read_options), training_dataset
  File "/srv/hops/anaconda/envs/hopsworks_environment/lib/python3.10/site-packages/hsfs/constructor/query.py", line 295, in read
    sql_query, online_conn = self._prep_read(online, read_options)
  File "/srv/hops/anaconda/envs/hopsworks_environment/lib/python3.10/site-packages/hsfs/constructor/query.py", line 131, in _prep_read
    fs_query.register_delta_tables(
  File "/srv/hops/anaconda/envs/hopsworks_environment/lib/python3.10/site-packages/hsfs/constructor/fs_query.py", line 140, in register_delta_tables
    engine.get_instance().register_delta_temporary_table(
  File "/srv/hops/anaconda/envs/hopsworks_environment/lib/python3.10/site-packages/hsfs/engine/spark.py", line 239, in register_delta_temporary_table
    delta_engine_instance.register_temporary_table(
  File "/srv/hops/anaconda/envs/hopsworks_environment/lib/python3.10/site-packages/hsfs/core/delta_engine.py", line 60, in register_temporary_table
    ).load(location).createOrReplaceTempView(delta_fg_alias.alias)
  File "/srv/hops/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 204, in load
    return self._df(self._jreader.load(path))
  File "/srv/hops/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/srv/hops/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/srv/hops/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o167.load.
: org.apache.spark.sql.delta.actions.InvalidProtocolVersionException: Delta protocol version is too new for this version of the Databricks Runtime. Please upgrade to a newer release.
	at org.apache.spark.sql.delta.DeltaLog.protocolRead(DeltaLog.scala:257)
	at org.apache.spark.sql.delta.Snapshot.init(Snapshot.scala:82)
	at org.apache.spark.sql.delta.Snapshot.<init>(Snapshot.scala:294)
	at org.apache.spark.sql.delta.SnapshotManagement.createSnapshot(SnapshotManagement.scala:223)
	at org.apache.spark.sql.delta.SnapshotManagement.createSnapshot$(SnapshotManagement.scala:211)
	at org.apache.spark.sql.delta.DeltaLog.createSnapshot(DeltaLog.scala:59)
	at org.apache.spark.sql.delta.SnapshotManagement.getSnapshotAtInit(SnapshotManagement.scala:195)
	at org.apache.spark.sql.delta.SnapshotManagement.getSnapshotAtInit$(SnapshotManagement.scala:186)
	at org.apache.spark.sql.delta.DeltaLog.getSnapshotAtInit(DeltaLog.scala:59)
	at org.apache.spark.sql.delta.SnapshotManagement.$init$(SnapshotManagement.scala:49)
	at org.apache.spark.sql.delta.DeltaLog.<init>(DeltaLog.scala:63)
	at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$3(DeltaLog.scala:467)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:221)
	at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$2(DeltaLog.scala:467)
	at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77)
	at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67)
	at org.apache.spark.sql.delta.DeltaLog$.recordOperation(DeltaLog.scala:367)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:106)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:91)
	at org.apache.spark.sql.delta.DeltaLog$.recordDeltaOperation(DeltaLog.scala:367)
	at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$1(DeltaLog.scala:466)
	at com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4876)
	at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
	at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
	at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
	at com.google.common.cache.LocalCache.get(LocalCache.java:3952)
	at com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4871)
	at org.apache.spark.sql.delta.DeltaLog$.apply(DeltaLog.scala:464)
	at org.apache.spark.sql.delta.DeltaLog$.forTable(DeltaLog.scala:401)
	at org.apache.spark.sql.delta.catalog.DeltaTableV2.deltaLog$lzycompute(DeltaTableV2.scala:73)
	at org.apache.spark.sql.delta.catalog.DeltaTableV2.deltaLog(DeltaTableV2.scala:73)
	at org.apache.spark.sql.delta.catalog.DeltaTableV2.toBaseRelation(DeltaTableV2.scala:139)
	at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:177)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:354)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:326)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$1(DataFrameReader.scala:306)
	at scala.Option.map(Option.scala:230)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:266)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:240)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)


Connection closed.


==================================
Logs for EXECUTOR on node davit.Executor Id: 2
============================================================================================== 

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant