Project fhir-remote-index is a stand-alone application to support asynchronous storage of FHIR search parameters published by the LinuxForHealth FHIR Server. When configured to do so, the LinuxForHealth FHIR Server extracts search parameters from the incoming resource and instead of storing the parameters as part of the create/update transaction, it packages the parameters into a message which is then published to Kafka. The fhir-remote-index application consumes the messages from Kafka and stores the value in the resource parameter tables using efficient batch-based inserts.
This pattern supports higher ingestion rates because:
- Any locking related to inserting normalized common parameter values is decoupled from the locking on logical resources (
logical_resource_ident
table) used to ensure correct versioning of the resource. Create and Update interactions should never see any contention unless the same logical resource is updated in parallel requests; - If the same logical resource is updated in parallel, the contention will be reduced compared to synchronous parameter value storage because less work is performed while the lock is held, allowing the transaction to be completed sooner;
- The remote index consumer can build larger batches, with transactions not tied to the semantics of the original FHIR interaction;
- The remote index consumer uses batches for all the search parameter value inserts, making it more efficient than the current JDBC persistence layer implementation;
- The remote index consumer handles normalized value cache-miss lookups using bulk queries. This reduces the total number of database round-trips required to find the required foreign key values.
In addition, this implementation eliminates any possibility of deadlocks occurring during the Insert/Update interaction. Deadlocks may still occur when processing the asynchronous remote index messages. As these will only occur in a backend process they will not be visible to IBM FHIR Server clients. Deadlocks are handle automatically using a rollback and retry mechanism. Care is taken to reduce the likelihood of deadlocks from occurring in the first place by sorting all record lists before they are processed.
It is worth noting that using multiple Kafka topic partitions can increase throughput by allowing more resource parameter messages to be processed in parallel. If sufficient threads are allocated across all the fhir-remote-index consumer instances, each thread will read data from a single partition. There is no point allocating more total threads than the number of configured partitions. The partition key is a function of the {resource-type, logical-id} tuple which guarantees that changes related to a particular logical resource will be pushed to the same partition. This guarantees that these changes will be processed in order.
Old search parameters are deleted whenever a resource is updated. When remote indexing is enabled, this means that a resource will not be searchable until the remote index service has received and processed the message. Carefully examine your interaction scenarios with the LinuxForHealth FHIR Server to determine if this behavior is suitable. In particular, conditional updates are unlikely to work as expected because the search may not return the expected value, depending on timing.
At this time, fhir-remote-index should be considered experimental.
To build fhir-remote-index, clone the git repository and build as follows:
git clone git@github.com:IBM/FHIR.git
cd FHIR
mvn clean install -f fhir-examples
mvn clean install -f fhir-parent
mvn clean install -f fhir-remote-index
Note that at this time, fhir-remote-index is not built by default so its project must be built explicitly as shown above.
To enable remote indexing of search parameters, add the following remoteIndexService
entry to the default FHIR server configuration file config/default/fhir-server-config.json
. This entry identifies the Kafka service and topic to use for sending remote index messages. When this entry is present, the JDBC persistence layer will skip storing the parameters and instead will package the parameters into a message and publish it to Kafka.
{
"__comment": "FHIR Server configuration",
"fhirServer": {
...
"remoteIndexService": {
"type": "kafka",
"instanceIdentifier": "a-random-uuid-value",
"kafka": {
"mode": "ACTIVE",
"topicName": "FHIR_REMOTE_INDEX",
"connectionProperties": {
"bootstrap.servers": "broker-0:9093, broker-1:9093, broker-2:9093, broker-3:9093, broker-4:9093, broker-5:9093",
"sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"token\" password=\"change-password\";",
"sasl.mechanism": "PLAIN",
"security.protocol": "SASL_SSL",
"ssl.protocol": "TLSv1.2",
"ssl.enabled.protocols": "TLSv1.2",
"ssl.endpoint.identification.algorithm": "HTTPS"
}
}
},
...
The fhir-remote-index application accepts two properties files defining access to Kafka and the target database:
--kafka-properties
the properties describing connection information for the upstream Kafka topic containing the resource search parameter messages sent by the upstream IBM FHIR Server;--database-properties
the properties describing the location of the target database to which we will insert the search parameter records generated by the upstream IBM FHIR Server.
java -Djava.util.logging.config.file=logging.properties \
-jar /path/to/git/FHIR/fhir-remote-index/target/fhir-remote-index-*-cli.jar \
--db-type postgresql \
--database-properties database.properties \
--kafka-properties kafka.properties \
--topic-name FHIR_REMOTE_INDEX \
--consumer-count 3 \
--instance-identifier "a-random-uuid-value"
Logging uses standard java.util.logging
(JUL) and can be configured as follows:
handlers=java.util.logging.ConsoleHandler,java.util.logging.FileHandler
.level=INFO
# Console output
java.util.logging.ConsoleHandler.level = INFO
java.util.logging.ConsoleHandler.formatter=org.linuxforhealth.fhir.database.utils.common.LogFormatter
# What level do we want to see in the log file
java.util.logging.FileHandler.level=INFO
# Log retention: 50MB * 20 files ~= 1GB
java.util.logging.FileHandler.formatter=org.linuxforhealth.fhir.database.utils.common.LogFormatter
java.util.logging.FileHandler.limit=50000000
java.util.logging.FileHandler.count=20
java.util.logging.FileHandler.pattern=remoteindexservice-%u-%g.log
To configure the Kafka service, use a properties file as described by the Kafka documentation:
bootstrap.servers=broker-0:9093, broker-1:9093, broker-2:9093, broker-3:9093, broker-4:9093, broker-5:9093
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="token" password="change-password";
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
ssl.protocol=TLSv1.2
ssl.enabled.protocols=TLSv1.2
ssl.endpoint.identification.algorithm=HTTPS
The database properties file:
db.host=your-db-host
db.port=5432
db.database=your-db-name
user=fhirserver
password=change-password
currentSchema=fhirdata
ssl=true
sslmode=require
sslcert=
sslrootcert=postgres.crt
sslkey=
Property | DB Type | Description |
---|---|---|
db.host | All | The host name of the database containing the LinuxForHealth FHIR Server schema |
db.port | All | The database port number |
db.database | All | The database name |
user | All | The database credential user |
password | All | The database credential password |
currentSchema | All | The database schema name |
ssl | PostgreSQL | For PostgreSQL, use SSL (TLS) for the database connection |
sslmode | PostgreSQL | The PostgreSQL SSL connection mode |
sslcert | PostgreSQL | The PostgreSQL SSL certificate |
sslrootcert | PostgreSQL | The PostgreSQL SSL root certificate |
sslkey | PostgreSQL | The PostgreSQL SSL key |
Note: Citus configuration is the same as PostgreSQL.
Option | Description |
---|---|
--consumer-count {n} | The number of Kafka consumer threads to start in this instance. Multiple instances of this service can be started. The total number of consumer threads across all instances should equal the number to the number of Kafka partitions on the topic. This should maximize throughput. |
--kafka-properties {properties-file} | A Java properties file containing connection details for the upstream Kafka service. |
--db-type {type} | The type of database. One of postgresql , derby , or citus . |
--database-properties {properties-file} | A Java properties file containing connection details for the downstream IBM FHIR Server database. |
--topic-name {topic} | The name of the Kafka topic to consume. Default FHIR_REMOTE_INDEX . |
--instance-identifier {uuid} | Each IBM FHIR Server cluster should be allocated a unique instance identifier. This identifier is added to each message sent over Kafka. The consumer will ignore messages unless they include the same instance identifier value. This helps to ensure that messages are processed from only intended sources. |
--consumer-group {grp} | Override the default Kafka consumer group (group.id value) for this application. Default remote-index-service-cg . |
--schema-type {type} | Set the schema type. One of PLAIN or DISTRIBUTED . Default is PLAIN . The schema type DISTRIBUTED is for use with Citus databases. |
--max-ready-time-ms {milliseconds} | The maximum number of milliseconds to wait for the database to contain the correct data for a particular set of consumed messages. Should be slightly longer than the configured Liberty transaction timeout value. |
To guarantee delivery, the search parameter messages are posted to Kafka by the LinuxForHealth FHIR Server before the transaction commits. The transaction will only be committed once all messages sent to Kafka have been acknowledged. This is important, because if the message were to be sent after the transaction, we could lose messages if a failure occured immediately after the transaction but before they were received by Kafka.
Because messages are sent to Kafka before the transaction is committed, it is possible that a fhir-remote-index consumer may receive a search parameter message before the corresponding resource version record is visible in the database. The consumer therefore runs a query at the start of a batch to determine if the current resource version record matches the message content. The following logic is then applied:
- If the resource version doesn't yet exist in the database, the consumer will pause and wait for the transaction to be committed. The consumer will only wait up to the maximum transaction timeout window, at which point it will assume the transaction has failed and the message will be discarded.
- If the resource version matches, but the lastUpdated time does not match, it assumes the message came from an IBM FHIR Server which failed before the transaction was committed, but the request was processed successfully by another server. The message will be discarded because there will be another message waiting in the queue from the second attempt.
- If the resource version in the database already exceeds the version in the message, the message will be discarded because the information is already out of date. There will be another message waiting in the queue containing the search parameter values from the most recent resource.