Skip to content

Commit d4620d6

Browse files
committed
Merge branch 'master' of https://github.com/apache/samza
2 parents 410ce78 + 958edc4 commit d4620d6

File tree

7 files changed

+93
-187
lines changed

7 files changed

+93
-187
lines changed

samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java

Lines changed: 18 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,19 @@ public interface KeyValueStore<K, V> {
4646
* @return a map of the keys that were found and their respective values.
4747
* @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
4848
*/
49-
Map<K, V> getAll(List<K> keys);
49+
default Map<K, V> getAll(List<K> keys) {
50+
Map<K, V> map = new HashMap<>(keys.size());
51+
52+
for (K key : keys) {
53+
V value = get(key);
54+
55+
if (value != null) {
56+
map.put(key, value);
57+
}
58+
}
59+
60+
return map;
61+
}
5062

5163
/**
5264
* Updates the mapping of the specified key-value pair; Associates the specified {@code key} with the specified {@code value}.
@@ -79,7 +91,11 @@ public interface KeyValueStore<K, V> {
7991
* @param keys the keys for which the mappings are to be deleted.
8092
* @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
8193
*/
82-
void deleteAll(List<K> keys);
94+
default void deleteAll(List<K> keys) {
95+
for (K key : keys) {
96+
delete(key);
97+
}
98+
}
8399

84100
/**
85101
* Returns an iterator for a sorted range of entries specified by [{@code from}, {@code to}).
@@ -111,53 +127,4 @@ public interface KeyValueStore<K, V> {
111127
* Flushes this key-value store, if applicable.
112128
*/
113129
void flush();
114-
115-
/**
116-
* Represents an extension for classes that implement {@link KeyValueStore}.
117-
*/
118-
// TODO replace with default interface methods when we can use Java 8 features.
119-
class Extension {
120-
private Extension() {
121-
// This class cannot be instantiated
122-
}
123-
124-
/**
125-
* Gets the values with which the specified {@code keys} are associated.
126-
*
127-
* @param store the key-value store for which this operation is to be performed.
128-
* @param keys the keys with which the associated values are to be fetched.
129-
* @param <K> the type of keys maintained by the specified {@code store}.
130-
* @param <V> the type of values maintained by the specified {@code store}.
131-
* @return a map of the keys that were found and their respective values.
132-
* @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
133-
*/
134-
public static <K, V> Map<K, V> getAll(final KeyValueStore<K, V> store, final List<K> keys) {
135-
final Map<K, V> map = new HashMap<>(keys.size());
136-
137-
for (final K key : keys) {
138-
final V value = store.get(key);
139-
140-
if (value != null) {
141-
map.put(key, value);
142-
}
143-
}
144-
145-
return map;
146-
}
147-
148-
/**
149-
* Deletes the mappings for the specified {@code keys} from this key-value store (if such mappings exist).
150-
*
151-
* @param store the key-value store for which this operation is to be performed.
152-
* @param keys the keys for which the mappings are to be deleted.
153-
* @param <K> the type of keys maintained by the specified {@code store}.
154-
* @param <V> the type of values maintained by the specified {@code store}.
155-
* @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
156-
*/
157-
public static <K, V> void deleteAll(final KeyValueStore<K, V> store, final List<K> keys) {
158-
for (final K key : keys) {
159-
store.delete(key);
160-
}
161-
}
162-
}
163130
}

samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala

Lines changed: 39 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ import scala.collection.mutable
4343
* keyed to that taskName. If there is no such message, no checkpoint data
4444
* exists. The underlying log has a single partition into which all
4545
* checkpoints and TaskName to changelog partition mappings are written.
46+
*
47+
* This class is thread safe for writing but not for reading checkpoints.
48+
* This is currently OK since checkpoints are only read on the main thread.
4649
*/
4750
class KafkaCheckpointManager(
4851
clientId: String,
@@ -64,14 +67,14 @@ class KafkaCheckpointManager(
6467
checkpointTopicProperties: Properties = new Properties) extends CheckpointManager with Logging {
6568

6669
var taskNames = Set[TaskName]()
67-
@volatile var systemProducer: SystemProducer = null
70+
@volatile var systemProducer: SystemProducer = null
71+
var systemConsumer: SystemConsumer = null
6872
var taskNamesToOffsets: Map[TaskName, Checkpoint] = null
6973
val systemAdmin = getSystemAdmin()
7074

7175
val kafkaUtil: KafkaUtil = new KafkaUtil(retryBackoff, connectZk)
7276

7377

74-
7578
KafkaCheckpointLogKey.setSystemStreamPartitionGrouperFactoryString(systemStreamPartitionGrouperFactoryString)
7679

7780
info("Creating KafkaCheckpointManager with: clientId=%s, checkpointTopic=%s, systemName=%s" format(clientId, checkpointTopic, systemName))
@@ -181,73 +184,40 @@ class KafkaCheckpointManager(
181184
*/
182185
private def readLog(shouldHandleEntry: (KafkaCheckpointLogKey) => Boolean,
183186
handleEntry: (ByteBuffer, KafkaCheckpointLogKey) => Unit): Unit = {
184-
185-
val UNKNOWN_OFFSET = "-1"
186-
var attempts = 10
187-
val POLL_TIMEOUT = 1000L
187+
info("Reading from checkpoint system:%s topic:%s" format(systemName, checkpointTopic))
188188

189189
val ssp: SystemStreamPartition = new SystemStreamPartition(systemName, checkpointTopic, new Partition(0))
190-
val systemConsumer = getSystemConsumer()
191-
val partitionMetadata = getSSPMetadata(checkpointTopic, new Partition(0))
192-
// offsets returned are strings
193-
val newestOffset = if (partitionMetadata.getNewestOffset == null) UNKNOWN_OFFSET else partitionMetadata.getNewestOffset
194-
val oldestOffset = partitionMetadata.getOldestOffset
195-
systemConsumer.register(ssp, oldestOffset) // checkpoint stream should always be read from the beginning
196-
systemConsumer.start()
197190

198-
var msgCount = 0
199-
try {
200-
val emptyEnvelopes = util.Collections.emptyMap[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]]
201-
// convert offsets to long
202-
var currentOffset = UNKNOWN_OFFSET.toLong
203-
val newestOffsetLong = newestOffset.toLong
204-
val sspToPoll = Collections.singleton(ssp)
205-
while (currentOffset < newestOffsetLong) {
206-
207-
val envelopes: java.util.Map[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]] =
208-
try {
209-
systemConsumer.poll(sspToPoll, POLL_TIMEOUT)
210-
} catch {
211-
case e: Exception => {
212-
// these exceptions are most likely intermediate
213-
warn("Got %s exception while polling the consumer for checkpoints." format e)
214-
if (attempts == 0) throw new SamzaException("Multiple attempts failed while reading the checkpoints. Giving up.", e)
215-
attempts -= 1
216-
emptyEnvelopes
217-
}
218-
}
191+
if (systemConsumer == null) {
192+
val partitionMetadata = getSSPMetadata(checkpointTopic, new Partition(0))
193+
val oldestOffset = partitionMetadata.getOldestOffset
219194

220-
val messages: util.List[IncomingMessageEnvelope] = envelopes.get(ssp)
221-
val messagesNum = if (messages != null) messages.size else 0
222-
info("CheckpointMgr read %s envelopes (%s messages) from ssp %s. Current offset is %s, newest is %s"
223-
format (envelopes.size(), messagesNum, ssp, currentOffset, newestOffset))
224-
if (envelopes.isEmpty || messagesNum <= 0) {
225-
info("Got empty/null list of messages")
226-
} else {
227-
msgCount += messages.size()
228-
// check the key
229-
for (msg: IncomingMessageEnvelope <- messages) {
230-
val key = msg.getKey.asInstanceOf[Array[Byte]]
231-
currentOffset = msg.getOffset().toLong
232-
if (key == null) {
233-
throw new KafkaUtilException("While reading checkpoint (currentOffset=%s) stream encountered message without key."
234-
format currentOffset)
235-
}
195+
systemConsumer = getSystemConsumer()
196+
systemConsumer.register(ssp, oldestOffset)
197+
systemConsumer.start()
198+
}
236199

237-
val checkpointKey = KafkaCheckpointLogKey.fromBytes(key)
200+
val iterator = new SystemStreamPartitionIterator(systemConsumer, ssp);
201+
var msgCount = 0
202+
while (iterator.hasNext) {
203+
val msg = iterator.next
204+
msgCount += 1
205+
206+
val offset = msg.getOffset
207+
val key = msg.getKey.asInstanceOf[Array[Byte]]
208+
if (key == null) {
209+
throw new KafkaUtilException(
210+
"While reading checkpoint (currentOffset=%s) stream encountered message without key." format offset)
211+
}
238212

239-
if (!shouldHandleEntry(checkpointKey)) {
240-
info("Skipping checkpoint log entry at offset %s with key %s." format(currentOffset, checkpointKey))
241-
} else {
242-
// handleEntry requires ByteBuffer
243-
val checkpointPayload = ByteBuffer.wrap(msg.getMessage.asInstanceOf[Array[Byte]])
244-
handleEntry(checkpointPayload, checkpointKey)
245-
}
246-
}
247-
}
213+
val checkpointKey = KafkaCheckpointLogKey.fromBytes(key)
214+
215+
if (!shouldHandleEntry(checkpointKey)) {
216+
info("Skipping checkpoint log entry at offset %s with key %s." format(offset, checkpointKey))
217+
} else {
218+
val checkpointPayload = ByteBuffer.wrap(msg.getMessage.asInstanceOf[Array[Byte]])
219+
handleEntry(checkpointPayload, checkpointKey)
248220
}
249-
} finally {
250-
systemConsumer.stop()
251221
}
252222
info("Done reading %s messages from checkpoint system:%s topic:%s" format(msgCount, systemName, checkpointTopic))
253223
}
@@ -282,12 +252,17 @@ class KafkaCheckpointManager(
282252

283253

284254
def stop = {
285-
synchronized (
255+
synchronized {
286256
if (systemProducer != null) {
287257
systemProducer.stop
288258
systemProducer = null
289259
}
290-
)
260+
261+
if (systemConsumer != null) {
262+
systemConsumer.stop
263+
systemConsumer = null
264+
}
265+
}
291266

292267
}
293268

samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,6 @@ class InMemoryKeyValueStore(val metrics: KeyValueStoreMetrics = new KeyValueStor
8181
put(key, null)
8282
}
8383

84-
override def deleteAll(keys: java.util.List[Array[Byte]]) = {
85-
KeyValueStore.Extension.deleteAll(this, keys)
86-
}
87-
8884
override def putAll(entries: util.List[Entry[Array[Byte], Array[Byte]]]): Unit = {
8985
// TreeMap's putAll requires a map, so we'd need to iterate over all the entries anyway
9086
// to use it, in order to putAll here. Therefore, just iterate here.
@@ -116,8 +112,4 @@ class InMemoryKeyValueStore(val metrics: KeyValueStoreMetrics = new KeyValueStor
116112
}
117113
found
118114
}
119-
120-
override def getAll(keys: java.util.List[Array[Byte]]): java.util.Map[Array[Byte], Array[Byte]] = {
121-
KeyValueStore.Extension.getAll(this, keys);
122-
}
123115
}

samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala

Lines changed: 32 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -20,89 +20,71 @@
2020
package org.apache.samza.storage.kv
2121

2222
import java.io.File
23+
import java.util.concurrent.TimeUnit
24+
2325
import org.apache.samza.SamzaException
24-
import org.apache.samza.util.{ LexicographicComparator, Logging }
2526
import org.apache.samza.config.Config
26-
import org.rocksdb._
27-
import org.rocksdb.TtlDB
27+
import org.apache.samza.util.{LexicographicComparator, Logging}
28+
import org.rocksdb.{TtlDB, _}
2829

2930
object RocksDbKeyValueStore extends Logging {
3031

31-
def openDB(dir: File, options: Options, storeConfig: Config, isLoggedStore: Boolean, storeName: String, metrics: KeyValueStoreMetrics): RocksDB = {
32+
def openDB(dir: File, options: Options, storeConfig: Config, isLoggedStore: Boolean,
33+
storeName: String, metrics: KeyValueStoreMetrics): RocksDB = {
3234
var ttl = 0L
3335
var useTTL = false
3436

35-
if (storeConfig.containsKey("rocksdb.ttl.ms"))
36-
{
37-
try
38-
{
37+
if (storeConfig.containsKey("rocksdb.ttl.ms")) {
38+
try {
3939
ttl = storeConfig.getLong("rocksdb.ttl.ms")
4040

41-
// RocksDB accepts TTL in seconds, convert ms to seconds
42-
if(ttl > 0) {
43-
if (ttl < 1000)
44-
{
45-
warn("The ttl values requested for %s is %d, which is less than 1000 (minimum), using 1000 instead",
46-
storeName,
47-
ttl)
41+
if (ttl > 0) {
42+
if (ttl < 1000) {
43+
warn("The ttl value requested for %s is %d which is less than 1000 (minimum). " +
44+
"Using 1000 ms instead.", storeName, ttl)
4845
ttl = 1000
4946
}
50-
ttl = ttl / 1000
51-
}
52-
else {
53-
warn("Non-positive TTL for RocksDB implies infinite TTL for the data. More Info -https://github.com/facebook/rocksdb/wiki/Time-to-Live")
47+
ttl = TimeUnit.MILLISECONDS.toSeconds(ttl)
48+
} else {
49+
warn("Non-positive TTL for RocksDB implies infinite TTL for the data. " +
50+
"More Info - https://github.com/facebook/rocksdb/wiki/Time-to-Live")
5451
}
5552

5653
useTTL = true
57-
if (isLoggedStore)
58-
{
59-
warn("%s is a TTL based store, changelog is not supported for TTL based stores, use at your own discretion" format storeName)
54+
if (isLoggedStore) {
55+
warn("%s is a TTL based store. Changelog is not supported for TTL based stores. " +
56+
"Use at your own discretion." format storeName)
6057
}
58+
} catch {
59+
case nfe: NumberFormatException =>
60+
throw new SamzaException("rocksdb.ttl.ms configuration value %s for store %s is not a number."
61+
format (storeConfig.get("rocksdb.ttl.ms"), storeName), nfe)
6162
}
62-
catch
63-
{
64-
case nfe: NumberFormatException => throw new SamzaException("rocksdb.ttl.ms configuration is not a number, " + "value found %s" format storeConfig.get(
65-
"rocksdb.ttl.ms"))
66-
}
6763
}
6864

69-
try
70-
{
65+
try {
7166
val rocksDb =
72-
if (useTTL)
73-
{
67+
if (useTTL) {
7468
info("Opening RocksDB store with TTL value: %s" format ttl)
7569
TtlDB.open(options, dir.toString, ttl.toInt, false)
76-
}
77-
else
78-
{
70+
} else {
7971
RocksDB.open(options, dir.toString)
8072
}
8173

82-
if (storeConfig.containsKey("rocksdb.metrics.list"))
83-
{
74+
if (storeConfig.containsKey("rocksdb.metrics.list")) {
8475
storeConfig
8576
.get("rocksdb.metrics.list")
8677
.split(",")
8778
.map(property => property.trim)
88-
.foreach(property =>
89-
metrics.newGauge(property, () => rocksDb.getProperty(property))
90-
)
79+
.foreach(property => metrics.newGauge(property, () => rocksDb.getProperty(property)))
9180
}
9281

9382
rocksDb
83+
} catch {
84+
case rocksDBException: RocksDBException =>
85+
throw new SamzaException("Error opening RocksDB store %s at location %s" format (storeName, dir.toString),
86+
rocksDBException)
9487
}
95-
catch
96-
{
97-
case rocksDBException: RocksDBException =>
98-
{
99-
throw new SamzaException(
100-
"Error opening RocksDB store %s at location %s, received the following exception from RocksDB %s".format(
101-
storeName,
102-
dir.toString,
103-
rocksDBException))
104-
}
105-
}
10688
}
10789
}
10890

@@ -187,10 +169,6 @@ class RocksDbKeyValueStore(
187169
put(key, null)
188170
}
189171

190-
def deleteAll(keys: java.util.List[Array[Byte]]) = {
191-
KeyValueStore.Extension.deleteAll(this, keys)
192-
}
193-
194172
def range(from: Array[Byte], to: Array[Byte]): KeyValueIterator[Array[Byte], Array[Byte]] = {
195173
metrics.ranges.inc
196174
require(from != null && to != null, "Null bound not allowed.")

0 commit comments

Comments
 (0)