-
Notifications
You must be signed in to change notification settings - Fork 670
use modin in remote context via rpyc #1659
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
…test-autoscaler
…test-autoscaler
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
6f00888 to
44e807d
Compare
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
…test-autoscaler
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
|
Last days, I have been working on launching the mortgage benchmark on a remote modin context. pd.read_csv(csv_path, names = ["seller_name", "new"], delimiter = "|")caused the error: pickle protocol must be <= 4kw = {'names': ['loan_id', 'orig_channel', 'seller_name', 'orig_interest_rate', 'orig_upb', 'orig_loan_term', 'orig_date', 'first_pay_date', 'orig_ltv', 'orig_cltv', 'num_borrowers', 'dti', 'borrower_credit_score', 'first_home_buyer', 'loan_purpose', 'property_type', 'num_units', 'occupancy_status', 'property_state', 'zip', 'mortgage_insurance_percent', 'product_type', 'coborrow_credit_score', 'mortgage_insurance_type', 'relocation_mortgage_indicator', 'year_quarter_ignore'], 'index_col': False}
print(pd.read_csv(acq_path,
dtype={'loan_id': 'int64', 'orig_channel': 'category', 'seller_name': 'category', 'orig_interest_rate': 'float64', 'orig_upb': 'int64', 'orig_loan_term': 'int64', 'orig_ltv': 'float64', 'orig_cltv': 'float64', 'num_borrowers': 'float64', 'dti': 'float64', 'borrower_credit_score': 'float64', 'first_home_buyer': 'category', 'loan_purpose': 'category', 'property_type': 'category', 'num_units': 'int64', 'occupancy_status': 'category', 'property_state': 'category', 'zip': 'int64', 'mortgage_insurance_percent': 'float64', 'product_type': 'category', 'coborrow_credit_score': 'float64', 'mortgage_insurance_type': 'float64', 'relocation_mortgage_indicator': 'category'},
parse_dates=['orig_date', 'first_pay_date'], skiprows=1, delimiter="|", **kw))caused the error: TypeError: Cannot interpret '{'loan_id': 'int64', 'orig_channel': 'category', 'seller_name': 'category', 'orig_interest_rate': 'float64', 'orig_upb': 'int64', 'orig_loan_term': 'int64', 'orig_ltv': 'float64', 'orig_cltv': 'float64', 'num_borrowers': 'float64', 'dti': 'float64', 'borrower_credit_score': 'float64', 'first_home_buyer': 'category', 'loan_purpose': 'category', 'property_type': 'category', 'num_units': 'int64', 'occupancy_status': 'category', 'property_state': 'category', 'zip': 'int64', 'mortgage_insurance_percent': 'float64', 'product_type': 'category', 'coborrow_credit_score': 'float64', 'mortgage_insurance_type': 'float64', 'relocation_mortgage_indicator': 'category'}' as a data typeIn the case of The following modification was used as a quick solution to this problem: # read_csv = _make_parser_func(sep=",")
def read_csv(*args, **kwargs):
from .. import execution_engine, _create_cloud_conn
conn = _create_cloud_conn()
import rpyc
args = rpyc.classic.deliver(conn, args)
kwargs = rpyc.classic.deliver(conn, kwargs)
if execution_engine.get() == "Cloudray":
read_csv = _create_cloud_conn().modules["modin.pandas"].read_csv
return read_csv(*args, **kwargs)
else:
return _make_parser_func(sep=",")(*args, **kwargs) |
|
Other problems arose when calling some methods of the joined_df.loc[:, loc_list]caused the error: TypeError: 'list' object is not callableThis workaround allows me to get around this problem, but only for research purposes. from modin import _create_cloud_conn
conn = _create_cloud_conn()
loc_list = [
"loan_id",
"monthly_reporting_period",
"current_loan_delinquency_status",
"current_actual_upb",
]
import rpyc
loc_list = rpyc.classic.deliver(conn, loc_list)
test = gdf.loc[:,loc_list]UPD: Adding next lines into if name_pack == "builtins.list" and name == "__call__":
continueclass_factorydef class_factory(id_pack, methods):
"""Creates a netref class proxying the given class
:param id_pack: the id pack used for proxy communication
:param methods: a list of ``(method name, docstring)`` tuples, of the methods that the class defines
:returns: a netref class
"""
ns = {"__slots__": (), "__class__": None}
name_pack = id_pack[0]
class_descriptor = None
if name_pack is not None:
# attempt to resolve __class__ using sys.modules (i.e. builtins and imported modules)
_module = None
cursor = len(name_pack)
while cursor != -1:
_module = sys.modules.get(name_pack[:cursor])
if _module is None:
cursor = name_pack[:cursor].rfind('.')
continue
_class_name = name_pack[cursor + 1:]
_class = getattr(_module, _class_name, None)
if _class is not None and hasattr(_class, '__class__'):
class_descriptor = NetrefClass(_class)
break
ns['__class__'] = class_descriptor
netref_name = class_descriptor.owner.__name__ if class_descriptor is not None else name_pack
# create methods that must perform a syncreq
for name, doc in methods:
name = str(name) # IronPython issue #10
# only create methods that wont shadow BaseNetref during merge for mro
if name not in LOCAL_ATTRS: # i.e. `name != __class__`
if name_pack == "builtins.list" and name == "__call__":
continue
ns[name] = _make_method(name, doc)
return type(netref_name, (BaseNetref,), ns) |
|
At the moment, part of the mortgage benchmark with I’m currently investigating errors that occur when |
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
|
Calling this: tmpdf = tmpdf.groupby(["loan_id", "josh_mody_n"], as_index=False).agg({"delinquency_12": "max", "upb_12": "min"})caused the error: TypeError: __init__() missing 2 required positional arguments: 'conn' and 'id_pack'This happens because the rpyc wrapper over the dict class replaces its attributes(when proxy object is creating) so that when |
|
Another problem: EOFError: connection closed by peer One of the reasons is the excess of timeouts that are set for the rpyc.zerodeploy.DeployedServer` object. Since setting timeouts for this entity is not implied, we will start the server during execution |
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
What do these changes do?
flake8 modinblack --check modingit commit -s