Skip to content

Commit 410ce78

Browse files
committed
Merge branch 'master' of https://github.com/apache/samza
2 parents a31a7aa + 343712e commit 410ce78

File tree

3 files changed

+66
-12
lines changed

3 files changed

+66
-12
lines changed

docs/learn/documentation/versioned/jobs/configuration-table.html

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1780,6 +1780,14 @@ <h1>Samza Configuration Reference</h1>
17801780
</td>
17811781
</tr>
17821782

1783+
<tr>
1784+
<td class="property" id="stores-rocksdb-metrics">stores.<span class="store">store-name</span>.<br>rocksdb.metrics.list</td>
1785+
<td class="default"></td>
1786+
<td class="description">
1787+
A list of RocksDB <a href="https://github.com/facebook/rocksdb/blob/master/include/rocksdb/db.h#L409">properties</a> to expose as metrics (gauges).
1788+
</td>
1789+
</tr>
1790+
17831791
<tr>
17841792
<th colspan="3" class="section" id="cluster-manager">
17851793
Running Samza with a cluster manager<br>

samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.rocksdb.TtlDB
2828

2929
object RocksDbKeyValueStore extends Logging {
3030

31-
def openDB(dir: File, options: Options, storeConfig: Config, isLoggedStore: Boolean, storeName: String): RocksDB = {
31+
def openDB(dir: File, options: Options, storeConfig: Config, isLoggedStore: Boolean, storeName: String, metrics: KeyValueStoreMetrics): RocksDB = {
3232
var ttl = 0L
3333
var useTTL = false
3434

@@ -68,15 +68,29 @@ object RocksDbKeyValueStore extends Logging {
6868

6969
try
7070
{
71-
if (useTTL)
72-
{
73-
info("Opening RocksDB store with TTL value: %s" format ttl)
74-
TtlDB.open(options, dir.toString, ttl.toInt, false)
75-
}
76-
else
71+
val rocksDb =
72+
if (useTTL)
73+
{
74+
info("Opening RocksDB store with TTL value: %s" format ttl)
75+
TtlDB.open(options, dir.toString, ttl.toInt, false)
76+
}
77+
else
78+
{
79+
RocksDB.open(options, dir.toString)
80+
}
81+
82+
if (storeConfig.containsKey("rocksdb.metrics.list"))
7783
{
78-
RocksDB.open(options, dir.toString)
84+
storeConfig
85+
.get("rocksdb.metrics.list")
86+
.split(",")
87+
.map(property => property.trim)
88+
.foreach(property =>
89+
metrics.newGauge(property, () => rocksDb.getProperty(property))
90+
)
7991
}
92+
93+
rocksDb
8094
}
8195
catch
8296
{
@@ -104,7 +118,7 @@ class RocksDbKeyValueStore(
104118

105119
// lazy val here is important because the store directories do not exist yet, it can only be opened
106120
// after the directories are created, which happens much later from now.
107-
private lazy val db = RocksDbKeyValueStore.openDB(dir, options, storeConfig, isLoggedStore, storeName)
121+
private lazy val db = RocksDbKeyValueStore.openDB(dir, options, storeConfig, isLoggedStore, storeName, metrics)
108122
private val lexicographic = new LexicographicComparator()
109123

110124
def get(key: Array[Byte]): Array[Byte] = {

samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import java.io.File
2424
import java.util
2525

2626
import org.apache.samza.config.MapConfig
27+
import org.apache.samza.metrics.{Gauge, MetricsRegistryMap}
2728
import org.apache.samza.util.ExponentialSleepStrategy
2829
import org.junit.{Assert, Test}
2930
import org.rocksdb.{RocksIterator, RocksDB, FlushOptions, Options}
@@ -41,7 +42,8 @@ class TestRocksDbKeyValueStore
4142
options,
4243
config,
4344
false,
44-
"someStore")
45+
"someStore",
46+
null)
4547
val key = "test".getBytes("UTF-8")
4648
rocksDB.put(key, "val".getBytes("UTF-8"))
4749
Assert.assertNotNull(rocksDB.get(key))
@@ -72,7 +74,8 @@ class TestRocksDbKeyValueStore
7274
options,
7375
config,
7476
false,
75-
"dbStore")
77+
"dbStore",
78+
null)
7679
val key = "key".getBytes("UTF-8")
7780
rocksDB.put(key, "val".getBytes("UTF-8"))
7881
// SAMZA-836: Mysteriously,calling new FlushOptions() does not invoke the NativeLibraryLoader in rocksdbjni-3.13.1!
@@ -98,7 +101,8 @@ class TestRocksDbKeyValueStore
98101
options,
99102
config,
100103
false,
101-
"dbStore")
104+
"dbStore",
105+
null)
102106

103107
val key = "key".getBytes("UTF-8")
104108
val key1 = "key1".getBytes("UTF-8")
@@ -142,4 +146,32 @@ class TestRocksDbKeyValueStore
142146
rocksDB.close()
143147
rocksDBReadOnly.close()
144148
}
149+
150+
@Test
151+
def testMetricsConfig(): Unit = {
152+
val registry = new MetricsRegistryMap("registrymap")
153+
val metrics = new KeyValueStoreMetrics("dbstore", registry)
154+
155+
val map = new util.HashMap[String, String]()
156+
map.put("rocksdb.metrics.list", "rocksdb.estimate-num-keys, rocksdb.estimate-live-data-size")
157+
val config = new MapConfig(map)
158+
val options = new Options()
159+
options.setCreateIfMissing(true)
160+
val rocksDB = RocksDbKeyValueStore.openDB(
161+
new File(System.getProperty("java.io.tmpdir")),
162+
options,
163+
config,
164+
false,
165+
"dbstore",
166+
metrics)
167+
168+
val metricsGroup = registry.getGroup("org.apache.samza.storage.kv.KeyValueStoreMetrics")
169+
assert(metricsGroup != null)
170+
171+
val estimateNumKeysMetric = metricsGroup.get("dbstore-rocksdb.estimate-num-keys")
172+
assert(estimateNumKeysMetric.isInstanceOf[Gauge[String]])
173+
174+
val estimateLiveDataSizeMetric = metricsGroup.get("dbstore-rocksdb.estimate-live-data-size")
175+
assert(estimateLiveDataSizeMetric.isInstanceOf[Gauge[String]])
176+
}
145177
}

0 commit comments

Comments
 (0)