Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class TransformValidator(
try {
val issues = mutableListOf<String>()
val concreteIndices =
indexNameExpressionResolver.concreteIndexNames(clusterService.state(), IndicesOptions.lenientExpand(), transform.sourceIndex)
indexNameExpressionResolver.concreteIndexNames(clusterService.state(), IndicesOptions.lenientExpand(), true, transform.sourceIndex)
if (concreteIndices.isEmpty()) return TransformValidationResult(false, listOf("No specified source index exist in the cluster"))

val request = ClusterHealthRequest()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@

package org.opensearch.indexmanagement.transform

import org.apache.http.entity.ContentType
import org.apache.http.entity.StringEntity
import org.opensearch.common.settings.Settings
import org.opensearch.index.query.TermQueryBuilder
import org.opensearch.indexmanagement.common.model.dimension.DateHistogram
import org.opensearch.indexmanagement.common.model.dimension.Histogram
import org.opensearch.indexmanagement.common.model.dimension.Terms
import org.opensearch.indexmanagement.makeRequest
import org.opensearch.indexmanagement.transform.model.Transform
import org.opensearch.indexmanagement.transform.model.TransformMetadata
import org.opensearch.indexmanagement.waitFor
Expand Down Expand Up @@ -343,6 +346,73 @@ class TransformRunnerIT : TransformRestTestCase() {
assertTrue("Expected failure message to be present", !metadata.failureReason.isNullOrBlank())
}

fun `test transform with data stream`() {
// Create a data stream.
val dataStreamName = "transform-data-stream"
client().makeRequest(
"PUT",
"/_index_template/transform-data-stream-template",
StringEntity(
"""
{
"data_stream": {"timestamp_field": {"name": "tpep_pickup_datetime"}},
"index_patterns": ["$dataStreamName"],
"template": {
"mappings": {
"properties":{"DOLocationID":{"type":"integer"},"RatecodeID":{"type":"integer"},"fare_amount":{"type":"float"},"tpep_dropoff_datetime":{"type":"date","format":"yyyy-MM-dd HH:mm:ss"},"congestion_surcharge":{"type":"float"},"VendorID":{"type":"integer"},"passenger_count":{"type":"integer"},"tolls_amount":{"type":"float"},"improvement_surcharge":{"type":"float"},"trip_distance":{"type":"float"},"store_and_fwd_flag":{"type":"keyword"},"payment_type":{"type":"integer"},"total_amount":{"type":"float"},"extra":{"type":"float"},"tip_amount":{"type":"float"},"mta_tax":{"type":"float"},"tpep_pickup_datetime":{"type":"date","format":"yyyy-MM-dd HH:mm:ss"},"PULocationID":{"type":"integer"}}
}
}
}
""".trimIndent(),
ContentType.APPLICATION_JSON
)
)
client().makeRequest("PUT", "/_data_stream/$dataStreamName")

// Insert the sample data across multiple backing indices of the data stream.
insertSampleBulkData(dataStreamName, javaClass.classLoader.getResource("data/nyc_5000.ndjson").readText())
client().makeRequest("POST", "/$dataStreamName/_rollover")
insertSampleBulkData(dataStreamName, javaClass.classLoader.getResource("data/nyc_5000.ndjson").readText())
client().makeRequest("POST", "/$dataStreamName/_rollover")

// Create the transform job.
val transform = Transform(
id = "id_7",
schemaVersion = 1L,
enabled = true,
enabledAt = Instant.now(),
updatedAt = Instant.now(),
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
description = "test transform",
metadataId = null,
sourceIndex = dataStreamName,
targetIndex = "transform-target-index",
roles = emptyList(),
pageSize = 100,
groups = listOf(
Terms(sourceField = "store_and_fwd_flag", targetField = "flag")
)
).let { createTransform(it, it.id) }

updateTransformStartTime(transform)

waitFor { assertTrue("Target transform index was not created", indexExists(transform.targetIndex)) }

val metadata = waitFor {
val job = getTransform(transformId = transform.id)
assertNotNull("Transform job doesn't have metadata set", job.metadataId)
val transformMetadata = getTransformMetadata(job.metadataId!!)
assertEquals("Transform has not finished", TransformMetadata.Status.FINISHED, transformMetadata.status)
transformMetadata
}

assertEquals("More than expected pages processed", 2L, metadata.stats.pagesProcessed)
assertEquals("More than expected documents indexed", 2L, metadata.stats.documentsIndexed)
assertEquals("More than expected documents processed", 10000L, metadata.stats.documentsProcessed)
assertTrue("Doesn't capture indexed time", metadata.stats.indexTimeInMillis > 0)
assertTrue("Didn't capture search time", metadata.stats.searchTimeInMillis > 0)
}

private fun getStrictMappings(): String {
return """
"dynamic": "strict",
Expand Down
Loading