Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(controller): datastore query and scan api support revision #2021

Merged
merged 1 commit into from
Mar 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

Expand Down Expand Up @@ -154,11 +155,13 @@ public ResponseEntity<ResponseMessage<RecordListVo>> scanTable(ScanTableRequest
throw new SwValidationException(SwValidationException.ValidSubject.DATASTORE,
"table name should not be null or empty: " + x);
}
var ts = StringUtils.hasText(x.getRevision()) ? Long.parseLong(x.getRevision()) : 0;
return DataStoreScanRequest.TableInfo.builder()
.tableName(x.getTableName())
.columnPrefix(x.getColumnPrefix())
.columns(DataStoreController.convertColumns(x.getColumns()))
.keepNone(x.isKeepNone())
.timestamp(ts)
.build();
})
.collect(Collectors.toList()))
Expand Down Expand Up @@ -223,6 +226,7 @@ private RecordList queryRecordList(QueryTableRequest request) {
.rawResult(request.isRawResult())
.encodeWithType(request.isEncodeWithType())
.ignoreNonExistingTable(request.isIgnoreNonExistingTable())
.timestamp(StringUtils.hasText(request.getRevision()) ? Long.parseLong(request.getRevision()) : 0)
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ public class QueryTableRequest {
private boolean rawResult;
private boolean encodeWithType;
private boolean ignoreNonExistingTable = true;
private String revision;
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@ public class TableDesc {
private String columnPrefix;
private List<ColumnDesc> columns;
private boolean keepNone;
private String revision;
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,9 @@ public String update(String tableName,
//noinspection ConstantConditions
table.lock();
try {
var revision = table.update(schema, records);
var ts = table.update(schema, records);
this.dirtyTables.put(table, "");
return revision;
return Long.toString(ts);
} finally {
table.unlock();
}
Expand Down Expand Up @@ -191,7 +191,7 @@ public RecordList query(DataStoreQueryRequest req) {
var schema = table.getSchema();
var columns = this.getColumnAliases(schema, req.getColumns());
var results = new ArrayList<RecordResult>();
var timestamp = req.getTimestamp() * 1000;
var timestamp = req.getTimestamp();
if (timestamp == 0) {
timestamp = System.currentTimeMillis();
}
Expand Down Expand Up @@ -291,9 +291,9 @@ class TableMeta {
var ret = new TableMeta();
ret.tableName = info.getTableName();
if (info.getTimestamp() > 0) {
ret.timestamp = info.getTimestamp() * 1000;
ret.timestamp = info.getTimestamp();
} else if (req.getTimestamp() > 0) {
ret.timestamp = req.getTimestamp() * 1000;
ret.timestamp = req.getTimestamp();
} else {
ret.timestamp = currentTimestamp;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
public class DataStoreQueryRequest {

private String tableName;
// timestamp in milliseconds, used to filter out the data that is older than the timestamp for this table
private long timestamp;
private Map<String, String> columns;
private List<OrderByDesc> orderBy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@ public class DataStoreScanRequest {
public static class TableInfo {

private String tableName;
// timestamp in milliseconds, used to filter out the data that is older than the timestamp for this table
private long timestamp;
private String columnPrefix;
private Map<String, String> columns;
private boolean keepNone;
}

private List<TableInfo> tables;
// timestamp in milliseconds, used to filter out the data that is older than the timestamp for all tables
private long timestamp;
private String start;
private String startType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ public interface MemoryTable {

void updateFromWal(Wal.WalEntry entry);

// update records, returns the revision
String update(TableSchemaDesc schema, List<Map<String, Object>> records);
// update records, returns the timestamp in milliseconds
long update(TableSchemaDesc schema, List<Map<String, Object>> records);

Iterator<RecordResult> query(long timestamp,
Map<String, String> columns,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ public void updateFromWal(Wal.WalEntry entry) {
}

@Override
public String update(TableSchemaDesc schema, @NonNull List<Map<String, Object>> records) {
public long update(TableSchemaDesc schema, @NonNull List<Map<String, Object>> records) {
var decodedRecords = new ArrayList<Map<String, BaseValue>>();
String keyColumn;
if (schema.getKeyColumn() == null) {
Expand Down Expand Up @@ -315,7 +315,7 @@ public String update(TableSchemaDesc schema, @NonNull List<Map<String, Object>>
this.insertRecords(timestamp, decodedRecords);
}

return Long.toString(timestamp);
return timestamp;
}

private void insertRecords(long timestamp, List<Map<String, BaseValue>> records) {
Expand Down Expand Up @@ -360,8 +360,11 @@ private void insertRecords(long timestamp, List<Map<String, BaseValue>> records)
private Map<String, BaseValue> getRecordMap(BaseValue key, List<MemoryRecord> versions, long timestamp) {
var ret = new HashMap<String, BaseValue>();
boolean deleted = false;
boolean hasVersion = false;
for (var record : versions) {
if (record.getTimestamp() <= timestamp) {
// record may be empty, use hasVersion to mark if there is a record
hasVersion = true;
if (record.isDeleted()) {
deleted = true;
ret.clear();
Expand All @@ -374,7 +377,10 @@ private Map<String, BaseValue> getRecordMap(BaseValue key, List<MemoryRecord> ve
if (deleted) {
ret.put(DELETED_FLAG_COLUMN_NAME, BoolValue.TRUE);
}
ret.put(this.schema.getKeyColumn(), key);

if (hasVersion) {
ret.put(this.schema.getKeyColumn(), key);
}
tianweidut marked this conversation as resolved.
Show resolved Hide resolved
return ret;
}

Expand Down Expand Up @@ -496,6 +502,7 @@ public Iterator<RecordResult> scan(
}
var iterator = this.recordMap.subMap(startKey, startInclusive, endKey, endInclusive).entrySet().stream()
.map(entry -> this.getRecordMap(entry.getKey(), entry.getValue(), timestamp))
.filter(record -> !record.isEmpty())
.iterator();
return new Iterator<>() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.given;
Expand Down Expand Up @@ -116,7 +117,7 @@ public void testList() {
}

@Test
public void testUpdate() {
public void testUpdate() throws InterruptedException {
this.controller.updateTable(new UpdateTableRequest() {
{
setTableName("t1");
Expand All @@ -140,7 +141,7 @@ public void testUpdate() {
}));
}
});
this.controller.updateTable(new UpdateTableRequest() {
var updateResp = this.controller.updateTable(new UpdateTableRequest() {
{
setTableName("t1");
setTableSchemaDesc(new TableSchemaDesc("k",
Expand All @@ -163,6 +164,7 @@ public void testUpdate() {
}));
}
});
Thread.sleep(1);
this.controller.updateTable(new UpdateTableRequest() {
{
setTableName("t2");
Expand Down Expand Up @@ -305,6 +307,66 @@ public void testUpdate() {
assertThat("t2",
Objects.requireNonNull(resp.getBody()).getData().getRecords(),
is(List.of(Map.of("k", "00000003", "b", "00000002"))));

// scan with empty revision string will get the latest revision
resp = this.controller.scanTable(new ScanTableRequest() {
{
setEncodeWithType(true);
setTables(List.of(new TableDesc() {
{
setTableName("t1");
setRevision("");
setColumns(List.of(new ColumnDesc() {
{
setColumnName("k");
}
}, new ColumnDesc() {
{
setColumnName("a");
}
}));
}
}));
}
});
assertThat("t1", resp.getStatusCode().is2xxSuccessful(), is(true));
assertNull(Objects.requireNonNull(resp.getBody()).getData().getColumnTypes());
assertThat("t1",
Objects.requireNonNull(resp.getBody()).getData().getRecords(),
is(List.of(Map.of("k", Map.of("type", "INT32", "value", "00000000"),
"a", Map.of("type", "STRING", "value", "1")),
Map.of("k", Map.of("type", "INT32", "value", "00000001"),
"a", Map.of("type", "INT32", "value", "00000002")))));

// scan with revision
resp = this.controller.scanTable(new ScanTableRequest() {
{
setEncodeWithType(true);
setTables(List.of(new TableDesc() {
{
setTableName("t1");
setRevision(updateResp.getBody().getData());
setColumns(List.of(new ColumnDesc() {
{
setColumnName("k");
}
}, new ColumnDesc() {
{
setColumnName("a");
}
}));
}
}));
}
});
assertThat("t1", resp.getStatusCode().is2xxSuccessful(), is(true));
assertNull(Objects.requireNonNull(resp.getBody()).getData().getColumnTypes());
assertThat("t1",
Objects.requireNonNull(resp.getBody()).getData().getRecords(),
is(List.of(Map.of("k", Map.of("type", "INT32", "value", "00000000"),
"a", Map.of("type", "INT32", "value", "00000001")),
Map.of("k", Map.of("type", "INT32", "value", "00000001"),
"a", Map.of("type", "INT32", "value", "00000002")))));
}

@Nested
Expand Down