Skip to content

Commit

Permalink
chore(controller): datastore query and scan api support revision
Browse files Browse the repository at this point in the history
  • Loading branch information
jialeicui committed Mar 28, 2023
1 parent 88b6045 commit b155ee9
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

Expand Down Expand Up @@ -154,11 +155,13 @@ public ResponseEntity<ResponseMessage<RecordListVo>> scanTable(ScanTableRequest
throw new SwValidationException(SwValidationException.ValidSubject.DATASTORE,
"table name should not be null or empty: " + x);
}
var ts = StringUtils.hasText(x.getRevision()) ? Long.parseLong(x.getRevision()) : 0;
return DataStoreScanRequest.TableInfo.builder()
.tableName(x.getTableName())
.columnPrefix(x.getColumnPrefix())
.columns(DataStoreController.convertColumns(x.getColumns()))
.keepNone(x.isKeepNone())
.timestamp(ts)
.build();
})
.collect(Collectors.toList()))
Expand Down Expand Up @@ -223,6 +226,7 @@ private RecordList queryRecordList(QueryTableRequest request) {
.rawResult(request.isRawResult())
.encodeWithType(request.isEncodeWithType())
.ignoreNonExistingTable(request.isIgnoreNonExistingTable())
.timestamp(StringUtils.hasText(request.getRevision()) ? Long.parseLong(request.getRevision()) : 0)
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ public class QueryTableRequest {
private boolean rawResult;
private boolean encodeWithType;
private boolean ignoreNonExistingTable = true;
private String revision;
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@ public class TableDesc {
private String columnPrefix;
private List<ColumnDesc> columns;
private boolean keepNone;
private String revision;
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,9 @@ public String update(String tableName,
//noinspection ConstantConditions
table.lock();
try {
var revision = table.update(schema, records);
var ts = table.update(schema, records);
this.dirtyTables.put(table, "");
return revision;
return Long.toString(ts);
} finally {
table.unlock();
}
Expand Down Expand Up @@ -191,7 +191,7 @@ public RecordList query(DataStoreQueryRequest req) {
var schema = table.getSchema();
var columns = this.getColumnAliases(schema, req.getColumns());
var results = new ArrayList<RecordResult>();
var timestamp = req.getTimestamp() * 1000;
var timestamp = req.getTimestamp();
if (timestamp == 0) {
timestamp = System.currentTimeMillis();
}
Expand Down Expand Up @@ -291,9 +291,9 @@ class TableMeta {
var ret = new TableMeta();
ret.tableName = info.getTableName();
if (info.getTimestamp() > 0) {
ret.timestamp = info.getTimestamp() * 1000;
ret.timestamp = info.getTimestamp();
} else if (req.getTimestamp() > 0) {
ret.timestamp = req.getTimestamp() * 1000;
ret.timestamp = req.getTimestamp();
} else {
ret.timestamp = currentTimestamp;
}
Expand Down Expand Up @@ -401,7 +401,7 @@ class TableRecords {
Map<String, Object> record = null;
for (var r : records) {
if (r.record.getKey().equals(lastKey)) {
if (r.record.isDeleted()) {
if (r.record.isDeleted() || r.record.getValues().isEmpty()) {
record = null;
} else {
if (record == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
public class DataStoreQueryRequest {

private String tableName;
// timestamp in milliseconds, used to filter out the data that is older than the timestamp for this table
private long timestamp;
private Map<String, String> columns;
private List<OrderByDesc> orderBy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@ public class DataStoreScanRequest {
public static class TableInfo {

private String tableName;
// timestamp in milliseconds, used to filter out the data that is older than the timestamp for this table
private long timestamp;
private String columnPrefix;
private Map<String, String> columns;
private boolean keepNone;
}

private List<TableInfo> tables;
// timestamp in milliseconds, used to filter out the data that is older than the timestamp for all tables
private long timestamp;
private String start;
private String startType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ public interface MemoryTable {

void updateFromWal(Wal.WalEntry entry);

// update records, returns the revision
String update(TableSchemaDesc schema, List<Map<String, Object>> records);
// update records, returns the timestamp in milliseconds
long update(TableSchemaDesc schema, List<Map<String, Object>> records);

Iterator<RecordResult> query(long timestamp,
Map<String, String> columns,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ public void updateFromWal(Wal.WalEntry entry) {
}

@Override
public String update(TableSchemaDesc schema, @NonNull List<Map<String, Object>> records) {
public long update(TableSchemaDesc schema, @NonNull List<Map<String, Object>> records) {
var decodedRecords = new ArrayList<Map<String, BaseValue>>();
String keyColumn;
if (schema.getKeyColumn() == null) {
Expand Down Expand Up @@ -315,7 +315,7 @@ public String update(TableSchemaDesc schema, @NonNull List<Map<String, Object>>
this.insertRecords(timestamp, decodedRecords);
}

return Long.toString(timestamp);
return timestamp;
}

private void insertRecords(long timestamp, List<Map<String, BaseValue>> records) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.given;
Expand Down Expand Up @@ -116,7 +117,7 @@ public void testList() {
}

@Test
public void testUpdate() {
public void testUpdate() throws InterruptedException {
this.controller.updateTable(new UpdateTableRequest() {
{
setTableName("t1");
Expand All @@ -140,7 +141,7 @@ public void testUpdate() {
}));
}
});
this.controller.updateTable(new UpdateTableRequest() {
var updateResp = this.controller.updateTable(new UpdateTableRequest() {
{
setTableName("t1");
setTableSchemaDesc(new TableSchemaDesc("k",
Expand All @@ -163,6 +164,7 @@ public void testUpdate() {
}));
}
});
Thread.sleep(1);
this.controller.updateTable(new UpdateTableRequest() {
{
setTableName("t2");
Expand Down Expand Up @@ -305,6 +307,66 @@ public void testUpdate() {
assertThat("t2",
Objects.requireNonNull(resp.getBody()).getData().getRecords(),
is(List.of(Map.of("k", "00000003", "b", "00000002"))));

// scan with empty revision string will get the latest revision
resp = this.controller.scanTable(new ScanTableRequest() {
{
setEncodeWithType(true);
setTables(List.of(new TableDesc() {
{
setTableName("t1");
setRevision("");
setColumns(List.of(new ColumnDesc() {
{
setColumnName("k");
}
}, new ColumnDesc() {
{
setColumnName("a");
}
}));
}
}));
}
});
assertThat("t1", resp.getStatusCode().is2xxSuccessful(), is(true));
assertNull(Objects.requireNonNull(resp.getBody()).getData().getColumnTypes());
assertThat("t1",
Objects.requireNonNull(resp.getBody()).getData().getRecords(),
is(List.of(Map.of("k", Map.of("type", "INT32", "value", "00000000"),
"a", Map.of("type", "STRING", "value", "1")),
Map.of("k", Map.of("type", "INT32", "value", "00000001"),
"a", Map.of("type", "INT32", "value", "00000002")))));

// scan with revision
resp = this.controller.scanTable(new ScanTableRequest() {
{
setEncodeWithType(true);
setTables(List.of(new TableDesc() {
{
setTableName("t1");
setRevision(updateResp.getBody().getData());
setColumns(List.of(new ColumnDesc() {
{
setColumnName("k");
}
}, new ColumnDesc() {
{
setColumnName("a");
}
}));
}
}));
}
});
assertThat("t1", resp.getStatusCode().is2xxSuccessful(), is(true));
assertNull(Objects.requireNonNull(resp.getBody()).getData().getColumnTypes());
assertThat("t1",
Objects.requireNonNull(resp.getBody()).getData().getRecords(),
is(List.of(Map.of("k", Map.of("type", "INT32", "value", "00000000"),
"a", Map.of("type", "INT32", "value", "00000001")),
Map.of("k", Map.of("type", "INT32", "value", "00000001"),
"a", Map.of("type", "INT32", "value", "00000002")))));
}

@Nested
Expand Down

0 comments on commit b155ee9

Please sign in to comment.