Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Calling Java Function in Python Executor and ModelBroadcast in Python #2284

Merged
merged 12 commits into from
Feb 9, 2018

Conversation

yangw1234
Copy link
Contributor

@yangw1234 yangw1234 commented Feb 7, 2018

What changes were proposed in this pull request?

Support Calling Java Function in Python Executor and ModelBroadcast in Python

To enable this feature, user should explicitly call

init_executor_gateway(sc)

The approach we took is

  1. launch a JavaGateway server on each executor and write the listening port to a fine in the executor's working directory (typical the ${SPARK_HOMW}/work/app-id/executor-id)
  2. in python, if running on executor, find the port on the above file and create a gateway client to connect the executor JVM.

How was this patch tested?

add a unit test to for spark local mode
manually tested on spark standalone cluster

@yiheng yiheng self-requested a review February 7, 2018 14:09
private var gatewayServer: py4j.GatewayServer = null


private[bigdl] def createJavaGateway(driverPort: Int): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should it be a singleton here? i.e createJavaGateway is invoked multiple times on the same Executor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On java side, Engine is already a singleton, so it won't create multiple servers.

@@ -610,9 +634,14 @@ def _java2py(sc, r, encoding="bytes"):

def callJavaFunc(sc, func, *args):
""" Call Java Function """
args = [_py2java(sc, a) for a in args]
if sc is None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this checking into GatewayWrapper ?

if run_on_driver:
    return get_spark_context()._jvm
else:
   return create_gateway()....

