Description
Environment:
- Python version [3.6]
- Spark version [ 2.3.1]
- TensorFlow version [1.13.1]
- TensorFlowOnSpark version [e.g. 1.4]
- Cluster version [Standalone, Hadoop 2.7]
Code
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
from tensorflowonspark import TFCluster
from tensorflowonspark import TFNode
from datetime import datetime
import tensorflow as tf
import argparse
def main_fun(argv, ctx):
worker_num = ctx.worker_num
job_name = ctx.job_name
print(job_nameame)
task_index = ctx.task_index
cluster_spec, server = TFNode.start_cluster_server(ctx)
hello = tf.constant('Hello, TensorFlow!')
sess = tf.Session()
print(sess.run(hello))
import sys
if name == "main":
parser = argparse.ArgumentParser()
parser.add_argument("--tensorboard", help="launch tensorboard process", action="store_true")
conf = SparkConf().setAppName("test").setMaster("local[2]")
sc = SparkContext(conf=conf)
print(sc)
executors = sc._conf.get("spark.executor.instances")
num_executors = int(executors) if executors is not None else 2
num_ps = 1
tensorboard = True
cluster = TFCluster.run(sc, main_fun, sys.argv, num_executors, num_ps, tensorboard, TFCluster.InputMode.TENSORFLOW)
cluster.shutdown()
sc.stop()
Describe the bug:
A clear and concise description of what the bug is.
Logs:
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
[Stage 0:> (0 + 2) / 2]19/04/14 03:09:21 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "D:\spark-2.2.3-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 196, in main
File "D:\spark-2.2.3-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 191, in process
File "D:\ProgramData\Anaconda3\envs\bigdata\lib\site-packages\pyspark\rdd.py", line 2410, in pipeline_func
return func(split, prev_func(split, iterator))
File "D:\ProgramData\Anaconda3\envs\bigdata\lib\site-packages\pyspark\rdd.py", line 2410, in pipeline_func
return func(split, prev_func(split, iterator))
File "D:\ProgramData\Anaconda3\envs\bigdata\lib\site-packages\pyspark\rdd.py", line 2410, in pipeline_func
return func(split, prev_func(split, iterator))
[Previous line repeated 1 more time]
File "D:\ProgramData\Anaconda3\envs\bigdata\lib\site-packages\pyspark\rdd.py", line 333, in func
return f(iterator)
File "D:\ProgramData\Anaconda3\envs\bigdata\lib\site-packages\pyspark\rdd.py", line 781, in func
r = f(it)
File "D:\ProgramData\Anaconda3\envs\bigdata\lib\site-packages\tensorflowonspark\TFSparkNode.py", line 183, in _mapfn
TFSparkNode.mgr = TFManager.start(authkey, queues)
File "D:\ProgramData\Anaconda3\envs\bigdata\lib\site-packages\tensorflowonspark\TFManager.py", line 64, in start
mgr.start()
File "D:\ProgramData\Anaconda3\envs\bigdata\lib\multiprocessing\managers.py", line 513, in start
self._process.start()
File "D:\ProgramData\Anaconda3\envs\bigdata\lib\multiprocessing\process.py", line 105, in start
self._popen = self._Popen(self)
File "D:\ProgramData\Anaconda3\envs\bigdata\lib\multiprocessing\context.py", line 322, in _Popen
return Popen(process_obj)
File "D:\ProgramData\Anaconda3\envs\bigdata\lib\multiprocessing\popen_spawn_win32.py", line 65, in init
reduction.dump(process_obj, to_child)
File "D:\ProgramData\Anaconda3\envs\bigdata\lib\multiprocessing\reduction.py", line 60, in dump
ForkingPickler(file, protocol).dump(obj)
AttributeError: Can't pickle local object 'start..'
SystemExit: 1
D:\ProgramData\Anaconda3\envs\bigdata\lib\site-packages\IPython\core\interactiveshell.py:3299: UserWarning: To exit: use 'exit', 'quit', or Ctrl-D.
warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)
Spark Submit Command Line: