Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE]Add flatten Command to PPL #669

Open
YANG-DB opened this issue Sep 16, 2024 · 13 comments
Open

[FEATURE]Add flatten Command to PPL #669

YANG-DB opened this issue Sep 16, 2024 · 13 comments
Labels
0.6 enhancement New feature or request Lang:PPL Pipe Processing Language support

Comments

@YANG-DB
Copy link
Member

YANG-DB commented Sep 16, 2024

Is your feature request related to a problem?
OpenSearch Piped Processing Language (PPL) currently lacks a native command to flatten nested objects or arrays in documents. Many datasets, especially those containing JSON objects, have deeply nested fields that are difficult to work with in their raw form. The flatten command will simplify these structures and make it easier to analyze and extract data.

What solution would you like?
Introduce a flatten command in PPL that can handle arrays or nested fields, producing a flattened result that contains all the nested elements at the top level.

Syntax:

source=<data_source> | flatten <nested_field>  | fields <fields_to_select>
  • The flatten command takes a nested array or object field and returns each element as part of a flat structure.

Example Use Cases

  1. Flattening an Array Field
source=my-index  | flatten bridges | fields _time, bridges, city, country

This query flattens the bridges array field.

Example Input:

{
  "_time": "2024-09-13T12:00:00",
  "bridges": [
    {"name": "Tower Bridge", "length": 801},
    {"name": "London Bridge", "length": 928}
  ],
  "city": "London",
  "country": "England"
}

Expected Output:

[
  {
    "_time": "2024-09-13T12:00:00",
    "name": "Tower Bridge",
    "length": 801,
    "city": "London",
    "country": "England"
  },
  {
    "_time": "2024-09-13T12:00:00",
    "name": "London Bridge",
    "length": 928,
    "city": "London",
    "country": "England"
  }
]
  1. Flattening a Nested Object
source=my-index | flatten details | fields _time, details 

This query flattens the details object field.

Example Input:

{
  "_time": "2024-09-13T12:00:00",
  "details": {
    "name": "Alice",
    "age": 30,
    "address": {
      "street": "Main St",
      "city": "New York"
    }
  }
}

Expected Output:

{
  "_time": "2024-09-13T12:00:00",
  "name": "Alice",
  "age": 30,
  "street": "Main St",
  "city": "New York"
}

Additional Considerations

  • The flatten command should work efficiently with large arrays or deeply nested structures.
  • It must handle complex JSON objects where multiple levels of nesting exist.
  • Consider supporting multi-level flattening for more deeply nested fields (e.g., flatten details.address).
@YANG-DB YANG-DB added enhancement New feature or request untriaged Lang:PPL Pipe Processing Language support labels Sep 16, 2024
@vamsi-amazon
Copy link
Member

Shouldn't it be bridges.name in flattened object?
|What if multiple object fields has same key inside them?

@YANG-DB YANG-DB removed the untriaged label Sep 16, 2024
@salyh
Copy link
Contributor

salyh commented Oct 1, 2024

Shouldn't it be bridges.name in flattened object? |What if multiple object fields has same key inside them?

As mentioned above "Consider supporting multi-level flattening for more deeply nested fields (e.g., flatten details.address)." I read it as: yes, we support it. Question is what is the default and should it be configurable?

When dealing with nested fields, see #565

@salyh
Copy link
Contributor

salyh commented Oct 5, 2024

@YANG-DB @vamsi-amazon

cc @dr-lilienthal

Before opening a PR, a few design questions and requirement refinements need to be discussed.

  1. Does the terms nested objects or arrays and nested_field refer to a) the datatype "Nested Field" in OpenSearch like described here OR does the terms refer to to b) just a field in a json document which value is a another json array or object? In the first case its relevant to point out that the OpenSearch "Nested Field" datatype is only for arrays and not objects. So the assumption is that b) applies

  2. If above b) applies then it appears that nested arrays in OpenSearch are always "NULL" when queried via Spark SQL or Spark PPL. Nested objects however can be queried as expected. This is possibly a bug and needs to be adressed first before this issue can be solved.

  3. It would help to add the expected input and output in a table structure to the examples because its not yet clear if the flattened object should be added as separate fields to the datarow.

  4. Clarify the relation between flatten and expand_field as proposed here [FEATURE]New expand_field PPL Command #657

@YANG-DB
Copy link
Member Author

YANG-DB commented Oct 8, 2024

@salyh

  1. IMO a general nested field (not specifically opensearch mapping)
  2. yes, we need to solve this specific OpenSearch PPL issue separately
  3. Examples:
source=employees 
| flatten contact 
| fields name, age, contact.phone as phone, contact.address.city as city, contact.address.zipcode as zipcode

results using alias:

Name Age Phone City Zipcode
Alice 30 123-4567 New York 10001
Alice 30 789-0123 New York 10001
Bob 25 234-5678 Los Angeles 90001
Bob 25 890-1234 Los Angeles 90001
source=employees 
| flatten contact 
| fields name, age, contact.phone, contact.address.city, contact.address.zipcode

results:

Name Age contact.phone contact.address.city contact.address.zipcode
Alice 30 123-4567 New York 10001
Alice 30 789-0123 New York 10001
Bob 25 234-5678 Los Angeles 90001
Bob 25 890-1234 Los Angeles 90001
  1. IMO we can merge both - lets discuss this more

@YANG-DB YANG-DB added the 0.6 label Oct 9, 2024
@lukasz-soszynski-eliatra
Copy link
Contributor

I have a question related to the implementation details of the flatten command. Nested JSON objects are represented in Spark by two data types

  • Struct
  • Arrays

Furthermore, the flatten command should work correctly with both types. It is relatively simple to flatten Struct. For example, to flatten the coor object from the below document, it is enough to use * operator (select coor.*from table1)

{
    "_time": "2024-09-13T12:00:00",
    "bridges": [
      {"name": "Rialto Bridge", "length": 48},
      {"name": "Bridge of Sighs", "length": 11}
    ],
    "city": "Venice",
    "country": "Italy",
    "coor": {
      "lat": 45.4408,
      "long": 12.3155,
      "alt": 2
    }
  }

On the other hand, flattening arrays requires the usage of generator functions, that is, functions that create new rows. Unfortunately, usage of the generator function is related to quite severe restrictions:

  • generator function can't be used together with case, if, etc. functions
  • only one generator function can appear in the select clause
  • etc.

Therefore, it is impossible to implement a condition that depends on the field type to use a generator function to flatten arrays or * operator to flatten struct.

Avoiding using condition functions together with generator functions like in the below example also results in type-related validation error (null of type array cannot be cast to a struct)

select inline_outer(array), struct.* 
from (
    select 
        if(startswith(typeof(bridges), 'array'), bridges, null) as array, 
        if(startswith(typeof(bridges), 'struct'), bridges, null) as struct 
    from table1 c);

line 1:7 no viable alternative at input 'select inline_outer'
Can only star expand struct data types. Attribute: `ArrayBuffer(struct)`.; line 1 pos 28

The approach also has another disadvantage. It is impossible to manipulate the names of flattened columns freely.

To solve the problems described above, we can use two approaches.

  1. Change the flatten command syntax, e.g.
... flatten array my_field_with_aray 
| flatten struct my_field_with_struct

However, this solution will contain cons for freely manipulating flattened column names.

  1. Another option is to implement a custom generator function that uses org.apache.spark.sql.catalyst.expressions.GeneratorOuter or org.apache.spark.sql.catalyst.expressions.ExplodeBase as the supper class. However, this requires some additional investigation and, therefore, might be more expensive.

@YANG-DB Could you please tell me what are your preference and how we should implement the flatten command?

@lukasz-soszynski-eliatra
Copy link
Contributor

Currently, I am using the below JSON file to test my implementation

[{"_time":"2024-09-13T12:00:00","bridges":[{"name":"Tower Bridge","length":801},{"name":"London Bridge","length":928}],"city":"London","country":"England","coor":{"lat":51.5074,"long":-0.1278,"alt":35}},{"_time":"2024-09-13T12:00:00","bridges":[{"name":"Pont Neuf","length":232},{"name":"Pont Alexandre III","length":160}],"city":"Paris","country":"France","coor":{"lat":48.8566,"long":2.3522,"alt":35}},{"_time":"2024-09-13T12:00:00","bridges":[{"name":"Rialto Bridge","length":48},{"name":"Bridge of Sighs","length":11}],"city":"Venice","country":"Italy","coor":{"lat":45.4408,"long":12.3155,"alt":2}},{"_time":"2024-09-13T12:00:00","bridges":[{"name":"Charles Bridge","length":516},{"name":"Legion Bridge","length":343}],"city":"Prague","country":"Czech Republic","coor":{"lat":50.0755,"long":14.4378,"alt":200}},{"_time":"2024-09-13T12:00:00","bridges":[{"name":"Chain Bridge","length":375},{"name":"Liberty Bridge","length":333}],"city":"Budapest","country":"Hungary","coor":{"lat":47.4979,"long":19.0402,"alt":96}},{"_time":"1990-09-13T12:00:00","bridges":null,"city":"Warsaw","country":"Poland","coor":null}]

The file is loaded using PySpark and the following commands

df_coor = spark.read.option("multiline","true").json('/home/lukasz/projects/aws/bridges_coordinates.json')
df_coor.write.saveAsTable("bridges_coor")

I can use the following SQL statement to load data in integration tests.

CREATE TEMPORARY VIEW bridges_coor
USING org.apache.spark.sql.json
OPTIONS (
  path "/home/lukasz/projects/aws/bridges_coordinates.json",
  multiLine true
);

@YANG-DB Is this testing method appropriate from your point of view, or do you have other recommendations (S3 tables)?

@YANG-DB
Copy link
Member Author

YANG-DB commented Oct 24, 2024

I have a question related to the implementation details of the flatten command. Nested JSON objects are represented in Spark by two data types

  • Struct
  • Arrays

Furthermore, the flatten command should work correctly with both types. It is relatively simple to flatten Struct. For example, to flatten the coor object from the below document, it is enough to use * operator (select coor.*from table1)

{
    "_time": "2024-09-13T12:00:00",
    "bridges": [
      {"name": "Rialto Bridge", "length": 48},
      {"name": "Bridge of Sighs", "length": 11}
    ],
    "city": "Venice",
    "country": "Italy",
    "coor": {
      "lat": 45.4408,
      "long": 12.3155,
      "alt": 2
    }
  }

On the other hand, flattening arrays requires the usage of generator functions, that is, functions that create new rows. Unfortunately, usage of the generator function is related to quite severe restrictions:

  • generator function can't be used together with case, if, etc. functions
  • only one generator function can appear in the select clause
  • etc.

Therefore, it is impossible to implement a condition that depends on the field type to use a generator function to flatten arrays or * operator to flatten struct.

Avoiding using condition functions together with generator functions like in the below example also results in type-related validation error (null of type array cannot be cast to a struct)

select inline_outer(array), struct.* 
from (
    select 
        if(startswith(typeof(bridges), 'array'), bridges, null) as array, 
        if(startswith(typeof(bridges), 'struct'), bridges, null) as struct 
    from table1 c);

line 1:7 no viable alternative at input 'select inline_outer'
Can only star expand struct data types. Attribute: `ArrayBuffer(struct)`.; line 1 pos 28

The approach also has another disadvantage. It is impossible to manipulate the names of flattened columns freely.

To solve the problems described above, we can use two approaches.

  1. Change the flatten command syntax, e.g.
... flatten array my_field_with_aray 
| flatten struct my_field_with_struct

However, this solution will contain cons for freely manipulating flattened column names.

  1. Another option is to implement a custom generator function that uses org.apache.spark.sql.catalyst.expressions.GeneratorOuter or org.apache.spark.sql.catalyst.expressions.ExplodeBase as the supper class. However, this requires some additional investigation and, therefore, might be more expensive.

@YANG-DB Could you please tell me what are your preference and how we should implement the flatten command?

@lukasz-soszynski-eliatra Thanks for the analysis - IMO separating flatten into 2 unique use cases make sense:

... flatten array my_field_with_aray 
| flatten struct my_field_with_struct

@LantaoJin @penghuo WDYT ?

@YANG-DB
Copy link
Member Author

YANG-DB commented Oct 24, 2024

Currently, I am using the below JSON file to test my implementation

[{"_time":"2024-09-13T12:00:00","bridges":[{"name":"Tower Bridge","length":801},{"name":"London Bridge","length":928}],"city":"London","country":"England","coor":{"lat":51.5074,"long":-0.1278,"alt":35}},{"_time":"2024-09-13T12:00:00","bridges":[{"name":"Pont Neuf","length":232},{"name":"Pont Alexandre III","length":160}],"city":"Paris","country":"France","coor":{"lat":48.8566,"long":2.3522,"alt":35}},{"_time":"2024-09-13T12:00:00","bridges":[{"name":"Rialto Bridge","length":48},{"name":"Bridge of Sighs","length":11}],"city":"Venice","country":"Italy","coor":{"lat":45.4408,"long":12.3155,"alt":2}},{"_time":"2024-09-13T12:00:00","bridges":[{"name":"Charles Bridge","length":516},{"name":"Legion Bridge","length":343}],"city":"Prague","country":"Czech Republic","coor":{"lat":50.0755,"long":14.4378,"alt":200}},{"_time":"2024-09-13T12:00:00","bridges":[{"name":"Chain Bridge","length":375},{"name":"Liberty Bridge","length":333}],"city":"Budapest","country":"Hungary","coor":{"lat":47.4979,"long":19.0402,"alt":96}},{"_time":"1990-09-13T12:00:00","bridges":null,"city":"Warsaw","country":"Poland","coor":null}]

The file is loaded using PySpark and the following commands

df_coor = spark.read.option("multiline","true").json('/home/lukasz/projects/aws/bridges_coordinates.json')
df_coor.write.saveAsTable("bridges_coor")

I can use the following SQL statement to load data in integration tests.

CREATE TEMPORARY VIEW bridges_coor
USING org.apache.spark.sql.json
OPTIONS (
  path "/home/lukasz/projects/aws/bridges_coordinates.json",
  multiLine true
);

@YANG-DB Is this testing method appropriate from your point of view, or do you have other recommendations (S3 tables)?

please review #780 to get a few ideas

@YANG-DB
Copy link
Member Author

YANG-DB commented Oct 24, 2024

@salyh @lukasz-soszynski-eliatra please review #780 to get a few ideas

@lukasz-soszynski-eliatra
Copy link
Contributor

lukasz-soszynski-eliatra commented Oct 24, 2024

@YANG-DB
Many thanks for your suggestions related to testing. Syntax extension to the flatten command might not be required. The current implementation of the command works correctly with

  • structs
  • arrays of structs
  • also, maps should be supported, although I did not test this.

The implementation in the current shape does not support arrays with simple types like [0,1,2,3]

Here are some examples of the flatten command.

The table coor below contains two interesting columns:

  • bridges - array<struct<length:bigint,name:string>>
  • color - struct<alt:bigint,lat:double,long:double>
+-------------------+--------------------+--------+--------------------+--------------+
|              _time|             bridges|    city|                coor|       country|
+-------------------+--------------------+--------+--------------------+--------------+
|2024-09-13T12:00:00|[{801, Tower Brid...|  London|{35, 51.5074, -0....|       England|
|2024-09-13T12:00:00|[{232, Pont Neuf}...|   Paris|{35, 48.8566, 2.3...|        France|
|2024-09-13T12:00:00|[{48, Rialto Brid...|  Venice|{2, 45.4408, 12.3...|         Italy|
|2024-09-13T12:00:00|[{516, Charles Br...|  Prague|{200, 50.0755, 14...|Czech Republic|
|2024-09-13T12:00:00|[{375, Chain Brid...|Budapest|{96, 47.4979, 19....|       Hungary|
|1990-09-13T12:00:00|                NULL|  Warsaw|                NULL|        Poland|
+-------------------+--------------------+--------+--------------------+--------------+

The columns bridges and coor can be flattened in the following way

source=coor | flatten bridges | flatten coor;

The result of the above command is

_time	city	country	length	name	alt	lat	long
2024-09-13T12:00:00	London	England	801	Tower Bridge	35	51.5074	-0.1278
2024-09-13T12:00:00	London	England	928	London Bridge	35	51.5074	-0.1278
2024-09-13T12:00:00	Paris	France	232	Pont Neuf	35	48.8566	2.3522
2024-09-13T12:00:00	Paris	France	160	Pont Alexandre III	35	48.8566	2.3522
2024-09-13T12:00:00	Venice	Italy	48	Rialto Bridge	2	45.4408	12.3155
2024-09-13T12:00:00	Venice	Italy	11	Bridge of Sighs	2	45.4408	12.3155
2024-09-13T12:00:00	Prague	Czech Republic	516	Charles Bridge	200	50.0755	14.4378
2024-09-13T12:00:00	Prague	Czech Republic	343	Legion Bridge	200	50.0755	14.4378
2024-09-13T12:00:00	Budapest	Hungary	375	Chain Bridge	96	47.4979	19.0402
2024-09-13T12:00:00	Budapest	Hungary	333	Liberty Bridge	96	47.4979	19.0402
1990-09-13T12:00:00	Warsaw	Poland	NULL	NULL	NULL	NULL	NULL
Time taken: 0.169 seconds, Fetched 11 row(s)

Please remember that the above result contains additional rows created when the array bridges is flattened.

@YANG-DB
Copy link
Member Author

YANG-DB commented Oct 24, 2024

@YANG-DB Many thanks for your suggestions related to testing. Syntax extension to the flatten command might not be required. The current implementation of the command works correctly with

  • structs
  • arrays of structs
  • also, maps should be supported, although I did not test this.

The implementation in the current shape does not support arrays with simple types like [0,1,2,3]

Here are some examples of the flatten command.

The table coor below contains two interesting columns:

  • bridges - array<struct<length:bigint,name:string>>
  • color - struct<alt:bigint,lat:double,long:double>
+-------------------+--------------------+--------+--------------------+--------------+
|              _time|             bridges|    city|                coor|       country|
+-------------------+--------------------+--------+--------------------+--------------+
|2024-09-13T12:00:00|[{801, Tower Brid...|  London|{35, 51.5074, -0....|       England|
|2024-09-13T12:00:00|[{232, Pont Neuf}...|   Paris|{35, 48.8566, 2.3...|        France|
|2024-09-13T12:00:00|[{48, Rialto Brid...|  Venice|{2, 45.4408, 12.3...|         Italy|
|2024-09-13T12:00:00|[{516, Charles Br...|  Prague|{200, 50.0755, 14...|Czech Republic|
|2024-09-13T12:00:00|[{375, Chain Brid...|Budapest|{96, 47.4979, 19....|       Hungary|
|1990-09-13T12:00:00|                NULL|  Warsaw|                NULL|        Poland|
+-------------------+--------------------+--------+--------------------+--------------+

The columns bridges and coor can be flattened in the following way

source=coor | flatten bridges | flatten coor;

The result of the above command is

_time	city	country	length	name	alt	lat	long
2024-09-13T12:00:00	London	England	801	Tower Bridge	35	51.5074	-0.1278
2024-09-13T12:00:00	London	England	928	London Bridge	35	51.5074	-0.1278
2024-09-13T12:00:00	Paris	France	232	Pont Neuf	35	48.8566	2.3522
2024-09-13T12:00:00	Paris	France	160	Pont Alexandre III	35	48.8566	2.3522
2024-09-13T12:00:00	Venice	Italy	48	Rialto Bridge	2	45.4408	12.3155
2024-09-13T12:00:00	Venice	Italy	11	Bridge of Sighs	2	45.4408	12.3155
2024-09-13T12:00:00	Prague	Czech Republic	516	Charles Bridge	200	50.0755	14.4378
2024-09-13T12:00:00	Prague	Czech Republic	343	Legion Bridge	200	50.0755	14.4378
2024-09-13T12:00:00	Budapest	Hungary	375	Chain Bridge	96	47.4979	19.0402
2024-09-13T12:00:00	Budapest	Hungary	333	Liberty Bridge	96	47.4979	19.0402
1990-09-13T12:00:00	Warsaw	Poland	NULL	NULL	NULL	NULL	NULL
Time taken: 0.169 seconds, Fetched 11 row(s)

Please remember that the above result contains additional rows created when the array bridges is flattened.

@lukasz-soszynski-eliatra ok - lets complete this PR and figure out the simple type array support afterwards - maybe in a different syntax ?

@lukasz-soszynski-eliatra
Copy link
Contributor

Sounds good. This means that tests and documentation only still need to be included.

@lukasz-soszynski-eliatra
Copy link
Contributor

Supporting arrays of simple types should be trivial if a dedicated syntax is used. However, it might still be possible to support such arrays with the current syntax, but such an approach is probably a bit more challenging. Anyway, I will not include support for such arrays in the current PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
0.6 enhancement New feature or request Lang:PPL Pipe Processing Language support
Projects
Status: In Progress
Development

No branches or pull requests

4 participants