Skip to content

Commit

Permalink
refactor code to be reduce code duplicates
Browse files Browse the repository at this point in the history
  • Loading branch information
Bryan authored and Bryan committed Dec 10, 2021
1 parent 558e21b commit 9c576f6
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 22 deletions.
4 changes: 2 additions & 2 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ CORE_DIR="${CURRENT_DIR}/core"
pushd ${CORE_DIR}
if [[ -z $GITHUB_CI ]];
then
mvn clean package -q -DskipTests
mvn clean package -DskipTests
else
mvn verify -q
mvn verify
fi
popd # core dir

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,29 +61,21 @@ class ObjectStoreWriter(@transient val df: DataFrame) extends Serializable {
queue: ObjectRefHolder.Queue,
ownerName: String): RecordBatch = {

var objectRef: ObjectRef[Array[Byte]] = null
if (ownerName == "") {
val objectRef = Ray.put(data)

// add the objectRef to the objectRefHolder to avoid reference GC
queue.add(objectRef)
val objectRefImpl = RayDPUtils.convert(objectRef)
val objectId = objectRefImpl.getId
val runtime = Ray.internal.asInstanceOf[RayRuntimeInternal]
val addressInfo = runtime.getObjectStore.getOwnershipInfo(objectId)
RecordBatch(addressInfo, objectId.getBytes, numRecords)
objectRef = Ray.put(data)
} else {
val ns = Ray.getRuntimeContext().getNamespace()
var dataOwner: PyActorHandle = Ray.getActor(ownerName, ns).get()
val objectRef = Ray.put(data, dataOwner) // val objectRef = Ray.put(data)

// add the objectRef to the objectRefHolder to avoid reference GC
queue.add(objectRef)
val objectRefImpl = RayDPUtils.convert(objectRef)
val objectId = objectRefImpl.getId
val runtime = Ray.internal.asInstanceOf[RayRuntimeInternal]
val addressInfo = runtime.getObjectStore.getOwnershipInfo(objectId)
RecordBatch(addressInfo, objectId.getBytes, numRecords)
var dataOwner: PyActorHandle = Ray.getActor(ownerName).get()
objectRef = Ray.put(data, dataOwner)
}

// add the objectRef to the objectRefHolder to avoid reference GC
queue.add(objectRef)
val objectRefImpl = RayDPUtils.convert(objectRef)
val objectId = objectRefImpl.getId
val runtime = Ray.internal.asInstanceOf[RayRuntimeInternal]
val addressInfo = runtime.getObjectStore.getOwnershipInfo(objectId)
RecordBatch(addressInfo, objectId.getBytes, numRecords)
}

/**
Expand Down

0 comments on commit 9c576f6

Please sign in to comment.