Skip to content

Commit b584344

Browse files
[#54] Create HBase2 connector (#77)
Avoid to duplicate HBase code as much as possible, this is obtained creating a folder hbase with all the common code and two separate sbt modules hbase1 and hbase2. Both modules include the code and resources placed under the hbase[1|2] folder + the code placed inside hbase folder. Older connector is called the same, while the new is called "darwin-hbase2-connector". This made necessary changes to make.sh and publish.sh because sbt cannot run two HBase mini cluster in the same JVM without throwing weird errors.
2 parents 9a82a66 + 9c38035 commit b584344

File tree

11 files changed

+129
-49
lines changed

11 files changed

+129
-49
lines changed

build.sbt

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
1+
import sbt.Keys.baseDirectory
2+
13
/*
24
* Main build definition.
35
*
46
* See project/Settings.scala for the settings definitions.
57
* See project/Dependencies.scala for the dependencies definitions.
68
* See project/Versions.scala for the versions definitions.
79
*/
10+
811
lazy val Settings.pgpPass = Option(System.getenv().get("PGP_PASS")).map(_.toArray)
912
lazy val root = Project("darwin", file("."))
1013
.settings(Settings.commonSettings: _*)
@@ -29,15 +32,30 @@ lazy val coreCommon = Project("darwin-core-common", file("common"))
2932
.settings(crossScalaVersions := Versions.crossScalaVersions)
3033
.enablePlugins(JavaAppPackaging)
3134

32-
lazy val hbaseConnector = Project("darwin-hbase-connector", file("hbase"))
35+
lazy val hbaseConnector = Project("darwin-hbase-connector", file("hbase1"))
3336
.settings(Settings.commonSettings: _*)
3437
.dependsOn(coreCommon)
3538
.settings(pgpPassphrase := Settings.pgpPass)
3639
.settings(libraryDependencies ++= Dependencies.hbase_conn_dep)
3740
.settings(crossScalaVersions := Versions.crossScalaVersions)
41+
.settings(Compile / unmanagedSourceDirectories += baseDirectory.value / ".." / "hbase" / "src" / "main" / "scala")
42+
.settings(Test / unmanagedSourceDirectories += baseDirectory.value / ".." / "hbase" / "src" / "test" / "scala")
43+
.settings(Test / unmanagedResourceDirectories += baseDirectory.value / ".." / "hbase" / "src" / "test" / "resources")
3844
.settings(Settings.hbaseTestSettings)
3945
.enablePlugins(JavaAppPackaging)
4046

47+
lazy val hbaseConnector2 = Project("darwin-hbase2-connector", file("hbase2"))
48+
.settings(Settings.commonSettings: _*)
49+
.dependsOn(coreCommon)
50+
.settings(pgpPassphrase := Settings.pgpPass)
51+
.settings(libraryDependencies ++= Dependencies.hbase2_conn_dep)
52+
.settings(crossScalaVersions := Versions.crossScalaVersions)
53+
.settings(Compile / unmanagedSourceDirectories += baseDirectory.value / ".." / "hbase" / "src" / "main" / "scala")
54+
.settings(Test / unmanagedSourceDirectories += baseDirectory.value / ".." / "hbase" / "src" / "test" / "scala")
55+
.settings(Test / unmanagedResourceDirectories += baseDirectory.value / ".." / "hbase" / "src" / "test" / "resources")
56+
.settings(Settings.hbase2TestSettings)
57+
.enablePlugins(JavaAppPackaging)
58+
4159
lazy val postgresConnector = Project("darwin-postgres-connector", file("postgres"))
4260
.settings(Settings.commonSettings: _*)
4361
.dependsOn(coreCommon)

hbase/src/main/scala/it/agilelab/darwin/connector/hbase/HBaseConnector.scala

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
package it.agilelab.darwin.connector.hbase
22

33
import com.typesafe.config.Config
4-
import it.agilelab.darwin.common.{using, Connector, Logging}
4+
import it.agilelab.darwin.common.compat._
5+
import it.agilelab.darwin.common.{Connector, Logging, using}
56
import org.apache.avro.Schema
67
import org.apache.avro.Schema.Parser
78
import org.apache.commons.io.IOUtils
89
import org.apache.hadoop.conf.Configuration
910
import org.apache.hadoop.fs.Path
1011
import org.apache.hadoop.hbase._
11-
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Get, Put, Result}
12+
import org.apache.hadoop.hbase.client._
1213
import org.apache.hadoop.hbase.security.User
1314
import org.apache.hadoop.hbase.util.Bytes
1415
import org.apache.hadoop.security.UserGroupInformation
15-
import it.agilelab.darwin.common.compat._
1616

1717
object HBaseConnector extends Logging {
1818

@@ -49,7 +49,8 @@ case class HBaseConnector(config: Config) extends Connector with Logging {
4949

5050
lazy val TABLE_NAME: TableName = TableName.valueOf(Bytes.toBytes(NAMESPACE_STRING), Bytes.toBytes(TABLE_NAME_STRING))
5151

52-
val CF: Array[Byte] = Bytes.toBytes("0")
52+
val CF_STRING = "0"
53+
val CF: Array[Byte] = Bytes.toBytes(CF_STRING)
5354
val QUALIFIER_SCHEMA: Array[Byte] = Bytes.toBytes("schema")
5455
val QUALIFIER_NAME: Array[Byte] = Bytes.toBytes("name")
5556
val QUALIFIER_NAMESPACE: Array[Byte] = Bytes.toBytes("namespace")
@@ -141,11 +142,13 @@ case class HBaseConnector(config: Config) extends Connector with Logging {
141142
}
142143
if (!tableExists()) {
143144
log.info(s"Table $TABLE_NAME does not exists, creating it")
144-
admin.createTable(new HTableDescriptor(TABLE_NAME).addFamily(new HColumnDescriptor(CF)))
145+
HBaseUtils.createTable(admin, TABLE_NAME, CF)
145146
}
146147
}
147148
}
148149

150+
151+
149152
override def tableExists(): Boolean = {
150153
using(connection.getAdmin) { admin =>
151154
admin.tableExists(TABLE_NAME)
@@ -155,7 +158,7 @@ case class HBaseConnector(config: Config) extends Connector with Logging {
155158
override def tableCreationHint(): String = {
156159
s"""To create namespace and table from an HBase shell issue:
157160
| create_namespace '$NAMESPACE_STRING'
158-
| create '$NAMESPACE_STRING:$TABLE_NAME_STRING', '0'""".stripMargin
161+
| create '$NAMESPACE_STRING:$TABLE_NAME_STRING', '$CF_STRING'""".stripMargin
159162
}
160163

161164
override def findSchema(id: Long): Option[Schema] = {
Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,21 @@
11
package it.agilelab.darwin.connector.hbase
22

33
import java.nio.file.Files
4+
import java.util.UUID
45

56
import com.typesafe.config.{ConfigFactory, ConfigValueFactory}
67
import it.agilelab.darwin.common.Connector
78
import org.apache.avro.reflect.ReflectData
89
import org.apache.avro.{Schema, SchemaNormalization}
9-
import org.apache.hadoop.hbase.HBaseTestingUtility
10+
import org.apache.hadoop.hbase.{HBaseConfiguration, HBaseTestingUtility, MiniHBaseCluster}
1011
import org.scalatest.BeforeAndAfterAll
1112
import org.scalatest.flatspec.AnyFlatSpec
1213
import org.scalatest.matchers.should.Matchers
1314

1415
class HBaseConnectorSuite extends AnyFlatSpec with Matchers with BeforeAndAfterAll {
1516

16-
var connector: Connector = _
17+
private var connector: Connector = _
18+
private var minicluster: MiniHBaseCluster = _
1719

1820
"HBaseConnector" should "load all existing schemas" in {
1921
connector.fullLoad()
@@ -45,34 +47,32 @@ class HBaseConnectorSuite extends AnyFlatSpec with Matchers with BeforeAndAfterA
4547
}
4648

4749
override def beforeAll(): Unit = {
48-
49-
connector = new HBaseConnectorCreator().create(HBaseConnectorSuite.config)
50-
51-
connector.createTable()
52-
}
53-
54-
55-
}
56-
57-
object HBaseConnectorSuite {
58-
private lazy val config = {
59-
val util = new HBaseTestingUtility()
60-
val minicluster = util.startMiniCluster()
61-
62-
//Hbase connector can only load configurations from a file path so we need to render the hadoop conf
63-
val confFile = Files.createTempFile("prefix", "suffix")
50+
val testUUID = UUID.randomUUID().toString
51+
val hConf = HBaseConfiguration.create()
52+
hConf.set("test.build.data.basedirectory", s"./target/hbase-test-data-$testUUID")
53+
val util = new HBaseTestingUtility(hConf)
54+
minicluster = util.startMiniCluster(1, true)
55+
val confFile = Files.createTempFile(testUUID, ".xml")
56+
// Hbase connector can only load configurations from a file path so we need to render the hadoop conf
6457
val stream = Files.newOutputStream(confFile)
58+
// mc.getConfiguration.writeXml(System.out)
6559
minicluster.getConfiguration.writeXml(stream)
6660
stream.flush()
6761
stream.close()
68-
val hbaseConfigPath = ConfigValueFactory.fromAnyRef(confFile.toAbsolutePath.toString)
69-
70-
//HbaseConnector will only load conf if hbase-site and core-site are given,
71-
//we give the same file to each.
62+
// HbaseConnector will only load conf if hbase-site and core-site are given,
63+
// we give the same file to each.
7264
sys.addShutdownHook(minicluster.shutdown())
73-
ConfigFactory.load()
74-
.withValue(ConfigurationKeys.HBASE_SITE, hbaseConfigPath)
75-
.withValue(ConfigurationKeys.CORE_SITE, hbaseConfigPath)
65+
val config = ConfigFactory.load()
66+
.withValue(ConfigurationKeys.HBASE_SITE, ConfigValueFactory.fromAnyRef(confFile.toAbsolutePath.toString))
67+
.withValue(ConfigurationKeys.CORE_SITE, ConfigValueFactory.fromAnyRef(confFile.toAbsolutePath.toString))
68+
connector = new HBaseConnectorCreator().create(config)
69+
connector.createTable()
7670
}
7771

72+
override def afterAll(): Unit = {
73+
minicluster.shutdown()
74+
minicluster.waitUntilShutDown()
75+
}
76+
77+
7878
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package it.agilelab.darwin.connector.hbase
2+
3+
import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, TableName}
4+
import org.apache.hadoop.hbase.client.Admin
5+
6+
object HBaseUtils {
7+
def createTable(admin: Admin, tableName: TableName, columnFamily: Array[Byte]): Unit = {
8+
admin.createTable(new HTableDescriptor(tableName).addFamily(new HColumnDescriptor(columnFamily)))
9+
}
10+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
it.agilelab.darwin.connector.hbase.HBaseConnectorCreator
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package it.agilelab.darwin.connector.hbase
2+
3+
import org.apache.hadoop.hbase.TableName
4+
import org.apache.hadoop.hbase.client.{Admin, ColumnFamilyDescriptorBuilder, TableDescriptorBuilder}
5+
6+
object HBaseUtils {
7+
def createTable(admin: Admin, tableName: TableName, columnFamily: Array[Byte]): Unit = {
8+
admin.createTable(
9+
TableDescriptorBuilder.newBuilder(tableName)
10+
.setColumnFamily(
11+
ColumnFamilyDescriptorBuilder.newBuilder(columnFamily).build()
12+
).build())
13+
}
14+
}

make.sh

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
11
#!/bin/bash
2-
sbt clean scalastyle +test +doc
2+
set -ex
3+
sbt clean scalastyle +test +doc
4+
sbt darwin-hbase2-connector/clean darwin-hbase2-connector/scalastyle +darwin-hbase2-connector/test +darwin-hbase2-connector/doc

project/Dependencies.scala

Lines changed: 36 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ import Keys._
33

44

55
/**
6-
* @author andreaL
7-
*/
6+
* @author andreaL
7+
*/
88
object Dependencies {
99

1010
lazy val scalatest = "org.scalatest" %% "scalatest" % "3.1.1" % "test"
@@ -13,7 +13,9 @@ object Dependencies {
1313
lazy val avro4s = "com.sksamuel.avro4s" %% "avro4s-core" % "1.8.3"
1414
lazy val hbase_server = "org.apache.hbase" % "hbase-server" % "1.2.2" % "provided"
1515
lazy val hbase_common = "org.apache.hbase" % "hbase-common" % "1.2.2" % "provided"
16-
lazy val hadoop_common = "org.apache.hadoop" % "hadoop-common" % "2.6.0" % "provided"
16+
lazy val hadoop_common = "org.apache.hadoop" % "hadoop-common" % "2.7.7" % "provided"
17+
lazy val hbase2_server = "org.apache.hbase" % "hbase-server" % "2.1.10" % "provided"
18+
lazy val hbase2_common = "org.apache.hbase" % "hbase-common" % "2.1.10" % "provided"
1719
lazy val reflections = "org.reflections" % "reflections" % "0.9.11" % Test
1820
lazy val spark_core = "org.apache.spark" %% "spark-core" % "2.4.5" % "provided"
1921
lazy val spark_sql = "org.apache.spark" %% "spark-sql" % "2.4.5" % "provided"
@@ -40,30 +42,48 @@ object Dependencies {
4042
//these jars are not resolved with default ivy behavior, also we need to enable in settings
4143
//the resolution of transitive dependencies for jars in test scope
4244
lazy val hbaseTestDependencies = Seq(
43-
("org.apache.hbase" % "hbase-testing-util" % "1.2.2").classifier("tests") % "test",
44-
("org.apache.hadoop" % "hadoop-common" % "2.6.0").classifier("tests") % "test" ,
45-
("org.apache.hbase" % "hbase-server" % "1.2.2").classifier("tests") % "test",
46-
("org.apache.hbase" % "hbase" % "1.2.2") % "test",
47-
("org.apache.hbase" % "hbase-hadoop-compat" % "1.2.2") % "test",
48-
("org.apache.hbase" % "hbase-hadoop-compat" % "1.2.2").classifier("tests") % "test",
49-
("org.apache.hbase" % "hbase-hadoop2-compat" % "1.2.2") % "test",
50-
("org.apache.hbase" % "hbase-hadoop2-compat" % "1.2.2").classifier("tests") % "test",
51-
("org.apache.hbase" % "hbase-common" % "1.2.2").classifier("tests") % "test",
52-
("org.apache.hbase" % "hbase" % "1.2.2").classifier("tests") % "test" exclude("org.apache.hbase", "hbase"),
53-
("org.apache.hadoop" % "hadoop-hdfs" % "2.6.0").classifier("tests") % "test",
54-
"org.apache.hadoop" % "hadoop-hdfs" % "2.6.0"% "test")
45+
("org.apache.hbase" % "hbase-testing-util" % "1.2.2").classifier("tests") % Test,
46+
("org.apache.hadoop" % "hadoop-common" % "2.7.7").classifier("tests") % Test,
47+
("org.apache.hbase" % "hbase-server" % "1.2.2").classifier("tests") % Test,
48+
("org.apache.hbase" % "hbase" % "1.2.2") % Test,
49+
("org.apache.hbase" % "hbase-hadoop-compat" % "1.2.2") % Test,
50+
("org.apache.hbase" % "hbase-hadoop-compat" % "1.2.2").classifier("tests") % Test,
51+
("org.apache.hbase" % "hbase-hadoop2-compat" % "1.2.2") % Test,
52+
("org.apache.hbase" % "hbase-hadoop2-compat" % "1.2.2").classifier("tests") % Test,
53+
("org.apache.hbase" % "hbase-common" % "1.2.2").classifier("tests") % Test,
54+
("org.apache.hbase" % "hbase" % "1.2.2").classifier("tests") % Test exclude("org.apache.hbase", "hbase"),
55+
("org.apache.hadoop" % "hadoop-hdfs" % "2.7.7").classifier("tests") % Test,
56+
("org.apache.hadoop" % "hadoop-hdfs" % "2.7.7") % Test)
57+
58+
lazy val hbase2TestDependencies = Seq(
59+
("org.apache.hbase" % "hbase-testing-util" % "2.1.10").classifier("tests") % Test,
60+
("org.apache.hadoop" % "hadoop-common" % "2.7.7").classifier("tests") % Test,
61+
("org.apache.hbase" % "hbase-server" % "2.1.10").classifier("tests") % Test,
62+
("org.apache.hbase" % "hbase" % "2.1.10") % Test,
63+
("org.apache.hbase" % "hbase-hadoop-compat" % "2.1.10") % Test,
64+
("org.apache.hbase" % "hbase-hadoop-compat" % "2.1.10").classifier("tests") % Test,
65+
("org.apache.hbase" % "hbase-hadoop2-compat" % "2.1.10") % Test,
66+
("org.apache.hbase" % "hbase-hadoop2-compat" % "2.1.10").classifier("tests") % Test,
67+
("org.apache.hbase" % "hbase-metrics" % "2.1.10") % Test,
68+
("org.apache.hbase" % "hbase-metrics-api" % "2.1.10") % Test,
69+
("org.apache.hbase" % "hbase-http" % "2.1.10") % Test,
70+
("org.apache.hbase" % "hbase-common" % "2.1.10").classifier("tests") % Test,
71+
("org.apache.hbase" % "hbase" % "2.1.10").classifier("tests") % Test exclude("org.apache.hbase", "hbase"),
72+
("org.apache.hadoop" % "hadoop-hdfs" % "2.7.7").classifier("tests") % Test,
73+
("org.apache.hadoop" % "hadoop-hdfs" % "2.7.7") % Test)
5574

5675
lazy val httpClient = "org.scalaj" %% "scalaj-http" % "2.4.2"
5776

5877
lazy val wireMock = Seq("com.github.tomakehurst" % "wiremock-jre8" % "2.21.0" % Test,
59-
"xmlunit" % "xmlunit" % "1.6" % Test)
78+
"xmlunit" % "xmlunit" % "1.6" % Test)
6079

6180

6281
lazy val restServer = core_deps ++ Seq(logback) ++ akka
6382
lazy val core_deps = Seq(scalatest, avro, typesafe_config, junit)
6483
lazy val mock_app_dep = core_deps ++ Seq(reflections, hbase_common)
6584
lazy val mock_conn = core_deps ++ Seq(reflections)
6685
lazy val hbase_conn_dep = core_deps ++ Seq(hbase_common, hbase_server, hadoop_common)
86+
lazy val hbase2_conn_dep = core_deps ++ Seq(hbase2_common, hbase2_server, hadoop_common)
6787
lazy val postgres_conn_dep = core_deps :+ postgres_conn :+ postgres_embedded
6888
lazy val spark_app = mock_app_dep ++ Seq(spark_core, spark_sql, hbase_common)
6989
lazy val mongo_conn = core_deps ++ Seq(mongo, mongoTest)

project/Settings.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,14 @@ object Settings {
9595
libraryDependencies ++= Dependencies.hbaseTestDependencies
9696
}
9797

98+
lazy val hbase2TestSettings: SettingsDefinition = {
99+
//enable resolution of transitive dependencies of jars containing tests
100+
//needed to run tests over hbase minicluster
101+
transitiveClassifiers in Test := Seq(Artifact.TestsClassifier, Artifact.SourceClassifier)
102+
libraryDependencies ++= Dependencies.hbase2TestDependencies
103+
}
104+
105+
98106
lazy val notPublishSettings = Seq(skip in publish := true)
99107

100108
lazy val myCredentials = Credentials(
@@ -116,6 +124,8 @@ object Settings {
116124

117125
lazy val scalastyleSettings = Seq(scalastyleFailOnWarning := true)
118126

127+
// lazy val testSettings = Seq(parallelExecution in Test := false)
128+
119129
lazy val publishSettings = Seq(
120130
publishTo := Some("bintray" at "https://api.bintray.com/maven/agile-lab-dev/Darwin/darwin/;publish=1"),
121131
credentials += myCredentials,

0 commit comments

Comments
 (0)