Skip to content

Commit

Permalink
Implement rest data loader (nv-morpheus#986)
Browse files Browse the repository at this point in the history
Implement RESTDataLoader.

Authors:
  - https://github.com/yuchenz427
  - Devin Robison (https://github.com/drobison00)

Approvers:
  - Devin Robison (https://github.com/drobison00)
  - Bhargav Suryadevara (https://github.com/bsuryadevara)

URL: nv-morpheus#986
  • Loading branch information
yczhang-nv authored Aug 3, 2023
1 parent b228389 commit 347ce65
Show file tree
Hide file tree
Showing 9 changed files with 1,035 additions and 40 deletions.
10 changes: 9 additions & 1 deletion ci/iwyu/mappings.imp
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,19 @@
# stdlib
{ "include": [ "<bits/cxxabi_forced.h>", private, "<mutex>", "public" ] },
{ "include": [ "<bits/cxxabi_forced.h>", private, "<vector>", "public" ] },
{ "include": [ "<bits/this_thread_sleep.h>", private, "<thread>", "public" ] },
{ "include": [ "<bits/types/siginfo_t.h>", private, "<csignal>", "public" ] },

# boost
{ "include": ["@<boost/fiber/future/detail/.*>", "private", "<boost/fiber/future/future.hpp>", "public"] },
{ "include": ["@<boost/algorithm/string/.*>", "private", "<boost/algorithm/string.hpp>", "public" ] },
{ "include": ["@<boost/asio/detail/.*>", "private", "<boost/asio.hpp>", "public" ] },
{ "include": ["@<boost/asio/impl/.*>", "private", "<boost/asio.hpp>", "public" ] },
{ "include": ["@<boost/asio/ip/impl/.*>", "private", "<boost/asio.hpp>", "public" ] },
{ "include": ["@<boost/beast/core/detail/.*>", "private", "<boost/beast/core.hpp>", "public" ] },
{ "include": ["@<boost/beast/core/impl/.*>", "private", "<boost/beast/core.hpp>", "public" ] },
{ "include": ["@<boost/beast/http/detail/.*>", "private", "<boost/beast/http.hpp>", "public" ] },
{ "include": ["@<boost/beast/http/impl/.*>", "private", "<boost/beast/http.hpp>", "public" ] },
{ "include": ["@<boost/fiber/future/detail/.*>", "private", "<boost/fiber/future/future.hpp>", "public"] },

# cuda
{ "include": ["<cuda_runtime_api.h>", "private", "<cuda_runtime.h>", "public"] },
Expand Down
99 changes: 99 additions & 0 deletions docs/source/loaders/core/rest_to_df_loader.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
<!--
SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

## REST to DataFrame Loader

[DataLoader](../../modules/core/data_loader.md) module is used to load data files content into a dataframe using custom loader function. This loader function can be configured to send REST requests with customized parameters to retrieve data from endpoints. See below for the specific configuration format.

### Example Loader Configuration

Using below configuration while loading DataLoader module, specifies that the DataLoader module should utilize the `rest` loader when loading files into a dataframe.

```json
{
"loaders": [{
"id": "rest"
}]
}
```

**Note** : Loaders can receive configuration from the `load` task via [control message] during runtime.

### Task Configurable Parameters

The parameters that can be configured for this specific loader at load task level:

| Parameter | Type | Description | Example Value | Default Value |
| ----------- | ------ | ----------------------------------- | ------------- | ------------- |
| `loader_id` | string | Unique identifier for the loader | "rest" | `[Required]` |
| `strategy` | string | Strategy for constructing dataframe | "aggregate" | `[Required]` |
| `queries` | array | parameters of REST queries | See below | `[Required]` |


### `queries`

| Key | Type | Description | Example Value | Default Value |
| -------------- | ---------- | ----------------------------------------------------------------- | ------------------------------------------------------------------------------ | ------------- |
| `method` | string | Method of request | "GET" | `"GET"` |
| `endpoint` | string | Endpoint of request | "0.0.0.0/path/to/target?param1=true" | `[Required]` |
| `port` | string | Target port of request | "80" | `"80"` |
| `http_version` | string | HTTP version of request | "1.1" | `"1.1"` |
| `content_type` | string | Content type of request body in a POST request | "text/plain" | `-` |
| `body` | string | Request body in a POST request | "param1=true&param2=false" | `-` |
| `X-Headers` | dictionary | Customized X-Headers of request | "{"X-Header1":"header1", "X-Header2":"header2"}" | `-` |
| `params` | array | Parameters of requested URL, override values included in endpoint | "[{"param1": "true", "param2":"false"}, {"param1": "false", "param2":"true"}]" | `-` |


### Example Load Task Configuration

Below JSON configuration specifies how to pass additional configuration to the loader through a control message task at runtime.

```json
{
"type":"load",
"properties":
{
"loader_id":"rest",
"strategy":"aggregate",
"queries":[
{
"method":"<GET/POST>",
"endpoint":"0.0.0.0/?param1=false&param2=true",
"port": "80",
"http_version": "1.1",
"content_type":"text/plain",
"body":"http POST body",
"x-headers":
{
"X-Header1":"header1",
"X-Header2":"header2"
},
"params":
[
{
"param1":"true"
},
{
"param1":"false",
"param2":"true"
}
]
}
]
}
}
```
1 change: 1 addition & 0 deletions docs/source/loaders/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,6 @@ accessible to the DataLoader module during module loading.
./core/file_to_df_loader.md
./core/fsspec_loader.md
./core/sql_loader.md
./core/rest_to_df_loader.md
```
6 changes: 6 additions & 0 deletions morpheus/_lib/include/morpheus/io/data_loader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ class DataLoader
*/
DataLoader() = default;

/**
* @brief Constructor with config for the DataLoader class.
*/
DataLoader(nlohmann::json config);

/**
* @brief Destructor for the DataLoader class.
*/
Expand Down Expand Up @@ -128,6 +133,7 @@ class DataLoader

private:
std::map<std::string, std::shared_ptr<Loader>> m_loaders{}; // Map of registered loader instances.
nlohmann::json m_config;
};

#pragma GCC visibility pop
Expand Down
37 changes: 35 additions & 2 deletions morpheus/_lib/src/io/data_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,25 @@

#include <glog/logging.h>

#include <exception>
#include <iostream>
#include <stdexcept>
#include <utility>

namespace {
void process_failures(const std::string& error_msg,
std::shared_ptr<morpheus::ControlMessage> message,
bool processes_failures_as_errors)
{
if (processes_failures_as_errors)
{
throw std::runtime_error(error_msg);
}
message->set_metadata("cm_failed", "true");
message->set_metadata("cm_failed_reason", error_msg);
}
} // namespace

namespace morpheus {

Loader::Loader(nlohmann::json config) : m_config(std::move(config)) {}
Expand All @@ -44,8 +59,18 @@ std::shared_ptr<ControlMessage> Loader::load(std::shared_ptr<ControlMessage> mes
return std::move(message);
}

DataLoader::DataLoader(nlohmann::json config) : m_config(std::move(config)) {}

std::shared_ptr<ControlMessage> DataLoader::load(std::shared_ptr<ControlMessage> control_message)
{
// If set to false, any exception thrown during the task is caught and the related fields in ControlMessage are set
// to indicate the reason of that failure; Otherwise, the exception is thrown
bool processes_failures_as_errors = false;
if (!m_config.empty())
{
processes_failures_as_errors = m_config.value("processes_failures_as_errors", false);
}

while (control_message->has_task("load"))
{
auto task = control_message->remove_task("load");
Expand All @@ -56,8 +81,16 @@ std::shared_ptr<ControlMessage> DataLoader::load(std::shared_ptr<ControlMessage>
{
VLOG(5) << "Loading data using loader: " << loader_id
<< " for message: " << control_message->config().dump() << std::endl;

loader->second->load(control_message, task);
try
{
loader->second->load(control_message, task);
} catch (std::exception& e)
{
process_failures(e.what(), control_message, processes_failures_as_errors);
} catch (...)
{
process_failures("Unknown error", control_message, processes_failures_as_errors);
}
}
else
{
Expand Down
Loading

0 comments on commit 347ce65

Please sign in to comment.