1515import tech .ydb .jdbc .YdbQueryResult ;
1616import tech .ydb .jdbc .YdbStatement ;
1717import tech .ydb .jdbc .YdbTracer ;
18+ import tech .ydb .jdbc .exception .YdbStatusable ;
1819import tech .ydb .jdbc .impl .YdbQueryResultExplain ;
1920import tech .ydb .jdbc .impl .YdbQueryResultReader ;
2021import tech .ydb .jdbc .impl .YdbQueryResultStatic ;
2122import tech .ydb .jdbc .impl .YdbResultSetMemory ;
2223import tech .ydb .jdbc .query .QueryType ;
2324import tech .ydb .jdbc .query .YdbQuery ;
2425import tech .ydb .jdbc .settings .YdbOperationProperties ;
26+ import tech .ydb .jdbc .spi .YdbQueryExtentionService ;
2527import tech .ydb .query .QueryClient ;
2628import tech .ydb .query .QuerySession ;
2729import tech .ydb .query .QueryStream ;
3133import tech .ydb .query .settings .CommitTransactionSettings ;
3234import tech .ydb .query .settings .ExecuteQuerySettings ;
3335import tech .ydb .query .settings .QueryExecMode ;
34- import tech .ydb .query .settings .QueryStatsMode ;
3536import tech .ydb .query .settings .RollbackTransactionSettings ;
3637import tech .ydb .query .tools .QueryReader ;
3738import tech .ydb .table .query .Params ;
@@ -44,6 +45,7 @@ public class QueryServiceExecutor extends BaseYdbExecutor {
4445 private final Duration sessionTimeout ;
4546 private final QueryClient queryClient ;
4647 private final boolean useStreamResultSet ;
48+ private final YdbQueryExtentionService querySpi ;
4749
4850 private int transactionLevel ;
4951 private boolean isReadOnly ;
@@ -59,6 +61,7 @@ public QueryServiceExecutor(YdbContext ctx) throws SQLException {
5961 this .sessionTimeout = options .getSessionTimeout ();
6062 this .queryClient = ctx .getQueryClient ();
6163 this .useStreamResultSet = options .getUseStreamResultSets ();
64+ this .querySpi = ctx .getQuerySpi ();
6265
6366 this .transactionLevel = options .getTransactionLevel ();
6467 this .isAutoCommit = options .isAutoCommit ();
@@ -225,17 +228,21 @@ public void rollback(YdbContext ctx, YdbValidator validator) throws SQLException
225228 }
226229
227230 @ Override
228- public YdbResultSetMemory [] executeInMemoryQuery (YdbStatement statement , String preparedYql , Params params )
229- throws SQLException {
231+ public YdbResultSetMemory [] executeInMemoryQuery (YdbStatement statement , YdbQuery query , String preparedYql ,
232+ Params params ) throws SQLException {
230233 ensureOpened ();
231234
232235 YdbValidator validator = statement .getValidator ();
236+
237+ String yql = prefixPragma + preparedYql ;
238+ YdbQueryExtentionService .QueryCall spi = querySpi .newDataQuery (statement , query , yql );
239+
233240 int timeout = statement .getQueryTimeout ();
234241 ExecuteQuerySettings .Builder settings = ExecuteQuerySettings .newBuilder ();
235- settings = settings .withStatsMode (QueryStatsMode .valueOf (statement .getStatsCollectionMode ().name ()));
236242 if (timeout > 0 ) {
237243 settings = settings .withRequestTimeout (timeout , TimeUnit .SECONDS );
238244 }
245+ settings = spi .prepareQuerySettings (settings );
239246
240247 QueryTransaction nextTx = tx .get ();
241248 while (nextTx == null ) {
@@ -246,11 +253,9 @@ public YdbResultSetMemory[] executeInMemoryQuery(YdbStatement statement, String
246253 }
247254 }
248255
249- final QueryTransaction localTx = nextTx ;
256+ QueryTransaction localTx = nextTx ;
250257 YdbTracer tracer = statement .getConnection ().getCtx ().getTracer ();
251258
252- String yql = prefixPragma + preparedYql ;
253-
254259 try {
255260 tracer .trace ("--> data query" );
256261 tracer .query (yql );
@@ -266,8 +271,19 @@ public YdbResultSetMemory[] executeInMemoryQuery(YdbStatement statement, String
266271 readers [idx ] = new YdbResultSetMemory (types , statement , result .getResultSet (idx ));
267272 }
268273
269- // queryResult.setQueryStats(result.getQueryInfo().getStats());
274+ if (result .getQueryInfo ().hasStats ()) {
275+ spi .onQueryStats (result .getQueryInfo ().getStats ());
276+ }
277+
278+ spi .onQueryResult (Status .SUCCESS , null );
270279 return readers ;
280+ } catch (SQLException | RuntimeException ex ) {
281+ if (ex instanceof YdbStatusable ) {
282+ spi .onQueryResult (((YdbStatusable ) ex ).getStatus (), null );
283+ } else {
284+ spi .onQueryResult (null , ex );
285+ }
286+ throw ex ;
271287 } finally {
272288 if (!localTx .isActive ()) {
273289 if (tx .compareAndSet (localTx , null )) {
@@ -289,16 +305,20 @@ public YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, S
289305 ensureOpened ();
290306
291307 if (!useStreamResultSet ) {
292- YdbResultSetMemory [] readers = executeInMemoryQuery (statement , preparedYql , params );
308+ YdbResultSetMemory [] readers = executeInMemoryQuery (statement , query , preparedYql , params );
293309 return updateCurrentResult (new YdbQueryResultStatic (query , readers ));
294310 }
295311
296312 YdbValidator validator = statement .getValidator ();
313+ String yql = prefixPragma + preparedYql ;
314+ YdbQueryExtentionService .QueryCall spi = querySpi .newDataQuery (statement , query , yql );
315+
297316 int timeout = statement .getQueryTimeout ();
298317 ExecuteQuerySettings .Builder settings = ExecuteQuerySettings .newBuilder ();
299318 if (timeout > 0 ) {
300319 settings = settings .withRequestTimeout (timeout , TimeUnit .SECONDS );
301320 }
321+ settings = spi .prepareQuerySettings (settings );
302322
303323 QueryTransaction nextTx = tx .get ();
304324 while (nextTx == null ) {
@@ -309,18 +329,16 @@ public YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, S
309329 }
310330 }
311331
312- final QueryTransaction localTx = nextTx ;
332+ QueryTransaction localTx = nextTx ;
313333 YdbTracer tracer = statement .getConnection ().getCtx ().getTracer ();
314-
315- String yql = prefixPragma + preparedYql ;
316-
317334 tracer .trace ("--> stream query" );
318335 tracer .query (yql );
319336 String msg = "STREAM_QUERY >>\n " + yql ;
320337
321338 YdbQueryResultReader reader = new YdbQueryResultReader (types , statement , query ) {
322339 @ Override
323340 public void onClose (Status status , Throwable th ) {
341+ spi .onQueryResult (status , th );
324342 if (th != null ) {
325343 tracer .trace ("<-- " + th .getMessage ());
326344 }
@@ -344,7 +362,7 @@ public void onClose(Status status, Throwable th) {
344362
345363 settings = settings .withGrpcFlowControl (reader );
346364 QueryStream stream = localTx .createQuery (yql , isAutoCommit , params , settings .build ());
347- validator .execute (msg , tracer , () -> reader .load (validator , stream ));
365+ validator .execute (msg , tracer , () -> reader .load (validator , stream , spi :: onQueryStats ));
348366 return updateCurrentResult (reader );
349367 }
350368
0 commit comments