From 282ced21f69c6651af656033c047c6e9f5017165 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Mon, 20 Jul 2015 16:55:21 -0700 Subject: [PATCH] Fix for test Two things: we only expect to use toPipe internally, marking it package private helps with where it gets used. Also the error stems from merging of flowdefs which need to copy the Boxed info between them --- .../src/main/scala/com/twitter/scalding/RichFlowDef.scala | 4 ++-- .../src/main/scala/com/twitter/scalding/typed/TypedPipe.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/RichFlowDef.scala b/scalding-core/src/main/scala/com/twitter/scalding/RichFlowDef.scala index 4a1f99ef5d..6d507d9a59 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/RichFlowDef.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/RichFlowDef.scala @@ -84,7 +84,7 @@ class RichFlowDef(val fd: FlowDef) { .foreach { oFS => FlowStateMap.mutate(fd) { current => // overwrite the items from o with current - (current.copy(sourceMap = oFS.sourceMap ++ current.sourceMap), ()) + (current.copy(sourceMap = oFS.sourceMap ++ current.sourceMap, flowConfigUpdates = oFS.flowConfigUpdates ++ current.flowConfigUpdates), ()) } } } @@ -147,7 +147,7 @@ class RichFlowDef(val fd: FlowDef) { if (headNames(name)) newfs + kv else newfs } - FlowStateMap.mutate(newFd) { oldFS => (oldFS.copy(sourceMap = subFlowState), ()) } + FlowStateMap.mutate(newFd) { oldFS => (oldFS.copy(sourceMap = subFlowState, flowConfigUpdates = thisFS.flowConfigUpdates ++ oldFS.flowConfigUpdates), ()) } } newFd } diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala index 6dd4e12fdd..91ad849dd5 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala @@ -149,7 +149,7 @@ trait TypedPipe[+T] extends Serializable { /** * Provide the internal implementation to get from a typed pipe to a cascading Pipe */ - protected def asPipe[U >: T](fieldNames: Fields)(implicit flowDef: FlowDef, mode: Mode, setter: TupleSetter[U]): Pipe + private[typed] def asPipe[U >: T](fieldNames: Fields)(implicit flowDef: FlowDef, mode: Mode, setter: TupleSetter[U]): Pipe ///////////////////////////////////////////// // @@ -866,7 +866,7 @@ class TypedPipeFactory[T] private (@transient val next: NoStackAndThen[(FlowDef, override def asPipe[U >: T](fieldNames: Fields)(implicit flowDef: FlowDef, mode: Mode, setter: TupleSetter[U]) = // unwrap in a loop, without recursing - unwrap(this).toPipe[U](fieldNames)(flowDef, mode, setter) + unwrap(this).asPipe[U](fieldNames)(flowDef, mode, setter) override def toIterableExecution: Execution[Iterable[T]] = Execution.getConfigMode.flatMap { case (conf, mode) =>