Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions core/src/main/scala/com/yahoo/maha/core/query/QueryContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import scala.collection.SortedSet
sealed trait QueryContext {
def requestModel: RequestModel
def indexAliasOption: Option[String]
def factGroupByKeys: List[String]
def primaryTableName: String
}

Expand Down Expand Up @@ -59,17 +60,20 @@ case object DimFactOuterGroupByQuery extends QueryType

case class DimQueryContext private[query](dims: SortedSet[DimensionBundle],
requestModel: RequestModel,
indexAliasOption: Option[String],
indexAliasOption: Option[String],
factGroupByKeys: List[String],
queryAttributes: QueryAttributes= QueryAttributes.empty) extends DimensionQueryContext
case class FactQueryContext private[query](factBestCandidate: FactBestCandidate,
requestModel: RequestModel,
indexAliasOption: Option[String],
indexAliasOption: Option[String],
factGroupByKeys: List[String],
queryAttributes: QueryAttributes) extends FactualQueryContext
case class CombinedQueryContext private[query](dims: SortedSet[DimensionBundle],
factBestCandidate: FactBestCandidate,
requestModel: RequestModel,
queryAttributes: QueryAttributes) extends DimensionQueryContext with FactualQueryContext {
val indexAliasOption = None
val factGroupByKeys = List.empty
override def primaryTableName: String = {
if(requestModel.isDimDriven) {
dims.last.dim.name
Expand All @@ -84,6 +88,7 @@ case class DimFactOuterGroupByQueryQueryContext(dims: SortedSet[DimensionBundle]
requestModel: RequestModel,
queryAttributes: QueryAttributes) extends DimensionQueryContext with FactualQueryContext {
override def indexAliasOption: Option[String] = None
override def factGroupByKeys: List[String] = List.empty
override def primaryTableName: String = factBestCandidate.fact.name

lazy val shouldQualifyFactsInPreOuter: Boolean = {
Expand Down Expand Up @@ -143,6 +148,7 @@ class QueryContextBuilder(queryType: QueryType, requestModel: RequestModel) {
var dims : SortedSet[DimensionBundle] = SortedSet.empty
var factBestCandidate : Option[FactBestCandidate] = None
var indexAliasOption : Option[String] = None
var factGroupByKeys : List[String] = List.empty
var queryAttributes : QueryAttributes = QueryAttributes.empty

def addDimTable(dimension: DimensionBundle) = {
Expand Down Expand Up @@ -170,6 +176,13 @@ class QueryContextBuilder(queryType: QueryType, requestModel: RequestModel) {
this
}

def addFactGroupByKeys(factGroupByKeyList: List[String]) = {
require(queryType != DimFactQuery, "dim fact query doesn't use Group By keys")
require(factGroupByKeys.isEmpty, s"fact group by is already defined : factGroupByKeys=${factGroupByKeys.toString()}, cannot set to ${factGroupByKeyList.toString()}")
this.factGroupByKeys = factGroupByKeyList
this
}

def setQueryAttributes(queryAttributes: QueryAttributes) = {
this.queryAttributes = queryAttributes
this
Expand All @@ -179,10 +192,10 @@ class QueryContextBuilder(queryType: QueryType, requestModel: RequestModel) {
queryType match {
case DimOnlyQuery =>
require(dims.nonEmpty, "dim only query should not have dimension empty")
DimQueryContext(dims, requestModel, indexAliasOption, queryAttributes)
DimQueryContext(dims, requestModel, indexAliasOption, factGroupByKeys, queryAttributes)
case FactOnlyQuery =>
require(factBestCandidate.isDefined, "fact only query should have fact defined")
FactQueryContext(factBestCandidate.get, requestModel, indexAliasOption, queryAttributes)
FactQueryContext(factBestCandidate.get, requestModel, indexAliasOption, factGroupByKeys, queryAttributes)
case DimFactQuery =>
require(factBestCandidate.isDefined, "dim fact query should have fact defined")
CombinedQueryContext(dims, factBestCandidate.get, requestModel, queryAttributes)
Expand Down
32 changes: 21 additions & 11 deletions core/src/main/scala/com/yahoo/maha/core/query/QueryPipeline.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,16 @@ object QueryPipeline extends Logging {
val asyncDisqualifyingSet: Set[Engine] = Set(DruidEngine)

val completeRowList: Query => RowList = (q) => new CompleteRowList(q)
val dimDrivenPartialRowList: Query => RowList = (q) => new DimDrivenPartialRowList(q.queryContext.indexAliasOption.get, q)
val dimDrivenFactOrderedPartialRowList: Query => RowList = (q) => new DimDrivenFactOrderedPartialRowList(q.queryContext.indexAliasOption.get, q)
val factDrivenPartialRowList: Query => RowList = (q) => new FactDrivenPartialRowList(q.queryContext.indexAliasOption.get, q)
val dimDrivenPartialRowList: Query => RowList = (q) => {
new DimDrivenPartialRowList(RowGrouping(q.queryContext.indexAliasOption.get, List(q.queryContext.indexAliasOption.get)), q)
}
val dimDrivenFactOrderedPartialRowList: Query => RowList = (q) => {
new DimDrivenFactOrderedPartialRowList(RowGrouping(q.queryContext.indexAliasOption.get, List(q.queryContext.indexAliasOption.get)), q)}
val factDrivenPartialRowList: Query => RowList = (q) => {
val indexAlias = q.queryContext.indexAliasOption.get
val groupByKeys: List[String] = q.queryContext.factGroupByKeys
new FactDrivenPartialRowList(RowGrouping(indexAlias, groupByKeys), q)
}

def unionViewPartialRowList(queryList: List[Query]): Query => RowList = {
require(queryList.forall(q => q.queryContext.isInstanceOf[FactualQueryContext] && q.queryContext.asInstanceOf[FactualQueryContext].factBestCandidate.fact.isInstanceOf[ViewBaseTable])
Expand Down Expand Up @@ -86,7 +93,7 @@ object QueryPipeline extends Logging {
case any =>
throw new UnsupportedOperationException(s"Cannot create UnionViewRowList with fact of type : ${any.getClass.getSimpleName}")
}
case DimQueryContext(_, _, _, _) =>
case DimQueryContext(_, _, _, _, _) =>
throw new IllegalArgumentException(s"Requested UnionViewRowList in DimQueryContext")
case _ =>
throw new IllegalArgumentException(s"Requested UnionViewRowList in Unhandled/UnknownQueryContext")
Expand Down Expand Up @@ -675,24 +682,27 @@ class DefaultQueryPipelineFactory(implicit val queryGeneratorRegistry: QueryGene
private[this] def getMultiEngineDimQuery(bestDimCandidates: SortedSet[DimensionBundle],
requestModel: RequestModel,
indexAlias: String,
factGroupByKeys: List[String],
queryAttributes: QueryAttributes,
queryGenVersion: Version): Query = {
val dimOnlyContextBuilder = QueryContext
.newQueryContext(DimOnlyQuery, requestModel)
.addDimTable(bestDimCandidates)
.addIndexAlias(indexAlias)
.addFactGroupByKeys(factGroupByKeys)
.setQueryAttributes(queryAttributes)

require(queryGeneratorRegistry.isEngineRegistered(bestDimCandidates.head.dim.engine, Option(queryGenVersion))
, s"Failed to find query generator for engine : ${bestDimCandidates.head.dim.engine}")
queryGeneratorRegistry.getValidGeneratorForVersion(bestDimCandidates.head.dim.engine, queryGenVersion, Option(requestModel)).get.generate(dimOnlyContextBuilder.build())
}

private[this] def getFactQuery(bestFactCandidate: FactBestCandidate, requestModel: => RequestModel, indexAlias: String, queryGenVersion: Version): Query = {
private[this] def getFactQuery(bestFactCandidate: FactBestCandidate, requestModel: => RequestModel, indexAlias: String, factGroupByKeys: List[String], queryGenVersion: Version): Query = {
val factOnlyContextBuilder = QueryContext
.newQueryContext(FactOnlyQuery, requestModel)
.addFactBestCandidate(bestFactCandidate)
.addIndexAlias(indexAlias)
.addFactGroupByKeys(factGroupByKeys)
require(queryGeneratorRegistry.isEngineRegistered(bestFactCandidate.fact.engine, Some(queryGenVersion))
, s"Failed to find query generator for engine : ${bestFactCandidate.fact.engine}")
queryGeneratorRegistry.getValidGeneratorForVersion(bestFactCandidate.fact.engine, queryGenVersion, Option(requestModel)).get.generate(factOnlyContextBuilder.build())
Expand Down Expand Up @@ -851,14 +861,14 @@ OuterGroupBy operation has to be applied only in the following cases
if (!requestModel.hasFactSortBy && requestModel.forceDimDriven) {
//oracle + druid
requestDebug("dimQueryThenFactQuery")
val dimQuery = getMultiEngineDimQuery(bestDimCandidates, requestModel, indexAlias, queryAttributes, queryGenVersion)
val dimQuery = getMultiEngineDimQuery(bestDimCandidates, requestModel, indexAlias, List(indexAlias), queryAttributes, queryGenVersion)
val subsequentQuery: (IndexedRowList, QueryAttributes) => Query = {
case (irl, subqueryAttributes) =>
val field = irl.indexAlias
val field = irl.rowGrouping.indexAlias
val values = irl.keys.toList.map(_.toString)
val filter = InFilter(field, values)
val injectedFactBestCandidate = factOnlyInjectFilter(factBestCandidateOption.get, filter)
val query = getFactQuery(injectedFactBestCandidate, requestModel, indexAlias, queryGenVersion)
val query = getFactQuery(injectedFactBestCandidate, requestModel, indexAlias, List(indexAlias), queryGenVersion)
irl.addSubQuery(query)
query
}
Expand All @@ -879,10 +889,10 @@ OuterGroupBy operation has to be applied only in the following cases
//since druid + oracle doesn't support row count
requestDebug("factQueryThenDimQuery")
val noRowCountRequestModel = requestModel.copy(includeRowCount = false)
val factQuery = getFactQuery(factBestCandidateOption.get, noRowCountRequestModel, indexAlias, queryGenVersion)
val factQuery = getFactQuery(factBestCandidateOption.get, noRowCountRequestModel, indexAlias, List(indexAlias), queryGenVersion)
val subsequentQuery: (IndexedRowList, QueryAttributes) => Query = {
case (irl, subqueryqueryAttributes) =>
val field = irl.indexAlias
val field = irl.rowGrouping.indexAlias
val values = irl.keys.toList.map(_.toString)
val valuesSize = values.size
if (values.nonEmpty && (valuesSize >= noRowCountRequestModel.maxRows || noRowCountRequestModel.startIndex <= 0)) {
Expand All @@ -896,7 +906,7 @@ OuterGroupBy operation has to be applied only in the following cases
}
queryAttributesBuilder.build
}
getMultiEngineDimQuery(bestDimCandidates, noRowCountRequestModel, indexAlias, injectedAttributes, queryGenVersion)
getMultiEngineDimQuery(bestDimCandidates, noRowCountRequestModel, indexAlias, List(indexAlias), injectedAttributes, queryGenVersion)
} else {
QueryChain.logger.info("No data returned from druid, should run fallback query if there is one")
NoopQuery
Expand Down
Loading