Skip to content

Commit

Permalink
Merge pull request #1376 from twitter/ianoc/fixBug
Browse files Browse the repository at this point in the history
A serialization error we were seeing in repl usage
  • Loading branch information
johnynek committed Jul 21, 2015
2 parents 329d277 + 282ced2 commit 4b4bcf8
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class RichFlowDef(val fd: FlowDef) {
.foreach { oFS =>
FlowStateMap.mutate(fd) { current =>
// overwrite the items from o with current
(current.copy(sourceMap = oFS.sourceMap ++ current.sourceMap), ())
(current.copy(sourceMap = oFS.sourceMap ++ current.sourceMap, flowConfigUpdates = oFS.flowConfigUpdates ++ current.flowConfigUpdates), ())
}
}
}
Expand Down Expand Up @@ -147,7 +147,7 @@ class RichFlowDef(val fd: FlowDef) {
if (headNames(name)) newfs + kv
else newfs
}
FlowStateMap.mutate(newFd) { oldFS => (oldFS.copy(sourceMap = subFlowState), ()) }
FlowStateMap.mutate(newFd) { oldFS => (oldFS.copy(sourceMap = subFlowState, flowConfigUpdates = thisFS.flowConfigUpdates ++ oldFS.flowConfigUpdates), ()) }
}
newFd
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ trait TypedPipe[+T] extends Serializable {
/**
* Provide the internal implementation to get from a typed pipe to a cascading Pipe
*/
protected def asPipe[U >: T](fieldNames: Fields)(implicit flowDef: FlowDef, mode: Mode, setter: TupleSetter[U]): Pipe
private[typed] def asPipe[U >: T](fieldNames: Fields)(implicit flowDef: FlowDef, mode: Mode, setter: TupleSetter[U]): Pipe

/////////////////////////////////////////////
//
Expand Down Expand Up @@ -866,7 +866,7 @@ class TypedPipeFactory[T] private (@transient val next: NoStackAndThen[(FlowDef,

override def asPipe[U >: T](fieldNames: Fields)(implicit flowDef: FlowDef, mode: Mode, setter: TupleSetter[U]) =
// unwrap in a loop, without recursing
unwrap(this).toPipe[U](fieldNames)(flowDef, mode, setter)
unwrap(this).asPipe[U](fieldNames)(flowDef, mode, setter)

override def toIterableExecution: Execution[Iterable[T]] = Execution.getConfigMode.flatMap {
case (conf, mode) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,21 @@ class ComplexJob(input: List[NestedCaseClass], args: Args) extends Job(args) {
.write(TypedTsv[String](args("output2")))
}

class ComplexJob2(input: List[NestedCaseClass], args: Args) extends Job(args) {
implicit def primitiveOrderedBufferSupplier[T]: OrderedSerialization[T] = macro com.twitter.scalding.serialization.macros.impl.OrderedSerializationProviderImpl[T]

val ds1 = TypedPipe.from(input).map(_ -> (1L, "asfg"))

val ds2 = TypedPipe.from(input).map(_ -> (1L, "sdf"))

val execution = ds1.join(ds2).groupAll.size.values.toIterableExecution
val r = Config.tryFrom(config).get
execution.waitFor(r, mode).get

ds1.map(_.toString).write(TypedTsv[String](args("output1")))
ds2.map(_.toString).write(TypedTsv[String](args("output2")))
}

class CheckFlowProcessJoiner(uniqueID: UniqueID) extends InnerJoin {
override def getIterator(joinerClosure: JoinerClosure): JIterator[Tuple] = {
println("CheckFlowProcessJoiner.getItertor")
Expand Down Expand Up @@ -401,6 +416,19 @@ class PlatformTest extends WordSpec with Matchers with HadoopSharedPlatformTest
.sink[String](TypedTsv[String]("output1")) { x => () }
.run
}

"A test job with that joins then groupAll's should have its boxes setup correctly." in {
val fn = (arg: Args) => new ComplexJob2(data, arg)
HadoopPlatformJobTest(fn, cluster)
.arg("output1", "output1")
.arg("output2", "output2")
// Here we are just testing that we hit no exceptions in the course of this run
// the previous issue would have caused OOM or other exceptions. If we get to the end
// then we are good.
.sink[String](TypedTsv[String]("output2")) { x => () }
.sink[String](TypedTsv[String]("output1")) { x => () }
.run
}
}

"Methods called from a Joiner" should {
Expand Down

0 comments on commit 4b4bcf8

Please sign in to comment.