@@ -265,6 +265,10 @@ impl TableProvider for MemTable {
265265 input : Arc < dyn ExecutionPlan > ,
266266 insert_op : InsertOp ,
267267 ) -> Result < Arc < dyn ExecutionPlan > > {
268+ if self . batches . is_empty ( ) {
269+ return plan_err ! ( "Cannot insert into MemTable with zero partitions." ) ;
270+ }
271+
268272 // If we are inserting into the table, any sort order may be messed up so reset it here
269273 * self . sort_order . lock ( ) = vec ! [ ] ;
270274
@@ -333,7 +337,11 @@ impl DisplayAs for MemSink {
333337}
334338
335339impl MemSink {
340+ /// Creates a new [`MemSink`].
341+ ///
342+ /// The caller is responsible for ensuring that there is at least one partition to insert into.
336343 fn new ( batches : Vec < PartitionData > ) -> Self {
344+ assert ! ( !batches. is_empty( ) ) ;
337345 Self { batches }
338346 }
339347}
@@ -779,4 +787,27 @@ mod tests {
779787 assert_eq ! ( resulting_data_in_table[ 0 ] . len( ) , 2 ) ;
780788 Ok ( ( ) )
781789 }
790+
791+ // Test inserting a batch into a MemTable without any partitions
792+ #[ tokio:: test]
793+ async fn test_insert_into_zero_partition ( ) -> Result < ( ) > {
794+ // Create a new schema with one field called "a" of type Int32
795+ let schema = Arc :: new ( Schema :: new ( vec ! [ Field :: new( "a" , DataType :: Int32 , false ) ] ) ) ;
796+
797+ // Create a new batch of data to insert into the table
798+ let batch = RecordBatch :: try_new (
799+ schema. clone ( ) ,
800+ vec ! [ Arc :: new( Int32Array :: from( vec![ 1 , 2 , 3 ] ) ) ] ,
801+ ) ?;
802+ // Run the experiment and expect an error
803+ let experiment_result = experiment ( schema, vec ! [ ] , vec ! [ vec![ batch. clone( ) ] ] )
804+ . await
805+ . unwrap_err ( ) ;
806+ // Ensure that there is a descriptive error message
807+ assert_eq ! (
808+ "Error during planning: Cannot insert into MemTable with zero partitions." ,
809+ experiment_result. strip_backtrace( )
810+ ) ;
811+ Ok ( ( ) )
812+ }
782813}
0 commit comments