-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support Calling Java Function in Python Executor and ModelBroadcast in Python #2284
Conversation
private var gatewayServer: py4j.GatewayServer = null | ||
|
||
|
||
private[bigdl] def createJavaGateway(driverPort: Int): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should it be a singleton here? i.e createJavaGateway is invoked multiple times on the same Executor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On java side, Engine is already a singleton, so it won't create multiple servers.
pyspark/bigdl/util/common.py
Outdated
@@ -610,9 +634,14 @@ def _java2py(sc, r, encoding="bytes"): | |||
|
|||
def callJavaFunc(sc, func, *args): | |||
""" Call Java Function """ | |||
args = [_py2java(sc, a) for a in args] | |||
if sc is None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this checking into GatewayWrapper
?
if run_on_driver:
return get_spark_context()._jvm
else:
return create_gateway()....
pyspark/bigdl/util/common.py
Outdated
@@ -533,6 +541,8 @@ def get_spark_context(conf=None): | |||
:param conf: combining bigdl configs into spark conf | |||
:return: SparkContext | |||
""" | |||
if SparkFiles._is_running_on_worker: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this?
} | ||
val thread = new Thread(new Runnable() { | ||
override def run(): Unit = try { | ||
gatewayServer.start() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we add a hooker to stop the gatewayServer once the thread is closed? i.e:
Runtime.getRuntime().addShutdownHook
|
||
private[bigdl] def createJavaGateway(driverPort: Int): Unit = { | ||
if (SparkUtils.isDriver) { | ||
val file = new java.io.File(SparkFiles.getRootDirectory(), "gateway_port") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we make sure it's thread safe when check and write the file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
if (file.exists()) { | ||
file.delete() | ||
} | ||
file.createNewFile() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add some logs when initializing gateway server, makes it easier to monitor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
file.delete() | ||
} | ||
file.createNewFile() | ||
val out = new PrintWriter(file) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we try to catch the file create exception and throw a more meaningful error message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
val thread = new Thread(new Runnable() { | ||
override def run(): Unit = try { | ||
gatewayServer.start() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the gatewayServer crashs, will it output some log and where can we find it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I turned on logging, logs can be found on spark's executor's log file.
case ct: ControlThrowable => | ||
throw ct | ||
case t: Throwable => | ||
throw new Exception(s"Uncaught exception in thread ${Thread.currentThread().getName}", t) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please replace with a more meaning error message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
}) | ||
|
||
val file = new java.io.File(SparkFiles.getRootDirectory(), "gateway_port") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should it be thread safe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
if (file.exists()) { | ||
file.delete() | ||
} | ||
file.createNewFile() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
try to catch the exceptions and throw a more meaningful error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
if (gatewayServer != null) return | ||
gatewayServer = new py4j.GatewayServer(null, 0) | ||
} | ||
val thread = new Thread(new Runnable() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if another thread try to use this gateway while the gateway havn't inited
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only one thread can reach this part of code.
@@ -2940,6 +2935,11 @@ class PythonBigDL[T: ClassTag](implicit ev: TensorNumeric[T]) extends Serializab | |||
maxIteration: Int): SequentialSchedule = { | |||
seq.add(scheduler, maxIteration) | |||
} | |||
|
|||
def initExecutorGateway(sc: JavaSparkContext, driverPort: Int): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private bigdl?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -438,6 +443,9 @@ def uniform(self, a, b, size): | |||
def init_engine(bigdl_type="float"): | |||
callBigDlFunc(bigdl_type, "initEngine") | |||
|
|||
def init_executor_gateway(sc, bigdl_type="float"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when call this method?
In each iteration, between the env and agent, there will be a tensor serial and deserial, right? |
@yiheng yes, but I can't think of any way around this. |
@@ -533,7 +533,7 @@ def test_save_jtensor_dict(self): | |||
tensors["tensor1"] = JTensor.from_ndarray(np.random.rand(3, 2)) | |||
tensors["tensor2"] = JTensor.from_ndarray(np.random.rand(3, 2)) | |||
# in old impl, this will throw an exception | |||
_py2java(self.sc, tensors) | |||
_py2java(self.sc._gateway, tensors) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add unittest for the new added ModelBroadcast?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
from bigdl.nn.layer import Model | ||
|
||
def _from_id_and_type(bid, bigdl_type): | ||
from pyspark.broadcast import _broadcastRegistry |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
call method _from_id
directly to avoid duplicating the code here?
LGTM |
PR Validation pass |
def _get_port(): | ||
root_dir = SparkFiles.getRootDirectory() | ||
path = os.path.join(root_dir, "gateway_port") | ||
f = open(path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if this fails?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll submit another PR and make it report a meaningful error message.
|
||
private[bigdl] def createJavaGateway(driverPort: Int): Unit = { | ||
if (SparkUtils.isDriver) { | ||
if (driverPortFileCreated.compareAndSet(false, true)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it needed on driver?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In spark local mode, python worker is a different process from python driver, so it cannot get the SparkContext object and need this to know the driver gateway's listening port.
…n Python (intel-analytics#2284) * support call java function in executor * fix test * fix style * address comments * add parallelism * address comments * fix partition num * address comments * fix typo * fix typo * add integration test * address comments
* add hyperzoo for k8s support (intel-analytics#2140) * add hyperzoo for k8s support * format * format * format * format * run examples on k8s readme (intel-analytics#2163) * k8s readme * fix jdk download issue (intel-analytics#2219) * add doc for submit jupyter notebook and cluster serving to k8s (intel-analytics#2221) * add hyperzoo doc * add hyperzoo doc * add hyperzoo doc * add hyperzoo doc * fix jdk download issue (intel-analytics#2223) * bump to 0.9s (intel-analytics#2227) * update jdk download url (intel-analytics#2259) * update some previous docs (intel-analytics#2284) * K8docsupdate (intel-analytics#2306) * Update README.md * Update s3 related links in readme and documents (intel-analytics#2489) * Update s3 related links in readme and documents * Update s3 related links in readme and documents * Update s3 related links in readme and documents * Update s3 related links in readme and documents * Update s3 related links in readme and documents * Update s3 related links in readme and documents * update * update * modify line length limit * update * Update mxnet-mkl version in hyper-zoo dockerfile (intel-analytics#2720) Co-authored-by: gaoping <pingx.gao@intel.com> * update bigdl version (intel-analytics#2743) * update bigdl version * hyperzoo dockerfile add cluster-serving (intel-analytics#2731) * hyperzoo dockerfile add cluster-serving * update * update * update * update jdk url * update jdk url * update Co-authored-by: gaoping <pingx.gao@intel.com> * Support init_spark_on_k8s (intel-analytics#2813) * initial * fix * code refactor * bug fix * update docker * style * add conda to docker image (intel-analytics#2894) * add conda to docker image * Update Dockerfile * Update Dockerfile Co-authored-by: glorysdj <glorysdj@gmail.com> * Fix code blocks indents in .md files (intel-analytics#2978) * Fix code blocks indents in .md files Previously a lot of the code blocks in markdown files were horribly indented with bad white spaces in the beginning of lines. Users can't just select, copy, paste, and run (in the case of python). I have fixed all these, so there is no longer any code block with bad white space at the beginning of the lines. It would be nice if you could try to make sure in future commits that all code blocks are properly indented inside and have the right amount of white space in the beginning! * Fix small style issue * Fix indents * Fix indent and add \ for multiline commands Change indent from 3 spaces to 4, and add "\" for multiline bash commands Co-authored-by: Yifan Zhu <fanzhuyifan@gmail.com> * enable bigdl 0.12 (intel-analytics#3101) * switch to bigdl 0.12 * Hyperzoo example ref (intel-analytics#3143) * specify pip version to fix oserror 0 of proxy (intel-analytics#3165) * Bigdl0.12.1 (intel-analytics#3155) * bigdl 0.12.1 * bump 0.10.0-Snapshot (intel-analytics#3237) * update runtime image name (intel-analytics#3250) * update jdk download url (intel-analytics#3316) * update jdk8 url (intel-analytics#3411) Co-authored-by: ardaci <dongjie.shi@intel.com> * update hyperzoo docker image (intel-analytics#3429) * update hyperzoo image (intel-analytics#3457) * fix jdk in az docker (intel-analytics#3478) * fix jdk in az docker * fix jdk for hyperzoo * fix jdk in jenkins docker * fix jdk in cluster serving docker * fix jdk * fix readme * update python dep to fit cnvrg (intel-analytics#3486) * update ray version doc (intel-analytics#3568) * fix deploy hyperzoo issue (intel-analytics#3574) Co-authored-by: gaoping <pingx.gao@intel.com> * add spark fix and net-tools and status check (intel-analytics#3742) * intsall netstat and add check status * add spark fix for graphene * bigdl 0.12.2 (intel-analytics#3780) * bump to 0.11-S and fix version issues except ipynb * add multi-stage build Dockerfile (intel-analytics#3916) * add multi-stage build Dockerfile * multi-stage build dockerfile * multi-stage build dockerfile * Rename Dockerfile.multi to Dockerfile * delete Dockerfile.multi * remove comments, add TINI_VERSION to common arg, remove Dockerfile.multi * multi-stage add tf_slim Co-authored-by: shaojie <shaojiex.bai@intel.com> * update hyperzoo doc and k8s doc (intel-analytics#3959) * update userguide of k8s * update k8s guide * update hyperzoo doc * Update k8s.md add note * Update k8s.md add note * Update k8s.md update notes * fix 4087 issue (intel-analytics#4097) Co-authored-by: shaojie <shaojiex.bai@intel.com> * fixed 4086 and 4083 issues (intel-analytics#4098) Co-authored-by: shaojie <shaojiex.bai@intel.com> * Reduce image size (intel-analytics#4132) * Reduce Dockerfile size 1. del redis stage 2. del flink stage 3. del conda & exclude some python packages 4. add copies layer stage * update numpy version to 1.18.1 Co-authored-by: zzti-bsj <shaojiex.bai@intel.com> * update hyperzoo image (intel-analytics#4250) Co-authored-by: Adria777 <Adria777@github.com> * bigdl 0.13 (intel-analytics#4210) * bigdl 0.13 * update * print exception * pyspark2.4.6 * update release PyPI script * update * flip snapshot-0.12.0 and spark2.4.6 (intel-analytics#4254) * s-0.12.0 master * Update __init__.py * Update python.md * fix docker issues due to version update (intel-analytics#4280) * fix docker issues * fix docker issues * update Dockerfile to support spark 3.1.2 && 2.4.6 (intel-analytics#4436) Co-authored-by: shaojie <otnw_bsj@163.com> * update hyperzoo, add lib for tf2 (intel-analytics#4614) * delete tf 1.15.0 (intel-analytics#4719) Co-authored-by: Le-Zheng <30695225+Le-Zheng@users.noreply.github.com> Co-authored-by: pinggao18 <44043817+pinggao18@users.noreply.github.com> Co-authored-by: pinggao187 <44044110+pinggao187@users.noreply.github.com> Co-authored-by: gaoping <pingx.gao@intel.com> Co-authored-by: Kai Huang <huangkaivision@gmail.com> Co-authored-by: GavinGu07 <55721214+GavinGu07@users.noreply.github.com> Co-authored-by: Yifan Zhu <zhuyifan@stanford.edu> Co-authored-by: Yifan Zhu <fanzhuyifan@gmail.com> Co-authored-by: Song Jiaming <litchy233@gmail.com> Co-authored-by: ardaci <dongjie.shi@intel.com> Co-authored-by: Yang Wang <yang3.wang@intel.com> Co-authored-by: zzti-bsj <2779090360@qq.com> Co-authored-by: shaojie <shaojiex.bai@intel.com> Co-authored-by: Lingqi Su <33695124+Adria777@users.noreply.github.com> Co-authored-by: Adria777 <Adria777@github.com> Co-authored-by: shaojie <otnw_bsj@163.com>
* add hyperzoo for k8s support (intel-analytics#2140) * add hyperzoo for k8s support * format * format * format * format * run examples on k8s readme (intel-analytics#2163) * k8s readme * fix jdk download issue (intel-analytics#2219) * add doc for submit jupyter notebook and cluster serving to k8s (intel-analytics#2221) * add hyperzoo doc * add hyperzoo doc * add hyperzoo doc * add hyperzoo doc * fix jdk download issue (intel-analytics#2223) * bump to 0.9s (intel-analytics#2227) * update jdk download url (intel-analytics#2259) * update some previous docs (intel-analytics#2284) * K8docsupdate (intel-analytics#2306) * Update README.md * Update s3 related links in readme and documents (intel-analytics#2489) * Update s3 related links in readme and documents * Update s3 related links in readme and documents * Update s3 related links in readme and documents * Update s3 related links in readme and documents * Update s3 related links in readme and documents * Update s3 related links in readme and documents * update * update * modify line length limit * update * Update mxnet-mkl version in hyper-zoo dockerfile (intel-analytics#2720) Co-authored-by: gaoping <pingx.gao@intel.com> * update bigdl version (intel-analytics#2743) * update bigdl version * hyperzoo dockerfile add cluster-serving (intel-analytics#2731) * hyperzoo dockerfile add cluster-serving * update * update * update * update jdk url * update jdk url * update Co-authored-by: gaoping <pingx.gao@intel.com> * Support init_spark_on_k8s (intel-analytics#2813) * initial * fix * code refactor * bug fix * update docker * style * add conda to docker image (intel-analytics#2894) * add conda to docker image * Update Dockerfile * Update Dockerfile Co-authored-by: glorysdj <glorysdj@gmail.com> * Fix code blocks indents in .md files (intel-analytics#2978) * Fix code blocks indents in .md files Previously a lot of the code blocks in markdown files were horribly indented with bad white spaces in the beginning of lines. Users can't just select, copy, paste, and run (in the case of python). I have fixed all these, so there is no longer any code block with bad white space at the beginning of the lines. It would be nice if you could try to make sure in future commits that all code blocks are properly indented inside and have the right amount of white space in the beginning! * Fix small style issue * Fix indents * Fix indent and add \ for multiline commands Change indent from 3 spaces to 4, and add "\" for multiline bash commands Co-authored-by: Yifan Zhu <fanzhuyifan@gmail.com> * enable bigdl 0.12 (intel-analytics#3101) * switch to bigdl 0.12 * Hyperzoo example ref (intel-analytics#3143) * specify pip version to fix oserror 0 of proxy (intel-analytics#3165) * Bigdl0.12.1 (intel-analytics#3155) * bigdl 0.12.1 * bump 0.10.0-Snapshot (intel-analytics#3237) * update runtime image name (intel-analytics#3250) * update jdk download url (intel-analytics#3316) * update jdk8 url (intel-analytics#3411) Co-authored-by: ardaci <dongjie.shi@intel.com> * update hyperzoo docker image (intel-analytics#3429) * update hyperzoo image (intel-analytics#3457) * fix jdk in az docker (intel-analytics#3478) * fix jdk in az docker * fix jdk for hyperzoo * fix jdk in jenkins docker * fix jdk in cluster serving docker * fix jdk * fix readme * update python dep to fit cnvrg (intel-analytics#3486) * update ray version doc (intel-analytics#3568) * fix deploy hyperzoo issue (intel-analytics#3574) Co-authored-by: gaoping <pingx.gao@intel.com> * add spark fix and net-tools and status check (intel-analytics#3742) * intsall netstat and add check status * add spark fix for graphene * bigdl 0.12.2 (intel-analytics#3780) * bump to 0.11-S and fix version issues except ipynb * add multi-stage build Dockerfile (intel-analytics#3916) * add multi-stage build Dockerfile * multi-stage build dockerfile * multi-stage build dockerfile * Rename Dockerfile.multi to Dockerfile * delete Dockerfile.multi * remove comments, add TINI_VERSION to common arg, remove Dockerfile.multi * multi-stage add tf_slim Co-authored-by: shaojie <shaojiex.bai@intel.com> * update hyperzoo doc and k8s doc (intel-analytics#3959) * update userguide of k8s * update k8s guide * update hyperzoo doc * Update k8s.md add note * Update k8s.md add note * Update k8s.md update notes * fix 4087 issue (intel-analytics#4097) Co-authored-by: shaojie <shaojiex.bai@intel.com> * fixed 4086 and 4083 issues (intel-analytics#4098) Co-authored-by: shaojie <shaojiex.bai@intel.com> * Reduce image size (intel-analytics#4132) * Reduce Dockerfile size 1. del redis stage 2. del flink stage 3. del conda & exclude some python packages 4. add copies layer stage * update numpy version to 1.18.1 Co-authored-by: zzti-bsj <shaojiex.bai@intel.com> * update hyperzoo image (intel-analytics#4250) Co-authored-by: Adria777 <Adria777@github.com> * bigdl 0.13 (intel-analytics#4210) * bigdl 0.13 * update * print exception * pyspark2.4.6 * update release PyPI script * update * flip snapshot-0.12.0 and spark2.4.6 (intel-analytics#4254) * s-0.12.0 master * Update __init__.py * Update python.md * fix docker issues due to version update (intel-analytics#4280) * fix docker issues * fix docker issues * update Dockerfile to support spark 3.1.2 && 2.4.6 (intel-analytics#4436) Co-authored-by: shaojie <otnw_bsj@163.com> * update hyperzoo, add lib for tf2 (intel-analytics#4614) * delete tf 1.15.0 (intel-analytics#4719) Co-authored-by: Le-Zheng <30695225+Le-Zheng@users.noreply.github.com> Co-authored-by: pinggao18 <44043817+pinggao18@users.noreply.github.com> Co-authored-by: pinggao187 <44044110+pinggao187@users.noreply.github.com> Co-authored-by: gaoping <pingx.gao@intel.com> Co-authored-by: Kai Huang <huangkaivision@gmail.com> Co-authored-by: GavinGu07 <55721214+GavinGu07@users.noreply.github.com> Co-authored-by: Yifan Zhu <zhuyifan@stanford.edu> Co-authored-by: Yifan Zhu <fanzhuyifan@gmail.com> Co-authored-by: Song Jiaming <litchy233@gmail.com> Co-authored-by: ardaci <dongjie.shi@intel.com> Co-authored-by: Yang Wang <yang3.wang@intel.com> Co-authored-by: zzti-bsj <2779090360@qq.com> Co-authored-by: shaojie <shaojiex.bai@intel.com> Co-authored-by: Lingqi Su <33695124+Adria777@users.noreply.github.com> Co-authored-by: Adria777 <Adria777@github.com> Co-authored-by: shaojie <otnw_bsj@163.com>
What changes were proposed in this pull request?
Support Calling Java Function in Python Executor and ModelBroadcast in Python
To enable this feature, user should explicitly call
The approach we took is
How was this patch tested?
add a unit test to for spark local mode
manually tested on spark standalone cluster