Skip to content

Commit

Permalink
insertinto for spark2
Browse files Browse the repository at this point in the history
  • Loading branch information
QiangCai authored and jackylk committed Dec 3, 2016
1 parent d5f4098 commit c1882f2
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 54 deletions.
3 changes: 1 addition & 2 deletions examples/spark2/src/main/resources/data.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
shortField,intField,bigintField,doubleField,stringField,timestampField
1,10,100,48.4,spark,2015/4/23
5,17,140,43.4,spark,2015/7/27
1,11,100,44.4,flink,2015/5/23
Expand All @@ -8,4 +7,4 @@ shortField,intField,bigintField,doubleField,stringField,timestampField
2,10,100,43.4,impala,2015/7/23
1,10,100,43.4,spark,2015/5/23
4,16,130,42.4,impala,2015/7/23
1,10,100,43.4,spark,2015/7/23
1,10,100,43.4,spark,2015/7/23
Original file line number Diff line number Diff line change
Expand Up @@ -17,79 +17,97 @@

package org.apache.spark.sql.examples

import java.io.File

import org.apache.commons.io.FileUtils
import org.apache.spark.sql.SparkSession
import org.apache.spark.util.TableLoader

import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties

object CarbonExample {

def main(args: Array[String]): Unit = {
// to run the example, plz change this path to your local machine path
val rootPath = "/home/david/Documents/incubator-carbondata"
val rootPath = new File(this.getClass.getResource("/").getPath
+ "../../../..").getCanonicalPath
val storeLocation = s"$rootPath/examples/spark2/target/store"
val warehouse = s"$rootPath/examples/spark2/target/warehouse"
val metastoredb = s"$rootPath/examples/spark2/target/metastore_db"

// clean data folder
if (true) {
val clean = (path: String) => FileUtils.deleteDirectory(new File(path))
clean(storeLocation)
clean(warehouse)
clean(metastoredb)
}

val spark = SparkSession
.builder()
.master("local")
.appName("CarbonExample")
.enableHiveSupport()
.config(CarbonCommonConstants.STORE_LOCATION,
s"$rootPath/examples/spark2/target/store")
.config("carbon.kettle.home",
s"$rootPath/processing/carbonplugins")
.config("carbon.storelocation", storeLocation)
.config("spark.sql.warehouse.dir", warehouse)
.config("javax.jdo.option.ConnectionURL",
s"jdbc:derby:;databaseName=$metastoredb;create=true")
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")

// Drop table
// spark.sql("DROP TABLE IF EXISTS carbon_table")
// spark.sql("DROP TABLE IF EXISTS csv_table")
//
// // Create table
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")

// Create table
spark.sql(
s"""
| CREATE TABLE carbon_table(
| shortField short,
| intField int,
| bigintField long,
| doubleField double,
| stringField string
| stringField string,
| timestampField timestamp
| )
| USING org.apache.spark.sql.CarbonSource
""".stripMargin)

val prop = s"$rootPath/conf/dataload.properties.template"
val tableName = "carbon_table"
// val prop = s"$rootPath/conf/dataload.properties.template"
// val tableName = "carbon_table"
val path = s"$rootPath/examples/spark2/src/main/resources/data.csv"
TableLoader.main(Array[String](prop, tableName, path))

// spark.sql(
// s"""
// | CREATE TABLE csv_table
// | (ID int,
// | date timestamp,
// | country string,
// | name string,
// | phonetype string,
// | serialname string,
// | salary int)
// """.stripMargin)
//
// spark.sql(
// s"""
// | LOAD DATA LOCAL INPATH '$csvPath'
// | INTO TABLE csv_table
// """.stripMargin)

// spark.sql(
// s"""
// | INSERT INTO TABLE carbon_table
// | SELECT * FROM csv_table
// """.stripMargin)

// Perform a query
// spark.sql("""
// SELECT country, count(salary) AS amount
// FROM carbon_table
// WHERE country IN ('china','france')
// GROUP BY country
// """).show()
// TableLoader.main(Array[String](prop, tableName, path))

spark.sql(
s"""
| CREATE TABLE csv_table
| ( shortField short,
| intField int,
| bigintField long,
| doubleField double,
| stringField string,
| timestampField string)
| ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
""".stripMargin)

spark.sql(
s"""
| LOAD DATA LOCAL INPATH '$path'
| INTO TABLE csv_table
""".stripMargin)

spark.sql("""
SELECT *
FROM csv_table
""").show

spark.sql(
s"""
| INSERT INTO TABLE carbon_table
| SELECT shortField, intField, bigintField, doubleField, stringField,
| from_unixtime(unix_timestamp(timestampField,'yyyy/M/dd')) timestampField
| FROM csv_table
""".stripMargin)

spark.sql("""
SELECT *
Expand All @@ -115,7 +133,7 @@ object CarbonExample {
""".stripMargin).show

// Drop table
// spark.sql("DROP TABLE IF EXISTS carbon_table")
// spark.sql("DROP TABLE IF EXISTS csv_table")
spark.sql("DROP TABLE IF EXISTS carbon_table")
spark.sql("DROP TABLE IF EXISTS csv_table")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.execution.command.{ExecutedCommandExec, LoadTableByInsert}
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
import org.apache.spark.sql.sources.{BaseRelation, Filter, InsertableRelation, PrunedFilteredScan}
import org.apache.spark.sql.types.StructType

import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonProjection}
import org.apache.carbondata.hadoop.util.SchemaReader
import org.apache.carbondata.scan.expression.Expression
Expand All @@ -41,7 +43,7 @@ case class CarbonDatasourceHadoopRelation(
paths: Array[String],
parameters: Map[String, String],
tableSchema: Option[StructType])
extends BaseRelation {
extends BaseRelation with InsertableRelation {

lazy val absIdentifier = AbsoluteTableIdentifier.fromTablePath(paths.head)
lazy val carbonTable = SchemaReader.readCarbonTableFromStore(absIdentifier)
Expand Down Expand Up @@ -75,4 +77,16 @@ case class CarbonDatasourceHadoopRelation(
absIdentifier, carbonTable)
}
override def unhandledFilters(filters: Array[Filter]): Array[Filter] = new Array[Filter](0)

override def insert(data: DataFrame, overwrite: Boolean): Unit = {
if (carbonRelation.output.size > CarbonCommonConstants.DEFAULT_MAX_NUMBER_OF_COLUMNS) {
sys.error("Maximum supported column by carbon is:" +
CarbonCommonConstants.DEFAULT_MAX_NUMBER_OF_COLUMNS)
}
if(data.logicalPlan.output.size >= carbonRelation.output.size) {
LoadTableByInsert(this, data.logicalPlan).run(sparkSession)
} else {
sys.error("Cannot insert into target table because column number are different")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.language.implicitConversions
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Cast, Literal}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.types.TimestampType
Expand All @@ -47,7 +48,8 @@ import org.apache.carbondata.processing.etl.DataLoadingException
import org.apache.carbondata.processing.model.CarbonLoadModel
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
import org.apache.carbondata.spark.util.{CarbonScalaUtil, GlobalDictionaryUtil}
import org.apache.carbondata.spark.util.{CarbonScalaUtil, CarbonSparkUtil, GlobalDictionaryUtil}


/**
* Command for the compaction in alter table command
Expand Down Expand Up @@ -290,6 +292,28 @@ case class DeleteLoadsByLoadDate(

}

case class LoadTableByInsert(relation: CarbonDatasourceHadoopRelation,
child: LogicalPlan) extends RunnableCommand {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
def run(sparkSession: SparkSession): Seq[Row] = {
val df = Dataset.ofRows(sparkSession, child)
val header = relation.tableSchema.get.fields.map(_.name).mkString(",")
val load = LoadTable(
Some(relation.carbonRelation.databaseName),
relation.carbonRelation.tableName,
null,
Seq(),
scala.collection.immutable.Map(("fileheader" -> header)),
false,
null,
Some(df)).run(sparkSession)
// updating relation metadata. This is in case of auto detect high cardinality
relation.carbonRelation.metaData =
CarbonSparkUtil.createSparkMeta(relation.carbonRelation.tableMeta.carbonTable)
load
}
}

case class LoadTable(
databaseNameOp: Option[String],
tableName: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ object CarbonMetastoreTypes extends RegexParsers {
case class CarbonRelation(
databaseName: String,
tableName: String,
metaData: CarbonMetaData,
var metaData: CarbonMetaData,
tableMeta: TableMeta,
alias: Option[String])
extends LeafNode with MultiInstanceRelation {
Expand Down

0 comments on commit c1882f2

Please sign in to comment.