Skip to content

Commit

Permalink
Merge branch '2.x' into parallel-test-run
Browse files Browse the repository at this point in the history
Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>
  • Loading branch information
bowenlan-amzn authored Oct 3, 2023
2 parents b660dc0 + c18b0f8 commit 8e40ac7
Show file tree
Hide file tree
Showing 37 changed files with 218 additions and 1,460 deletions.
18 changes: 15 additions & 3 deletions .github/workflows/docker-security-test-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ name: Docker Security Test Workflow
on:
pull_request:
branches:
- "*"
- "**"
push:
branches:
- "*"
- "**"

jobs:
test:
docker-test:
# This job runs on Linux
runs-on: ubuntu-latest
steps:
Expand Down Expand Up @@ -81,3 +81,15 @@ jobs:
with:
name: logs
path: build/testclusters/integTest-*/logs/*
- name: Collect docker logs on failure
uses: jwalton/gh-docker-logs@v2
with:
dest: './logs'
- name: Tar logs
run: tar cvzf ./logs.tgz ./logs
- name: Upload logs to GitHub
uses: actions/upload-artifact@v2
if: failure()
with:
name: logs.tgz
path: ./logs.tgz
4 changes: 2 additions & 2 deletions .github/workflows/multi-node-test-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ name: Multi node test workflow
on:
pull_request:
branches:
- "*"
- "**"
push:
branches:
- "*"
- "**"

jobs:
multi-node-test:
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/security-test-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ name: Security test workflow
on:
pull_request:
branches:
- "*"
- "**"
push:
branches:
- "*"
- "**"

jobs:
test:
security-test:
# This job runs on Linux
runs-on: ubuntu-latest
steps:
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/test-and-build-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ name: Test and Build Workflow
on:
pull_request:
branches:
- "*"
- "**"
push:
branches:
- "*"
- "**"

jobs:
build:
test-and-build:
env:
BUILD_ARGS: ${{ matrix.os_build_args }}
WORKING_DIR: ${{ matrix.working_directory }}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ object SMRunner :
)

override fun runJob(job: ScheduledJobParameter, context: JobExecutionContext) {
log.debug("Snapshot management running job: $job")
log.debug("Snapshot management running job: {}", job)

if (job !is SMPolicy) {
throw IllegalArgumentException("Received invalid job type [${job.javaClass.simpleName}] with id [${context.jobId}].")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class SMStateMachine(
val indicesManager: IndexManagementIndices,
) {

val log: Logger = LogManager.getLogger("$javaClass [${job.policyName}]")
val log: Logger = LogManager.getLogger(javaClass)

lateinit var currentState: SMState
fun currentState(currentState: SMState): SMStateMachine {
Expand All @@ -62,7 +62,7 @@ class SMStateMachine(
val prevState = currentState
for (nextState in nextStates) {
currentState = nextState
log.debug("Start executing $currentState.")
log.debug("Start executing {}.", currentState)
log.debug(
"User and roles string from thread context: ${threadPool.threadContext.getTransient<String>(
ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT
Expand Down Expand Up @@ -99,7 +99,7 @@ class SMStateMachine(
break
}
is SMResult.Stay -> {
log.debug("State [$currentState] has not finished.")
log.debug("State [{}] has not finished.", currentState)
updateMetadata(
result.metadataToSave
.setCurrentState(prevState)
Expand Down Expand Up @@ -200,7 +200,7 @@ class SMStateMachine(
suspend fun updateMetadata(md: SMMetadata) {
indicesManager.checkAndUpdateIMConfigIndex(log)
try {
log.debug("Update metadata: $md")
log.debug("Update metadata: {}", md)
if (md == metadata) {
log.debug("Metadata not change, so don't need to update")
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,14 @@ import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_HIDDEN
import org.opensearch.core.rest.RestStatus
import org.opensearch.core.xcontent.MediaType
import org.opensearch.indexmanagement.rollup.model.Rollup
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule
import java.io.IOException
import java.nio.file.Files
import java.util.Date
import java.time.Duration
import java.time.Instant
import java.util.*
import javax.management.MBeanServerInvocationHandler
import javax.management.ObjectName
import javax.management.remote.JMXConnectorFactory
Expand Down Expand Up @@ -65,6 +70,24 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() {
}
}

@Before
fun setDebugLogLevel() {
client().makeRequest(
"PUT", "_cluster/settings",
StringEntity(
"""
{
"transient": {
"logger.org.opensearch.indexmanagement":"DEBUG",
"logger.org.opensearch.jobscheduler":"DEBUG"
}
}
""".trimIndent(),
ContentType.APPLICATION_JSON
)
)
}

protected val isDebuggingTest = DisableOnDebug(null).isDebugging
protected val isDebuggingRemoteCluster = System.getProperty("cluster.debug", "false")!!.toBoolean()

Expand Down Expand Up @@ -172,6 +195,41 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() {
}
}

protected fun updateRollupStartTime(update: Rollup, desiredStartTimeMillis: Long? = null) {
// Before updating start time of a job always make sure there are no unassigned shards that could cause the config
// index to move to a new node and negate this forced start
if (isMultiNode) {
waitFor {
try {
client().makeRequest("GET", "_cluster/allocation/explain")
fail("Expected 400 Bad Request when there are no unassigned shards to explain")
} catch (e: ResponseException) {
assertEquals(RestStatus.BAD_REQUEST, e.response.restStatus())
}
}
}
val intervalSchedule = (update.jobSchedule as IntervalSchedule)
val millis = Duration.of(intervalSchedule.interval.toLong(), intervalSchedule.unit).minusSeconds(2).toMillis()
val startTimeMillis = desiredStartTimeMillis ?: (Instant.now().toEpochMilli() - millis)
val waitForActiveShards = if (isMultiNode) "all" else "1"
// TODO flaky: Add this log to confirm this update is missed by job scheduler
// This miss is because shard remove, job scheduler deschedule on the original node and reschedule on another node
// However the shard comes back, and job scheduler deschedule on the another node and reschedule on the original node
// During this period, this update got missed
// Since from the log, this happens very fast (within 0.1~0.2s), the above cluster explain may not have the granularity to catch this.
logger.info("Update rollup start time to $startTimeMillis")
val response = client().makeRequest(
"POST", "${IndexManagementPlugin.INDEX_MANAGEMENT_INDEX}/_update/${update.id}?wait_for_active_shards=$waitForActiveShards&refresh=true",
StringEntity(
"{\"doc\":{\"rollup\":{\"schedule\":{\"interval\":{\"start_time\":" +
"\"$startTimeMillis\"}}}}}",
ContentType.APPLICATION_JSON
)
)

assertEquals("Request failed", RestStatus.OK, response.restStatus())
}

override fun preserveIndicesUponCompletion(): Boolean = true
companion object {
val isMultiNode = System.getProperty("cluster.number_of_nodes", "1").toInt() > 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,9 @@ class IndexStateManagementSecurityBehaviorIT : SecurityRestTestCase() {
)
}

private fun assertIndexRolledUp(indexName: String, policyId: String, rollup: ISMRollup) {
val rollupId = rollup.toRollup(indexName).id
private fun assertIndexRolledUp(indexName: String, policyId: String, ismRollup: ISMRollup) {
val rollup = ismRollup.toRollup(indexName)
val rollupId = rollup.id
val managedIndexConfig = getExistingManagedIndexConfig(indexName)

// Change the start time so that the policy will be initialized.
Expand All @@ -290,21 +291,20 @@ class IndexStateManagementSecurityBehaviorIT : SecurityRestTestCase() {
)
}

Thread.sleep(60000)
updateRollupStartTime(rollup)
waitFor(timeout = Instant.ofEpochSecond(60)) {
val rollupJob = getRollup(rollupId = rollupId)
assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID)
val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!)
assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status)
}

// Change the start time so that the rollup action will be attempted.
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor {
assertEquals(
WaitForRollupCompletionStep.getJobCompletionMessage(rollupId, indexName),
getExplainManagedIndexMetaData(indexName).info?.get("message")
)
}
val rollupJob = getRollup(rollupId = rollupId)
waitFor {
assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID)
val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!)
assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,6 @@ abstract class SecurityRestTestCase : IndexManagementRestTestCase() {
client: RestClient
) = super.createRollup(rollup, rollupId, refresh, client)

fun updateRollupStartTimeExt(update: Rollup, desiredStartTimeMillis: Long? = null) =
super.updateRollupStartTime(update, desiredStartTimeMillis)

fun getRollupMetadataExt(
metadataId: String,
refresh: Boolean = true,
Expand Down Expand Up @@ -165,10 +162,6 @@ abstract class SecurityRestTestCase : IndexManagementRestTestCase() {
return executeRequest(request, expectedStatus, client)
}

protected fun updateRollupStartTime(update: Rollup, desiredStartTimeMillis: Long? = null) {
RollupRestTestCaseSecurityExtension.updateRollupStartTimeExt(update, desiredStartTimeMillis)
}

protected fun getRollupMetadata(
metadataId: String,
refresh: Boolean = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import org.opensearch.indexmanagement.makeRequest
import org.opensearch.indexmanagement.opensearchapi.string
import org.opensearch.indexmanagement.util.NO_ID
import org.opensearch.core.rest.RestStatus
import org.opensearch.indexmanagement.waitFor
import org.opensearch.search.builder.SearchSourceBuilder

class IndexManagementBackwardsCompatibilityIT : IndexManagementRestTestCase() {
Expand Down Expand Up @@ -60,7 +61,9 @@ class IndexManagementBackwardsCompatibilityIT : IndexManagementRestTestCase() {
createBasicPolicy()

verifyPolicyExists(LEGACY_POLICY_BASE_URI)
verifyPolicyOnIndex(LEGACY_ISM_BASE_URI)
waitFor {
verifyPolicyOnIndex(LEGACY_ISM_BASE_URI)
}
}
ClusterType.MIXED -> {
assertTrue(pluginNames.contains("opensearch-index-management"))
Expand Down Expand Up @@ -137,7 +140,6 @@ class IndexManagementBackwardsCompatibilityIT : IndexManagementRestTestCase() {
val createdVersion = responseBody["_version"] as Int
assertNotEquals("Create policy response is missing id", NO_ID, createdId)
assertTrue("Create policy response has incorrect version", createdVersion > 0)
Thread.sleep(10000)
}

@Throws(Exception::class)
Expand All @@ -164,6 +166,7 @@ class IndexManagementBackwardsCompatibilityIT : IndexManagementRestTestCase() {

assertEquals("Explain Index failed", RestStatus.OK, getResponse.restStatus())
val responseBody = getResponse.asMap()
logger.info("Response body: $responseBody")
assertTrue("Test index does not exist", responseBody.containsKey(INDEX_NAME))
val responsePolicy = responseBody[INDEX_NAME] as Map<String, String>
val responsePolicyId = responsePolicy["policy_id"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.opensearch.core.rest.RestStatus
abstract class LRONConfigRestTestCase : IndexManagementRestTestCase() {
@Before
fun prepareForIT() {
setDebugLogLevel()
/* init cluster node ids in integ test */
initNodeIdsInRestIT(client())
}
Expand All @@ -49,22 +48,6 @@ abstract class LRONConfigRestTestCase : IndexManagementRestTestCase() {
return client().makeRequest("POST", IndexManagementPlugin.LRON_BASE_URI, emptyMap(), lronConfig.toHttpEntity())
}

