-
Notifications
You must be signed in to change notification settings - Fork 200
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SNAP-2366] row buffer fault-in, forced rollover, merge small batches #1046
base: master
Are you sure you want to change the base?
Conversation
- add check for the two cases in table stats service: - "large enough" row buffer (currently "large enough" is anything more than maxDeltaRows/8) that has not seen any updates/deletes since the last check; in this case schedule a task to force rollover the row buffer in column table - also check if a bucket of column table has multiple small batches (non-transactional check); if so then submit a task to merge those after checking for transactional snapshot; merge is done by locally created ColumnTableScan->ColumnInsertExec plan where the scan uses an iterator only on the small batches - added a ColumnFormatStatsIterator that can take a bunch of stats rows and create an iterator over just those (like required for batch merge) - added new scan metrics for disk reads: a) disk rows from row buffer, b) partial column batches on disk, c) full column batches on disk - extended SQLMetrics types with a new SPLIT_SUM_METRIC that allows displaying multiple metrics against a common name; ColumnTableScan now uses this to combine some metrics else it becomes too large in display (especially for the newly added disk read metrics) - use hive-metadata (ExternalTableMetaData) to get number of rows instead of getting from row buffer table (that is subject to changes in future)
- fixed disk metrics collection added previously; set the metric correctly for both row buffer iterator (ResultSetTraversal) and ColumnFormatIterator - added a metric for remote batch fetch - fixed multiple ColumnTableScans causing split metrics to add up into one ColumnTableScan; now use a unique ID for split metrics for each ColumnTableScan instance - fix an NPE in SnappyTableStatsProviderService while filling up result map from members since CHM cannot hold null values - use a common entry map in ColumnFormatIterator disk iteration instead of creating separate for every column batch - added implementation of PURGE_CODEGEN_CACHES as StoreCallbacksImpl.clearCodegenCaches - limit to one task per table for background rolloverRowBuffer and mergeSmallBatches tasks - replaced a few usage of Map.put with justPut for koloboke maps
… replicated table
Conflicts: core/src/main/scala/io/snappydata/SnappyTableStatsProviderService.scala core/src/main/scala/io/snappydata/TableStatsProviderService.scala store
Conflicts: cluster/src/test/scala/io/snappydata/benchmark/TPCHColumnPartitionedTable.scala
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First set of comments. Can see more after one of the points about updated & deleted columns is clarified.
} | ||
} | ||
|
||
private def withExceptionHandling(f: => Unit, doFinally: () => Unit = null): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method can be moved to Utils class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do
logInfo(s"Found small batches in ${pr.getName}: ${smallBatches.map(_.getId)}") | ||
val cache = pr.getGemFireCache | ||
implicit val executionContext = Utils.executionContext(cache) | ||
Future(withExceptionHandling({ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There may be a situation where many such future tasks might pile up for the same Bucket region if the future thread does not get a chance to complete before next stats publish. It would be better if we mark the BucketRegion and exclude it from picking in progress buckets.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That cannot happen because this deliberately uses a "mergeTasks" map with "computeIfAbsent" that will add only one future for a PR at a time. The mergeTasks is cleared for that PR only at the end of Future execution. Same for rolloverTasks.
partitionColumnAliases = Nil, baseRelation = null, schema, allFilters = Nil, | ||
schemaAttrs, caseSensitive = true) | ||
// reduce min delta row size to avoid going through rolloverRowBuffers again | ||
val insertPlan = ColumnInsertExec(tableScan, Nil, Nil, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please explain here, maybe in the comment section how this will handle updated columns and deleted columns?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ColumnBatchIterator is passed the stats row entry. Rest all the columns, including delta/delete are looked up by the iterator (see ColumnFormatStatsIterator.getColumnValue) when the generated code asks for them.
So this will be same as iterating batches in ColumnTableScan that will return merged entries with deltas/deletes applied. The ColumnInsert is tied to output of this hence will create a combined merged batch.
Note that merging of deltas into main batch (when they become large enough) or deletes into main batch (when large number of entries are deleted) will be handled separately. This does not depend on main batch being small rather deltas being large. That merge needs to be done in the operation thread itself (that has created the last delta causing it to grow large). Right now only the case where all entries are deleted is handled.
Conflicts: core/src/main/scala/org/apache/spark/sql/store/CompressionCodecId.scala
…el with update/delete
added an OperationContext in SnappySession that can be used to persist context across multiple plan executions (e.g. caching for putInto, then actual execution)
8b43301
to
2b254d9
Compare
2c254f0
to
0f2888f
Compare
a466d26
to
ea127bd
Compare
99ec79c
to
c7b84fa
Compare
Changes proposed in this pull request
that has not seen any updates/deletes since the last check; in this case schedule
a task to force rollover the row buffer in column table
if so then submit a task to merge those after checking for transactional snapshot;
merge is done by locally created ColumnTableScan->ColumnInsertExec plan where the scan
uses an iterator only on the small batches
over just those (like required for batch merge)
b) partial column batches on disk, c) full column batches on disk
metrics against a common name; ColumnTableScan now uses this to combine some metrics
else it becomes too large in display (especially for the newly added disk read metrics)
from row buffer table (that is subject to changes in future)
result map from members since CHM cannot hold null values
separate for every column batch
Patch testing
precheckin; new unit tests to be added next
ReleaseNotes.txt changes
NA
Other PRs
TIBCOSoftware/snappy-store#391