Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 15 additions & 76 deletions src/main/scala/services/mssql/MsSqlConnection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@ package services.mssql
import logging.ZIOLogAnnotations.zlogStream
import models.schemas.{ArcaneSchema, DataRow, given_CanAdd_ArcaneSchema}
import services.base.SchemaProvider
import services.mssql.MsSqlConnection.{VersionedBatch, closeSafe, executeQuerySafe}
import services.mssql.MsSqlConnection.VersionedBatch
import services.mssql.QueryProvider.{getBackfillQuery, getChangesQuery, getSchemaQuery}
import services.mssql.SqlSchema.toSchema
import services.mssql.base.{CanPeekHead, MsSqlServerFieldsFilteringService, QueryResult}
import services.mssql.query.LazyQueryResult.toDataRow
import services.mssql.query.{LazyQueryResult, ScalarQueryResult}
import services.mssql.base.{MsSqlServerFieldsFilteringService, QueryResult}
import services.mssql.query.{ResultSetZIO, ScalarQueryResultZIO}
import services.mssql.query.MsSqlResultSet.*

import com.microsoft.sqlserver.jdbc.SQLServerDriver
import zio.stream.ZStream
import zio.{Scope, Task, UIO, ZIO, ZLayer}
import zio.*

import java.sql.{Connection, ResultSet, Statement}
import java.time.Duration
Expand Down Expand Up @@ -82,15 +82,7 @@ class MsSqlConnection(
_ <- zlogStream("Acquired result set with fetch size %s", resultSet.getFetchSize.toString)
_ <- ZStream.succeed(resultSet.setFetchSize(connectionOptions.fetchSize.getOrElse(1000)))
_ <- zlogStream("Updated result set fetch size to %s", resultSet.getFetchSize.toString)
stream <- ZStream.unfoldZIO(resultSet.next()) { hasNext =>
if hasNext then
for
columns <- ZIO.attemptBlockingInterrupt(resultSet.getMetaData.getColumnCount)
row <- ZIO.fromTry(toDataRow(resultSet, columns, List.empty))
hasNextRow <- ZIO.attemptBlockingInterrupt(resultSet.next())
yield Some((row, hasNextRow))
else ZIO.succeed(None)
}
stream <- resultSet.toZStream.map(implicitly)
yield stream

/** Gets the changes in the database since the given version.
Expand All @@ -105,16 +97,14 @@ class MsSqlConnection(
val query = QueryProvider.getChangeTrackingVersionQuery(maybeLatestVersion, lookBackInterval)
ZIO.scoped {
for
versionResult <- ZIO.fromAutoCloseable(
executeQuery(query, connection, (st, rs) => ScalarQueryResult.apply(st, rs, readChangeTrackingVersion))
)
version = versionResult.read.getOrElse(Long.MaxValue)
versionResult <- executeQuery(query, connection)
version <- ScalarQueryResultZIO(versionResult, readChangeTrackingVersion).read.map(_.getOrElse(Long.MaxValue))
changesQuery <- this.getChangesQuery(version - 1)

// We don't need to close the statement/result set here, since the ownership is passed to the LazyQueryResult
// And the LazyQueryResult will close the statement/result set when it is closed.
result <- executeQuery(changesQuery, connection, LazyQueryResult.apply)
yield MsSqlConnection.ensureHead((result, maybeLatestVersion.getOrElse(0)))
result <- executeQuery(changesQuery, connection)
yield (ResultSetZIO(result), maybeLatestVersion.getOrElse(0))
}

private def readChangeTrackingVersion(resultSet: ResultSet): Option[Long] =
Expand Down Expand Up @@ -178,17 +168,14 @@ class MsSqlConnection(
if !hasNext then return result
readColumns(resultSet, result ++ List((resultSet.getString(1), resultSet.getInt(2) == 1)))

private type ResultFactory[QueryResultType] = (Statement, ResultSet) => QueryResultType

private def executeQuery[QueryResultType](
private def executeQuery(
query: MsSqlQuery,
connection: Connection,
resultFactory: ResultFactory[QueryResultType]
): Task[QueryResultType] =
): Task[ResultSet] =
for
statement <- ZIO.attemptBlocking(connection.createStatement())
resultSet <- ZIO.attemptBlocking(statement.executeQuery(query))
yield resultFactory(statement, resultSet)
resultSet <- ZIO.acquireReleaseWith(ZIO.attempt(statement.executeQuery(query)))(rs => rs.closeSafe(statement)) { rs => ZIO.succeed(rs) }
yield resultSet

object MsSqlConnection:

Expand Down Expand Up @@ -223,56 +210,8 @@ object MsSqlConnection:

/** Represents a batch of data.
*/
type DataBatch = QueryResult[LazyQueryResult.OutputType] & CanPeekHead[LazyQueryResult.OutputType]

/** Represents a batch of data that can be used to backfill the data. Since the data is not versioned, the version is
* always 0, and we don't need to be able to peek the head of the result.
*/
type BackfillBatch = QueryResult[LazyQueryResult.OutputType]
type DataBatch = QueryResult[ZStream[Any, Throwable, DataRow]]

/** Represents a versioned batch of data.
*/
type VersionedBatch = (DataBatch, Long)

/** Closes the result in a safe way. MsSQL JDBC driver enforces the result set to iterate over all the rows returned
* by the query if the result set is being closed without cancelling the statement first. see:
* https://github.com/microsoft/mssql-jdbc/issues/877 for details. ALL RESULT SETS CREATED FROM MS SQL CONNECTION
* MUST BE CLOSED THIS WAY
* @param resultSet
* The result set to close.
* @param statement
* The statement to close.
* @return
* Scoped effect that tracks the result set and closes it when the effect is completed.
*/
extension (statement: Statement)
def executeQuerySafe(query: String): ZIO[Scope, Throwable, ResultSet] =
for resultSet <- ZIO.acquireRelease(ZIO.attemptBlocking(statement.executeQuery(query)))(rs =>
rs.closeSafe(statement)
)
yield resultSet

/** Closes the result in a safe way. MsSQL JDBC driver enforces the result set to iterate over all the rows returned
* by the query if the result set is being closed without cancelling the statement first. see:
* https://github.com/microsoft/mssql-jdbc/issues/877 for details. ALL RESULT SETS CREATED FROM MS SQL CONNECTION
* MUST BE CLOSED THIS WAY
* @param resultSet
* The result set to close.
* @param statement
* The statement to close.
* @return
* UIO[Unit] that completes when the result set is closed.
*/
extension (resultSet: ResultSet)
def closeSafe(statement: Statement): UIO[Unit] =
for
_ <- ZIO.succeed(statement.cancel())
_ <- ZIO.succeed(resultSet.close())
yield ()

/** Ensures that the head of the result (if any) saved and cannot be lost This is required to let the head function
* work properly.
*/
private def ensureHead(result: VersionedBatch): VersionedBatch =
val (queryResult, version) = result
(queryResult.peekHead, version)
26 changes: 11 additions & 15 deletions src/main/scala/services/mssql/MsSqlDataProvider.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,29 @@ package services.mssql
import models.schemas.DataRow
import services.mssql.MsSqlConnection.VersionedBatch
import services.mssql.base.MssqlVersionedDataProvider
import services.streaming.base.HasVersion

import com.sneaksanddata.arcane.framework.logging.ZIOLogAnnotations.zlog
import zio.stream.ZStream
import zio.{Task, ZIO, ZLayer}

import java.time.Duration
import scala.util.{Failure, Try}

given HasVersion[VersionedBatch] with
type VersionType = Option[Long]

private val partial: PartialFunction[VersionedBatch, Option[Long]] =
extension (result: VersionedBatch)
def getLatestVersion: ZIO[Any, Throwable, Option[Long]] = result match
case (queryResult, version: Long) =>
// If the database response is empty, we can't extract the version from it and return the old version.
queryResult.read.headOption match
case None => Some(version)
queryResult.read.runHead.flatMap {
// If the database response is empty, we can't extract the version from it and return the old version.
case None => ZIO.succeed(Some(version))
case Some(row) =>
// If the database response is not empty, we can extract the version from any row of the response.
// Let's take the first row and try to extract the version from it.
val dataVersion = row.filter(_.name == "ChangeTrackingVersion") match
// For logging purposes will be used in the future.
case Nil => Failure(new UnsupportedOperationException("No ChangeTrackingVersion found in row."))
case version :: _ => Try(version.value.asInstanceOf[Long])
dataVersion.toOption

extension (result: VersionedBatch)
def getLatestVersion: this.VersionType = partial.applyOrElse(result, (_: VersionedBatch) => None)
// Hard fail when ChangeTrackingVersion is not found
case Nil => ZIO.die(new Throwable("No ChangeTrackingVersion found in row."))
case version :: _ => ZIO.attempt(version.value.asInstanceOf[Long]).map(Option.apply).orElse(ZIO.succeed(Option.empty[Long]))
Copy link
Contributor

@s-vitaliy s-vitaliy May 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor, but worth mentioning:

Suggested change
case version :: _ => ZIO.attempt(version.value.asInstanceOf[Long]).map(Option.apply).orElse(ZIO.succeed(Option.empty[Long]))
case version :: _ => ZIO.attempt(version.value.asInstanceOf[Long]).map(Option.apply).orElse(ZIO.succeed(None))

None is better here because the type can be inferred.

dataVersion
}

/** A data provider that reads the changes from the Microsoft SQL Server.
* @param msSqlConnection
Expand Down
31 changes: 10 additions & 21 deletions src/main/scala/services/mssql/MsSqlStreamingDataProvider.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class MsSqlStreamingDataProvider(
override def stream: ZStream[Any, Throwable, DataRow] =
val stream =
if streamContext.IsBackfilling then dataProvider.requestBackfill
else ZStream.unfoldZIO(None)(v => continueStream(v)).flatMap(readDataBatch)
else ZStream.unfoldZIO(None)(v => continueStream(v)).flatMap(b => b.read)
stream
.map(row =>
row.map {
Expand All @@ -58,34 +58,23 @@ class MsSqlStreamingDataProvider(
}
)

private def readDataBatch[T <: AutoCloseable & QueryResult[LazyList[DataRow]]](
batch: T
): ZStream[Any, Throwable, DataRow] =
for
data <- ZStream.acquireReleaseWith(ZIO.succeed(batch))(b => ZIO.succeed(b.close()))
rowsList <- ZStream.fromZIO(ZIO.attemptBlocking(data.read))
row <- ZStream.fromIterable(rowsList, 1)
yield row

private def continueStream(previousVersion: Option[Long]): ZIO[Any, Throwable, Some[(DataBatch, Option[Long])]] =
for
versionedBatch <- dataProvider.requestChanges(previousVersion, settings.lookBackInterval)
_ <- zlog(s"Received versioned batch: ${versionedBatch.getLatestVersion}")
_ <- maybeSleep(versionedBatch)
latestVersion = versionedBatch.getLatestVersion
latestVersion <- versionedBatch.getLatestVersion
(queryResult, _) = versionedBatch
_ <- zlog(s"Latest version: ${versionedBatch.getLatestVersion}")
_ <- zlog(s"Latest version: $latestVersion")
yield Some(queryResult, latestVersion)

private def maybeSleep(versionedBatch: VersionedBatch): ZIO[Any, Nothing, Unit] =
versionedBatch match
case (queryResult, _) =>
val headOption = queryResult.read.headOption
if headOption.isEmpty then
zlog("No data in the batch, sleeping for the configured interval.") *> ZIO.sleep(
settings.changeCaptureInterval
)
else zlog("Data found in the batch, continuing without sleep.") *> ZIO.unit
private def maybeSleep(versionedBatch: VersionedBatch): ZIO[Any, Throwable, Unit] = versionedBatch match
case (queryResult, _) => queryResult.read.runHead.flatMap {
case None => zlog("No data in the batch, sleeping for the configured interval.") *> ZIO.sleep(
settings.changeCaptureInterval
)
case Some(_) => zlog("Data found in the batch, continuing without sleep.") *> ZIO.unit
}

object MsSqlStreamingDataProvider:

Expand Down
15 changes: 1 addition & 14 deletions src/main/scala/services/mssql/base/QueryResult.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package services.mssql.base

/** Represents the result of a query to a SQL database.
*/
trait QueryResult[Output] extends AutoCloseable:
trait QueryResult[Output]:

/** The output type of the query result.
*/
Expand All @@ -15,16 +15,3 @@ trait QueryResult[Output] extends AutoCloseable:
* The result of the query.
*/
def read: Output

/** Represents a query result that can peek the head of the result.
*
* @tparam Output
* The type of the output of the query.
*/
trait CanPeekHead[Output]:
/** Peeks the head of the result of the SQL query mapped to an output type.
*
* @return
* The head of the result of the query.
*/
def peekHead: QueryResult[Output] & CanPeekHead[Output]
16 changes: 0 additions & 16 deletions src/main/scala/services/mssql/base/ResultSetOwner.scala

This file was deleted.

87 changes: 0 additions & 87 deletions src/main/scala/services/mssql/query/LazyQueryResult.scala

This file was deleted.

Loading
Loading