Skip to content

Commit ae056ff

Browse files
pdu-mn1jagadish-v0
authored andcommitted
SAMZA-1719: Add caching support to table-api
This change adds caching support for Samza tables. This is especially useful for remote table where the accesses can have high latency for applications that can tolerate staleness. Caching is added in the form of a composite table that wraps the actual table and a cache. We reuse the ReadWriteTable interface for the cache. A simple Guava cache-based table is provided in this change. Original PR was inadvertently closed: apache/samza#524 Author: Peng Du <pdu@linkedin.com> Reviewers: Jagadish <jagadish@apache.org>, Wei <wsong@linkedin.com> Closes apache#531 from pdu-mn1/table-cache
1 parent 4d7482d commit ae056ff

File tree

14 files changed

+1337
-51
lines changed

14 files changed

+1337
-51
lines changed
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.samza.table.caching;
21+
22+
import java.util.HashMap;
23+
import java.util.List;
24+
import java.util.Map;
25+
import java.util.concurrent.atomic.AtomicLong;
26+
import java.util.concurrent.locks.Lock;
27+
28+
import org.apache.samza.container.SamzaContainerContext;
29+
import org.apache.samza.metrics.MetricsRegistry;
30+
import org.apache.samza.storage.kv.Entry;
31+
import org.apache.samza.table.ReadWriteTable;
32+
import org.apache.samza.table.ReadableTable;
33+
import org.apache.samza.task.TaskContext;
34+
35+
import com.google.common.base.Preconditions;
36+
import com.google.common.util.concurrent.Striped;
37+
38+
39+
/**
40+
* A composite table incorporating a cache with a Samza table. The cache is
41+
* represented as a {@link ReadWriteTable}.
42+
*
43+
* The intented use case is to optimize the latency of accessing the actual table, eg.
44+
* remote tables, when eventual consistency between cache and table is acceptable.
45+
* The cache is expected to support TTL such that the values can be refreshed at some
46+
* point.
47+
*
48+
* If the actual table is read-write table, CachingTable supports both write-through
49+
* and write-around (writes bypassing cache) policies. For write-through policy, it
50+
* supports read-after-write semantics because the value is cached after written to
51+
* the table.
52+
*
53+
* Table and cache are updated (put/delete) in an atomic manner as such it is thread
54+
* safe for concurrent accesses. Strip locks are used for fine-grained synchronization
55+
* and the number of stripes is configurable.
56+
*
57+
* NOTE: Cache get is not synchronized with put for better parallelism in the read path.
58+
* As such, cache table implementation is expected to be thread-safe for concurrent
59+
* accesses.
60+
*
61+
* @param <K> type of the table key
62+
* @param <V> type of the table value
63+
*/
64+
public class CachingTable<K, V> implements ReadWriteTable<K, V> {
65+
private static final String GROUP_NAME = CachingTable.class.getSimpleName();
66+
67+
private final String tableId;
68+
private final ReadableTable<K, V> rdTable;
69+
private final ReadWriteTable<K, V> rwTable;
70+
private final ReadWriteTable<K, V> cache;
71+
private final boolean isWriteAround;
72+
73+
// Use stripe based locking to allow parallelism of disjoint keys.
74+
private final Striped<Lock> stripedLocks;
75+
76+
// Common caching stats
77+
private AtomicLong hitCount = new AtomicLong();
78+
private AtomicLong missCount = new AtomicLong();
79+
80+
public CachingTable(String tableId, ReadableTable<K, V> table, ReadWriteTable<K, V> cache, int stripes, boolean isWriteAround) {
81+
this.tableId = tableId;
82+
this.rdTable = table;
83+
this.rwTable = table instanceof ReadWriteTable ? (ReadWriteTable) table : null;
84+
this.cache = cache;
85+
this.isWriteAround = isWriteAround;
86+
this.stripedLocks = Striped.lazyWeakLock(stripes);
87+
}
88+
89+
/**
90+
* {@inheritDoc}
91+
*/
92+
@Override
93+
public void init(SamzaContainerContext containerContext, TaskContext taskContext) {
94+
MetricsRegistry metricsRegistry = taskContext.getMetricsRegistry();
95+
metricsRegistry.newGauge(GROUP_NAME, new SupplierGauge(tableId + "-hit-rate", () -> hitRate()));
96+
metricsRegistry.newGauge(GROUP_NAME, new SupplierGauge(tableId + "-miss-rate", () -> missRate()));
97+
metricsRegistry.newGauge(GROUP_NAME, new SupplierGauge(tableId + "-req-count", () -> requestCount()));
98+
}
99+
100+
@Override
101+
public V get(K key) {
102+
V value = cache.get(key);
103+
if (value == null) {
104+
missCount.incrementAndGet();
105+
Lock lock = stripedLocks.get(key);
106+
try {
107+
lock.lock();
108+
if (cache.get(key) == null) {
109+
// Due to the lack of contains() API in ReadableTable, there is
110+
// no way to tell whether a null return by cache.get(key) means
111+
// cache miss or the value is actually null. As such, we cannot
112+
// support negative cache semantics.
113+
value = rdTable.get(key);
114+
if (value != null) {
115+
cache.put(key, value);
116+
}
117+
}
118+
} finally {
119+
lock.unlock();
120+
}
121+
} else {
122+
hitCount.incrementAndGet();
123+
}
124+
return value;
125+
}
126+
127+
@Override
128+
public Map<K, V> getAll(List<K> keys) {
129+
Map<K, V> getAllResult = new HashMap<>();
130+
keys.stream().forEach(k -> getAllResult.put(k, get(k)));
131+
return getAllResult;
132+
}
133+
134+
@Override
135+
public void put(K key, V value) {
136+
Preconditions.checkNotNull(rwTable, "Cannot write to a read-only table: " + rdTable);
137+
Lock lock = stripedLocks.get(key);
138+
try {
139+
lock.lock();
140+
rwTable.put(key, value);
141+
if (!isWriteAround) {
142+
cache.put(key, value);
143+
}
144+
} finally {
145+
lock.unlock();
146+
}
147+
}
148+
149+
@Override
150+
public void putAll(List<Entry<K, V>> entries) {
151+
Preconditions.checkNotNull(rwTable, "Cannot write to a read-only table: " + rdTable);
152+
entries.forEach(e -> put(e.getKey(), e.getValue()));
153+
}
154+
155+
@Override
156+
public void delete(K key) {
157+
Preconditions.checkNotNull(rwTable, "Cannot delete from a read-only table: " + rdTable);
158+
Lock lock = stripedLocks.get(key);
159+
try {
160+
lock.lock();
161+
rwTable.delete(key);
162+
cache.delete(key);
163+
} finally {
164+
lock.unlock();
165+
}
166+
}
167+
168+
@Override
169+
public void deleteAll(List<K> keys) {
170+
Preconditions.checkNotNull(rwTable, "Cannot delete from a read-only table: " + rdTable);
171+
keys.stream().forEach(k -> delete(k));
172+
}
173+
174+
@Override
175+
public synchronized void flush() {
176+
Preconditions.checkNotNull(rwTable, "Cannot flush a read-only table: " + rdTable);
177+
rwTable.flush();
178+
}
179+
180+
@Override
181+
public void close() {
182+
this.cache.close();
183+
this.rdTable.close();
184+
}
185+
186+
double hitRate() {
187+
long reqs = requestCount();
188+
return reqs == 0 ? 1.0 : (double) hitCount.get() / reqs;
189+
}
190+
191+
double missRate() {
192+
long reqs = requestCount();
193+
return reqs == 0 ? 1.0 : (double) missCount.get() / reqs;
194+
}
195+
196+
long requestCount() {
197+
return hitCount.get() + missCount.get();
198+
}
199+
}
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.samza.table.caching;
21+
22+
import java.time.Duration;
23+
import java.util.HashMap;
24+
import java.util.Map;
25+
26+
import org.apache.samza.operators.BaseTableDescriptor;
27+
import org.apache.samza.operators.KV;
28+
import org.apache.samza.operators.TableImpl;
29+
import org.apache.samza.table.Table;
30+
import org.apache.samza.table.TableSpec;
31+
32+
import com.google.common.base.Preconditions;
33+
34+
/**
35+
* Table descriptor for {@link CachingTable}.
36+
* @param <K> type of the key in the cache
37+
* @param <V> type of the value in the cache
38+
*/
39+
public class CachingTableDescriptor<K, V> extends BaseTableDescriptor<K, V, CachingTableDescriptor<K, V>> {
40+
private Duration readTtl;
41+
private Duration writeTtl;
42+
private long cacheSize;
43+
private Table<KV<K, V>> cache;
44+
private Table<KV<K, V>> table;
45+
private int stripes = 16;
46+
private boolean isWriteAround;
47+
48+
/**
49+
* Constructs a table descriptor instance
50+
* @param tableId Id of the table
51+
*/
52+
public CachingTableDescriptor(String tableId) {
53+
super(tableId);
54+
}
55+
56+
@Override
57+
public TableSpec getTableSpec() {
58+
validate();
59+
60+
Map<String, String> tableSpecConfig = new HashMap<>();
61+
generateTableSpecConfig(tableSpecConfig);
62+
63+
if (cache != null) {
64+
tableSpecConfig.put(CachingTableProvider.CACHE_TABLE_ID, ((TableImpl) cache).getTableSpec().getId());
65+
} else {
66+
if (readTtl != null) {
67+
tableSpecConfig.put(CachingTableProvider.READ_TTL_MS, String.valueOf(readTtl.toMillis()));
68+
}
69+
if (writeTtl != null) {
70+
tableSpecConfig.put(CachingTableProvider.WRITE_TTL_MS, String.valueOf(writeTtl.toMillis()));
71+
}
72+
if (cacheSize > 0) {
73+
tableSpecConfig.put(CachingTableProvider.CACHE_SIZE, String.valueOf(cacheSize));
74+
}
75+
}
76+
77+
tableSpecConfig.put(CachingTableProvider.REAL_TABLE_ID, ((TableImpl) table).getTableSpec().getId());
78+
tableSpecConfig.put(CachingTableProvider.LOCK_STRIPES, String.valueOf(stripes));
79+
tableSpecConfig.put(CachingTableProvider.WRITE_AROUND, String.valueOf(isWriteAround));
80+
81+
return new TableSpec(tableId, serde, CachingTableProviderFactory.class.getName(), tableSpecConfig);
82+
}
83+
84+
/**
85+
* Specify a cache instance (as Table abstraction) to be used for caching.
86+
* Cache get is not synchronized with put for better parallelism in the read path
87+
* of {@link CachingTable}. As such, cache table implementation is expected to be
88+
* thread-safe for concurrent accesses.
89+
* @param cache cache instance
90+
* @return this descriptor
91+
*/
92+
public CachingTableDescriptor withCache(Table<KV<K, V>> cache) {
93+
this.cache = cache;
94+
return this;
95+
}
96+
97+
/**
98+
* Specify the table instance for the actual table input/output.
99+
* @param table table instance
100+
* @return this descriptor
101+
*/
102+
public CachingTableDescriptor withTable(Table<KV<K, V>> table) {
103+
this.table = table;
104+
return this;
105+
}
106+
107+
/**
108+
* Specify the TTL for each read access, ie. record is expired after
109+
* the TTL duration since last read access of each key.
110+
* @param readTtl read TTL
111+
* @return this descriptor
112+
*/
113+
public CachingTableDescriptor withReadTtl(Duration readTtl) {
114+
this.readTtl = readTtl;
115+
return this;
116+
}
117+
118+
/**
119+
* Specify the TTL for each write access, ie. record is expired after
120+
* the TTL duration since last write access of each key.
121+
* @param writeTtl write TTL
122+
* @return this descriptor
123+
*/
124+
public CachingTableDescriptor withWriteTtl(Duration writeTtl) {
125+
this.writeTtl = writeTtl;
126+
return this;
127+
}
128+
129+
/**
130+
* Specify the max cache size for size-based eviction.
131+
* @param cacheSize max size of the cache
132+
* @return this descriptor
133+
*/
134+
public CachingTableDescriptor withCacheSize(long cacheSize) {
135+
this.cacheSize = cacheSize;
136+
return this;
137+
}
138+
139+
/**
140+
* Specify the number of stripes for striped locking for atomically updating
141+
* cache and the actual table. Default number of stripes is 16.
142+
* @param stripes number of stripes for locking
143+
* @return this descriptor
144+
*/
145+
public CachingTableDescriptor withStripes(int stripes) {
146+
this.stripes = stripes;
147+
return this;
148+
}
149+
150+
/**
151+
* Specify if write-around policy should be used to bypass writing
152+
* to cache for put operations. This is useful when put() is the
153+
* dominant operation and get() has no locality with recent puts.
154+
* @return this descriptor
155+
*/
156+
public CachingTableDescriptor withWriteAround() {
157+
this.isWriteAround = true;
158+
return this;
159+
}
160+
161+
@Override
162+
protected void validate() {
163+
super.validate();
164+
Preconditions.checkNotNull(table, "Actual table is required.");
165+
if (cache == null) {
166+
Preconditions.checkNotNull(readTtl, "readTtl must be specified.");
167+
} else {
168+
Preconditions.checkArgument(readTtl == null && writeTtl == null && cacheSize == 0,
169+
"Invalid to specify both {cache} and {readTtl|writeTtl|cacheSize} at the same time.");
170+
}
171+
Preconditions.checkArgument(stripes > 0, "Number of cache stripes must be positive.");
172+
}
173+
}

0 commit comments

Comments
 (0)