Skip to content

[fature] 添加数据源的demo,未实际进行数据源的读取 #11

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub mod avro;
pub mod csv;
pub mod json;
pub mod parquet;
pub mod rocksdb;

use std::any::Any;
use std::fmt;
Expand Down
54 changes: 54 additions & 0 deletions datafusion/src/datasource/file_format/rocksdb.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use std::any::Any;
use std::sync::Arc;

use arrow::datatypes::Schema;
use arrow::{self, datatypes::SchemaRef};
use async_trait::async_trait;
use futures::StreamExt;

use super::FileFormat;
use crate::datasource::object_store::{ObjectReader, ObjectReaderStream};
use crate::error::Result;
use crate::logical_plan::Expr;
use crate::physical_plan::file_format::{RocksdbExec, FileScanConfig};
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;
use std::collections::HashMap;
use crate::arrow::datatypes::{DataType, Field};

#[derive(Debug)]
pub struct RocksdbFormat {
schema: Schema,
}

impl RocksdbFormat {
pub fn new(schema: Schema) -> Self {
Self {
schema
}
}
}

#[async_trait]
impl FileFormat for RocksdbFormat {
fn as_any(&self) -> &dyn Any {
self
}

async fn infer_schema(&self, mut readers: ObjectReaderStream) -> Result<SchemaRef> {
Ok(Arc::new(self.schema.to_owned()))
}

async fn infer_stats(&self, _reader: Arc<dyn ObjectReader>) -> Result<Statistics> {
Ok(Statistics::default())
}

async fn create_physical_plan(
&self,
conf: FileScanConfig,
filters: &[Expr],
) -> Result<Arc<dyn ExecutionPlan>> {
let exec = RocksdbExec::new(conf);
Ok(Arc::new(exec))
}
}
75 changes: 75 additions & 0 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::{
avro::{AvroFormat, DEFAULT_AVRO_EXTENSION},
csv::{CsvFormat, DEFAULT_CSV_EXTENSION},
parquet::{ParquetFormat, DEFAULT_PARQUET_EXTENSION},
rocksdb::RocksdbFormat,
FileFormat,
},
MemTable,
Expand Down Expand Up @@ -101,6 +102,7 @@ use super::{
options::{AvroReadOptions, CsvReadOptions},
DiskManager, MemoryManager,
};
use crate::arrow::datatypes::{Field, DataType, Schema};

/// ExecutionContext is the main interface for executing queries with DataFusion. The context
/// provides the following functionality:
Expand Down Expand Up @@ -464,6 +466,41 @@ impl ExecutionContext {
Ok(())
}

/// Registers a rocksdb data source so that it can be referenced from SQL statements
/// executed against this context.
pub async fn register_rocksdb(
&mut self,
name: &str,
) -> Result<()> {
// 元数据 实时从元数据服务获取 TODO
let field = Field::new("boolvalue", DataType::Boolean, false);
let field1 = Field::new("intvalue", DataType::Boolean, false);
let schema = Schema::new_with_metadata(vec![field, field1],HashMap::new());

let file_format = RocksdbFormat::new(schema.clone());

// 查询选项
let listing_options = ListingOptions {
format: Arc::new(file_format),
collect_stat: false,
file_extension: String::new(),
target_partitions:self.state.lock().config.target_partitions,
table_partition_cols: vec![],
};

// 需要一个数据地址 表的目录
let mut uri = String::from(env!("CARGO_MANIFEST_DIR"));
uri.push_str("/src/execution/mod.rs");
self.register_listing_table(
name,
uri.as_str(),
listing_options,
Some(Arc::new(schema.to_owned())),
).await?;

Ok(())
}

