Skip to content

Commit 1eb4c26

Browse files
committed
SAMZA-1948: Updated hybrid table descriptors to take underlying table descriptors
Hybrid Table Descriptors, e.g. CachingTableDescriptor, should take both tables as a constructor param instead of using withXYZ methods. This is better because they are required for the hybrid descriptor to work, and it helps with type inference for the descriptor. Author: Wei Song <wsong@linkedin.com> Reviewers: Peng Du <pdu@linkedin.com> Closes apache#706 from weisong44/SAMZA-1948 and squashes the following commits: 53444419 [Wei Song] Updated based on review comments 39d9ab00 [Wei Song] SAMZA-1948 Updated hybrid table descriptors to take underlying table descriptors a56c28d [Wei Song] Merge remote-tracking branch 'upstream/master' 097958c [Wei Song] Merge remote-tracking branch 'upstream/master' 05822f0 [Wei Song] Merge remote-tracking branch 'upstream/master' f748050 [Wei Song] Merge remote-tracking branch 'upstream/master' 7706ab1 [Wei Song] Merge remote-tracking branch 'upstream/master' f5731b1 [Wei Song] Merge remote-tracking branch 'upstream/master' 1e5de45 [Wei Song] Merge remote-tracking branch 'upstream/master' c85604e [Wei Song] Merge remote-tracking branch 'upstream/master' 242d844 [Wei Song] Merge remote-tracking branch 'upstream/master' ec7d840 [Wei Song] Merge remote-tracking branch 'upstream/master' e19b4dc [Wei Song] Merge remote-tracking branch 'upstream/master' 8ee7844 [Wei Song] Merge remote-tracking branch 'upstream/master' 1c6a2ea [Wei Song] Merge remote-tracking branch 'upstream/master' a6c94ad [Wei Song] Merge remote-tracking branch 'upstream/master' 41299b5 [Wei Song] Merge remote-tracking branch 'upstream/master' 239a095 [Wei Song] Merge remote-tracking branch 'upstream/master' eca0020 [Wei Song] Merge remote-tracking branch 'upstream/master' 5156239 [Wei Song] Merge remote-tracking branch 'upstream/master' de708f5 [Wei Song] Merge remote-tracking branch 'upstream/master' df2f8d7 [Wei Song] Merge remote-tracking branch 'upstream/master' f28b491 [Wei Song] Merge remote-tracking branch 'upstream/master' 4782c61 [Wei Song] Merge remote-tracking branch 'upstream/master' 0440f75 [Wei Song] Merge remote-tracking branch 'upstream/master' aae0f38 [Wei Song] Merge remote-tracking branch 'upstream/master' a15a7c9 [Wei Song] Merge remote-tracking branch 'upstream/master' 5cbf9af [Wei Song] Merge remote-tracking branch 'upstream/master' 3f7ed71 [Wei Song] Added self to committer list
1 parent 63d33fa commit 1eb4c26

File tree

4 files changed

+33
-35
lines changed

4 files changed

+33
-35
lines changed

samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@
9191
public interface TableDescriptorsProvider {
9292
/**
9393
* Constructs instances of the table descriptors
94-
* @param config
94+
* @param config the job config
9595
* @return list of table descriptors
9696
*/
9797
List<TableDescriptor> getTableDescriptors(Config config);

samza-core/src/main/java/org/apache/samza/table/caching/CachingTableDescriptor.java

Lines changed: 22 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,30 @@ public class CachingTableDescriptor<K, V> extends BaseHybridTableDescriptor<K, V
4747
private boolean isWriteAround;
4848

4949
/**
50-
* {@inheritDoc}
50+
* Constructs a table descriptor instance with internal cache
51+
*
52+
* @param tableId Id of the table, it must conform to pattern { @literal [\\d\\w-_]+ }
53+
* @param table target table descriptor
5154
*/
52-
public CachingTableDescriptor(String tableId) {
55+
public CachingTableDescriptor(String tableId, TableDescriptor<K, V, ?> table) {
5356
super(tableId);
57+
this.table = table;
58+
}
59+
60+
/**
61+
* Constructs a table descriptor instance and specify a cache (as Table descriptor)
62+
* to be used for caching. Cache get is not synchronized with put for better parallelism
63+
* in the read path of {@link CachingTable}. As such, cache table implementation is
64+
* expected to be thread-safe for concurrent accesses.
65+
*
66+
* @param tableId Id of the table, it must conform to pattern { @literal [\\d\\w-_]+ }
67+
* @param table target table descriptor
68+
* @param cache cache table descriptor
69+
*/
70+
public CachingTableDescriptor(String tableId, TableDescriptor<K, V, ?> table,
71+
TableDescriptor<K, V, ?> cache) {
72+
this(tableId, table);
73+
this.cache = cache;
5474
}
5575

5676
@Override
@@ -87,29 +107,6 @@ public TableSpec getTableSpec() {
87107
return new TableSpec(tableId, serde, CachingTableProviderFactory.class.getName(), tableSpecConfig);
88108
}
89109

90-
/**
91-
* Specify a cache (as Table descriptor) to be used for caching.
92-
* Cache get is not synchronized with put for better parallelism in the read path
93-
* of {@link CachingTable}. As such, cache table implementation is expected to be
94-
* thread-safe for concurrent accesses.
95-
* @param cache cache table descriptor
96-
* @return this descriptor
97-
*/
98-
public CachingTableDescriptor withCache(TableDescriptor<K, V, ?> cache) {
99-
this.cache = cache;
100-
return this;
101-
}
102-
103-
/**
104-
* Specify the target table descriptor for the actual table input/output.
105-
* @param table the target table descriptor
106-
* @return this descriptor
107-
*/
108-
public CachingTableDescriptor withTable(TableDescriptor<K, V, ?> table) {
109-
this.table = table;
110-
return this;
111-
}
112-
113110
/**
114111
* Specify the TTL for each read access, ie. record is expired after
115112
* the TTL duration since last read access of each key.

samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,14 +82,15 @@ public void testSerializeWithCacheInstance() {
8282
}
8383

8484
private void doTestSerialize(TableDescriptor cache) {
85-
CachingTableDescriptor desc = new CachingTableDescriptor("1");
86-
desc.withTable(createDummyTableDescriptor("2"));
85+
CachingTableDescriptor desc;
86+
TableDescriptor table = createDummyTableDescriptor("2");
8787
if (cache == null) {
88+
desc = new CachingTableDescriptor("1", table);
8889
desc.withReadTtl(Duration.ofMinutes(3));
8990
desc.withWriteTtl(Duration.ofMinutes(3));
9091
desc.withCacheSize(1000);
9192
} else {
92-
desc.withCache(cache);
93+
desc = new CachingTableDescriptor("1", table, cache);
9394
}
9495

9596
desc.withWriteAround();
@@ -150,9 +151,9 @@ private void initTables(ReadableTable ... tables) {
150151
}
151152

152153
private void doTestCacheOps(boolean isWriteAround) {
153-
CachingTableDescriptor desc = new CachingTableDescriptor("1");
154-
desc.withTable(createDummyTableDescriptor("realTable"));
155-
desc.withCache(createDummyTableDescriptor("cacheTable"));
154+
CachingTableDescriptor desc = new CachingTableDescriptor("1",
155+
createDummyTableDescriptor("realTable"),
156+
createDummyTableDescriptor("cacheTable"));
156157
if (isWriteAround) {
157158
desc.withWriteAround();
158159
}

samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -144,17 +144,17 @@ public boolean isRetriable(Throwable exception) {
144144
}
145145

146146
private <K, V> Table<KV<K, V>> getCachingTable(TableDescriptor<K, V, ?> actualTableDesc, boolean defaultCache, String id, StreamApplicationDescriptor appDesc) {
147-
CachingTableDescriptor<K, V> cachingDesc = new CachingTableDescriptor<>("caching-table-" + id);
147+
CachingTableDescriptor<K, V> cachingDesc;
148148
if (defaultCache) {
149+
cachingDesc = new CachingTableDescriptor<>("caching-table-" + id, actualTableDesc);
149150
cachingDesc.withReadTtl(Duration.ofMinutes(5));
150151
cachingDesc.withWriteTtl(Duration.ofMinutes(5));
151152
} else {
152153
GuavaCacheTableDescriptor<K, V> guavaTableDesc = new GuavaCacheTableDescriptor<>("guava-table-" + id);
153154
guavaTableDesc.withCache(CacheBuilder.newBuilder().expireAfterAccess(5, TimeUnit.MINUTES).build());
154-
cachingDesc.withCache(guavaTableDesc);
155+
cachingDesc = new CachingTableDescriptor<>("caching-table-" + id, actualTableDesc, guavaTableDesc);
155156
}
156157

157-
cachingDesc.withTable(actualTableDesc);
158158
return appDesc.getTable(cachingDesc);
159159
}
160160

0 commit comments

Comments
 (0)