Kotlin coroutines adapter for the Neo4j Java driver (async)
// Connect to Neo4j
val neo4j = DispatchedNeo4jOperations(
driver = driver,
dispatcher = Dispatchers.IO.limitedParallelism(90)
)
// Write data
neo4j.write { tx ->
tx.run("CREATE (p:Person {name: 'Alice', age: 30})")
}
// Read data
val name = neo4j.read { tx ->
tx.run(
"MATCH (p:Person) RETURN p.name AS name"
).single()["name"].asString()
}
println(name) // "Alice"See ReadmeExamples.kt for all runnable examples.
For a complete example of building Neo4j-backed REST APIs with Ktor, see the xemantic-neo4j-demo project. It demonstrates:
- Fully non-blocking database operations using Kotlin coroutines
- Safe session management and connection pooling
- Memory-efficient streaming with Kotlin Flow
- Test-driven development with embedded Neo4j
- Neo4jOperations - Simplified coroutine-friendly interface for common Neo4j operations
- Automatic session management - No need to manually manage session lifecycle for simple operations
- Safe concurrency - IO dispatcher with limited parallelism (default 90) prevents exhausting the driver's 100 session limit
- Kotlinx.serialization integration - Map
@Serializableclasses directly to/from Neo4j properties - Type-safe conversions -
toProperties()andtoObject<T>()extension functions - Instant support - Automatic conversion between
kotlin.time.Instantand Neo4jDateTime
- Structured concurrency - All operations use
suspendfunctions instead ofCompletionStage - Flow-based streaming - Stream large result sets efficiently with Kotlin Flow
- Non-blocking - Built on Neo4j's async driver, never blocks threads
- Multi-dollar string interpolation (
$$"""...""") - Include$in Cypher queries without escaping - IntelliJ IDEA integration -
@Language("cypher")annotations enable syntax highlighting with the Graph Database plugin - Flexible configuration - Builder DSL for session and transaction configs with sensible defaults
populate()utility - Quickly insert test data without boilerplate- Resource cleanup - Automatic cleanup when Flow completes (normally or exceptionally)
Add to your build.gradle.kts:
dependencies {
implementation("com.xemantic.neo4j:xemantic-neo4j-kotlin-driver:1.0.0")
}// URI examples: "neo4j://localhost", "neo4j+s://xxx.databases.neo4j.io"
val dbUri = "<database-uri>"
val dbUser = "<username>"
val dbPassword = "<password>"
val driver = GraphDatabase.driver(
dbUri,
AuthTokens.basic(dbUser, dbPassword)
).use { driver ->
driver.verifyConnectivity()
println("Connection established.")
// use the driver
}Warning
Always close Driver objects to free up allocated resources. Use Kotlin's .use { } function or call driver.close() explicitly.
For advanced connection options, see the Neo4j Java driver documentation.
The recommended way to use this library is through the Neo4jOperations interface:
val neo4j = DispatchedNeo4jOperations(
driver = driver,
dispatcher = Dispatchers.IO.limitedParallelism(90),
defaultSessionConfig = {
database = "neo4j"
},
defaultTransactionConfig = {
timeout = 30.seconds
}
)Note
The IO dispatcher ensures optimal performance for non-blocking operations. Limiting parallelism to 90 (default driver limit is 100) means operations will suspend instead of throwing exceptions when no free sessions are available.
val peopleCount = neo4j.read { tx ->
tx.run(
"MATCH (p:Person) RETURN count(p) AS count"
).single()["count"].asInt()
}val summary = neo4j.write { tx ->
tx.run(
query = $$"""
CREATE (a:Person {name: $name})
CREATE (b:Person {name: $friendName})
CREATE (a)-[:KNOWS]->(b)
""".trimIndent(),
parameters = mapOf(
"name" to "Alice",
"friendName" to "David"
)
).consume()
}
println(summary)Notes:
- The
queryandparameterscan be optionally named for clarity - Multi-dollar interpolation (
$$"""...""") allows$in queries without escaping - The final
.consume()call can be omitted if you don't need the summary
neo4j.write { tx ->
tx.run(
query = $$"""
CREATE (a:Person {name: $name})
CREATE (b:Person {name: $friendName})
CREATE (a)-[:KNOWS]->(b)
""".trimIndent(),
parameters = mapOf(
"name" to "Alice",
"friendName" to "David"
)
)
}Map between Neo4j properties and Kotlin data classes using kotlinx.serialization:
@Serializable
data class Person(
val name: String,
val email: String,
val age: Int,
val city: String,
val skills: List<String>,
val active: Boolean,
val createdAt: Instant = Clock.System.now()
)
val person = Person(
name = "Alice Johnson",
email = "alice.johnson@email.com",
age = 28,
city = "New York",
skills = listOf("Python", "JavaScript", "SQL"),
active = true
)val createdPerson = neo4j.write { tx ->
tx.run(
query = $$"""
CREATE (person:Person $props)
SET person.createdAt = datetime()
RETURN person
""",
parameters = mapOf(
"props" to person.toProperties()
)
).single()["person"].toObject<Person>()
}Note
The createdAt set by the Neo4j server will hold server time. Assuming synchronized clocks, createdPerson.createdAt will be greater than or equal to person.createdAt.
The toProperties() extension function converts any @Serializable class to a map of Neo4j-compatible properties.
val storedPerson = neo4j.read { tx ->
tx.run(
"MATCH (p:Person) RETURN p"
).single()["p"].toObject<Person>()
}
println(storedPerson)The toObject<T>() extension function converts Neo4j nodes, relationships, and records to Kotlin objects.
Supported sources:
- Nodes -
record["p"].toObject<Person>() - Relationships -
record["r"].toObject<Knows>() - Records with map projections -
record.toObject<Person>()when usingRETURN p.name AS name, p.age AS age
Map projections allow you to create nested structures in query results:
@Serializable
data class Address(val street: String, val city: String, val zipCode: String)
@Serializable
data class PersonWithAddress(val name: String, val age: Int, val address: Address)
val person = neo4j.read { tx ->
tx.run("""
MATCH (p:Person)
RETURN p.name AS name, p.age AS age,
{street: p.street, city: p.city, zipCode: p.zipCode} AS address
""".trimIndent()).single().toObject<PersonWithAddress>()
}Note
Only flat properties are supported in node/relationship storage (primitives, lists of primitives, enums). Nested objects require separate nodes connected by relationships, but can be retrieved using map projections in Cypher queries.
Stream large result sets efficiently using Kotlin Flow:
neo4j.flow(
"MATCH (p:Person) RETURN p ORDER BY p.name"
).collect {
println(it["p"]["name"].asString())
}
// Prints: Alice, Bob, Charlie...
// Session is automatically closed after flow collectionval names = neo4j.flow(
query = $$"MATCH (p:Person) WHERE p.age > $minAge RETURN p.name AS name ORDER BY p.name",
parameters = mapOf("minAge" to 28)
).map {
it["name"].asString()
}.toList()
println(names) // [Alice, Charlie, ...]For complex scenarios requiring multiple transactions on the same session, you can use neo4j.withSession { } which provides session lifecycle management:
neo4j.withSession { session ->
// First: write operation
session.executeWrite { tx ->
tx.run("CREATE (p:Person {name: 'Alice'})")
}
// Then: read operation on the same session
val count = session.executeRead { tx ->
tx.run(
"MATCH (p:Person) RETURN count(p) as count"
).single()["count"].asInt()
}
println("Created and counted: $count")
}For even more control, you can use driver.coroutineSession() directly:
Write with driver session:
val summary = driver.coroutineSession().use { session ->
session.executeWrite { tx ->
tx.run(
query = $$"""
CREATE (a:Person {name: $name})
CREATE (b:Person {name: $friendName})
CREATE (a)-[:KNOWS]->(b)
""".trimIndent(),
parameters = mapOf(
"name" to "Alice",
"friendName" to "David"
)
).consume()
}
}
println(
"Created ${summary.counters().nodesCreated()} nodes " +
"in ${summary.resultAvailableAfter(TimeUnit.MILLISECONDS)} ms."
)Read with driver session:
val (names, readSummary) = driver.coroutineSession().use { session ->
session.executeRead { tx ->
val result = tx.run(
"MATCH (p:Person)-[:KNOWS]->(:Person) RETURN p.name AS name"
)
val names = result.records().toList().map {
it["name"].asString()
}
val summary = result.consume()
names to summary
}
}
println(
"The query ${readSummary.query().text()} " +
"returned ${names.size} records " +
"in ${readSummary.resultAvailableAfter(TimeUnit.MILLISECONDS)} ms."
)
println("Returned names: $names")Session and transaction configuration:
driver.coroutineSession { // session config
database = "neo4j"
}.use { session ->
session.executeRead({ // transaction config
timeout = 5.seconds
metadata = mapOf("appName" to "peopleTracker")
}) { tx ->
tx.run("MATCH (p:Person) RETURN p").records().collect {
println(it)
}
}
}Quickly set up test data:
neo4j.populate("""
CREATE (p1:Person {name: 'Alice', age: 30})
CREATE (p2:Person {name: 'Bob', age: 25})
CREATE (p1)-[:KNOWS]->(p2)
""".trimIndent())The populate() function handles session and transaction management automatically.
// ✅ Recommended for simple single-transaction operations
val count = neo4j.read { tx ->
tx.run(
"MATCH (p:Person) RETURN count(p) as count"
).single()["count"].asInt()
}// ✅ Recommended when you need multiple transactions on the same session
neo4j.withSession { session ->
session.executeWrite { tx ->
tx.run("CREATE (p:Person {name: 'Alice'})")
}
val count = session.executeRead { tx ->
tx.run(
"MATCH (p:Person) RETURN count(p) as count"
).single()["count"].asInt()
}
println("Created and counted: $count")
}// ✅ Use when you need maximum control over session lifecycle
driver.coroutineSession().use { session ->
// Full control over session configuration and lifecycle
val summary = session.executeWrite { tx ->
tx.run("CREATE (p:Person {name: 'Bob'})").consume()
}
println("Nodes created: ${summary.counters().nodesCreated()}")
}- Sessions: Always use
.use { }to ensure proper cleanup - Results: Must be fully consumed (Flow collected or
consume()called) - Transactions: Managed transactions auto-commit/rollback; unmanaged require explicit
commit()/rollback()
records()Flow can only be collected once- After
consume()is called,records()cannot be collected - Results are automatically consumed when Flow collection completes
- Nested objects: Not supported in property mapping - use separate nodes with relationships
- Multi-collector Flows: Each result's
records()can only be collected once
See CLAUDE.md for development guidelines and architecture overview.
Apache License 2.0 - see LICENSE