Skip to content
This repository was archived by the owner on Oct 21, 2024. It is now read-only.

Commit 1337b98

Browse files
committed
parse recordbatch
1 parent 459bef3 commit 1337b98

File tree

2 files changed

+37
-7
lines changed

2 files changed

+37
-7
lines changed

rust/arrow/src/ipc/reader.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -669,6 +669,16 @@ pub fn schema_from_bytes(bytes: &[u8]) -> Option<Schema> {
669669
ipc.header_as_schema().map(|schema| fb_to_schema(schema))
670670
}
671671

672+
pub fn recordbatch_from_bytes(
673+
bytes: &[u8],
674+
schema: Arc<Schema>,
675+
) -> Result<Option<RecordBatch>> {
676+
let ipc = ipc::get_root_as_message(&bytes[..]);
677+
match ipc.header_as_record_batch() {
678+
Some(batch) => read_record_batch(&bytes[..].to_vec(), batch, schema),
679+
None => Ok(None),
680+
}
681+
}
672682
#[cfg(test)]
673683
mod tests {
674684
use super::*;

rust/datafusion/examples/flight-client.rs

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use arrow::datatypes::Schema;
1819
use arrow::ipc::reader;
1920
use flight::flight_service_client::FlightServiceClient;
2021
use flight::Ticket;
22+
use std::sync::Arc;
2123

2224
#[tokio::main]
2325
async fn main() -> Result<(), Box<dyn std::error::Error>> {
@@ -28,13 +30,31 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
2830
});
2931

3032
let mut stream = client.do_get(request).await?.into_inner();
31-
32-
while let Some(batch) = stream.message().await? {
33-
println!("BATCH = {:?}", batch);
34-
35-
let schema = reader::schema_from_bytes(&batch.data_header);
36-
37-
println!("SCHEMA = {:?}", schema);
33+
let mut batch_schema: Option<Arc<Schema>> = None;
34+
35+
while let Some(flight_data) = stream.message().await? {
36+
println!("FlightData = {:?}", flight_data);
37+
38+
if let Some(schema) = reader::schema_from_bytes(&flight_data.data_header) {
39+
println!("Schema = {:?}", schema);
40+
batch_schema = Some(Arc::new(schema.clone()));
41+
}
42+
43+
match batch_schema {
44+
Some(ref schema) => {
45+
if let Some(record_batch) = reader::recordbatch_from_bytes(
46+
&flight_data.data_header,
47+
schema.clone(),
48+
)? {
49+
println!(
50+
"record_batch has {} columns and {} rows",
51+
record_batch.num_columns(),
52+
record_batch.num_rows()
53+
);
54+
}
55+
}
56+
None => {}
57+
}
3858
}
3959

4060
Ok(())

0 commit comments

Comments
 (0)