Skip to content

Hadoop Database Connectors

Alex Bush edited this page Mar 29, 2019 · 14 revisions

The Hadoop Database Connectors are connection objects used by the ParquetDataCommitter object and are a Waimak abstraction over an Hadoop-based Metastore database connection (i.e. Hive or Impala). They are used to generate correct SQL schema DDLs and to manage the schema information in the Metastore.

Note: If you are creating tables on a filesystem other than the one specified in fs.defaultFS you will need to set the spark.waimak.fs.defaultFS parameter (listed in the Configuration Parameters page).

For all Hadoop Database Connectors, the spark.waimak.metastore.forceRecreateTables parameter (listed in the Configuration Parameters page) is available. If this flag is set to true all tables committed using Hadoop Database Connectors will be dropped and recreated; this flag is useful when the schema of tables may have changed (i.e. new columns are added).

Contents

Impala Connections

These connection objects are used to connect to an Impala service and are available in the waimak-impala package. The Impala SQL dialect is implemented by the ImpalaDBConnector trait and can be extended to provided a custom Impala connection implementation.

Impala connection over JDBC

The class ImpalaJDBCConnector can be used to create a connection to an Impala service using JDBC. It has the following definition:

case class ImpalaJDBCConnector(sparkFlowContext: SparkFlowContext,
                               jdbcString: String,
                               properties: java.util.Properties = new java.util.Properties(),
                               secureProperties: Map[String, String] = Map.empty) extends ImpalaDBConnector

The constructor takes a SparkFlowContext object (available on a SparkDataFlow as flow.flowContext) and a JDBC connection string to the Impala service.

The constructor also takes an optional Java Properties object as properties and an optional Map[String, String] object as secureProperties. The properties object is passed to the JDBC connection to provide extra connection parameters (i.e. username). The secureProperties is used to provide additional properties to the JDBC connection by taking them securely from a secure JCEKS file configured with hadoop.security.credential.provider.path. The first value in the Map is the key of the parameter in the JCEKS file and the second parameter is the key of the parameter you want in the JDBC connection.

For example, to connect to an Impala service at address test.host:21050 with database test_db, with SSL enabled and username and password stored as test.username and test.password stored in a JCEKS file you would construct an Impala JDBC connector as:

val prop = new Properties()
prop.setProperty("ssl", "true")

ImpalaJDBCConnector(flow.flowContext,
                    "jdbc:hive2://test.host:21050/test_db",
                    properties = prop,
                    secureProperties = Map("test.username" -> "user", "test.password" -> "password"))

Hive Connections

These connection objects are used to connect to a Hive service and are available in the waimak-hive package. The Hive SQL dialect is implemented by the HiveDBConnector trait and can be extended to provided a custom Hive connection implementation.

Hive connection via Spark SQL

The class HiveSparkSQLConnector can be used to connect to a Hive service using Spark SQL. Hive support must be enabled on the Spark Session to use this connector (see Hive Tables in the Spark documentation, and all metadata queries will be submitted via the sparkSession.sql(..) function.

The Hive Spark SQL connector has the following definition:

case class HiveSparkSQLConnector(sparkFlowContext: SparkFlowContext,
                                 database: String,
                                 createDatabaseIfNotExists: Boolean = false)

The constructor takes a SparkFlowContext object (available on a SparkDataFlow as flow.flowContext), a Hive database name, and an optional createDatabaseIfNotExists boolean that tells the connector to create the database if it does not already exist.