Skip to content

Commit 2227f9f

Browse files
author
Prateek Maheshwari
committed
SAMZA-1736: Add counters and timers for batch get/put/delete operations in KeyValueStorageEngine
Author: Prateek Maheshwari <pmaheshwari@linkedin.com> Reviewers: Cameron Lee <calee@linkedin.com>, Shanthoosh Venkatraman <svenkatr@linkedin.com> Closes apache#539 from prateekm/store-metrics
1 parent ae056ff commit 2227f9f

File tree

4 files changed

+47
-27
lines changed

4 files changed

+47
-27
lines changed

samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -154,28 +154,30 @@ class RocksDbKeyValueStore(
154154
}
155155

156156
def put(key: Array[Byte], value: Array[Byte]): Unit = ifOpen {
157-
metrics.puts.inc
158157
require(key != null, "Null key not allowed.")
159158
if (value == null) {
159+
metrics.deletes.inc
160160
db.delete(writeOptions, key)
161161
} else {
162+
metrics.puts.inc
162163
metrics.bytesWritten.inc(key.length + value.length)
163164
db.put(writeOptions, key, value)
164165
}
165166
}
166167

167168
// Write batch from RocksDB API is not used currently because of: https://github.com/facebook/rocksdb/issues/262
168169
def putAll(entries: java.util.List[Entry[Array[Byte], Array[Byte]]]): Unit = ifOpen {
170+
metrics.putAlls.inc()
169171
val iter = entries.iterator
170172
var wrote = 0
171173
var deletes = 0
172174
while (iter.hasNext) {
173-
wrote += 1
174175
val curr = iter.next()
175176
if (curr.getValue == null) {
176177
deletes += 1
177178
db.delete(writeOptions, curr.getKey)
178179
} else {
180+
wrote += 1
179181
val key = curr.getKey
180182
val value = curr.getValue
181183
metrics.bytesWritten.inc(key.length + value.length)
@@ -187,7 +189,6 @@ class RocksDbKeyValueStore(
187189
}
188190

189191
def delete(key: Array[Byte]): Unit = ifOpen {
190-
metrics.deletes.inc
191192
put(key, null)
192193
}
193194

samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
package org.apache.samza.storage.kv
2121

2222
import org.apache.samza.util.Logging
23-
import org.apache.samza.storage.{StoreProperties, StorageEngine}
23+
import org.apache.samza.storage.{StorageEngine, StoreProperties}
2424
import org.apache.samza.system.IncomingMessageEnvelope
2525
import org.apache.samza.util.TimerUtil
2626

@@ -52,8 +52,11 @@ class KeyValueStorageEngine[K, V](
5252
}
5353

5454
override def getAll(keys: java.util.List[K]): java.util.Map[K, V] = {
55-
metrics.gets.inc(keys.size)
56-
wrapperStore.getAll(keys)
55+
updateTimer(metrics.getAllNs) {
56+
metrics.getAlls.inc()
57+
metrics.gets.inc(keys.size)
58+
wrapperStore.getAll(keys)
59+
}
5760
}
5861

5962
def put(key: K, value: V) = {
@@ -64,8 +67,7 @@ class KeyValueStorageEngine[K, V](
6467
}
6568

6669
def putAll(entries: java.util.List[Entry[K, V]]) = {
67-
metrics.puts.inc(entries.size)
68-
wrapperStore.putAll(entries)
70+
doPutAll(wrapperStore, entries)
6971
}
7072

7173
def delete(key: K) = {
@@ -76,8 +78,11 @@ class KeyValueStorageEngine[K, V](
7678
}
7779

7880
override def deleteAll(keys: java.util.List[K]) = {
79-
metrics.deletes.inc(keys.size)
80-
wrapperStore.deleteAll(keys)
81+
updateTimer(metrics.deleteAllNs) {
82+
metrics.deleteAlls.inc()
83+
metrics.deletes.inc(keys.size)
84+
wrapperStore.deleteAll(keys)
85+
}
8186
}
8287

8388
def range(from: K, to: K) = {
@@ -110,17 +115,17 @@ class KeyValueStorageEngine[K, V](
110115
batch.add(new Entry(keyBytes, valBytes))
111116

112117
if (batch.size >= batchSize) {
113-
rawStore.putAll(batch)
118+
doPutAll(rawStore, batch)
114119
batch.clear()
115120
}
116121

117122
if (valBytes != null) {
118-
metrics.restoredBytes.inc(valBytes.size)
119-
metrics.restoredBytesGauge.set(metrics.restoredBytesGauge.getValue + valBytes.size)
123+
metrics.restoredBytes.inc(valBytes.length)
124+
metrics.restoredBytesGauge.set(metrics.restoredBytesGauge.getValue + valBytes.length)
120125
}
121126

122-
metrics.restoredBytes.inc(keyBytes.size)
123-
metrics.restoredBytesGauge.set(metrics.restoredBytesGauge.getValue + keyBytes.size)
127+
metrics.restoredBytes.inc(keyBytes.length)
128+
metrics.restoredBytesGauge.set(metrics.restoredBytesGauge.getValue + keyBytes.length)
124129

125130
metrics.restoredMessages.inc()
126131
metrics.restoredMessagesGauge.set(metrics.restoredMessagesGauge.getValue + 1)
@@ -134,7 +139,7 @@ class KeyValueStorageEngine[K, V](
134139
info(count + " total entries restored.")
135140

136141
if (batch.size > 0) {
137-
rawStore.putAll(batch)
142+
doPutAll(rawStore, batch)
138143
}
139144
}
140145

@@ -159,6 +164,14 @@ class KeyValueStorageEngine[K, V](
159164
wrapperStore.close()
160165
}
161166

167+
private def doPutAll[Key, Value](store: KeyValueStore[Key, Value], entries: java.util.List[Entry[Key, Value]]) = {
168+
updateTimer(metrics.putAllNs) {
169+
metrics.putAlls.inc()
170+
metrics.puts.inc(entries.size)
171+
store.putAll(entries)
172+
}
173+
}
174+
162175
override def getStoreProperties: StoreProperties = storeProperties
163176

164177
override def snapshot(from: K, to: K): KeyValueSnapshot[K, V] = {

samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,27 +28,32 @@ class KeyValueStorageEngineMetrics(
2828
val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
2929

3030
val gets = newCounter("gets")
31-
val ranges = newCounter("ranges")
32-
val alls = newCounter("alls")
31+
val getAlls = newCounter("get-alls")
3332
val puts = newCounter("puts")
33+
val putAlls = newCounter("put-alls")
3434
val deletes = newCounter("deletes")
35+
val deleteAlls = newCounter("delete-alls")
3536
val flushes = newCounter("flushes")
37+
val alls = newCounter("alls")
38+
val ranges = newCounter("ranges")
3639
val snapshots = newCounter("snapshots")
3740

38-
val restoredMessages = newCounter("messages-restored") //Deprecated
39-
val restoredMessagesGauge = newGauge("restored-messages", 0)
40-
41-
val restoredBytes = newCounter("messages-bytes") //Deprecated
42-
val restoredBytesGauge = newGauge("restored-bytes", 0)
43-
44-
4541
val getNs = newTimer("get-ns")
42+
val getAllNs = newTimer("get-all-ns")
4643
val putNs = newTimer("put-ns")
44+
val putAllNs = newTimer("put-all-ns")
4745
val deleteNs = newTimer("delete-ns")
46+
val deleteAllNs = newTimer("delete-all-ns")
4847
val flushNs = newTimer("flush-ns")
4948
val allNs = newTimer("all-ns")
5049
val rangeNs = newTimer("range-ns")
5150
val snapshotNs = newTimer("snapshot-ns")
5251

52+
val restoredMessages = newCounter("messages-restored") //Deprecated
53+
val restoredMessagesGauge = newGauge("restored-messages", 0)
54+
55+
val restoredBytes = newCounter("messages-bytes") //Deprecated
56+
val restoredBytesGauge = newGauge("restored-bytes", 0)
57+
5358
override def getPrefix = storeName + "-"
5459
}

samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStoreMetrics.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,12 @@ class KeyValueStoreMetrics(
2727

2828
val gets = newCounter("gets")
2929
val getAlls = newCounter("getAlls")
30-
val ranges = newCounter("ranges")
31-
val alls = newCounter("alls")
3230
val puts = newCounter("puts")
31+
val putAlls = newCounter("putAlls")
3332
val deletes = newCounter("deletes")
3433
val deleteAlls = newCounter("deleteAlls")
34+
val alls = newCounter("alls")
35+
val ranges = newCounter("ranges")
3536
val flushes = newCounter("flushes")
3637
val bytesWritten = newCounter("bytes-written")
3738
val bytesRead = newCounter("bytes-read")

0 commit comments

Comments
 (0)