2020package org .apache .samza .test .performance
2121
2222import java .io .File
23+ import java .util
2324import java .util .concurrent .TimeUnit
24- import java .util .{Collections , UUID }
25+ import java .util .Collections
26+ import java .util .UUID
2527
2628import com .google .common .base .Stopwatch
29+ import com .google .common .collect .ImmutableList
30+ import com .google .common .collect .ImmutableMap
31+ import org .apache .commons .lang .RandomStringUtils
2732import org .apache .samza .config .Config
33+ import org .apache .samza .config .JobConfig
34+ import org .apache .samza .config .MapConfig
2835import org .apache .samza .config .StorageConfig ._
2936import org .apache .samza .container .TaskName
30- import org .apache .samza .context .{ContainerContextImpl , JobContextImpl }
31- import org .apache .samza .job .model .{ContainerModel , TaskModel }
37+ import org .apache .samza .context .ContainerContextImpl
38+ import org .apache .samza .context .JobContextImpl
39+ import org .apache .samza .job .model .ContainerModel
40+ import org .apache .samza .job .model .TaskModel
3241import org .apache .samza .metrics .MetricsRegistryMap
33- import org .apache .samza .serializers .{ByteSerde , SerdeManager , UUIDSerde }
42+ import org .apache .samza .serializers .ByteSerde
43+ import org .apache .samza .serializers .SerdeManager
44+ import org .apache .samza .serializers .UUIDSerde
3445import org .apache .samza .storage .StorageEngineFactory
3546import org .apache .samza .storage .StorageEngineFactory .StoreMode
36- import org .apache .samza .storage .kv .{KeyValueStorageEngine , KeyValueStore }
37- import org .apache .samza .system .{SystemProducer , SystemProducers , SystemStreamPartition }
47+ import org .apache .samza .storage .kv .KeyValueStorageEngine
48+ import org .apache .samza .storage .kv .KeyValueStore
49+ import org .apache .samza .system .SystemProducer
50+ import org .apache .samza .system .SystemProducers
51+ import org .apache .samza .system .SystemStreamPartition
3852import org .apache .samza .task .TaskInstanceCollector
39- import org .apache .samza .util .{CommandLine , FileUtil , Logging , Util }
40- import org .apache .samza .{Partition , SamzaException }
53+ import org .apache .samza .util .CommandLine
54+ import org .apache .samza .util .FileUtil
55+ import org .apache .samza .util .Logging
56+ import org .apache .samza .util . Util
57+ import org .apache .samza .Partition
58+ import org .apache .samza .SamzaException
4159
4260import scala .collection .JavaConverters ._
4361import scala .util .Random
@@ -60,6 +78,7 @@ import scala.util.Random
6078
6179object TestKeyValuePerformance extends Logging {
6280 val Encoding = " UTF-8"
81+ val JobId = RandomStringUtils .random(10 )
6382
6483 val testMethods : Map [String , (KeyValueStorageEngine [Array [Byte ], Array [Byte ]], Config ) => Unit ] = Map (
6584 " all-with-deletes" -> runTestAllWithDeletes,
@@ -77,7 +96,10 @@ object TestKeyValuePerformance extends Logging {
7796 tests.foreach{ test =>
7897 info(" Running test: %s" format test)
7998 if (testMethods.contains(test)) {
80- invokeTest(test, testMethods(test), config.subset(" test." + test + " ." , true ))
99+ val testConfig : util.Map [String , String ] = new MapConfig (config.subset(" test." + test + " ." , true ))
100+ val jobConfig : util.Map [String , String ] = ImmutableMap .of(JobConfig .JOB_NAME , test, JobConfig .JOB_ID , JobId )
101+ val combinedConfig : Config = new MapConfig (ImmutableList .of(testConfig, jobConfig))
102+ invokeTest(test, testMethods(test), combinedConfig)
81103 } else {
82104 error(" Invalid test method. valid methods are: %s" format testMethods.keys)
83105 throw new SamzaException (" Unknown test method: %s" format test)
0 commit comments