/// Registers a CSV data source so that it can be referenced from SQL statements
/// executed against this context.
pub async fn register_csv(
Expand Down Expand Up @@ -1317,6 +1354,7 @@ mod tests {
use std::thread::{self, JoinHandle};
use std::{io::prelude::*, sync::Mutex};
use tempfile::TempDir;
use crate::arrow::util::display::array_value_to_string;

#[tokio::test]
async fn shared_memory_and_disk_manager() {
Expand Down Expand Up @@ -3360,4 +3398,41 @@ mod tests {
ctx.read_parquet("dummy").await.unwrap()
}
}

#[tokio::test]
async fn test_register_rocksdb() -> Result<()> {
let mut ctx = ExecutionContext::new();
let x = ctx.register_rocksdb("tab").await?;

let df = ctx.sql("select boolvalue from tab").await?;

let res = df.collect().await;
let vec = res.expect("出现错误");
let vec1 = collect_answer(&vec);
for x in vec1 {
println!("{}", x);
}

Ok(())
}

pub fn collect_answer(records: &[RecordBatch]) -> Vec<String> {
let mut result = Vec::new();

if !records.is_empty() {
for batch in records {
for row in 0..batch.num_rows() {
let mut cells = Vec::new();
for col in 0..batch.num_columns() {
let column = batch.column(col);
cells.push(array_value_to_string(&column, row).unwrap());
}

result.push(cells.join("|"));
}
}
}

result
}
}
2 changes: 2 additions & 0 deletions datafusion/src/physical_plan/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod csv;
mod file_stream;
mod json;
mod parquet;
mod rocksdb;

pub use self::parquet::ParquetExec;
use arrow::{
Expand All @@ -34,6 +35,7 @@ use arrow::{
pub use avro::AvroExec;
pub use csv::CsvExec;
pub use json::NdJsonExec;
pub use rocksdb::RocksdbExec;

use crate::error::DataFusionError;
use crate::{
Expand Down
206 changes: 206 additions & 0 deletions datafusion/src/physical_plan/file_format/rocksdb.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
use crate::physical_plan::{ExecutionPlan, Partitioning, Distribution, SendableRecordBatchStream, DisplayFormatType, Statistics};
use std::any::Any;
use std::sync::Arc;
use crate::execution::runtime_env::RuntimeEnv;
use crate::physical_plan::metrics::MetricsSet;
use std::fmt::{Formatter, Debug};
use futures::future::OkInto;
use async_trait::async_trait;
use crate::physical_plan::file_format::FileScanConfig;
use crate::error::DataFusionError;
use super::file_stream::{BatchIter, FileStream};
use std::io::Read;
use arrow::record_batch::RecordBatch;
use arrow::error::Result;
use arrow::datatypes::*;
use crate::arrow::array::{ArrayRef, BooleanBuilder, Array};
use crate::physical_plan::expressions::PhysicalSortExpr;

#[derive(Debug, Clone)]
pub struct RocksdbExec{
base_config: FileScanConfig,
projected_statistics: Statistics,
projected_schema: SchemaRef,
}

/// Execution plan for scanning one or more Parquet partitions

impl RocksdbExec{
/// Create a new Rocksdb reader execution plan provided file list and schema.
pub fn new(base_config: FileScanConfig) -> Self{
let (projected_schema, projected_statistics) = base_config.project();
Self{
base_config,
projected_statistics,
projected_schema,
}
}
}

#[async_trait]
impl ExecutionPlan for RocksdbExec {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
Arc::clone(&self.projected_schema)
}

fn output_partitioning(&self) -> Partitioning {
// 输出分区的个数
Partitioning::UnknownPartitioning(self.base_config.file_groups.len())
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
// 叶子节点没有child
vec![]
}

fn with_new_children(&self, children: Vec<Arc<dyn ExecutionPlan>>) -> crate::error::Result<Arc<dyn ExecutionPlan>> {
if children.is_empty() {
Ok(Arc::new(self.clone()))
} else {
Err(DataFusionError::Internal(format!(
"Children cannot be replaced in {:?}",
self
)))
}
}

async fn execute(&self, partition: usize, runtime: Arc<RuntimeEnv>) -> crate::error::Result<SendableRecordBatchStream> {

let batch_size = runtime.batch_size;
let file_schema = Arc::clone(&self.base_config.file_schema);
let projection = self.base_config.file_column_projection_indices();
let fun = move |file, remaining: &Option<usize>| {
Box::new(RocksdbReader::new(
Arc::clone(&file_schema),
batch_size,
projection.clone(),
)) as BatchIter
};

Ok(Box::pin(FileStream::new(
Arc::clone(&self.base_config.object_store),
self.base_config.file_groups[partition].clone(),
fun,
Arc::clone(&self.projected_schema),
self.base_config.limit,
self.base_config.table_partition_cols.clone(),
)))
}


fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
match t {
DisplayFormatType::Default => {
write!(
f,
"RocksdbExec: limit={:?}, files={}",
self.base_config.limit,
super::FileGroupsDisplay(&self.base_config.file_groups),
)
}
}
}

fn statistics(&self) -> Statistics {
self.projected_statistics.clone()
}
}

pub struct RocksdbReader {
/// Explicit schema for the rockdb
schema: SchemaRef,
/// Optional projection for which columns to load (zero-based column indices)
projection: Option<Vec<usize>>,
/// Number of records per batch
batch_size: usize,
last_id:Option<i64>,
}

impl RocksdbReader {
pub fn new(
schema: SchemaRef,
batch_size: usize,
projection: Option<Vec<usize>>,) -> Self{
Self{
schema,
projection,
batch_size,
last_id: Option::Some(std::i64::MIN),
}
}

/// Returns the schema of the reader, useful for getting the schema without reading
/// record batches
pub fn schema(&self) -> SchemaRef {
match &self.projection {
Some(projection) => {
let fields = self.schema.fields();
let projected_fields: Vec<Field> =
projection.iter().map(|i| fields[*i].clone()).collect();

Arc::new(Schema::new(projected_fields))
}
None => self.schema.clone(),
}
}
}

impl Iterator for RocksdbReader {
type Item = Result<RecordBatch>;

fn next(&mut self) -> Option<Self::Item> {

if self.last_id == Some(std::i64::MAX) {
return None;
}

// 返回数据
let fields = self.schema.fields();
let projection: Vec<usize> = match self.projection {
Some(ref v) => v.clone(),
None => fields.iter().enumerate().map(|(i, _)| i).collect(),
};

let projected_fields: Vec<Field> = projection
.iter()
.map(|i| fields[*i].clone())
.collect();
let metadata = Some(self.schema.metadata().clone());
let projected_schema = Arc::new(match metadata {
None => Schema::new(projected_fields),
Some(metadata) => Schema::new_with_metadata(projected_fields, metadata),
});


let mut builder = BooleanBuilder::new(10);
builder.append_value(true);
builder.append_value(true);
builder.append_value(true);
builder.append_value(true);
builder.append_value(true);
builder.append_value(false);
builder.append_value(false);
builder.append_value(false);
builder.append_value(false);
builder.append_value(false);

let arc:ArrayRef= Arc::new(builder.finish());

let arr = vec![arc];

// 现在demo中未实现真正读取rocksdb
let result = RecordBatch::try_new(projected_schema,arr);

// 更新last_id 标识上次读取的位置 分批次读取
self.last_id = Option::Some(std::i64::MAX);
Some(result)
}
}