Skip to content

Commit

Permalink
[PIO-203] Fixes pio status warnings in ES storage (apache#507)
Browse files Browse the repository at this point in the history
  • Loading branch information
shimamoto authored Jan 21, 2019
1 parent aa79f7c commit 52eb306
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,9 @@ class ESAccessKeys(client: RestClient, config: StorageClientConfig, index: Strin
private val estype = "accesskeys"
private val internalIndex = index + "_" + estype

ESUtils.createIndex(client, internalIndex,
ESUtils.getNumberOfShards(config, internalIndex.toUpperCase),
ESUtils.getNumberOfReplicas(config, internalIndex.toUpperCase))
ESUtils.createIndex(client, internalIndex)
val mappingJson =
(estype ->
("_all" -> ("enabled" -> false)) ~
("properties" ->
("key" -> ("type" -> "keyword")) ~
("events" -> ("type" -> "keyword"))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,12 @@ class ESApps(client: RestClient, config: StorageClientConfig, index: String)
extends Apps with Logging {
implicit val formats = DefaultFormats.lossless
private val estype = "apps"
private val seq = new ESSequences(client, config, index)
private val internalIndex = index + "_" + estype

private val seq = new ESSequences(client, config, internalIndex)

ESUtils.createIndex(client, internalIndex,
ESUtils.getNumberOfShards(config, internalIndex.toUpperCase),
ESUtils.getNumberOfReplicas(config, internalIndex.toUpperCase))
ESUtils.createIndex(client, internalIndex)
val mappingJson =
(estype ->
("_all" -> ("enabled" -> false)) ~
("properties" ->
("id" -> ("type" -> "keyword")) ~
("name" -> ("type" -> "keyword"))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,12 @@ class ESChannels(client: RestClient, config: StorageClientConfig, index: String)
extends Channels with Logging {
implicit val formats = DefaultFormats.lossless
private val estype = "channels"
private val seq = new ESSequences(client, config, internalIndex)
private val seq = new ESSequences(client, config, index)
private val internalIndex = index + "_" + estype

ESUtils.createIndex(client, internalIndex,
ESUtils.getNumberOfShards(config, internalIndex.toUpperCase),
ESUtils.getNumberOfReplicas(config, internalIndex.toUpperCase))
ESUtils.createIndex(client, internalIndex)
val mappingJson =
(estype ->
("_all" -> ("enabled" -> false)) ~
("properties" ->
("name" -> ("type" -> "keyword"))))
ESUtils.createMapping(client, internalIndex, estype, compact(render(mappingJson)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,11 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index:
extends EngineInstances with Logging {
implicit val formats = DefaultFormats + new EngineInstanceSerializer
private val estype = "engine_instances"

ESUtils.createIndex(client, index,
ESUtils.getNumberOfShards(config, index.toUpperCase),
ESUtils.getNumberOfReplicas(config, index.toUpperCase))
private val internalIndex = index + "_" + estype

ESUtils.createIndex(client, internalIndex)
val mappingJson =
(estype ->
("_all" -> ("enabled" -> false)) ~
("properties" ->
("status" -> ("type" -> "keyword")) ~
("startTime" -> ("type" -> "date")) ~
Expand All @@ -61,7 +59,7 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index:
("algorithmsParams" -> ("type" -> "keyword")) ~
("servingParams" -> ("type" -> "keyword"))
))
ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
ESUtils.createMapping(client, internalIndex, estype, compact(render(mappingJson)))

def insert(i: EngineInstance): String = {
val id = i.id match {
Expand All @@ -86,7 +84,7 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index:
val entity = new NStringEntity("{}", ContentType.APPLICATION_JSON)
val response = client.performRequest(
"POST",
s"/$index/$estype/",
s"/$internalIndex/$estype/",
Map("refresh" -> "true").asJava,
entity)
val jsonResponse = parse(EntityUtils.toString(response.getEntity))
Expand All @@ -95,12 +93,12 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index:
case "created" =>
Some((jsonResponse \ "_id").extract[String])
case _ =>
error(s"[$result] Failed to create $index/$estype")
error(s"[$result] Failed to create $internalIndex/$estype")
None
}
} catch {
case e: IOException =>
error(s"Failed to create $index/$estype", e)
error(s"Failed to create $internalIndex/$estype", e)
None
}
}
Expand All @@ -109,7 +107,7 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index:
try {
val response = client.performRequest(
"GET",
s"/$index/$estype/$id",
s"/$internalIndex/$estype/$id",
Map.empty[String, String].asJava)
val jsonResponse = parse(EntityUtils.toString(response.getEntity))
(jsonResponse \ "found").extract[Boolean] match {
Expand All @@ -123,11 +121,11 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index:
e.getResponse.getStatusLine.getStatusCode match {
case 404 => None
case _ =>
error(s"Failed to access to /$index/$estype/$id", e)
error(s"Failed to access to /$internalIndex/$estype/$id", e)
None
}
case e: IOException =>
error(s"Failed to access to /$index/$estype/$id", e)
error(s"Failed to access to /$internalIndex/$estype/$id", e)
None
}
}
Expand All @@ -137,10 +135,10 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index:
val json =
("query" ->
("match_all" -> List.empty))
ESUtils.getAll[EngineInstance](client, index, estype, compact(render(json)))
ESUtils.getAll[EngineInstance](client, internalIndex, estype, compact(render(json)))
} catch {
case e: IOException =>
error("Failed to access to /$index/$estype/_search", e)
error(s"Failed to access to /$internalIndex/$estype/_search", e)
Nil
}
}
Expand All @@ -165,10 +163,10 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index:
("sort" -> List(
("startTime" ->
("order" -> "desc"))))
ESUtils.getAll[EngineInstance](client, index, estype, compact(render(json)))
ESUtils.getAll[EngineInstance](client, internalIndex, estype, compact(render(json)))
} catch {
case e: IOException =>
error(s"Failed to access to /$index/$estype/_search", e)
error(s"Failed to access to /$internalIndex/$estype/_search", e)
Nil
}
}
Expand All @@ -188,7 +186,7 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index:
val entity = new NStringEntity(write(i), ContentType.APPLICATION_JSON)
val response = client.performRequest(
"POST",
s"/$index/$estype/$id",
s"/$internalIndex/$estype/$id",
Map("refresh" -> "true").asJava,
entity)
val jsonResponse = parse(EntityUtils.toString(response.getEntity))
Expand All @@ -197,30 +195,30 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index:
case "created" =>
case "updated" =>
case _ =>
error(s"[$result] Failed to update $index/$estype/$id")
error(s"[$result] Failed to update $internalIndex/$estype/$id")
}
} catch {
case e: IOException =>
error(s"Failed to update $index/$estype/$id", e)
error(s"Failed to update $internalIndex/$estype/$id", e)
}
}

def delete(id: String): Unit = {
try {
val response = client.performRequest(
"DELETE",
s"/$index/$estype/$id",
s"/$internalIndex/$estype/$id",
Map("refresh" -> "true").asJava)
val json = parse(EntityUtils.toString(response.getEntity))
val result = (json \ "result").extract[String]
result match {
case "deleted" =>
case _ =>
error(s"[$result] Failed to update $index/$estype/$id")
error(s"[$result] Failed to update $internalIndex/$estype/$id")
}
} catch {
case e: IOException =>
error(s"Failed to update $index/$estype/$id", e)
error(s"Failed to update $internalIndex/$estype/$id", e)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import org.apache.predictionio.data.storage.EvaluationInstance
import org.apache.predictionio.data.storage.EvaluationInstanceSerializer
import org.apache.predictionio.data.storage.EvaluationInstances
import org.apache.predictionio.data.storage.StorageClientConfig
import org.apache.predictionio.data.storage.StorageClientException
import org.elasticsearch.client.{ResponseException, RestClient}
import org.json4s._
import org.json4s.JsonDSL._
Expand All @@ -41,15 +40,12 @@ class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, ind
extends EvaluationInstances with Logging {
implicit val formats = DefaultFormats + new EvaluationInstanceSerializer
private val estype = "evaluation_instances"
private val seq = new ESSequences(client, config, internalIndex)
private val seq = new ESSequences(client, config, index)
private val internalIndex = index + "_" + estype

ESUtils.createIndex(client, internalIndex,
ESUtils.getNumberOfShards(config, internalIndex.toUpperCase),
ESUtils.getNumberOfReplicas(config, internalIndex.toUpperCase))
ESUtils.createIndex(client, internalIndex)
val mappingJson =
(estype ->
("_all" -> ("enabled" -> false)) ~
("properties" ->
("status" -> ("type" -> "keyword")) ~
("startTime" -> ("type" -> "date")) ~
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,9 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val baseInd
override def init(appId: Int, channelId: Option[Int] = None): Boolean = {
val estype = getEsType(appId, channelId)
val index = baseIndex + "_" + estype
ESUtils.createIndex(client, index,
ESUtils.getNumberOfShards(config, index.toUpperCase),
ESUtils.getNumberOfReplicas(config, index.toUpperCase))
ESUtils.createIndex(client, index)
val json =
(estype ->
("_all" -> ("enabled" -> false)) ~
("properties" ->
("name" -> ("type" -> "keyword")) ~
("eventId" -> ("type" -> "keyword")) ~
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.io.IOException

import scala.collection.JavaConverters._

import org.apache.http.Header
import org.apache.http.entity.ContentType
import org.apache.http.nio.entity.NStringEntity
import org.apache.http.util.EntityUtils
Expand All @@ -40,12 +39,9 @@ class ESSequences(client: RestClient, config: StorageClientConfig, index: String
private val estype = "sequences"
private val internalIndex = index + "_" + estype

ESUtils.createIndex(client, internalIndex,
ESUtils.getNumberOfShards(config, internalIndex.toUpperCase),
ESUtils.getNumberOfReplicas(config, internalIndex.toUpperCase))
ESUtils.createIndex(client, internalIndex)
val mappingJson =
(estype ->
("_all" -> ("enabled" -> false)) ~
("properties" ->
("n" -> ("enabled" -> false))))
ESUtils.createMapping(client, internalIndex, estype, compact(render(mappingJson)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import scala.collection.JavaConversions._
import scala.collection.JavaConverters._

import org.apache.http.entity.ContentType
import org.apache.http.entity.StringEntity
import org.apache.http.nio.entity.NStringEntity
import org.elasticsearch.client.RestClient
import org.json4s._
Expand Down Expand Up @@ -165,23 +164,16 @@ object ESUtils {

def createIndex(
client: RestClient,
index: String,
numberOfShards: Option[Int],
numberOfReplicas: Option[Int]): Unit = {
index: String): Unit = {
client.performRequest(
"HEAD",
s"/$index",
Map.empty[String, String].asJava).getStatusLine.getStatusCode match {
case 404 =>
val json = ("settings" ->
("number_of_shards" -> numberOfShards) ~
("number_of_replicas" -> numberOfReplicas))
val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
client.performRequest(
"PUT",
s"/$index",
Map.empty[String, String].asJava,
entity)
Map.empty[String, String].asJava)
case 200 =>
case _ =>
throw new IllegalStateException(s"/$index is invalid.")
Expand Down Expand Up @@ -269,14 +261,6 @@ object ESUtils {
(hosts, ports, schemes).zipped.map((h, p, s) => new HttpHost(h, p, s))
}

def getNumberOfShards(config: StorageClientConfig, index: String): Option[Int] = {
config.properties.get(s"${index}_NUM_OF_SHARDS").map(_.toInt)
}

def getNumberOfReplicas(config: StorageClientConfig, index: String): Option[Int] = {
config.properties.get(s"${index}_NUM_OF_REPLICAS").map(_.toInt)
}

def getEventDataRefresh(config: StorageClientConfig): String = {
config.properties.getOrElse("EVENTDATA_REFRESH", "true")
}
Expand Down

0 comments on commit 52eb306

Please sign in to comment.