@@ -533,6 +541,8 @@ def get_spark_context(conf=None):
:param conf: combining bigdl configs into spark conf
:return: SparkContext
"""
if SparkFiles._is_running_on_worker:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this?

}
val thread = new Thread(new Runnable() {
override def run(): Unit = try {
gatewayServer.start()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add a hooker to stop the gatewayServer once the thread is closed? i.e:
Runtime.getRuntime().addShutdownHook


private[bigdl] def createJavaGateway(driverPort: Int): Unit = {
if (SparkUtils.isDriver) {
val file = new java.io.File(SparkFiles.getRootDirectory(), "gateway_port")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make sure it's thread safe when check and write the file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if (file.exists()) {
file.delete()
}
file.createNewFile()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add some logs when initializing gateway server, makes it easier to monitor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

file.delete()
}
file.createNewFile()
val out = new PrintWriter(file)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we try to catch the file create exception and throw a more meaningful error message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
val thread = new Thread(new Runnable() {
override def run(): Unit = try {
gatewayServer.start()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the gatewayServer crashs, will it output some log and where can we find it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I turned on logging, logs can be found on spark's executor's log file.

case ct: ControlThrowable =>
throw ct
case t: Throwable =>
throw new Exception(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please replace with a more meaning error message

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
})

val file = new java.io.File(SparkFiles.getRootDirectory(), "gateway_port")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should it be thread safe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if (file.exists()) {
file.delete()
}
file.createNewFile()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try to catch the exceptions and throw a more meaningful error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if (gatewayServer != null) return
gatewayServer = new py4j.GatewayServer(null, 0)
}
val thread = new Thread(new Runnable() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if another thread try to use this gateway while the gateway havn't inited

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only one thread can reach this part of code.

@@ -2940,6 +2935,11 @@ class PythonBigDL[T: ClassTag](implicit ev: TensorNumeric[T]) extends Serializab
maxIteration: Int): SequentialSchedule = {
seq.add(scheduler, maxIteration)
}

def initExecutorGateway(sc: JavaSparkContext, driverPort: Int): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private bigdl?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -438,6 +443,9 @@ def uniform(self, a, b, size):
def init_engine(bigdl_type="float"):
callBigDlFunc(bigdl_type, "initEngine")

def init_executor_gateway(sc, bigdl_type="float"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when call this method?

@yiheng
Copy link
Contributor

yiheng commented Feb 8, 2018

In each iteration, between the env and agent, there will be a tensor serial and deserial, right?

@yangw1234
Copy link
Contributor Author

@yiheng yes, but I can't think of any way around this.

@@ -533,7 +533,7 @@ def test_save_jtensor_dict(self):
tensors["tensor1"] = JTensor.from_ndarray(np.random.rand(3, 2))
tensors["tensor2"] = JTensor.from_ndarray(np.random.rand(3, 2))
# in old impl, this will throw an exception
_py2java(self.sc, tensors)
_py2java(self.sc._gateway, tensors)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add unittest for the new added ModelBroadcast?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

from bigdl.nn.layer import Model

def _from_id_and_type(bid, bigdl_type):
from pyspark.broadcast import _broadcastRegistry
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

call method _from_id directly to avoid duplicating the code here?

@zhichao-li
Copy link
Contributor

LGTM

@yangw1234
Copy link
Contributor Author

PR Validation pass

@yangw1234 yangw1234 merged commit cd8db33 into intel-analytics:master Feb 9, 2018
def _get_port():
root_dir = SparkFiles.getRootDirectory()
path = os.path.join(root_dir, "gateway_port")
f = open(path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if this fails?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll submit another PR and make it report a meaningful error message.


private[bigdl] def createJavaGateway(driverPort: Int): Unit = {
if (SparkUtils.isDriver) {
if (driverPortFileCreated.compareAndSet(false, true)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it needed on driver?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In spark local mode, python worker is a different process from python driver, so it cannot get the SparkContext object and need this to know the driver gateway's listening port.

psyyz10 pushed a commit to psyyz10/BigDL that referenced this pull request Mar 8, 2018
…n Python (intel-analytics#2284)

* support call java function in executor

* fix test

* fix style

* address comments

* add parallelism

* address comments

* fix partition num

* address comments

* fix typo

* fix typo

* add integration test

* address comments
Le-Zheng added a commit to Le-Zheng/BigDL that referenced this pull request Oct 20, 2021
dding3 pushed a commit to dding3/BigDL that referenced this pull request Nov 17, 2021
dding3 pushed a commit to dding3/BigDL that referenced this pull request Nov 17, 2021
* add hyperzoo for k8s support (intel-analytics#2140)

* add hyperzoo for k8s support

* format

* format

* format

* format

* run examples on k8s readme (intel-analytics#2163)

* k8s  readme

* fix jdk download issue (intel-analytics#2219)

* add doc for submit jupyter notebook and cluster serving to k8s (intel-analytics#2221)

* add hyperzoo doc

* add hyperzoo doc

* add hyperzoo doc

* add hyperzoo doc

* fix jdk download issue (intel-analytics#2223)

* bump to 0.9s (intel-analytics#2227)

* update jdk download url (intel-analytics#2259)

* update some previous docs (intel-analytics#2284)

* K8docsupdate (intel-analytics#2306)

* Update README.md

* Update s3 related links in readme  and documents (intel-analytics#2489)

* Update s3 related links in readme  and documents

* Update s3 related links in readme and documents

* Update s3 related links in readme and documents

* Update s3 related links in readme and documents

* Update s3 related links in readme and documents

* Update s3 related links in readme and documents

* update

* update

* modify line length limit

* update

* Update mxnet-mkl version in hyper-zoo dockerfile (intel-analytics#2720)

Co-authored-by: gaoping <pingx.gao@intel.com>

* update bigdl version (intel-analytics#2743)

* update bigdl version

* hyperzoo dockerfile add cluster-serving (intel-analytics#2731)

* hyperzoo dockerfile add cluster-serving

* update

* update

* update

* update jdk url

* update jdk url

* update

Co-authored-by: gaoping <pingx.gao@intel.com>

* Support init_spark_on_k8s (intel-analytics#2813)

* initial

* fix

* code refactor

* bug fix

* update docker

* style

* add conda to docker image (intel-analytics#2894)

* add conda to docker image

* Update Dockerfile

* Update Dockerfile

Co-authored-by: glorysdj <glorysdj@gmail.com>

* Fix code blocks indents in .md files (intel-analytics#2978)

* Fix code blocks indents in .md files

Previously a lot of the code blocks in markdown files were horribly indented with bad white spaces in the beginning of lines. Users can't just select, copy, paste, and run (in the case of python). I have fixed all these, so there is no longer any code block with bad white space at the beginning of the lines.
It would be nice if you could try to make sure in future commits that all code blocks are properly indented inside and have the right amount of white space in the beginning!

* Fix small style issue

* Fix indents

* Fix indent and add \ for multiline commands

Change indent from 3 spaces to 4, and add "\" for multiline bash commands

Co-authored-by: Yifan Zhu <fanzhuyifan@gmail.com>

* enable bigdl 0.12 (intel-analytics#3101)

* switch to bigdl 0.12

* Hyperzoo example ref (intel-analytics#3143)

* specify pip version to fix oserror 0 of proxy (intel-analytics#3165)

* Bigdl0.12.1 (intel-analytics#3155)

* bigdl 0.12.1

* bump 0.10.0-Snapshot (intel-analytics#3237)

* update runtime image name (intel-analytics#3250)

* update jdk download url (intel-analytics#3316)

* update jdk8 url (intel-analytics#3411)

Co-authored-by: ardaci <dongjie.shi@intel.com>

* update hyperzoo docker image (intel-analytics#3429)

* update hyperzoo image (intel-analytics#3457)

* fix jdk in az docker (intel-analytics#3478)

* fix jdk in az docker

* fix jdk for hyperzoo

* fix jdk in jenkins docker

* fix jdk in cluster serving docker

* fix jdk

* fix readme

* update python dep to fit cnvrg (intel-analytics#3486)

* update ray version doc (intel-analytics#3568)

* fix deploy hyperzoo issue (intel-analytics#3574)

Co-authored-by: gaoping <pingx.gao@intel.com>

* add spark fix and net-tools and status check (intel-analytics#3742)

* intsall netstat and add check status

* add spark fix for graphene

* bigdl 0.12.2 (intel-analytics#3780)

* bump to 0.11-S and fix version issues except ipynb

* add multi-stage build Dockerfile (intel-analytics#3916)

* add multi-stage build Dockerfile

* multi-stage build dockerfile

* multi-stage build dockerfile

* Rename Dockerfile.multi to Dockerfile

* delete Dockerfile.multi

* remove comments, add TINI_VERSION to common arg, remove Dockerfile.multi

* multi-stage add tf_slim

Co-authored-by: shaojie <shaojiex.bai@intel.com>

* update hyperzoo doc and k8s doc (intel-analytics#3959)

* update userguide of k8s

* update k8s guide

* update hyperzoo doc

* Update k8s.md

add note

* Update k8s.md

add note

* Update k8s.md

update notes

* fix 4087 issue (intel-analytics#4097)

Co-authored-by: shaojie <shaojiex.bai@intel.com>

* fixed 4086 and 4083 issues (intel-analytics#4098)

Co-authored-by: shaojie <shaojiex.bai@intel.com>

* Reduce image size (intel-analytics#4132)

* Reduce Dockerfile size
1. del redis stage
2. del flink stage
3. del conda & exclude some python packages
4. add copies layer stage

* update numpy version to 1.18.1

Co-authored-by: zzti-bsj <shaojiex.bai@intel.com>

* update hyperzoo image (intel-analytics#4250)

Co-authored-by: Adria777 <Adria777@github.com>

* bigdl 0.13 (intel-analytics#4210)

* bigdl 0.13

* update

* print exception

* pyspark2.4.6

* update release PyPI script

* update

* flip snapshot-0.12.0 and spark2.4.6 (intel-analytics#4254)

* s-0.12.0 master

* Update __init__.py

* Update python.md

* fix docker issues due to version update (intel-analytics#4280)

* fix docker issues

* fix docker issues

* update Dockerfile to support spark 3.1.2 && 2.4.6 (intel-analytics#4436)

Co-authored-by: shaojie <otnw_bsj@163.com>

* update hyperzoo, add lib for tf2 (intel-analytics#4614)

* delete tf 1.15.0 (intel-analytics#4719)

Co-authored-by: Le-Zheng <30695225+Le-Zheng@users.noreply.github.com>
Co-authored-by: pinggao18 <44043817+pinggao18@users.noreply.github.com>
Co-authored-by: pinggao187 <44044110+pinggao187@users.noreply.github.com>
Co-authored-by: gaoping <pingx.gao@intel.com>
Co-authored-by: Kai Huang <huangkaivision@gmail.com>
Co-authored-by: GavinGu07 <55721214+GavinGu07@users.noreply.github.com>
Co-authored-by: Yifan Zhu <zhuyifan@stanford.edu>
Co-authored-by: Yifan Zhu <fanzhuyifan@gmail.com>
Co-authored-by: Song Jiaming <litchy233@gmail.com>
Co-authored-by: ardaci <dongjie.shi@intel.com>
Co-authored-by: Yang Wang <yang3.wang@intel.com>
Co-authored-by: zzti-bsj <2779090360@qq.com>
Co-authored-by: shaojie <shaojiex.bai@intel.com>
Co-authored-by: Lingqi Su <33695124+Adria777@users.noreply.github.com>
Co-authored-by: Adria777 <Adria777@github.com>
Co-authored-by: shaojie <otnw_bsj@163.com>
dding3 pushed a commit to dding3/BigDL that referenced this pull request Nov 17, 2021
* add hyperzoo for k8s support (intel-analytics#2140)

* add hyperzoo for k8s support

* format

* format

* format

* format

* run examples on k8s readme (intel-analytics#2163)

* k8s  readme

* fix jdk download issue (intel-analytics#2219)

* add doc for submit jupyter notebook and cluster serving to k8s (intel-analytics#2221)

* add hyperzoo doc

* add hyperzoo doc

* add hyperzoo doc

* add hyperzoo doc

* fix jdk download issue (intel-analytics#2223)

* bump to 0.9s (intel-analytics#2227)

* update jdk download url (intel-analytics#2259)

* update some previous docs (intel-analytics#2284)

* K8docsupdate (intel-analytics#2306)

* Update README.md

* Update s3 related links in readme  and documents (intel-analytics#2489)

* Update s3 related links in readme  and documents

* Update s3 related links in readme and documents

* Update s3 related links in readme and documents

* Update s3 related links in readme and documents

* Update s3 related links in readme and documents

* Update s3 related links in readme and documents

* update

* update

* modify line length limit

* update

* Update mxnet-mkl version in hyper-zoo dockerfile (intel-analytics#2720)

Co-authored-by: gaoping <pingx.gao@intel.com>

* update bigdl version (intel-analytics#2743)

* update bigdl version

* hyperzoo dockerfile add cluster-serving (intel-analytics#2731)

* hyperzoo dockerfile add cluster-serving

* update

* update

* update

* update jdk url

* update jdk url

* update

Co-authored-by: gaoping <pingx.gao@intel.com>

* Support init_spark_on_k8s (intel-analytics#2813)

* initial

* fix

* code refactor

* bug fix

* update docker

* style

* add conda to docker image (intel-analytics#2894)

* add conda to docker image

* Update Dockerfile

* Update Dockerfile

Co-authored-by: glorysdj <glorysdj@gmail.com>

* Fix code blocks indents in .md files (intel-analytics#2978)

* Fix code blocks indents in .md files

Previously a lot of the code blocks in markdown files were horribly indented with bad white spaces in the beginning of lines. Users can't just select, copy, paste, and run (in the case of python). I have fixed all these, so there is no longer any code block with bad white space at the beginning of the lines.
It would be nice if you could try to make sure in future commits that all code blocks are properly indented inside and have the right amount of white space in the beginning!

* Fix small style issue

* Fix indents

* Fix indent and add \ for multiline commands

Change indent from 3 spaces to 4, and add "\" for multiline bash commands

Co-authored-by: Yifan Zhu <fanzhuyifan@gmail.com>

* enable bigdl 0.12 (intel-analytics#3101)

* switch to bigdl 0.12

* Hyperzoo example ref (intel-analytics#3143)

* specify pip version to fix oserror 0 of proxy (intel-analytics#3165)

* Bigdl0.12.1 (intel-analytics#3155)

* bigdl 0.12.1

* bump 0.10.0-Snapshot (intel-analytics#3237)

* update runtime image name (intel-analytics#3250)

* update jdk download url (intel-analytics#3316)

* update jdk8 url (intel-analytics#3411)

Co-authored-by: ardaci <dongjie.shi@intel.com>

* update hyperzoo docker image (intel-analytics#3429)

* update hyperzoo image (intel-analytics#3457)

* fix jdk in az docker (intel-analytics#3478)

* fix jdk in az docker

* fix jdk for hyperzoo

* fix jdk in jenkins docker

* fix jdk in cluster serving docker

* fix jdk

* fix readme

* update python dep to fit cnvrg (intel-analytics#3486)

* update ray version doc (intel-analytics#3568)

* fix deploy hyperzoo issue (intel-analytics#3574)

Co-authored-by: gaoping <pingx.gao@intel.com>

* add spark fix and net-tools and status check (intel-analytics#3742)

* intsall netstat and add check status

* add spark fix for graphene

* bigdl 0.12.2 (intel-analytics#3780)

* bump to 0.11-S and fix version issues except ipynb

* add multi-stage build Dockerfile (intel-analytics#3916)

* add multi-stage build Dockerfile

* multi-stage build dockerfile

* multi-stage build dockerfile

* Rename Dockerfile.multi to Dockerfile

* delete Dockerfile.multi

* remove comments, add TINI_VERSION to common arg, remove Dockerfile.multi

* multi-stage add tf_slim

Co-authored-by: shaojie <shaojiex.bai@intel.com>

* update hyperzoo doc and k8s doc (intel-analytics#3959)

* update userguide of k8s

* update k8s guide

* update hyperzoo doc

* Update k8s.md

add note

* Update k8s.md

add note

* Update k8s.md

update notes

* fix 4087 issue (intel-analytics#4097)

Co-authored-by: shaojie <shaojiex.bai@intel.com>

* fixed 4086 and 4083 issues (intel-analytics#4098)

Co-authored-by: shaojie <shaojiex.bai@intel.com>

* Reduce image size (intel-analytics#4132)

* Reduce Dockerfile size
1. del redis stage
2. del flink stage
3. del conda & exclude some python packages
4. add copies layer stage

* update numpy version to 1.18.1

Co-authored-by: zzti-bsj <shaojiex.bai@intel.com>

* update hyperzoo image (intel-analytics#4250)

Co-authored-by: Adria777 <Adria777@github.com>

* bigdl 0.13 (intel-analytics#4210)

* bigdl 0.13

* update

* print exception

* pyspark2.4.6

* update release PyPI script

* update

* flip snapshot-0.12.0 and spark2.4.6 (intel-analytics#4254)

* s-0.12.0 master

* Update __init__.py

* Update python.md

* fix docker issues due to version update (intel-analytics#4280)

* fix docker issues

* fix docker issues

* update Dockerfile to support spark 3.1.2 && 2.4.6 (intel-analytics#4436)

Co-authored-by: shaojie <otnw_bsj@163.com>

* update hyperzoo, add lib for tf2 (intel-analytics#4614)

* delete tf 1.15.0 (intel-analytics#4719)

Co-authored-by: Le-Zheng <30695225+Le-Zheng@users.noreply.github.com>
Co-authored-by: pinggao18 <44043817+pinggao18@users.noreply.github.com>
Co-authored-by: pinggao187 <44044110+pinggao187@users.noreply.github.com>
Co-authored-by: gaoping <pingx.gao@intel.com>
Co-authored-by: Kai Huang <huangkaivision@gmail.com>
Co-authored-by: GavinGu07 <55721214+GavinGu07@users.noreply.github.com>
Co-authored-by: Yifan Zhu <zhuyifan@stanford.edu>
Co-authored-by: Yifan Zhu <fanzhuyifan@gmail.com>
Co-authored-by: Song Jiaming <litchy233@gmail.com>
Co-authored-by: ardaci <dongjie.shi@intel.com>
Co-authored-by: Yang Wang <yang3.wang@intel.com>
Co-authored-by: zzti-bsj <2779090360@qq.com>
Co-authored-by: shaojie <shaojiex.bai@intel.com>
Co-authored-by: Lingqi Su <33695124+Adria777@users.noreply.github.com>
Co-authored-by: Adria777 <Adria777@github.com>
Co-authored-by: shaojie <otnw_bsj@163.com>
@yangw1234 yangw1234 deleted the py2java branch July 20, 2023 17:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants