Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: table with fs connector cannot work with rate limit #19338

Merged
merged 2 commits into from
Nov 12, 2024

Conversation

tabVersion
Copy link
Contributor

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

resolve #19296

The pr fix two places:

  • plan for fs fetch exec does not access rate limit from the catalog (src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs)
  • FsFetch Exec is not assigned to the correct source_id when building fragments (src/meta/src/stream/stream_graph/fragment.rs)

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

@tabVersion
Copy link
Contributor Author

another question about the generated plan


for table with connector

dev=> explain  CREATE table diamonds (
    carat FLOAT,
    cut TEXT,
    color TEXT,
    depth FLOAT,
) WITH (
    connector = 'posix_fs',
    match_pattern = 'data*.csv',
    posix_fs.root = 'e2e_test/source_inline/fs/data',
    source_rate_limit = 0
) FORMAT PLAIN ENCODE CSV ( without_header = 'false', delimiter = ',');
                                                                    QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------
 StreamMaterialize { columns: [carat, cut, color, depth, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: Overwrite }
 └─StreamRowIdGen { row_id_index: 4 }
   └─StreamUnion { all: true }
     ├─StreamExchange [no_shuffle] { dist: SomeShard }
     │ └─StreamFsFetch { columns: [carat, cut, color, depth, _row_id] }
     │   └─StreamAppendOnlyDedup { dedup_cols: [filename] }
     │     └─StreamExchange { dist: HashShard(filename) }
     │       └─StreamSource { source: diamonds, columns: [filename, last_edit_time, file_size] }
     └─StreamExchange { dist: HashShard(_row_id) }
       └─StreamDml { columns: [carat, cut, color, depth, _row_id] }
         └─StreamSource
(11 rows)

and I get the following 3 fragments

Materialize(MaterializeNode { table_id: 6, ... }), mask: 2
StreamFsFetch(StreamFsFetchNode { node_inner: Some(StreamFsFetch { source_id: 7, ... ) }), mask: 0
Source(SourceNode { source_inner: Some(StreamSource { source_id: 7,  ... }), mask: 129

but for creating source + building mv on the source

dev=>  CREATE source diamonds1 (
    carat FLOAT,
    cut TEXT,
    color TEXT,
    depth FLOAT,
) WITH (
    connector = 'posix_fs',
    match_pattern = 'data*.csv',
    posix_fs.root = 'e2e_test/source_inline/fs/data',
    source_rate_limit = 0
) FORMAT PLAIN ENCODE CSV ( without_header = 'false', delimiter = ',');
CREATE_SOURCE
dev=> explain create materialized view mv as select * from diamonds1 ;
                                                                   QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------
 StreamMaterialize { columns: [carat, cut, color, depth, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck }
 └─StreamProject { exprs: [carat, cut, color, depth, _row_id] }
   └─StreamRowIdGen { row_id_index: 4 }
     └─StreamFsFetch { columns: [carat, cut, color, depth, _row_id] }
       └─StreamAppendOnlyDedup { dedup_cols: [filename] }
         └─StreamExchange { dist: HashShard(filename) }
           └─StreamSource { source: diamonds1, columns: [filename, last_edit_time, file_size] }
(7 rows)

I only have two fragments

Materialize(MaterializeNode { table_id: 12, ... }), mask: 2
Source(SourceNode { source_inner: Some(StreamSource { source_id: 11, ...  }), mask: 1

is it expected that the 2nd approach have no fragment for StreamFsFetch while the plan contains it? cc @st1page

@graphite-app graphite-app bot requested a review from a team November 11, 2024 16:40
@st1page
Copy link
Contributor

st1page commented Nov 12, 2024

another question about the generated plan

for table with connector

dev=> explain  CREATE table diamonds (
    carat FLOAT,
    cut TEXT,
    color TEXT,
    depth FLOAT,
) WITH (
    connector = 'posix_fs',
    match_pattern = 'data*.csv',
    posix_fs.root = 'e2e_test/source_inline/fs/data',
    source_rate_limit = 0
) FORMAT PLAIN ENCODE CSV ( without_header = 'false', delimiter = ',');
                                                                    QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------
 StreamMaterialize { columns: [carat, cut, color, depth, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: Overwrite }
 └─StreamRowIdGen { row_id_index: 4 }
   └─StreamUnion { all: true }
     ├─StreamExchange [no_shuffle] { dist: SomeShard }
     │ └─StreamFsFetch { columns: [carat, cut, color, depth, _row_id] }
     │   └─StreamAppendOnlyDedup { dedup_cols: [filename] }
     │     └─StreamExchange { dist: HashShard(filename) }
     │       └─StreamSource { source: diamonds, columns: [filename, last_edit_time, file_size] }
     └─StreamExchange { dist: HashShard(_row_id) }
       └─StreamDml { columns: [carat, cut, color, depth, _row_id] }
         └─StreamSource
(11 rows)

and I get the following 3 fragments

Materialize(MaterializeNode { table_id: 6, ... }), mask: 2
StreamFsFetch(StreamFsFetchNode { node_inner: Some(StreamFsFetch { source_id: 7, ... ) }), mask: 0
Source(SourceNode { source_inner: Some(StreamSource { source_id: 7,  ... }), mask: 129

but for creating source + building mv on the source

dev=>  CREATE source diamonds1 (
    carat FLOAT,
    cut TEXT,
    color TEXT,
    depth FLOAT,
) WITH (
    connector = 'posix_fs',
    match_pattern = 'data*.csv',
    posix_fs.root = 'e2e_test/source_inline/fs/data',
    source_rate_limit = 0
) FORMAT PLAIN ENCODE CSV ( without_header = 'false', delimiter = ',');
CREATE_SOURCE
dev=> explain create materialized view mv as select * from diamonds1 ;
                                                                   QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------
 StreamMaterialize { columns: [carat, cut, color, depth, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck }
 └─StreamProject { exprs: [carat, cut, color, depth, _row_id] }
   └─StreamRowIdGen { row_id_index: 4 }
     └─StreamFsFetch { columns: [carat, cut, color, depth, _row_id] }
       └─StreamAppendOnlyDedup { dedup_cols: [filename] }
         └─StreamExchange { dist: HashShard(filename) }
           └─StreamSource { source: diamonds1, columns: [filename, last_edit_time, file_size] }
(7 rows)

I only have two fragments

Materialize(MaterializeNode { table_id: 12, ... }), mask: 2
Source(SourceNode { source_inner: Some(StreamSource { source_id: 11, ...  }), mask: 1

is it expected that the 2nd approach have no fragment for StreamFsFetch while the plan contains it? cc @st1page

That's expected. The first plan's fragments is for multiple data sources like sink into table and DML statments.

@tabVersion
Copy link
Contributor Author

found a new issue #19346
merge this one first

@tabVersion tabVersion added this pull request to the merge queue Nov 12, 2024
Merged via the queue into main with commit 4dea583 Nov 12, 2024
31 of 32 checks passed
@tabVersion tabVersion deleted the tab/fix-fs-throttle branch November 12, 2024 07:01
tabVersion added a commit that referenced this pull request Nov 12, 2024
Co-authored-by: tabversion <tabversion@bupt.icu>
github-merge-queue bot pushed a commit that referenced this pull request Nov 12, 2024
Comment on lines +1335 to +1337
if is_fs_source && *fragment_type_mask == PbFragmentTypeFlag::FragmentUnspecified as i32
{
// when create table with fs connector, the fragment type is unspecified
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems still incomplete to me: I guess MV on fs source still won't work?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

source_rate_limit doesn't work for file source
4 participants