Skip to content

Commit

Permalink
272 - Data and file storage improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
ivicac committed Sep 7, 2023
1 parent 4349c57 commit 35b423a
Show file tree
Hide file tree
Showing 37 changed files with 301 additions and 275 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import com.bytechef.hermes.execution.converter.StringToWorkflowExecutionIdConverter;
import com.bytechef.hermes.execution.converter.TriggerStateValueToStringConverter;
import com.bytechef.hermes.execution.converter.WorkflowExecutionIdToStringConverter;
import com.bytechef.data.storage.converter.DataStorageValueToStringConverter;
import com.bytechef.data.storage.converter.StringToDataStorageValueConverter;
import com.bytechef.data.storage.converter.DataWrapperToStringConverter;
import com.bytechef.data.storage.converter.StringToDataWrapperConverter;
import com.bytechef.file.storage.converter.FileEntryToStringConverter;
import com.bytechef.file.storage.converter.StringToFileEntryConverter;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -83,13 +83,13 @@ public DateTimeProvider auditingDateTimeProvider() {
@Override
protected List<?> userConverters() {
return Arrays.asList(
new DataStorageValueToStringConverter(objectMapper),
new DataWrapperToStringConverter(objectMapper),
new EncryptedMapWrapperToStringConverter(encryption, objectMapper),
new EncryptedStringToMapWrapperConverter(encryption, objectMapper),
new ExecutionErrorToStringConverter(objectMapper),
new FileEntryToStringConverter(objectMapper),
new MapWrapperToStringConverter(objectMapper),
new StringToDataStorageValueConverter(objectMapper),
new StringToDataWrapperConverter(objectMapper),
new StringToExecutionErrorConverter(objectMapper),
new StringToFileEntryConverter(objectMapper),
new StringToMapWrapperConverter(objectMapper),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import com.bytechef.atlas.execution.converter.WebhooksToStringConverter;
import com.bytechef.commons.data.jdbc.converter.MapWrapperToStringConverter;
import com.bytechef.commons.data.jdbc.converter.StringToMapWrapperConverter;
import com.bytechef.data.storage.converter.DataStorageValueToStringConverter;
import com.bytechef.data.storage.converter.StringToDataStorageValueConverter;
import com.bytechef.data.storage.converter.DataWrapperToStringConverter;
import com.bytechef.data.storage.converter.StringToDataWrapperConverter;
import com.bytechef.hermes.execution.converter.StringToTriggerStateValueConverter;
import com.bytechef.hermes.execution.converter.StringToWorkflowExecutionIdConverter;
import com.bytechef.hermes.configuration.converter.StringToWorkflowTriggerConverter;
Expand Down Expand Up @@ -78,11 +78,11 @@ public DateTimeProvider auditingDateTimeProvider() {
@Override
protected List<?> userConverters() {
return Arrays.asList(
new DataStorageValueToStringConverter(objectMapper),
new DataWrapperToStringConverter(objectMapper),
new ExecutionErrorToStringConverter(objectMapper),
new FileEntryToStringConverter(objectMapper),
new MapWrapperToStringConverter(objectMapper),
new StringToDataStorageValueConverter(objectMapper),
new StringToDataWrapperConverter(objectMapper),
new StringToExecutionErrorConverter(objectMapper),
new StringToFileEntryConverter(objectMapper),
new StringToMapWrapperConverter(objectMapper),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package com.bytechef.data.storage.remote.client.service;

import com.bytechef.commons.webclient.LoadBalancedWebClient;
import com.bytechef.hermes.component.definition.Context.DataStorageScope;
import com.bytechef.data.storage.service.DataStorageService;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.springframework.core.ParameterizedTypeReference;
Expand All @@ -40,7 +39,7 @@ public DataStorageServiceClient(LoadBalancedWebClient loadBalancedWebClient) {
}

@Override
public <T> Optional<T> fetchValue(DataStorageScope scope, long scopeId, String key) {
public <T> Optional<T> fetchData(String context, int scope, long scopeId, String key) {
return Optional.ofNullable(
loadBalancedWebClient.get(
uriBuilder -> uriBuilder
Expand All @@ -51,12 +50,12 @@ public <T> Optional<T> fetchValue(DataStorageScope scope, long scopeId, String k
}

@Override
public void save(DataStorageScope scope, long scopeId, String key, Object value) {
public void save(String context, int scope, long scopeId, String key, Object data) {
loadBalancedWebClient.put(
uriBuilder -> uriBuilder
.host("execution-service-app")
.path("/internal/data-storage-service/save/{scope}/{scopeId}/{key}")
.build(scope, scope, key),
value);
.path("/internal/data-storage-service/save/{context}/{scope}/{scopeId}/{key}")
.build(context, scope, scope, key),
data);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package com.bytechef.data.storage.remote.web.rest.service;

import com.bytechef.hermes.component.definition.Context.DataStorageScope;
import com.bytechef.commons.util.OptionalUtils;
import com.bytechef.data.storage.service.DataStorageService;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.swagger.v3.oas.annotations.Hidden;
Expand Down Expand Up @@ -45,29 +45,29 @@ public DataStorageServiceController(DataStorageService dataStorageService) {

@RequestMapping(
method = RequestMethod.GET,
value = "/fetch-value/{scope}/{scopeId}/{key}",
value = "/fetch-value/{context}/{scope}/{scopeId}/{key}",
consumes = {
"application/json"
})
public ResponseEntity<Object> fetchValue(
@PathVariable DataStorageScope scope, @PathVariable long scopeId, @PathVariable String key) {
@PathVariable String context, @PathVariable int scope, @PathVariable long scopeId,
@PathVariable String key) {

return ResponseEntity.ok(
dataStorageService.fetchValue(scope, scopeId, key)
.orElse(null));
OptionalUtils.orElse(dataStorageService.fetchData(context, scope, scopeId, key), null));
}

@RequestMapping(
method = RequestMethod.PUT,
value = "/save/{scope}/{scopeId}/{key}",
value = "/save/{context}/{scope}/{scopeId}/{key}",
consumes = {
"application/json"
})
public ResponseEntity<Void> save(
@PathVariable DataStorageScope scope, @PathVariable long scopeId, @PathVariable String key,
@RequestBody Object value) {
@PathVariable String context, @PathVariable int scope, @PathVariable long scopeId,
@PathVariable String key, @RequestBody Object data) {

dataStorageService.save(scope, scopeId, key, value);
dataStorageService.save(context, scope, scopeId, key, data);

return ResponseEntity.noContent()
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package com.bytechef.atlas.configuration;
package com.bytechef.atlas.configuration.workflow.contributor;

import java.util.List;

Expand All @@ -24,5 +24,5 @@
*/
public interface WorkflowReservedWordContributor {

List<String> getReservedWord();
List<String> getReservedWords();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package com.bytechef.atlas.configuration.workflow.mapper;

import com.bytechef.atlas.configuration.WorkflowReservedWordContributor;
import com.bytechef.atlas.configuration.workflow.contributor.WorkflowReservedWordContributor;
import com.bytechef.atlas.configuration.constant.WorkflowConstants;
import com.bytechef.atlas.configuration.domain.Workflow;
import com.bytechef.commons.util.CollectionUtils;
Expand Down Expand Up @@ -57,7 +57,7 @@ abstract class AbstractWorkflowMapper implements WorkflowMapper {
WorkflowReservedWordContributor.class);

for (WorkflowReservedWordContributor workflowReservedWordContributor : serviceLoader) {
additionalWorkflowReservedWords.addAll(workflowReservedWordContributor.getReservedWord());
additionalWorkflowReservedWords.addAll(workflowReservedWordContributor.getReservedWords());
}
} catch (ServiceConfigurationError e) {
if (logger.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@
*/
public interface WorkflowFileStorage {

InputStream getFileStream(FileEntry fileEntry);
InputStream getFileStream(String filename, String url);

Map<String, ?> readContextValue(FileEntry fileEntry);

String readFileToString(FileEntry fileEntry);
String readFileToString(String filename, String url);

Map<String, ?> readJobOutputs(FileEntry fileEntry);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@

public class WorkflowFileStorageImpl implements WorkflowFileStorage {

private static final String CONTEXTS = "contexts";
public static final String DATA = "data";
public static final String JOBS = "jobs";
public static final String TASK_EXECUTIONS = "task_executions";
private static final String WORKFLOWS_CONTEXTS = "workflows/contexts";
private static final String WORKFLOWS_DATA = "workflows/data";
private static final String WORKFLOWS_JOBS = "workflows/jobs";
private static final String WORKFLOWS_TASK_EXECUTIONS = "workflows/task_executions";

private final FileStorageService fileStorageService;
private final ObjectMapper objectMapper;
Expand All @@ -45,66 +45,67 @@ public WorkflowFileStorageImpl(FileStorageService fileStorageService, ObjectMapp
}

@Override
public InputStream getFileStream(FileEntry fileEntry) {
return fileStorageService.getFileStream(DATA, fileEntry);
public InputStream getFileStream(String filename, String url) {
return fileStorageService.getFileStream(WORKFLOWS_DATA, new FileEntry(WORKFLOWS_DATA, filename, url));
}

@Override
public Map<String, ?> readContextValue(FileEntry fileEntry) {
return JsonUtils.read(
fileStorageService.readFileToString(CONTEXTS, fileEntry), new TypeReference<>() {}, objectMapper);
fileStorageService.readFileToString(WORKFLOWS_CONTEXTS, fileEntry), new TypeReference<>() {}, objectMapper);
}

@Override
public String readFileToString(FileEntry fileEntry) {
return fileStorageService.readFileToString(DATA, fileEntry);
public String readFileToString(String filename, String url) {
return fileStorageService.readFileToString(WORKFLOWS_DATA, new FileEntry(WORKFLOWS_DATA, filename, url));
}

@Override
public Map<String, ?> readJobOutputs(FileEntry fileEntry) {
return JsonUtils.read(
fileStorageService.readFileToString(JOBS, fileEntry), new TypeReference<>() {}, objectMapper);
fileStorageService.readFileToString(WORKFLOWS_JOBS, fileEntry), new TypeReference<>() {}, objectMapper);
}

@Override
public Object readTaskExecutionOutput(FileEntry fileEntry) {
return JsonUtils.read(
fileStorageService.readFileToString(TASK_EXECUTIONS, fileEntry), Object.class, objectMapper);
fileStorageService.readFileToString(WORKFLOWS_TASK_EXECUTIONS, fileEntry), Object.class, objectMapper);
}

@Override
public FileEntry storeContextValue(long stackId, Context.Classname classname, Map<String, ?> value) {
return fileStorageService.storeFileContent(
CONTEXTS, classname + "_" + stackId + ".json", JsonUtils.write(value, objectMapper));
WORKFLOWS_CONTEXTS, classname + "_" + stackId + ".json", JsonUtils.write(value, objectMapper));
}

@Override
public FileEntry storeContextValue(
long stackId, int subStackId, Context.Classname classname, Map<String, ?> value) {

return fileStorageService.storeFileContent(
CONTEXTS, classname + "_" + stackId + "_" + subStackId + ".json",
WORKFLOWS_CONTEXTS, classname + "_" + stackId + "_" + subStackId + ".json",
JsonUtils.write(value, objectMapper));
}

@Override
public FileEntry storeFileContent(String fileName, String data) {
return fileStorageService.storeFileContent(DATA, fileName, data);
return fileStorageService.storeFileContent(WORKFLOWS_DATA, fileName, data);
}

@Override
public FileEntry storeFileContent(String fileName, InputStream inputStream) {
return fileStorageService.storeFileContent(DATA, fileName, inputStream);
return fileStorageService.storeFileContent(WORKFLOWS_DATA, fileName, inputStream);
}

@Override
public FileEntry storeJobOutputs(long jobId, Map<String, ?> outputs) {
return fileStorageService.storeFileContent(JOBS, jobId + ".json", JsonUtils.write(outputs, objectMapper));
return fileStorageService.storeFileContent(WORKFLOWS_JOBS, jobId + ".json",
JsonUtils.write(outputs, objectMapper));
}

@Override
public FileEntry storeTaskExecutionOutput(Long taskExecutionId, Object output) {
return fileStorageService.storeFileContent(
TASK_EXECUTIONS, taskExecutionId + ".json", JsonUtils.write(output, objectMapper));
WORKFLOWS_TASK_EXECUTIONS, taskExecutionId + ".json", JsonUtils.write(output, objectMapper));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<includeAll path="classpath:config/liquibase/changelog/configuration/" relativeToChangelogFile="false" errorIfMissingOrEmpty="false" contextFilter="mono or configuration" />

<!-- data storage -->
<includeAll path="classpath:config/liquibase/changelog/data_storage" relativeToChangelogFile="false" errorIfMissingOrEmpty="false" contextFilter="mono or data-storage" />
<includeAll path="classpath:config/liquibase/changelog/data_entry" relativeToChangelogFile="false" errorIfMissingOrEmpty="false" contextFilter="mono or data-storage" />

<!-- execution -->
<includeAll path="classpath:config/liquibase/changelog/execution/" relativeToChangelogFile="false" errorIfMissingOrEmpty="false" contextFilter="mono or execution" />
Expand Down
Loading

0 comments on commit 35b423a

Please sign in to comment.