Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When use the query_stream method, the error "bytes remaining on stream" occurs. #265

Open
artorias1024 opened this issue Nov 15, 2023 · 1 comment

Comments

@artorias1024
Copy link

artorias1024 commented Nov 15, 2023

image

This is the logic of my code.

    async fn create_data_events(&self, captured_tables: HashSet<TableId>) -> core::result::Result<(), MySqlConnectorError> {
        let pool = &self.context.pool;
        let sender = &self.sender;

        let max_concurrent_tasks = 2;
        let semaphore = Arc::new(Semaphore::new(max_concurrent_tasks));

        let mut handlers = Vec::new();
        for table_id in captured_tables {
            let table = self.context.schema.table_for(&table_id).unwrap().clone();
            let connection = pool.get_conn().await?;
            let sender_clone = sender.clone();
            // Self::export_data(connection, table_id, table, sender.clone()).await?;
            let semaphore_clone = semaphore.clone();
            let join_handler = tokio::spawn(async move {
                let _permit = semaphore_clone.acquire().await.unwrap();
                Self::export_data(connection, table_id, table, sender_clone).await
            });
            handlers.push(join_handler);
        }

        for handle in handlers {
            if let Ok(snapshot_result) = handle.await {}
        }

        Ok(())
    }
async fn export_data(
        mut connection: Conn,
        table_id: TableId,
        table: Arc<Table>,
        sender: Sender<FlatMessage>,
    ) -> core::result::Result<(), MySqlConnectorError> {
        {
            info!("Exporting data from table '{}'", table_id);
            let export_start_time = Instant::now();
            let mut stream: ResultSetStream<Row, TextProtocol> = connection.query_stream(format!("select * from {}", table_id)).await?;
            let mut rows_count = 0;
            while let Some(next) = stream.next().await {
                match next {
                    Ok(row) => {
                        let message = table.generate_snapshot_record(row);
                        match sender.send(message).await {
                            Ok(_) => {}
                            Err(err) => error!("{}", err),
                        }
                    }
                    Err(err) => {
                        error!("Snapshotting of table {} failed : {:#?}", table_id, err);
                    }
                }

                rows_count += 1;
            }
            drop(stream);
            info!(
                "Finished exporting {} records for table'{}';total duration {:#?}",
                rows_count,
                table_id,
                export_start_time.elapsed()
            );
        }

        let _ = connection.disconnect().await?;
        Ok(())
    }
@artorias1024
Copy link
Author

It's very strange that when I set the Semaphore to 1, which means that the connection is not used concurrently, no error occurs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant