Skip to content

Commit e83257e

Browse files
committed
FINERACT-2386: Journal entry aggregation capabilities
1 parent 963f855 commit e83257e

File tree

35 files changed

+2531
-3
lines changed

35 files changed

+2531
-3
lines changed

fineract-core/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,16 @@ public static class FineractJobProperties {
417417

418418
private int stuckRetryThreshold;
419419
private boolean loanCobEnabled;
420+
private FineractJournalEntryAggregationProperties journalEntryAggregation;
421+
}
422+
423+
@Getter
424+
@Setter
425+
public static class FineractJournalEntryAggregationProperties {
426+
427+
private Integer excludeRecentNDays;
428+
private boolean enabled;
429+
private Integer chunkSize;
420430
}
421431

422432
@Getter

fineract-core/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobName.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,9 @@ public enum JobName {
5858
PURGE_EXTERNAL_EVENTS("Purge External Events"), //
5959
PURGE_PROCESSED_COMMANDS("Purge Processed Commands"), //
6060
ACCRUAL_ACTIVITY_POSTING("Accrual Activity Posting"), //
61-
ADD_PERIODIC_ACCRUAL_ENTRIES_FOR_SAVINGS_WITH_INCOME_POSTED_AS_TRANSACTIONS("Add Accrual Transactions For Savings"); //
61+
ADD_PERIODIC_ACCRUAL_ENTRIES_FOR_SAVINGS_WITH_INCOME_POSTED_AS_TRANSACTIONS("Add Accrual Transactions For Savings"), //
62+
JOURNAL_ENTRY_AGGREGATION("Journal Entry Aggregation"), //
63+
; //
6264

6365
private final String name;
6466

fineract-doc/src/docs/en/chapters/features/index.adoc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,5 @@ include::approved-amount-modification.adoc[leveloffset=+1]
88
include::backdated-interest-modification.adoc[leveloffset=+1]
99
include::interest-rate-change-progressive-loans.adoc[leveloffset=+1]
1010
include::contract-termination.adoc[leveloffset=+1]
11-
include::loan-charges.adoc[leveloffset=+1]
11+
include::loan-charges.adoc[leveloffset=+1]
12+
include::journal-entry-aggregation.adoc[leveloffset=+1]
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
= Journal Entry Aggregation
2+
:experimental:
3+
:source-highlighter: highlightjs
4+
:toc: left
5+
:toclevels: 3
6+
:icons: font
7+
:sectlinks:
8+
:sectnums:
9+
10+
== Overview
11+
The Journal Entry Aggregation Job is a Spring Batch-based solution designed to efficiently aggregate journal entries in the Fineract system. This job processes journal entries in configurable chunks, improving performance and resource utilization when dealing with large volumes of financial transactions.
12+
13+
== Key Features
14+
15+
=== Chunk-based Processing
16+
* Processes journal entries in configurable batch sizes
17+
* Reduces memory footprint by working with manageable data subsets
18+
* Improves performance through efficient batch processing
19+
20+
=== Tracking and Deduplication
21+
* Tracks processed date ranges to prevent duplicate aggregations
22+
* Uses `JournalEntryAggregationTracking` to maintain execution history
23+
* Skips already processed date ranges in subsequent runs
24+
25+
=== Configurable Exclude Recent N Days
26+
* Excludes the last N days (from business date) from processing
27+
* Default `Exclude Recent N Days` can be customized via application properties
28+
29+
== How It Works
30+
31+
=== Job Flow
32+
33+
==== Job Initialization
34+
* Determines the date range to process based on last execution
35+
* Sets up execution context with date boundaries
36+
37+
==== Data Reading
38+
* Fetches unaggregated journal entries within the target date range
39+
* Groups entries by GL account, product, office, and other dimensions
40+
41+
==== Processing
42+
* Aggregates debit and credit amounts for each group
43+
* Handles external asset owner mappings
44+
* Processes data in configurable chunk sizes
45+
46+
==== Tracking
47+
* Records successful aggregation runs
48+
* Maintains execution history for future reference
49+
50+
== Configuration
51+
52+
=== Job Parameters
53+
* `aggregatedOnDate`: (Optional) Specific date to process (defaults to business date)
54+
* `chunkSize`: (Optional) Number of records to process in each chunk
55+
56+
=== Application Properties
57+
[source,properties]
58+
----
59+
# Exclude Recent N days from aggregation
60+
fineract.job.journal-entry-aggregation.exclude-recent-N-days=1
61+
62+
# Chunk size for batch processing
63+
fineract.job.journal-entry-aggregation.chunk-size=1000
64+
----
65+
66+
== Usage
67+
68+
=== Manual Execution
69+
Trigger the job manually through the Fineract API:
70+
71+
[source,http]
72+
----
73+
POST /jobs/short-name/JRNL_AGG
74+
Content-Type: application/json
75+
76+
{
77+
}
78+
----
79+
80+
=== Scheduled Execution
81+
Configure the job to run on a schedule by adding to your scheduler configuration.
82+
83+
=== Monitoring
84+
Monitor job execution through:
85+
86+
* Job execution logs
87+
* `JOURNAL_ENTRY_AGGREGATION_TRACKING` table
88+
* Spring Batch job execution tables
89+
90+
== Best Practices
91+
92+
=== Chunk Size Tuning
93+
* Larger chunks improve throughput but increase memory usage
94+
* Monitor memory usage and adjust chunk size accordingly
95+
96+
=== Scheduling
97+
* Schedule during off-peak hours for large datasets
98+
* Consider running more frequently with smaller `Exclude Recent N Days` values
99+
100+
=== Error Handling
101+
* Failed jobs can be restarted from the last successful chunk
102+
* Review job execution logs for any processing issues
103+
104+
== Performance Considerations
105+
106+
* *Indexing*: Ensure proper indexes exist on `aggregated_on_date`, `office_id`, and other filtering columns
107+
* *Partitioning*: Consider partitioning large journal entry tables by date for better performance
108+
* *Batch Window*: Allocate sufficient time for the job to complete during maintenance windows
109+
110+
== Database Schema
111+
112+
=== m_journal_entry_aggregation_summary Table
113+
This table stores the aggregated journal entry amounts, grouped by various dimensions for efficient reporting and analysis.
114+
115+
[cols="1,2,2,2", options="header"]
116+
|===
117+
| Column | Type | Nullable | Description
118+
| id | BIGINT | No | Primary key
119+
| gl_account_id | BIGINT | No | Reference to `acc_gl_account`
120+
| product_id | BIGINT | Yes | Reference to the product (if applicable)
121+
| office_id | BIGINT | No | Reference to `m_office`
122+
| entity_type_enum | SMALLINT | No | Type of entity (e.g., loan, savings)
123+
| submitted_on_date | DATE | No | The date of the business date when entry was submitted
124+
| aggregated_on_date | DATE | No | The date when aggregation was performed
125+
| debit_amount | DECIMAL(19,6) | No | Sum of debit amounts
126+
| credit_amount | DECIMAL(19,6) | No | Sum of credit amounts
127+
| external_owner_id | BIGINT | Yes | Reference to external owner (if applicable)
128+
| job_execution_id | BIGINT | No | Reference to batch job execution
129+
| created_date | TIMESTAMP | No | Record creation timestamp
130+
| last_modified_date | TIMESTAMP | Yes | Last modification timestamp
131+
|===
132+
133+
The table is designed to support efficient querying of aggregated financial data by:
134+
* Date ranges (using `submitted_on_date` and `aggregated_on_date`)
135+
* Organizational structure (using `office_id`)
136+
* Financial dimensions (using `gl_account_id` and `product_id`)
137+
* Entity types (using `entity_type_enum`)
138+
139+
=== m_journal_entry_aggregation_tracking Table
140+
This table maintains a history of aggregation job executions, tracking which date ranges have been processed to prevent duplicate aggregations.
141+
142+
[cols="1,2,2,2", options="header"]
143+
|===
144+
| Column | Type | Nullable | Description
145+
| id | BIGINT | No | Primary key
146+
| job_execution_id | BIGINT | No | Reference to Spring Batch job execution
147+
| aggregated_on_date_from | DATE | No | Start date of the aggregation period
148+
| aggregated_on_date_to | DATE | No | End date of the aggregation period
149+
| submitted_on_date | DATE | No | The date of the business date when entry was submitted
150+
| status | VARCHAR(20) | No | Status of the aggregation (e.g., COMPLETED, FAILED)
151+
| error_message | TEXT | Yes | Error details if the job failed
152+
| start_time | TIMESTAMP | No | When the aggregation started
153+
| end_time | TIMESTAMP | Yes | When the aggregation completed
154+
| records_processed | INT | Yes | Number of records processed
155+
| created_date | TIMESTAMP | No | Record creation timestamp
156+
| last_modified_date | TIMESTAMP | Yes | Last modification timestamp
157+
|===
158+
159+
Key aspects of the tracking table:
160+
* Tracks the exact date ranges processed in each job execution
161+
* Maintains job status and error information for debugging
162+
* Records performance metrics (processing time, record counts)
163+
* Used by the job to determine which date ranges need processing in subsequent runs
164+
165+
Indexes are created on frequently queried columns to ensure optimal performance for reporting and analysis.
166+
167+
This aggregation job provides a robust, scalable solution for processing journal entries while maintaining data integrity and providing clear audit trails of all aggregation activities.

fineract-investor/src/main/resources/db/changelog/tenant/module/investor/module-changelog-master.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,5 @@
4242
<include relativeToChangelogFile="true" file="parts/0018_add_external_asset_owner_transfer_outstanding_interest_strategy.xml"/>
4343
<include relativeToChangelogFile="true" file="parts/0019_add_configurable_allowed_loan_statuses.xml"/>
4444
<include relativeToChangelogFile="true" file="parts/0020_add_previous_owner_reference.xml"/>
45+
<include relativeToChangelogFile="true" file="parts/0021_external_owner_reference_in_journal_entry_aggregation.xml"/>
4546
</databaseChangeLog>
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
4+
Licensed to the Apache Software Foundation (ASF) under one
5+
or more contributor license agreements. See the NOTICE file
6+
distributed with this work for additional information
7+
regarding copyright ownership. The ASF licenses this file
8+
to you under the Apache License, Version 2.0 (the
9+
"License"); you may not use this file except in compliance
10+
with the License. You may obtain a copy of the License at
11+
12+
http://www.apache.org/licenses/LICENSE-2.0
13+
14+
Unless required by applicable law or agreed to in writing,
15+
software distributed under the License is distributed on an
16+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17+
KIND, either express or implied. See the License for the
18+
specific language governing permissions and limitations
19+
under the License.
20+
21+
-->
22+
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
23+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
24+
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog
25+
http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.1.xsd">
26+
<changeSet author="fineract" id="1">
27+
<preConditions onFail="MARK_RAN">
28+
<tableExists tableName="m_journal_entry_aggregation_summary"/>
29+
</preConditions>
30+
<addForeignKeyConstraint baseColumnNames="external_owner_id" baseTableName="m_journal_entry_aggregation_summary" constraintName="FK_GL_JOURNAL_ENTRY_AGGREGATION_SUMMARY_ON_EXTERNAL_OWNER_ID" referencedColumnNames="id" referencedTableName="m_external_asset_owner"/>
31+
</changeSet>
32+
</databaseChangeLog>

fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobRegisterServiceImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ public void scheduleJob(final ScheduledJobDetail scheduledJobDetails) {
266266
scheduledJobDetails.setNextRunTime(null);
267267
final String stackTrace = getStackTraceAsString(throwable);
268268
scheduledJobDetails.setErrorLog(stackTrace);
269-
log.error("Could not schedule job: {}", scheduledJobDetails.getJobName(), throwable);
269+
log.warn("Could not schedule job: {}", scheduledJobDetails.getJobName(), throwable);
270270
}
271271
scheduledJobDetails.setCurrentlyRunning(false);
272272
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.fineract.infrastructure.jobs.service.aggregationjob;
20+
21+
import static org.apache.fineract.infrastructure.jobs.service.aggregationjob.JournalEntryAggregationJobConstant.JOB_SUMMARY_STEP_NAME;
22+
import static org.apache.fineract.infrastructure.jobs.service.aggregationjob.JournalEntryAggregationJobConstant.JOB_TRACKING_STEP_NAME;
23+
24+
import org.apache.fineract.infrastructure.core.config.FineractProperties;
25+
import org.apache.fineract.infrastructure.core.service.migration.TenantDataSourceFactory;
26+
import org.apache.fineract.infrastructure.jobs.service.JobName;
27+
import org.apache.fineract.infrastructure.jobs.service.aggregationjob.data.JournalEntryAggregationSummaryData;
28+
import org.apache.fineract.infrastructure.jobs.service.aggregationjob.listener.JournalEntryAggregationJobListener;
29+
import org.apache.fineract.infrastructure.jobs.service.aggregationjob.tasklet.JournalEntryAggregationTrackingTasklet;
30+
import org.springframework.batch.core.Job;
31+
import org.springframework.batch.core.Step;
32+
import org.springframework.batch.core.job.builder.JobBuilder;
33+
import org.springframework.batch.core.launch.support.RunIdIncrementer;
34+
import org.springframework.batch.core.repository.JobRepository;
35+
import org.springframework.batch.core.step.builder.StepBuilder;
36+
import org.springframework.beans.factory.annotation.Autowired;
37+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
38+
import org.springframework.context.annotation.Bean;
39+
import org.springframework.context.annotation.Configuration;
40+
import org.springframework.transaction.PlatformTransactionManager;
41+
42+
@Configuration
43+
@ConditionalOnProperty(value = "fineract.job.journal-entry-aggregation.enabled", havingValue = "true")
44+
public class JournalEntryAggregationJobConfiguration {
45+
46+
@Autowired
47+
private JournalEntryAggregationJobListener journalEntryAggregationJobListener;
48+
@Autowired
49+
private JobRepository jobRepository;
50+
@Autowired
51+
private JournalEntryAggregationJobExecutionDecider journalEntryAggregationJobExecutionDecider;
52+
@Autowired
53+
private JournalEntryAggregationJobWriter aggregationItemWriter;
54+
@Autowired
55+
private PlatformTransactionManager transactionManager;
56+
@Autowired
57+
private FineractProperties fineractProperties;
58+
@Autowired
59+
private JournalEntryAggregationTrackingTasklet journalEntryAggregationTrackingTasklet;
60+
@Autowired
61+
private TenantDataSourceFactory tenantDataSourceFactory;
62+
63+
@Bean
64+
public Step journalEntryAggregationSummaryStep() {
65+
return new StepBuilder(JOB_SUMMARY_STEP_NAME, jobRepository)
66+
.<JournalEntryAggregationSummaryData, JournalEntryAggregationSummaryData>chunk(
67+
fineractProperties.getJob().getJournalEntryAggregation().getChunkSize(), transactionManager)
68+
.reader(journalEntryAggregationJobReader()).writer(aggregationItemWriter).allowStartIfComplete(true).build();
69+
}
70+
71+
@Bean
72+
public JournalEntryAggregationJobReader journalEntryAggregationJobReader() {
73+
return new JournalEntryAggregationJobReader(tenantDataSourceFactory);
74+
}
75+
76+
@Bean
77+
protected Step journalEntryAggregationTrackingStep() {
78+
return new StepBuilder(JOB_TRACKING_STEP_NAME, jobRepository).tasklet(journalEntryAggregationTrackingTasklet, transactionManager)
79+
.build();
80+
}
81+
82+
@Bean(name = "journalEntryAggregation")
83+
public Job journalEntryAggregation() {
84+
return new JobBuilder(JobName.JOURNAL_ENTRY_AGGREGATION.name(), jobRepository).listener(journalEntryAggregationJobListener)
85+
.start(journalEntryAggregationJobExecutionDecider).on(JournalEntryAggregationJobConstant.NO_OP_EXECUTION).end()
86+
.from(journalEntryAggregationJobExecutionDecider).on(JournalEntryAggregationJobConstant.CONTINUE_JOB_EXECUTION)
87+
.to(journalEntryAggregationSummaryStep()).next(journalEntryAggregationTrackingStep()).end()
88+
.incrementer(new RunIdIncrementer()).build();
89+
}
90+
91+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.fineract.infrastructure.jobs.service.aggregationjob;
20+
21+
public final class JournalEntryAggregationJobConstant {
22+
23+
public static final String CONTINUE_JOB_EXECUTION = "CONTINUE_JOB_EXECUTION";
24+
public static final String NO_OP_EXECUTION = "NO_OP_EXECUTION";
25+
public static final String JOURNAL_ENTRY_AGGREGATION_JOB_NAME = "JOURNAL_ENTRY_AGGREGATION";
26+
public static final String JOB_SUMMARY_STEP_NAME = "JournalEntryAggregation Summary Insert - Step";
27+
public static final String JOB_TRACKING_STEP_NAME = "JournalEntryAggregation Tracking Insert - Step";
28+
public static final String AGGREGATED_ON_DATE = "aggregatedOnDate";
29+
public static final String AGGREGATED_ON_DATE_FROM = "aggregatedOnDateFrom";
30+
public static final String AGGREGATED_ON_DATE_TO = "aggregatedOnDateTo";
31+
public static final String LAST_AGGREGATED_ON_DATE = "lastAggregatedOnDate";
32+
33+
private JournalEntryAggregationJobConstant() {}
34+
}

0 commit comments

Comments
 (0)