diff --git a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/config/cache/CacheableMetadataDAO.java b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/config/cache/CacheableMetadataDAO.java index f7ad3d25e0..256512ea44 100644 --- a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/config/cache/CacheableMetadataDAO.java +++ b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/config/cache/CacheableMetadataDAO.java @@ -143,6 +143,11 @@ public List getAllWorkflowDefs() { return cassandraMetadataDAO.getAllWorkflowDefs(); } + @Override + public List getAllWorkflowDefsLatestVersions() { + return cassandraMetadataDAO.getAllWorkflowDefsLatestVersions(); + } + private List refreshTaskDefsCache() { try { Cache taskDefsCache = cacheManager.getCache(TASK_DEF_CACHE); diff --git a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraMetadataDAO.java b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraMetadataDAO.java index af846e6a14..90ff3ed4a7 100644 --- a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraMetadataDAO.java +++ b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraMetadataDAO.java @@ -14,9 +14,12 @@ import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.PriorityQueue; import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.ImmutablePair; @@ -60,8 +63,10 @@ public class CassandraMetadataDAO extends CassandraBaseDAO implements MetadataDA private final PreparedStatement insertTaskDefStatement; private final PreparedStatement selectWorkflowDefStatement; + private final PreparedStatement selectAllWorkflowDefVersionsByNameStatement; private final PreparedStatement selectAllWorkflowDefsStatement; + private final PreparedStatement selectAllWorkflowDefsLatestVersionsStatement; private final PreparedStatement selectTaskDefStatement; private final PreparedStatement selectAllTaskDefsStatement; @@ -97,6 +102,9 @@ public CassandraMetadataDAO( this.selectAllWorkflowDefsStatement = session.prepare(statements.getSelectAllWorkflowDefsStatement()) .setConsistencyLevel(properties.getReadConsistencyLevel()); + this.selectAllWorkflowDefsLatestVersionsStatement = + session.prepare(statements.getSelectAllWorkflowDefsLatestVersionsStatement()) + .setConsistencyLevel(properties.getReadConsistencyLevel()); this.selectTaskDefStatement = session.prepare(statements.getSelectTaskDefStatement()) .setConsistencyLevel(properties.getReadConsistencyLevel()); @@ -289,6 +297,48 @@ public List getAllWorkflowDefs() { } } + @Override + public List getAllWorkflowDefsLatestVersions() { + try { + ResultSet resultSet = + session.execute( + selectAllWorkflowDefsLatestVersionsStatement.bind( + WORKFLOW_DEF_INDEX_KEY)); + List rows = resultSet.all(); + if (rows.size() == 0) { + LOGGER.info("No workflow definitions were found."); + return Collections.EMPTY_LIST; + } + Map> allWorkflowDefs = new HashMap<>(); + + for (Row row : rows) { + String defNameVersion = row.getString(WORKFLOW_DEF_NAME_VERSION_KEY); + var nameVersion = getWorkflowNameAndVersion(defNameVersion); + WorkflowDef def = + getWorkflowDef(nameVersion.getLeft(), nameVersion.getRight()).orElse(null); + if (def == null) { + continue; + } + if (allWorkflowDefs.get(def.getName()) == null) { + allWorkflowDefs.put( + def.getName(), + new PriorityQueue<>( + (WorkflowDef w1, WorkflowDef w2) -> + Integer.compare(w2.getVersion(), w1.getVersion()))); + } + allWorkflowDefs.get(def.getName()).add(def); + } + return allWorkflowDefs.values().stream() + .map(PriorityQueue::poll) + .collect(Collectors.toList()); + } catch (DriverException e) { + Monitors.error(CLASS_NAME, "getAllWorkflowDefsLatestVersions"); + String errorMsg = "Error retrieving all workflow defs latest versions"; + LOGGER.error(errorMsg, e); + throw new TransientException(errorMsg, e); + } + } + private TaskDef getTaskDefFromDB(String name) { try { ResultSet resultSet = session.execute(selectTaskDefStatement.bind(name)); diff --git a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/util/Statements.java b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/util/Statements.java index c77f2b2939..68fe3b2427 100644 --- a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/util/Statements.java +++ b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/util/Statements.java @@ -210,6 +210,14 @@ public String getSelectAllWorkflowDefsStatement() { .getQueryString(); } + public String getSelectAllWorkflowDefsLatestVersionsStatement() { + return QueryBuilder.select() + .all() + .from(keyspace, TABLE_WORKFLOW_DEFS_INDEX) + .where(eq(WORKFLOW_DEF_INDEX_KEY, bindMarker())) + .getQueryString(); + } + /** * @return cql query statement to fetch a task definition by name from the "task_definitions" * table diff --git a/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraMetadataDAOSpec.groovy b/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraMetadataDAOSpec.groovy index f28216d17f..fd8afacc5b 100644 --- a/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraMetadataDAOSpec.groovy +++ b/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraMetadataDAOSpec.groovy @@ -166,6 +166,36 @@ class CassandraMetadataDAOSpec extends CassandraSpec { } + def "Get All WorkflowDef"() { + when: + metadataDAO.removeWorkflowDef("workflow_def_1", 1) + WorkflowDef workflowDef = new WorkflowDef() + workflowDef.setName("workflow_def_1") + workflowDef.setVersion(1) + workflowDef.setOwnerEmail("test@junit.com") + metadataDAO.createWorkflowDef(workflowDef) + + workflowDef.setName("workflow_def_2") + metadataDAO.createWorkflowDef(workflowDef) + workflowDef.setVersion(2) + metadataDAO.createWorkflowDef(workflowDef) + + workflowDef.setName("workflow_def_3") + workflowDef.setVersion(1) + metadataDAO.createWorkflowDef(workflowDef) + workflowDef.setVersion(2) + metadataDAO.createWorkflowDef(workflowDef) + workflowDef.setVersion(3) + metadataDAO.createWorkflowDef(workflowDef) + + then: // fetch the workflow definition + def allDefsLatestVersions = metadataDAO.getAllWorkflowDefsLatestVersions() + Map allDefsMap = allDefsLatestVersions.collectEntries {wfDef -> [wfDef.getName(), wfDef]} + allDefsMap.get("workflow_def_1").getVersion() == 1 + allDefsMap.get("workflow_def_2").getVersion() == 2 + allDefsMap.get("workflow_def_3").getVersion() == 3 + } + def "parse index string"() { expect: def pair = metadataDAO.getWorkflowNameAndVersion(nameVersionStr) diff --git a/client/src/main/java/com/netflix/conductor/client/http/MetadataClient.java b/client/src/main/java/com/netflix/conductor/client/http/MetadataClient.java index 1ea4efebaf..8db927b8c5 100644 --- a/client/src/main/java/com/netflix/conductor/client/http/MetadataClient.java +++ b/client/src/main/java/com/netflix/conductor/client/http/MetadataClient.java @@ -22,12 +22,16 @@ import com.netflix.conductor.common.metadata.workflow.WorkflowDef; import com.sun.jersey.api.client.ClientHandler; +import com.sun.jersey.api.client.GenericType; import com.sun.jersey.api.client.config.ClientConfig; import com.sun.jersey.api.client.config.DefaultClientConfig; import com.sun.jersey.api.client.filter.ClientFilter; public class MetadataClient extends ClientBase { + private static final GenericType> workflowDefList = + new GenericType>() {}; + /** Creates a default metadata client */ public MetadataClient() { this(new DefaultClientConfig(), new DefaultConductorClientConfiguration(), null); @@ -122,6 +126,12 @@ public WorkflowDef getWorkflowDef(String name, Integer version) { name); } + /** */ + public List getAllWorkflowsWithLatestVersions() { + return getForEntity( + "metadata/workflow/latest-versions", null, workflowDefList, (Object) null); + } + /** * Removes the workflow definition of a workflow from the conductor server. It does not remove * associated workflows. Use with caution. diff --git a/client/src/test/groovy/com/netflix/conductor/client/http/MetadataClientSpec.groovy b/client/src/test/groovy/com/netflix/conductor/client/http/MetadataClientSpec.groovy index f194f7180a..d82acc5090 100644 --- a/client/src/test/groovy/com/netflix/conductor/client/http/MetadataClientSpec.groovy +++ b/client/src/test/groovy/com/netflix/conductor/client/http/MetadataClientSpec.groovy @@ -13,7 +13,9 @@ package com.netflix.conductor.client.http import com.netflix.conductor.client.exception.ConductorClientException +import com.netflix.conductor.common.metadata.workflow.WorkflowDef +import com.sun.jersey.api.client.ClientResponse import spock.lang.Subject class MetadataClientSpec extends ClientSpecification { @@ -75,4 +77,18 @@ class MetadataClientSpec extends ClientSpecification { then: thrown(IllegalArgumentException.class) } + + def "workflow get all definitions latest version"() { + given: + List result = new ArrayList() + URI uri = createURI("metadata/workflow/latest-versions") + + when: + metadataClient.getAllWorkflowsWithLatestVersions() + + then: + 1 * requestHandler.get(uri) >> Mock(ClientResponse.class) { + getEntity(_) >> result + } + } } diff --git a/core/src/main/java/com/netflix/conductor/dao/MetadataDAO.java b/core/src/main/java/com/netflix/conductor/dao/MetadataDAO.java index 2d9a79f3a9..b7e39cf3ad 100644 --- a/core/src/main/java/com/netflix/conductor/dao/MetadataDAO.java +++ b/core/src/main/java/com/netflix/conductor/dao/MetadataDAO.java @@ -81,4 +81,9 @@ public interface MetadataDAO { * @return List of all the workflow definitions */ List getAllWorkflowDefs(); + + /** + * @return List the latest versions of the workflow definitions + */ + List getAllWorkflowDefsLatestVersions(); } diff --git a/core/src/main/java/com/netflix/conductor/service/MetadataService.java b/core/src/main/java/com/netflix/conductor/service/MetadataService.java index 913245c647..701055ef84 100644 --- a/core/src/main/java/com/netflix/conductor/service/MetadataService.java +++ b/core/src/main/java/com/netflix/conductor/service/MetadataService.java @@ -154,4 +154,6 @@ void removeEventHandlerStatus( List getEventHandlersForEvent( @NotEmpty(message = "EventName cannot be null or empty") String event, boolean activeOnly); + + List getWorkflowDefsLatestVersions(); } diff --git a/core/src/main/java/com/netflix/conductor/service/MetadataServiceImpl.java b/core/src/main/java/com/netflix/conductor/service/MetadataServiceImpl.java index 5cb0c48092..7326ab62df 100644 --- a/core/src/main/java/com/netflix/conductor/service/MetadataServiceImpl.java +++ b/core/src/main/java/com/netflix/conductor/service/MetadataServiceImpl.java @@ -218,6 +218,11 @@ public List getEventHandlersForEvent(String event, boolean activeO return eventHandlerDAO.getEventHandlersForEvent(event, activeOnly); } + @Override + public List getWorkflowDefsLatestVersions() { + return metadataDAO.getAllWorkflowDefsLatestVersions(); + } + public Map> getWorkflowNamesAndVersions() { List workflowDefs = metadataDAO.getAllWorkflowDefs(); diff --git a/redis-persistence/src/main/java/com/netflix/conductor/redis/dao/RedisMetadataDAO.java b/redis-persistence/src/main/java/com/netflix/conductor/redis/dao/RedisMetadataDAO.java index d8009b6180..f7951c1e32 100644 --- a/redis-persistence/src/main/java/com/netflix/conductor/redis/dao/RedisMetadataDAO.java +++ b/redis-persistence/src/main/java/com/netflix/conductor/redis/dao/RedisMetadataDAO.java @@ -299,6 +299,27 @@ public List getAllWorkflowDefs() { return workflows; } + @Override + public List getAllWorkflowDefsLatestVersions() { + List workflows = new LinkedList<>(); + + // Get all definitions latest versions from WORKFLOW_DEF_NAMES + recordRedisDaoRequests("getAllWorkflowLatestVersionsDefs"); + Set wfNames = jedisProxy.smembers(nsKey(WORKFLOW_DEF_NAMES)); + int size = 0; + // Place all workflows into the Priority Queue. The PQ will allow us to grab the latest + // version of the workflows. + for (String wfName : wfNames) { + WorkflowDef def = getLatestWorkflowDef(wfName).orElse(null); + if (def != null) { + workflows.add(def); + size += def.toString().length(); + } + } + recordRedisDaoPayloadSize("getAllWorkflowLatestVersionsDefs", size, "n/a", "n/a"); + return workflows; + } + private void _createOrUpdate(WorkflowDef workflowDef) { // First set the workflow def jedisProxy.hset( diff --git a/redis-persistence/src/test/java/com/netflix/conductor/redis/dao/RedisMetadataDAOTest.java b/redis-persistence/src/test/java/com/netflix/conductor/redis/dao/RedisMetadataDAOTest.java index ef630a0ff4..fd91191629 100644 --- a/redis-persistence/src/test/java/com/netflix/conductor/redis/dao/RedisMetadataDAOTest.java +++ b/redis-persistence/src/test/java/com/netflix/conductor/redis/dao/RedisMetadataDAOTest.java @@ -15,9 +15,11 @@ import java.time.Duration; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.function.Function; import java.util.stream.Collectors; import org.junit.Before; @@ -160,6 +162,45 @@ public void testWorkflowDefOperations() { assertEquals(workflow.getVersion(), 3); } + @Test + public void testGetAllWorkflowDefsLatestVersions() { + WorkflowDef def = new WorkflowDef(); + def.setName("test1"); + def.setVersion(1); + def.setDescription("description"); + def.setCreatedBy("unit_test"); + def.setCreateTime(1L); + def.setOwnerApp("ownerApp"); + def.setUpdatedBy("unit_test2"); + def.setUpdateTime(2L); + redisMetadataDAO.createWorkflowDef(def); + + def.setName("test2"); + redisMetadataDAO.createWorkflowDef(def); + def.setVersion(2); + redisMetadataDAO.createWorkflowDef(def); + + def.setName("test3"); + def.setVersion(1); + redisMetadataDAO.createWorkflowDef(def); + def.setVersion(2); + redisMetadataDAO.createWorkflowDef(def); + def.setVersion(3); + redisMetadataDAO.createWorkflowDef(def); + + // Placed the values in a map because they might not be stored in order of defName. + // To test, needed to confirm that the versions are correct for the definitions. + Map allMap = + redisMetadataDAO.getAllWorkflowDefsLatestVersions().stream() + .collect(Collectors.toMap(WorkflowDef::getName, Function.identity())); + + assertNotNull(allMap); + assertEquals(3, allMap.size()); + assertEquals(1, allMap.get("test1").getVersion()); + assertEquals(2, allMap.get("test2").getVersion()); + assertEquals(3, allMap.get("test3").getVersion()); + } + @Test(expected = NotFoundException.class) public void removeInvalidWorkflowDef() { redisMetadataDAO.removeWorkflowDef("hello", 1); diff --git a/rest/src/main/java/com/netflix/conductor/rest/controllers/MetadataResource.java b/rest/src/main/java/com/netflix/conductor/rest/controllers/MetadataResource.java index a1062f64be..023ed2b57d 100644 --- a/rest/src/main/java/com/netflix/conductor/rest/controllers/MetadataResource.java +++ b/rest/src/main/java/com/netflix/conductor/rest/controllers/MetadataResource.java @@ -83,6 +83,12 @@ public List getAll() { return metadataService.getWorkflowNamesAndVersions(); } + @Operation(summary = "Returns only the latest version of all workflow definitions") + @GetMapping("/workflow/latest-versions") + public List getAllWorkflowsWithLatestVersions() { + return metadataService.getWorkflowDefsLatestVersions(); + } + @DeleteMapping("/workflow/{name}/{version}") @Operation( summary = diff --git a/rest/src/test/java/com/netflix/conductor/rest/controllers/MetadataResourceTest.java b/rest/src/test/java/com/netflix/conductor/rest/controllers/MetadataResourceTest.java index eadf68e8bf..36d5b6e70c 100644 --- a/rest/src/test/java/com/netflix/conductor/rest/controllers/MetadataResourceTest.java +++ b/rest/src/test/java/com/netflix/conductor/rest/controllers/MetadataResourceTest.java @@ -91,6 +91,20 @@ public void testGetAllWorkflowDef() { assertEquals(listOfWorkflowDef, metadataResource.getAll()); } + @Test + public void testGetAllWorkflowDefLatestVersions() { + WorkflowDef workflowDef = new WorkflowDef(); + workflowDef.setName("test"); + workflowDef.setVersion(1); + workflowDef.setDescription("test"); + + List listOfWorkflowDef = new ArrayList<>(); + listOfWorkflowDef.add(workflowDef); + + when(mockMetadataService.getWorkflowDefsLatestVersions()).thenReturn(listOfWorkflowDef); + assertEquals(listOfWorkflowDef, metadataResource.getAllWorkflowsWithLatestVersions()); + } + @Test public void testUnregisterWorkflowDef() throws Exception { metadataResource.unregisterWorkflowDef("test", 1);