Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write metadata cache data to mappings _meta with refresh time update #805

Open
wants to merge 25 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
2f48bbf
[0.5-nexus] Write mock metadata cache data to mappings _meta (#744)
seankao-az Oct 8, 2024
38570ce
default metadata cache write disabled
seankao-az Oct 10, 2024
3cea7ea
remove string literal "external" in index builder
seankao-az Oct 10, 2024
3d2d095
track refreshInterval and lastRefreshTime
seankao-az Oct 11, 2024
3e76497
add last refresh timestamps to metadata log entry
seankao-az Oct 22, 2024
9acf7e5
update metadata cache test case: should pass
seankao-az Oct 22, 2024
830cc2b
move to spark package; get refresh interval
seankao-az Oct 22, 2024
7354128
parse refresh interval
seankao-az Oct 22, 2024
cac0af7
Merge branch 'main' into write-metadata-cache
seankao-az Oct 22, 2024
83fbe5e
minor syntax fix on FlintSpark.createIndex
seankao-az Oct 22, 2024
8873189
strategize cache writer interface
seankao-az Oct 23, 2024
8e86912
update refresh timestamps in FlintSpark
seankao-az Oct 23, 2024
5b67e96
add test cases
seankao-az Oct 23, 2024
77321fd
IT test for refresh timestamp update
seankao-az Oct 24, 2024
69e675b
add doc for spark conf
seankao-az Oct 24, 2024
671d5f6
change mock table name
seankao-az Oct 24, 2024
681067b
add IT test at FlintSpark level
seankao-az Oct 24, 2024
5ade12e
Merge branch 'main' into write-metadata-cache
seankao-az Oct 24, 2024
4823544
test with external scheduler
seankao-az Oct 24, 2024
79b7b17
refactor refreshIndex method; add test for modes
seankao-az Oct 25, 2024
941d08c
Merge branch 'main' into write-metadata-cache
seankao-az Oct 25, 2024
b4a9b53
fix typo
seankao-az Oct 25, 2024
2f58f56
fix failed test caused by refactoring
seankao-az Oct 25, 2024
7a8e1f3
Merge branch 'main' into write-metadata-cache
seankao-az Oct 25, 2024
d80d34d
rename method; add comment
seankao-az Oct 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
- `spark.flint.monitor.initialDelaySeconds`: Initial delay in seconds before starting the monitoring task. Default value is 15.
- `spark.flint.monitor.intervalSeconds`: Interval in seconds for scheduling the monitoring task. Default value is 60.
- `spark.flint.monitor.maxErrorCount`: Maximum number of consecutive errors allowed before stopping the monitoring task. Default value is 5.
- `spark.flint.metadataCacheWrite.enabled`: default is false. enable writing metadata to index mappings _meta as read cache for frontend user to access. Do not use in production, this setting will be removed in later version.

#### Data Type Mapping

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState
* log entry id
* @param state
* Flint index state
* @param lastRefreshStartTime
* timestamp when last refresh started for manual or external scheduler refresh
* @param lastRefreshCompleteTime
* timestamp when last refresh completed for manual or external scheduler refresh
* @param entryVersion
* entry version fields for consistency control
* @param error
Expand All @@ -28,10 +32,12 @@ import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState
case class FlintMetadataLogEntry(
id: String,
/**
* This is currently used as streaming job start time. In future, this should represent the
* create timestamp of the log entry
* This is currently used as streaming job start time for internal scheduler. In future, this
* should represent the create timestamp of the log entry
*/
createTime: Long,
lastRefreshStartTime: Long,
lastRefreshCompleteTime: Long,
state: IndexState,
entryVersion: Map[String, Any],
error: String,
Expand All @@ -40,26 +46,48 @@ case class FlintMetadataLogEntry(
def this(
id: String,
createTime: Long,
lastRefreshStartTime: Long,
lastRefreshCompleteTime: Long,
state: IndexState,
entryVersion: JMap[String, Any],
error: String,
properties: JMap[String, Any]) = {
this(id, createTime, state, entryVersion.asScala.toMap, error, properties.asScala.toMap)
this(
id,
createTime,
lastRefreshStartTime,
lastRefreshCompleteTime,
state,
entryVersion.asScala.toMap,
error,
properties.asScala.toMap)
}

def this(
id: String,
createTime: Long,
lastRefreshStartTime: Long,
lastRefreshCompleteTime: Long,
state: IndexState,
entryVersion: JMap[String, Any],
error: String,
properties: Map[String, Any]) = {
this(id, createTime, state, entryVersion.asScala.toMap, error, properties)
this(
id,
createTime,
lastRefreshStartTime,
lastRefreshCompleteTime,
state,
entryVersion.asScala.toMap,
error,
properties)
}
}

object FlintMetadataLogEntry {

val EMPTY_TIMESTAMP = 0L

/**
* Flint index state enum.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ public T commit(Function<FlintMetadataLogEntry, T> operation) {
initialLog = initialLog.copy(
initialLog.id(),
initialLog.createTime(),
initialLog.lastRefreshStartTime(),
initialLog.lastRefreshCompleteTime(),
initialLog.state(),
latest.entryVersion(),
initialLog.error(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,16 @@ public static String toJson(FlintMetadataLogEntry logEntry) throws JsonProcessin
ObjectMapper mapper = new ObjectMapper();
ObjectNode json = mapper.createObjectNode();

json.put("version", "1.0");
json.put("version", "1.1");
json.put("latestId", logEntry.id());
json.put("type", "flintindexstate");
json.put("state", logEntry.state().toString());
json.put("applicationId", applicationId);
json.put("jobId", jobId);
json.put("dataSourceName", logEntry.properties().get("dataSourceName").get().toString());
json.put("jobStartTime", logEntry.createTime());
json.put("lastRefreshStartTime", logEntry.lastRefreshStartTime());
json.put("lastRefreshCompleteTime", logEntry.lastRefreshCompleteTime());
json.put("lastUpdateTime", lastUpdateTime);
json.put("error", logEntry.error());

Expand Down Expand Up @@ -138,6 +140,8 @@ public static FlintMetadataLogEntry constructLogEntry(
id,
/* sourceMap may use Integer or Long even though it's always long in index mapping */
((Number) sourceMap.get("jobStartTime")).longValue(),
((Number) sourceMap.get("lastRefreshStartTime")).longValue(),
((Number) sourceMap.get("lastRefreshCompleteTime")).longValue(),
FlintMetadataLogEntry.IndexState$.MODULE$.from((String) sourceMap.get("state")),
Map.of("seqNo", seqNo, "primaryTerm", primaryTerm),
(String) sourceMap.get("error"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ public void purge() {
public FlintMetadataLogEntry emptyLogEntry() {
return new FlintMetadataLogEntry(
"",
0L,
FlintMetadataLogEntry.EMPTY_TIMESTAMP(),
FlintMetadataLogEntry.EMPTY_TIMESTAMP(),
FlintMetadataLogEntry.EMPTY_TIMESTAMP(),
FlintMetadataLogEntry.IndexState$.MODULE$.EMPTY(),
Map.of("seqNo", UNASSIGNED_SEQ_NO, "primaryTerm", UNASSIGNED_PRIMARY_TERM),
"",
Expand All @@ -146,6 +148,8 @@ private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) {
logEntry.copy(
latestId,
logEntry.createTime(),
logEntry.lastRefreshStartTime(),
logEntry.lastRefreshCompleteTime(),
logEntry.state(),
logEntry.entryVersion(),
logEntry.error(),
Expand Down Expand Up @@ -184,6 +188,8 @@ private FlintMetadataLogEntry writeLogEntry(
logEntry = new FlintMetadataLogEntry(
logEntry.id(),
logEntry.createTime(),
logEntry.lastRefreshStartTime(),
logEntry.lastRefreshCompleteTime(),
logEntry.state(),
Map.of("seqNo", response.getSeqNo(), "primaryTerm", response.getPrimaryTerm()),
logEntry.error(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ class FlintMetadataLogEntryOpenSearchConverterTest
val sourceMap = JMap.of(
"jobStartTime",
1234567890123L.asInstanceOf[Object],
"lastRefreshStartTime",
1234567890123L.asInstanceOf[Object],
"lastRefreshCompleteTime",
1234567890123L.asInstanceOf[Object],
"state",
"active".asInstanceOf[Object],
"dataSourceName",
Expand All @@ -36,6 +40,8 @@ class FlintMetadataLogEntryOpenSearchConverterTest
when(mockLogEntry.id).thenReturn("id")
when(mockLogEntry.state).thenReturn(FlintMetadataLogEntry.IndexState.ACTIVE)
when(mockLogEntry.createTime).thenReturn(1234567890123L)
when(mockLogEntry.lastRefreshStartTime).thenReturn(1234567890123L)
when(mockLogEntry.lastRefreshCompleteTime).thenReturn(1234567890123L)
when(mockLogEntry.error).thenReturn("")
when(mockLogEntry.properties).thenReturn(Map("dataSourceName" -> "testDataSource"))
}
Expand All @@ -45,14 +51,16 @@ class FlintMetadataLogEntryOpenSearchConverterTest
val expectedJsonWithoutLastUpdateTime =
s"""
|{
| "version": "1.0",
| "version": "1.1",
| "latestId": "id",
| "type": "flintindexstate",
| "state": "active",
| "applicationId": "unknown",
| "jobId": "unknown",
| "dataSourceName": "testDataSource",
| "jobStartTime": 1234567890123,
| "lastRefreshStartTime": 1234567890123,
| "lastRefreshCompleteTime": 1234567890123,
| "error": ""
|}
|""".stripMargin
Expand All @@ -67,15 +75,22 @@ class FlintMetadataLogEntryOpenSearchConverterTest
logEntry shouldBe a[FlintMetadataLogEntry]
logEntry.id shouldBe "id"
logEntry.createTime shouldBe 1234567890123L
logEntry.lastRefreshStartTime shouldBe 1234567890123L
logEntry.lastRefreshCompleteTime shouldBe 1234567890123L
logEntry.state shouldBe FlintMetadataLogEntry.IndexState.ACTIVE
logEntry.error shouldBe ""
logEntry.properties.get("dataSourceName").get shouldBe "testDataSource"
}

it should "construct log entry with integer jobStartTime value" in {
it should "construct log entry with integer timestamp value" in {
// Use Integer instead of Long for timestamps
val testSourceMap = JMap.of(
"jobStartTime",
1234567890.asInstanceOf[Object], // Integer instead of Long
1234567890.asInstanceOf[Object],
"lastRefreshStartTime",
1234567890.asInstanceOf[Object],
"lastRefreshCompleteTime",
1234567890.asInstanceOf[Object],
"state",
"active".asInstanceOf[Object],
"dataSourceName",
Expand All @@ -87,6 +102,8 @@ class FlintMetadataLogEntryOpenSearchConverterTest
logEntry shouldBe a[FlintMetadataLogEntry]
logEntry.id shouldBe "id"
logEntry.createTime shouldBe 1234567890
logEntry.lastRefreshStartTime shouldBe 1234567890
logEntry.lastRefreshCompleteTime shouldBe 1234567890
logEntry.state shouldBe FlintMetadataLogEntry.IndexState.ACTIVE
logEntry.error shouldBe ""
logEntry.properties.get("dataSourceName").get shouldBe "testDataSource"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,10 @@ object FlintSparkConf {
FlintConfig("spark.metadata.accessAWSCredentialsProvider")
.doc("AWS credentials provider for metadata access permission")
.createOptional()
val METADATA_CACHE_WRITE = FlintConfig("spark.flint.metadataCacheWrite.enabled")
.doc("Enable Flint metadata cache write to Flint index mappings")
.createWithDefault("false")

val CUSTOM_SESSION_MANAGER =
FlintConfig("spark.flint.job.customSessionManager")
.createOptional()
Expand Down Expand Up @@ -309,6 +313,8 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable

def monitorMaxErrorCount(): Int = MONITOR_MAX_ERROR_COUNT.readFrom(reader).toInt

def isMetadataCacheWriteEnabled: Boolean = METADATA_CACHE_WRITE.readFrom(reader).toBoolean

/**
* spark.sql.session.timeZone
*/
Expand Down
Loading
Loading