private fun setDebugLogLevel() {
client().makeRequest(
"PUT", "_cluster/settings",
StringEntity(
"""
{
"transient": {
"logger.org.opensearch.indexmanagement.controlcenter.notification":"DEBUG"
}
}
""".trimIndent(),
ContentType.APPLICATION_JSON
)
)
}

protected fun LRONConfig.toHttpEntity(): HttpEntity = StringEntity(toJsonString(), ContentType.APPLICATION_JSON)

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,17 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase()
return myIndex["state"] as String
}

@Suppress("UNCHECKED_CAST")
protected fun getIndexNamesOfPattern(pattern: String): Set<String> {
val request = Request("GET", "/_cluster/state")
val response = client().performRequest(request)

val responseMap = response.asMap()
val metadata = responseMap["metadata"] as Map<String, Any>
val indexMetaData = metadata["indices"] as Map<String, Any>
return indexMetaData.filter { it.key.startsWith(pattern) }.keys
}

@Suppress("UNCHECKED_CAST")
protected fun getIndexBlocksWriteSetting(indexName: String): String {
val indexSettings = getIndexSettings(indexName) as Map<String, Map<String, Map<String, Any?>>>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ class ActionRetryIT : IndexStateManagementRestTestCase() {
* We are forcing RollOver to fail in this Integ test.
*/
fun `test failed action`() {
disableValidationService()
val testPolicy = """
{"policy":{"description":"Default policy","default_state":"Ingest","states":[
{"name":"Ingest","actions":[{"retry":{"count":2,"backoff":"constant","delay":"1s"},"rollover":{"min_doc_count":100}}],"transitions":[{"state_name":"Search"}]},
Expand Down Expand Up @@ -96,7 +95,6 @@ class ActionRetryIT : IndexStateManagementRestTestCase() {
}

fun `test exponential backoff`() {
disableValidationService()
val testPolicy = """
{"policy":{"description":"Default policy","default_state":"Ingest","states":[
{"name":"Ingest","actions":[{"retry":{"count":2,"backoff":"exponential","delay":"1m"},"rollover":{"min_doc_count":100}}],"transitions":[{"state_name":"Search"}]},
Expand Down
Loading

0 comments on commit 8e40ac7

Please sign in to comment.