From 9784e6419e74d5642ef99b5981af4f9485b797b3 Mon Sep 17 00:00:00 2001 From: Jialei <3217223+jialeicui@users.noreply.github.com> Date: Wed, 29 Mar 2023 22:02:04 +0800 Subject: [PATCH] chore(controller): datastore query and scan api support revision (#2021) --- .../mlops/api/DataStoreController.java | 4 ++ .../protocol/datastore/QueryTableRequest.java | 1 + .../api/protocol/datastore/TableDesc.java | 1 + .../starwhale/mlops/datastore/DataStore.java | 10 +-- .../datastore/DataStoreQueryRequest.java | 1 + .../mlops/datastore/DataStoreScanRequest.java | 2 + .../mlops/datastore/MemoryTable.java | 4 +- .../mlops/datastore/impl/MemoryTableImpl.java | 13 +++- .../mlops/api/DataStoreControllerTest.java | 66 ++++++++++++++++++- 9 files changed, 90 insertions(+), 12 deletions(-) diff --git a/server/controller/src/main/java/ai/starwhale/mlops/api/DataStoreController.java b/server/controller/src/main/java/ai/starwhale/mlops/api/DataStoreController.java index 30a2a5cdae..f51bb9c165 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/api/DataStoreController.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/api/DataStoreController.java @@ -54,6 +54,7 @@ import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.springframework.http.ResponseEntity; +import org.springframework.util.StringUtils; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @@ -154,11 +155,13 @@ public ResponseEntity> scanTable(ScanTableRequest throw new SwValidationException(SwValidationException.ValidSubject.DATASTORE, "table name should not be null or empty: " + x); } + var ts = StringUtils.hasText(x.getRevision()) ? Long.parseLong(x.getRevision()) : 0; return DataStoreScanRequest.TableInfo.builder() .tableName(x.getTableName()) .columnPrefix(x.getColumnPrefix()) .columns(DataStoreController.convertColumns(x.getColumns())) .keepNone(x.isKeepNone()) + .timestamp(ts) .build(); }) .collect(Collectors.toList())) @@ -223,6 +226,7 @@ private RecordList queryRecordList(QueryTableRequest request) { .rawResult(request.isRawResult()) .encodeWithType(request.isEncodeWithType()) .ignoreNonExistingTable(request.isIgnoreNonExistingTable()) + .timestamp(StringUtils.hasText(request.getRevision()) ? Long.parseLong(request.getRevision()) : 0) .build()); } diff --git a/server/controller/src/main/java/ai/starwhale/mlops/api/protocol/datastore/QueryTableRequest.java b/server/controller/src/main/java/ai/starwhale/mlops/api/protocol/datastore/QueryTableRequest.java index 5a1a6a8466..13096b92cd 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/api/protocol/datastore/QueryTableRequest.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/api/protocol/datastore/QueryTableRequest.java @@ -34,4 +34,5 @@ public class QueryTableRequest { private boolean rawResult; private boolean encodeWithType; private boolean ignoreNonExistingTable = true; + private String revision; } diff --git a/server/controller/src/main/java/ai/starwhale/mlops/api/protocol/datastore/TableDesc.java b/server/controller/src/main/java/ai/starwhale/mlops/api/protocol/datastore/TableDesc.java index ffa8931afe..2dc95fdb08 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/api/protocol/datastore/TableDesc.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/api/protocol/datastore/TableDesc.java @@ -26,4 +26,5 @@ public class TableDesc { private String columnPrefix; private List columns; private boolean keepNone; + private String revision; } diff --git a/server/controller/src/main/java/ai/starwhale/mlops/datastore/DataStore.java b/server/controller/src/main/java/ai/starwhale/mlops/datastore/DataStore.java index c8ca9259bc..d34c1fe0d3 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/datastore/DataStore.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/datastore/DataStore.java @@ -157,9 +157,9 @@ public String update(String tableName, //noinspection ConstantConditions table.lock(); try { - var revision = table.update(schema, records); + var ts = table.update(schema, records); this.dirtyTables.put(table, ""); - return revision; + return Long.toString(ts); } finally { table.unlock(); } @@ -191,7 +191,7 @@ public RecordList query(DataStoreQueryRequest req) { var schema = table.getSchema(); var columns = this.getColumnAliases(schema, req.getColumns()); var results = new ArrayList(); - var timestamp = req.getTimestamp() * 1000; + var timestamp = req.getTimestamp(); if (timestamp == 0) { timestamp = System.currentTimeMillis(); } @@ -291,9 +291,9 @@ class TableMeta { var ret = new TableMeta(); ret.tableName = info.getTableName(); if (info.getTimestamp() > 0) { - ret.timestamp = info.getTimestamp() * 1000; + ret.timestamp = info.getTimestamp(); } else if (req.getTimestamp() > 0) { - ret.timestamp = req.getTimestamp() * 1000; + ret.timestamp = req.getTimestamp(); } else { ret.timestamp = currentTimestamp; } diff --git a/server/controller/src/main/java/ai/starwhale/mlops/datastore/DataStoreQueryRequest.java b/server/controller/src/main/java/ai/starwhale/mlops/datastore/DataStoreQueryRequest.java index 81f2cadd57..4877bfbf39 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/datastore/DataStoreQueryRequest.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/datastore/DataStoreQueryRequest.java @@ -30,6 +30,7 @@ public class DataStoreQueryRequest { private String tableName; + // timestamp in milliseconds, used to filter out the data that is older than the timestamp for this table private long timestamp; private Map columns; private List orderBy; diff --git a/server/controller/src/main/java/ai/starwhale/mlops/datastore/DataStoreScanRequest.java b/server/controller/src/main/java/ai/starwhale/mlops/datastore/DataStoreScanRequest.java index 595f6969b8..acdd160aa6 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/datastore/DataStoreScanRequest.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/datastore/DataStoreScanRequest.java @@ -36,6 +36,7 @@ public class DataStoreScanRequest { public static class TableInfo { private String tableName; + // timestamp in milliseconds, used to filter out the data that is older than the timestamp for this table private long timestamp; private String columnPrefix; private Map columns; @@ -43,6 +44,7 @@ public static class TableInfo { } private List tables; + // timestamp in milliseconds, used to filter out the data that is older than the timestamp for all tables private long timestamp; private String start; private String startType; diff --git a/server/controller/src/main/java/ai/starwhale/mlops/datastore/MemoryTable.java b/server/controller/src/main/java/ai/starwhale/mlops/datastore/MemoryTable.java index 19527eafe1..e443615a0e 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/datastore/MemoryTable.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/datastore/MemoryTable.java @@ -27,8 +27,8 @@ public interface MemoryTable { void updateFromWal(Wal.WalEntry entry); - // update records, returns the revision - String update(TableSchemaDesc schema, List> records); + // update records, returns the timestamp in milliseconds + long update(TableSchemaDesc schema, List> records); Iterator query(long timestamp, Map columns, diff --git a/server/controller/src/main/java/ai/starwhale/mlops/datastore/impl/MemoryTableImpl.java b/server/controller/src/main/java/ai/starwhale/mlops/datastore/impl/MemoryTableImpl.java index eae039e0a8..c36858af55 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/datastore/impl/MemoryTableImpl.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/datastore/impl/MemoryTableImpl.java @@ -255,7 +255,7 @@ public void updateFromWal(Wal.WalEntry entry) { } @Override - public String update(TableSchemaDesc schema, @NonNull List> records) { + public long update(TableSchemaDesc schema, @NonNull List> records) { var decodedRecords = new ArrayList>(); String keyColumn; if (schema.getKeyColumn() == null) { @@ -315,7 +315,7 @@ public String update(TableSchemaDesc schema, @NonNull List> this.insertRecords(timestamp, decodedRecords); } - return Long.toString(timestamp); + return timestamp; } private void insertRecords(long timestamp, List> records) { @@ -360,8 +360,11 @@ private void insertRecords(long timestamp, List> records) private Map getRecordMap(BaseValue key, List versions, long timestamp) { var ret = new HashMap(); boolean deleted = false; + boolean hasVersion = false; for (var record : versions) { if (record.getTimestamp() <= timestamp) { + // record may be empty, use hasVersion to mark if there is a record + hasVersion = true; if (record.isDeleted()) { deleted = true; ret.clear(); @@ -374,7 +377,10 @@ private Map getRecordMap(BaseValue key, List ve if (deleted) { ret.put(DELETED_FLAG_COLUMN_NAME, BoolValue.TRUE); } - ret.put(this.schema.getKeyColumn(), key); + + if (hasVersion) { + ret.put(this.schema.getKeyColumn(), key); + } return ret; } @@ -496,6 +502,7 @@ public Iterator scan( } var iterator = this.recordMap.subMap(startKey, startInclusive, endKey, endInclusive).entrySet().stream() .map(entry -> this.getRecordMap(entry.getKey(), entry.getValue(), timestamp)) + .filter(record -> !record.isEmpty()) .iterator(); return new Iterator<>() { diff --git a/server/controller/src/test/java/ai/starwhale/mlops/api/DataStoreControllerTest.java b/server/controller/src/test/java/ai/starwhale/mlops/api/DataStoreControllerTest.java index 1c30335fb9..827c8ce314 100644 --- a/server/controller/src/test/java/ai/starwhale/mlops/api/DataStoreControllerTest.java +++ b/server/controller/src/test/java/ai/starwhale/mlops/api/DataStoreControllerTest.java @@ -21,6 +21,7 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.BDDMockito.given; @@ -116,7 +117,7 @@ public void testList() { } @Test - public void testUpdate() { + public void testUpdate() throws InterruptedException { this.controller.updateTable(new UpdateTableRequest() { { setTableName("t1"); @@ -140,7 +141,7 @@ public void testUpdate() { })); } }); - this.controller.updateTable(new UpdateTableRequest() { + var updateResp = this.controller.updateTable(new UpdateTableRequest() { { setTableName("t1"); setTableSchemaDesc(new TableSchemaDesc("k", @@ -163,6 +164,7 @@ public void testUpdate() { })); } }); + Thread.sleep(1); this.controller.updateTable(new UpdateTableRequest() { { setTableName("t2"); @@ -305,6 +307,66 @@ public void testUpdate() { assertThat("t2", Objects.requireNonNull(resp.getBody()).getData().getRecords(), is(List.of(Map.of("k", "00000003", "b", "00000002")))); + + // scan with empty revision string will get the latest revision + resp = this.controller.scanTable(new ScanTableRequest() { + { + setEncodeWithType(true); + setTables(List.of(new TableDesc() { + { + setTableName("t1"); + setRevision(""); + setColumns(List.of(new ColumnDesc() { + { + setColumnName("k"); + } + }, new ColumnDesc() { + { + setColumnName("a"); + } + })); + } + })); + } + }); + assertThat("t1", resp.getStatusCode().is2xxSuccessful(), is(true)); + assertNull(Objects.requireNonNull(resp.getBody()).getData().getColumnTypes()); + assertThat("t1", + Objects.requireNonNull(resp.getBody()).getData().getRecords(), + is(List.of(Map.of("k", Map.of("type", "INT32", "value", "00000000"), + "a", Map.of("type", "STRING", "value", "1")), + Map.of("k", Map.of("type", "INT32", "value", "00000001"), + "a", Map.of("type", "INT32", "value", "00000002"))))); + + // scan with revision + resp = this.controller.scanTable(new ScanTableRequest() { + { + setEncodeWithType(true); + setTables(List.of(new TableDesc() { + { + setTableName("t1"); + setRevision(updateResp.getBody().getData()); + setColumns(List.of(new ColumnDesc() { + { + setColumnName("k"); + } + }, new ColumnDesc() { + { + setColumnName("a"); + } + })); + } + })); + } + }); + assertThat("t1", resp.getStatusCode().is2xxSuccessful(), is(true)); + assertNull(Objects.requireNonNull(resp.getBody()).getData().getColumnTypes()); + assertThat("t1", + Objects.requireNonNull(resp.getBody()).getData().getRecords(), + is(List.of(Map.of("k", Map.of("type", "INT32", "value", "00000000"), + "a", Map.of("type", "INT32", "value", "00000001")), + Map.of("k", Map.of("type", "INT32", "value", "00000001"), + "a", Map.of("type", "INT32", "value", "00000002"))))); } @Nested