Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: CitusDB for analytics tables Merge with 2.42/master (#14945) [DHIS2-15548] #17099

Draft
wants to merge 50 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
c088dc0
fix: Merge with 2.42/master [DHIS2-15548]
maikelarabori Apr 9, 2024
c271dd2
fix: Missing javadocs + clean-up [DHIS2-15548]
maikelarabori Apr 9, 2024
36555d5
fix: Table inherits condition [DHIS2-15548]
maikelarabori Apr 9, 2024
f9b2378
refactor: Code clean-up + revert method extraction [DHIS2-15548]
maikelarabori Apr 9, 2024
2570148
fix: Code formatting [DHIS2-15548]
maikelarabori Apr 9, 2024
2c5ea6f
refactor: Remove cistus flag from params [DHIS2-15548]
maikelarabori Apr 10, 2024
59c18e0
refactor: Remove distributed flag get/setter [DHIS2-15548]
maikelarabori Apr 10, 2024
38512b3
refactor: False is set by default [DHIS2-15548]
maikelarabori Apr 10, 2024
94bfdef
fix: Code formatting [DHIS2-15548]
maikelarabori Apr 10, 2024
5e88e74
fix: Merge conflicts with master [DHIS2-15548]
maikelarabori Apr 10, 2024
79c012b
fix: Unit test [DHIS2-15548]
maikelarabori Apr 10, 2024
93b6f1f
fix: Code formatting [DHIS2-15548]
maikelarabori Apr 10, 2024
4554321
Merge branch 'master' into DHIS2-15548_2.42
maikelarabori Apr 10, 2024
774b73d
refactor: Use enum instead of boolean flag [DHIS2-15548]
maikelarabori Apr 11, 2024
32c6651
refactor: Use text blocks [DHIS2-15548]
maikelarabori Apr 11, 2024
9ddd12e
Merge branch 'master' of github.com:dhis2/dhis2-core into DHIS2-15548…
maikelarabori Apr 11, 2024
cd993aa
refactor: Small clean-up [DHIS2-15548]
maikelarabori Apr 11, 2024
6fa335c
fix: Unit test [DHIS2-15548]
maikelarabori Apr 11, 2024
a605daa
Merge branch 'master' into DHIS2-15548_2.42
maikelarabori Apr 15, 2024
ffcceb3
Merge branch 'master' into DHIS2-15548_2.42
maikelarabori Apr 24, 2024
afa8d9a
fix: Conflicts with master [DHIS2-15548]
maikelarabori May 31, 2024
55d16a0
fix: Code formatting [DHIS2-15548]
maikelarabori May 31, 2024
b2498d6
Merge remote-tracking branch 'refs/remotes/origin/master' into DHIS2-…
gnespolino Jul 15, 2024
1840cf9
fix: fixing conflict when merging master [DHIS2-15548]
gnespolino Jul 15, 2024
3caa711
fix: renaming citus property [DHIS2-15548]
gnespolino Jul 15, 2024
f321eaf
fix: fix skip partitions when citus is enabled [DHIS2-15548]
gnespolino Jul 17, 2024
573fe21
fix: unit test [DHIS2-15548]
gnespolino Jul 17, 2024
095f38f
Merge remote-tracking branch 'refs/remotes/origin/master' into DHIS2-…
gnespolino Jul 26, 2024
02abff2
fix: tei table column name
gnespolino Jul 29, 2024
67621d9
Merge branch 'master' into DHIS2-15548_2.42
gnespolino Jul 31, 2024
3395ef4
fix: enrollment and event table column name [DHIS2-15548]
gnespolino Jul 31, 2024
e1c742b
fix: fixing TE queries which failed when citus is enabled [DHIS2-15548]
gnespolino Aug 1, 2024
0bd08be
fix: failing unit tests [DHIS2-15548]
gnespolino Aug 2, 2024
5ebbca5
Merge branch 'master' into DHIS2-15548_2.42
gnespolino Aug 2, 2024
92ac602
Merge remote-tracking branch 'origin/master' into DHIS2-15548_2.42
gnespolino Sep 24, 2024
88cfc1d
Merge remote-tracking branch 'origin/master' into DHIS2-15548_2.42
gnespolino Sep 30, 2024
96c2e1c
fix: added distribution to Enrollment/Event/Analytics [DHIS2-15548]
gnespolino Sep 30, 2024
7d2af77
fix: added distribution to Enrollment/Event/Analytics [DHIS2-15548]
gnespolino Sep 30, 2024
bf7d2b8
fix: added distribution to Enrollment/Event/Analytics [DHIS2-15548]
gnespolino Sep 30, 2024
608f68e
fix: tests [DHIS2-15548]
gnespolino Sep 30, 2024
f8ed267
feat: skip citus dynamically [DHIS2-18134] (#18720)
gnespolino Oct 3, 2024
651f3be
Merge remote-tracking branch 'origin/master' into DHIS2-15548_2.42
gnespolino Oct 8, 2024
46cdaff
chore: fixing merge conflicts [DHIS2-15548]
gnespolino Oct 8, 2024
24d0b0e
Merge remote-tracking branch 'origin/master' into DHIS2-15548_2.42
gnespolino Oct 11, 2024
4619398
chore: fixing unit tests [DHIS2-15548]
gnespolino Oct 11, 2024
ad1f41d
test: tagging slow analytics query
gnespolino Nov 5, 2024
2ddd1fe
Merge remote-tracking branch 'origin/master' into DHIS2-15548_2.42
gnespolino Nov 5, 2024
b1ca045
perf: performance analysis
gnespolino Nov 19, 2024
014b2a8
Merge remote-tracking branch 'origin/master' into DHIS2-15548_2.42
gnespolino Nov 19, 2024
6c29732
style: format
gnespolino Nov 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
package org.hisp.dhis.analytics;

import lombok.Getter;
import org.apache.commons.lang3.StringUtils;

/**
* @author Lars Helge Overland
Expand All @@ -38,15 +39,18 @@ public enum AnalyticsTableType {
COMPLETENESS("analytics_completeness", true, true),
COMPLETENESS_TARGET("analytics_completenesstarget", false, false),
ORG_UNIT_TARGET("analytics_orgunittarget", false, false),
EVENT("analytics_event", false, true),
ENROLLMENT("analytics_enrollment", false, false),
EVENT("analytics_event", false, true, "psi"),
ENROLLMENT("analytics_enrollment", false, false, "pi"),
OWNERSHIP("analytics_ownership", false, false),
VALIDATION_RESULT("analytics_validationresult", true, false),
TRACKED_ENTITY_INSTANCE_EVENTS("analytics_tei_events", false, true),
TRACKED_ENTITY_INSTANCE_ENROLLMENTS("analytics_tei_enrollments", false, false),
TRACKED_ENTITY_INSTANCE("analytics_tei", false, false);
TRACKED_ENTITY_INSTANCE_EVENTS("analytics_tei_events", false, true, "trackedentityinstanceuid"),
TRACKED_ENTITY_INSTANCE_ENROLLMENTS(
"analytics_tei_enrollments", false, false, "trackedentityinstanceuid"),
TRACKED_ENTITY_INSTANCE("analytics_tei", false, false, "trackedentityinstanceuid");

private final String tableName;
@Getter private String tableName;

@Getter private String distributionColumn;

private final boolean periodDimension;

Expand All @@ -57,4 +61,17 @@ public enum AnalyticsTableType {
this.periodDimension = periodDimension;
this.latestPartition = latestPartition;
}

AnalyticsTableType(
String tableName,
boolean periodDimension,
boolean latestPartition,
String distributionColumn) {
this(tableName, periodDimension, latestPartition);
this.distributionColumn = distributionColumn;
}

public boolean isDistributed() {
return StringUtils.isNotBlank(distributionColumn);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ public class AnalyticsTableUpdateParams {
/** Current date, only used for testing */
private Date today;

/** Is Citus enabled? */
@Getter private boolean citusExtensionEnabled = false;

private final Map<String, Object> extraParameters = new HashMap<>();

public void addExtraParam(String prefix, String key, Object value) {
Expand Down Expand Up @@ -171,6 +174,7 @@ public AnalyticsTableUpdateParams instance() {
params.jobId = this.jobId;
params.startTime = this.startTime;
params.lastSuccessfulUpdate = this.lastSuccessfulUpdate;
params.citusExtensionEnabled = this.citusExtensionEnabled;

return this;
}
Expand Down Expand Up @@ -255,5 +259,10 @@ public AnalyticsTableUpdateParams build() {
checkNotNull(this.params.startTime);
return this.params;
}

public Builder withCitusEnabled(boolean citusExtensionEnabled) {
this.params.citusExtensionEnabled = citusExtensionEnabled;
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,42 @@ public void removeUpdatedData(List<AnalyticsTable> tables) {}
public void createTable(AnalyticsTable table) {
createAnalyticsTable(table);
createAnalyticsTablePartitions(table);

if (analyticsTableSettings.isCitusExtensionEnabled()) {
createDistributedCitusTable(table);
}
}

/**
* Create a distributed Citus table.
*
* @param table the {@link AnalyticsTable}.
*/
private void createDistributedCitusTable(AnalyticsTable table) {
if (!table.isTableTypeDistributed()) {
log.warn(
"No distribution column defined for table "
maikelarabori marked this conversation as resolved.
Show resolved Hide resolved
+ table.getMainName()
+ " so it won't be distributed");
return;
}

String tableName = table.getName();
String distributionColumn = table.getTableType().getDistributionColumn();

try {
jdbcTemplate.query(
"select create_distributed_table( :1, :2 )",
ps -> {
ps.setString(1, tableName);
ps.setString(2, distributionColumn);
},
rs -> {});
log.info("Successfully distributed table " + tableName + " on column " + distributionColumn);
} catch (Exception e) {
log.warn(
"Failed to distribute table " + table.getName() + " on column " + distributionColumn, e);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.hisp.dhis.analytics.AnalyticsTableUpdateParams;
import org.hisp.dhis.analytics.cache.AnalyticsCache;
import org.hisp.dhis.analytics.cache.OutliersCache;
import org.hisp.dhis.analytics.table.setting.AnalyticsTableSettings;
import org.hisp.dhis.resourcetable.ResourceTableService;
import org.hisp.dhis.scheduling.JobProgress;
import org.hisp.dhis.setting.SettingKey;
Expand All @@ -69,6 +70,8 @@ public class DefaultAnalyticsTableGenerator implements AnalyticsTableGenerator {

private final OutliersCache outliersCache;

private final AnalyticsTableSettings analyticsTableSettings;

// TODO introduce last successful timestamps per table type

@Override
Expand All @@ -85,6 +88,7 @@ public void generateAnalyticsTables(AnalyticsTableUpdateParams params0, JobProgr
AnalyticsTableUpdateParams params =
AnalyticsTableUpdateParams.newBuilder(params0)
.withLastSuccessfulUpdate(lastSuccessfulUpdate)
.withCitusEnabled(analyticsTableSettings.isCitusExtensionEnabled())
.build();

log.info("Found {} analytics table types: {}", availableTypes.size(), availableTypes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void create(AnalyticsTableUpdateParams params, JobProgress progress) {
return;
}

List<AnalyticsTable> tables = tableManager.getAnalyticsTables(params);
List<AnalyticsTable> tables = getAnalyticsTables(params);

if (tables.isEmpty()) {
clock.logTime(
Expand Down Expand Up @@ -172,6 +172,19 @@ public void create(AnalyticsTableUpdateParams params, JobProgress progress) {
clock.logTime("Table update done: '{}'", tableType.getTableName());
}

private List<AnalyticsTable> getAnalyticsTables(AnalyticsTableUpdateParams params) {
return tableManager.getAnalyticsTables(params).stream()
.map(analyticsTable -> withDistributedFlag(analyticsTable, params))
.toList();
}

private AnalyticsTable withDistributedFlag(
AnalyticsTable analyticsTable, AnalyticsTableUpdateParams params) {
analyticsTable.setTableDistributed(
params.isCitusExtensionEnabled() && analyticsTable.isTableTypeDistributed());
maikelarabori marked this conversation as resolved.
Show resolved Hide resolved
return analyticsTable;
}

@Override
public void dropTables() {
Set<String> tables = tableManager.getExistingDatabaseTables();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,15 @@ public AnalyticsTableType getAnalyticsTableType() {
@Override
@Transactional
public List<AnalyticsTable> getAnalyticsTables(AnalyticsTableUpdateParams params) {
List<AnalyticsTable> tables = new ArrayList<>();

// Skipping if the latest partition is requested since it's not supported.
if (params.isLatestUpdate()) {
return tables;
}

Calendar calendar = PeriodType.getCalendar();
List<TrackedEntityType> trackedEntityTypes = trackedEntityTypeService.getAllTrackedEntityType();
List<AnalyticsTable> tables = new ArrayList<>();
Logged logged = analyticsTableSettings.getTableLogged();

for (TrackedEntityType tet : trackedEntityTypes) {
Expand All @@ -189,7 +195,7 @@ public List<AnalyticsTable> getAnalyticsTables(AnalyticsTableUpdateParams params
List.of(), year, getStartDate(calendar, year), getEndDate(calendar, year));
}

if (table.hasTablePartitions()) {
if (table.hasTablePartitions() || params.isCitusExtensionEnabled()) {
tables.add(table);
}
}
Expand Down Expand Up @@ -266,6 +272,10 @@ public void populateTable(AnalyticsTableUpdateParams params, AnalyticsTableParti
List<AnalyticsTableColumn> columns = partition.getMasterTable().getAnalyticsTableColumns();
String partitionClause = getPartitionClause(partition);

if (params.isCitusExtensionEnabled()) {
partitionClause = eventDateExpression + " is not null";
}

StringBuilder sql = new StringBuilder("insert into " + tableName + " (");

for (AnalyticsTableColumn col : columns) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.List;
import java.util.Objects;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
import org.hisp.dhis.analytics.AnalyticsTableType;
import org.hisp.dhis.commons.collection.UniqueArrayList;
Expand Down Expand Up @@ -62,6 +63,8 @@ public class AnalyticsTable extends Table {
/** Tracked entity type of enrollments in analytics table. */
private TrackedEntityType trackedEntityType;

@Getter @Setter private boolean tableDistributed = false;
maikelarabori marked this conversation as resolved.
Show resolved Hide resolved

maikelarabori marked this conversation as resolved.
Show resolved Hide resolved
/** Analytics table partitions for this base analytics table. */
private List<AnalyticsTablePartition> tablePartitions = new UniqueArrayList<>();

Expand Down Expand Up @@ -241,6 +244,10 @@ public boolean hasTablePartitions() {
return !tablePartitions.isEmpty();
}

public boolean isTableTypeDistributed() {
return tableType.isDistributed();
}

/**
* Returns the latest partition, or null if no latest partition exists.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@
import static org.hisp.dhis.util.ObjectUtils.isNull;

import lombok.RequiredArgsConstructor;
import lombok.experimental.Delegate;
import org.apache.commons.lang3.EnumUtils;
import org.apache.commons.lang3.StringUtils;
import org.hisp.dhis.configuration.CitusSettings;
import org.hisp.dhis.db.model.Database;
import org.hisp.dhis.db.model.Logged;
import org.hisp.dhis.external.conf.DhisConfigurationProvider;
Expand All @@ -57,6 +59,8 @@ public class AnalyticsTableSettings {

private final SystemSettingManager systemSettings;

@Delegate private final CitusSettings citusSettings;

/**
* Returns the setting indicating whether resource and analytics tables should be logged or
* unlogged.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@

import static org.hisp.dhis.commons.util.TextUtils.removeLastComma;

import java.util.List;
import java.util.stream.Collectors;
import org.hisp.dhis.analytics.table.model.AnalyticsTable;
import org.hisp.dhis.db.model.Collation;
import org.hisp.dhis.db.model.Column;
import org.hisp.dhis.db.model.Index;
Expand Down Expand Up @@ -239,52 +241,91 @@ public String createTable(Table table) {
.append(" (");

// Columns

if (table.hasColumns()) {
for (Column column : table.getColumns()) {
String dataType = getDataTypeName(column.getDataType());
String nullable = column.getNullable() == Nullable.NOT_NULL ? " not null" : " null";
String collation = column.getCollation() == Collation.C ? (" collate " + quote("C")) : "";

sql.append(quote(column.getName()) + " ")
.append(dataType)
.append(nullable)
.append(collation)
.append(COMMA);
}
sql.append(columnsStatement(table.getColumns()));
}

// Primary key

if (table.hasPrimaryKey()) {
sql.append("primary key (");

for (String columnName : table.getPrimaryKey()) {
sql.append(quote(columnName)).append(COMMA);
}

removeLastComma(sql).append(")").append(COMMA);
sql.append(primaryKeysStatement(table.getPrimaryKey()));
}

// Checks

if (table.hasChecks()) {
for (String check : table.getChecks()) {
sql.append("check(" + check + ")").append(COMMA);
}
sql.append(checksStatement(table.getChecks()));
}

removeLastComma(sql).append(")");

// Parent

if (table.hasParent()) {
// Only use partitioned (inherited) tables when we should not distribute the table.
if (table instanceof AnalyticsTable analyticsTable
&& !analyticsTable.isTableDistributed()
maikelarabori marked this conversation as resolved.
Show resolved Hide resolved
&& table.hasParent()) {
sql.append(" inherits (").append(quote(table.getParent().getName())).append(")");
}

return sql.append(";").toString();
}

/**
* Returns the SQL statement for the given list of checks.
*
* @param checks the list of checks to append to the SQL statement.
* @return the SQL statement.
*/
private static String checksStatement(List<String> checks) {
StringBuilder sql = new StringBuilder();

for (String check : checks) {
sql.append("check(" + check + ")").append(COMMA);
}

return sql.toString();
}

/**
* Returns the SQL statement for the given list of primaryKeys.
*
* @param primaryKeys the list of primaryKeys to append to the SQL statement.
* @return the SQL statement.
*/
private String primaryKeysStatement(List<String> primaryKeys) {
maikelarabori marked this conversation as resolved.
Show resolved Hide resolved
StringBuilder sql = new StringBuilder("primary key (");

for (String columnName : primaryKeys) {
sql.append(quote(columnName)).append(COMMA);
}

removeLastComma(sql).append(")").append(COMMA);

return sql.toString();
}

/**
* Returns the SQL statement for the given list of columns.
*
* @param columns the list of columns to append to the SQL statement.
* @return the SQL statement.
*/
private String columnsStatement(List<Column> columns) {
StringBuilder sql = new StringBuilder();

for (Column column : columns) {
String dataType = getDataTypeName(column.getDataType());
String nullable = column.getNullable() == Nullable.NOT_NULL ? " not null" : " null";
String collation = column.getCollation() == Collation.C ? (" collate " + quote("C")) : "";

sql.append(quote(column.getName()) + " ")
.append(dataType)
.append(nullable)
.append(collation)
.append(COMMA);
}

return sql.toString();
}

@Override
public String analyzeTable(String name) {
return String.format("analyze %s;", quote(name));
Expand Down
Loading
Loading