|
20 | 20 |
|
21 | 21 | package com.mysql.cj; |
22 | 22 |
|
23 | | -import java.io.Closeable; |
24 | 23 | import java.io.IOException; |
25 | 24 | import java.io.InputStream; |
26 | 25 | import java.io.Reader; |
|
50 | 49 | import com.mysql.cj.telemetry.TelemetrySpan; |
51 | 50 | import com.mysql.cj.telemetry.TelemetrySpanName; |
52 | 51 | import com.mysql.cj.util.StringUtils; |
| 52 | +import com.mysql.cj.util.Util; |
53 | 53 |
|
54 | 54 | // TODO should not be protocol-specific |
55 | 55 |
|
56 | 56 | public class ServerPreparedQuery extends ClientPreparedQuery { |
57 | 57 |
|
58 | | - public static final int BLOB_STREAM_READ_BUF_SIZE = 8192; |
59 | | - public static final byte OPEN_CURSOR_FLAG = 0x01; |
60 | | - public static final byte PARAMETER_COUNT_AVAILABLE = 0x08; |
| 58 | + private static final int READ_BUFFER_SIZE = 8192; |
| 59 | + private static final byte OPEN_CURSOR_FLAG = 0x01; |
| 60 | + private static final byte PARAMETER_COUNT_AVAILABLE = 0x08; |
61 | 61 |
|
62 | 62 | /** The ID that the server uses to identify this PreparedStatement */ |
63 | 63 | private long serverStatementId; |
@@ -409,24 +409,24 @@ private void serverLongData(int parameterIndex, BindValue binding) { |
409 | 409 | this.session.getProtocol() |
410 | 410 | .sendCommand(this.commandBuilder.buildComStmtSendLongData(packet, this.serverStatementId, parameterIndex, (byte[]) value), true, 0); |
411 | 411 | } else if (value instanceof InputStream) { |
412 | | - storeStreamOrReader(parameterIndex, packet, (InputStream) value); |
| 412 | + storeStream(parameterIndex, packet, (InputStream) value, binding.getScaleOrLength()); |
413 | 413 | } else if (value instanceof java.sql.Blob) { |
414 | 414 | try { |
415 | | - storeStreamOrReader(parameterIndex, packet, ((java.sql.Blob) value).getBinaryStream()); |
| 415 | + storeStream(parameterIndex, packet, ((java.sql.Blob) value).getBinaryStream(), binding.getScaleOrLength()); |
416 | 416 | } catch (Throwable t) { |
417 | 417 | throw ExceptionFactory.createException(t.getMessage(), this.session.getExceptionInterceptor()); |
418 | 418 | } |
419 | 419 | } else if (value instanceof Reader) { |
420 | 420 | if (binding.isNational() && !this.charEncoding.equalsIgnoreCase("UTF-8") && !this.charEncoding.equalsIgnoreCase("utf8")) { |
421 | 421 | throw ExceptionFactory.createException(Messages.getString("ServerPreparedStatement.31"), this.session.getExceptionInterceptor()); |
422 | 422 | } |
423 | | - storeStreamOrReader(parameterIndex, packet, (Reader) value); |
| 423 | + storeReader(parameterIndex, packet, (Reader) value, binding.getScaleOrLength()); |
424 | 424 | } else if (value instanceof Clob) { |
425 | 425 | if (binding.isNational() && !this.charEncoding.equalsIgnoreCase("UTF-8") && !this.charEncoding.equalsIgnoreCase("utf8")) { |
426 | 426 | throw ExceptionFactory.createException(Messages.getString("ServerPreparedStatement.31"), this.session.getExceptionInterceptor()); |
427 | 427 | } |
428 | 428 | try { |
429 | | - storeStreamOrReader(parameterIndex, packet, ((Clob) value).getCharacterStream()); |
| 429 | + storeReader(parameterIndex, packet, ((Clob) value).getCharacterStream(), binding.getScaleOrLength()); |
430 | 430 | } catch (Throwable t) { |
431 | 431 | throw ExceptionFactory.createException(t.getMessage(), t); |
432 | 432 | } |
@@ -477,88 +477,126 @@ public void setResultFields(ColumnDefinition resultFields) { |
477 | 477 | this.resultFields = resultFields; |
478 | 478 | } |
479 | 479 |
|
480 | | - private void storeStreamOrReader(int parameterIndex, NativePacketPayload packet, Closeable streamOrReader) { |
481 | | - // TODO consider to use more precise type than just Closable |
| 480 | + private void storeStream(int parameterIndex, NativePacketPayload packet, InputStream inStream, long length) { |
482 | 481 | this.session.checkClosed(); |
483 | | - boolean isStream = InputStream.class.isAssignableFrom(streamOrReader.getClass()); |
484 | | - byte[] bBuf = null; |
485 | | - char[] cBuf = null; |
486 | | - String clobEncoding = null; |
487 | 482 |
|
488 | 483 | this.session.getSessionLock().lock(); |
489 | 484 | try { |
490 | | - if (isStream) { |
491 | | - bBuf = new byte[BLOB_STREAM_READ_BUF_SIZE]; |
492 | | - } else { |
493 | | - clobEncoding = this.session.getPropertySet().getStringProperty(PropertyKey.clobCharacterEncoding).getStringValue(); |
494 | | - if (clobEncoding == null) { |
495 | | - clobEncoding = this.session.getPropertySet().getStringProperty(PropertyKey.characterEncoding).getValue(); |
| 485 | + inStream.mark(Integer.MAX_VALUE); // This same stream may have to be read several times, so it must be reset at the end. |
| 486 | + |
| 487 | + boolean limitLength = length == -1 ? false : this.session.getPropertySet().getBooleanProperty(PropertyKey.useStreamLengthsInPrepStmts).getValue(); |
| 488 | + int sendThreshold = this.session.getPropertySet().getMemorySizeProperty(PropertyKey.blobSendChunkSize).getValue(); |
| 489 | + |
| 490 | + byte[] buffer = new byte[READ_BUFFER_SIZE]; |
| 491 | + boolean dataSent = false; |
| 492 | + int bytesRead = 0; |
| 493 | + int bytesInPacket = 0; |
| 494 | + long lengthLeft = length; |
| 495 | + |
| 496 | + do { |
| 497 | + if (bytesInPacket == 0) { |
| 498 | + packet.setPosition(0); |
| 499 | + this.commandBuilder.buildComStmtSendLongDataHeader(packet, this.serverStatementId, parameterIndex); |
| 500 | + } |
| 501 | + |
| 502 | + if (limitLength) { |
| 503 | + bytesRead = Util.readBlock(inStream, buffer, lengthLeft, this.session.getExceptionInterceptor()); |
| 504 | + lengthLeft -= bytesRead; |
| 505 | + } else { |
| 506 | + bytesRead = Util.readBlock(inStream, buffer, this.session.getExceptionInterceptor()); |
| 507 | + } |
| 508 | + if (bytesRead > 0) { |
| 509 | + packet.writeBytes(StringLengthDataType.STRING_FIXED, buffer, 0, bytesRead); |
| 510 | + bytesInPacket += bytesRead; |
| 511 | + } |
| 512 | + |
| 513 | + if (bytesInPacket >= sendThreshold || bytesRead <= 0 && (!dataSent || bytesInPacket > 0)) { |
| 514 | + this.session.getProtocol().sendCommand(packet, true, 0); |
| 515 | + dataSent = true; |
| 516 | + bytesInPacket = 0; |
496 | 517 | } |
497 | | - int maxBytesChar = 2; |
498 | | - if (clobEncoding != null) { |
499 | | - maxBytesChar = this.session.getServerSession().getCharsetSettings().getMaxBytesPerChar(clobEncoding); |
500 | | - if (maxBytesChar == 1) { |
501 | | - maxBytesChar = 2; // for safety |
| 518 | + } while (bytesRead > 0); |
| 519 | + } finally { |
| 520 | + try { |
| 521 | + inStream.reset(); |
| 522 | + } catch (IOException e) { |
| 523 | + } |
| 524 | + |
| 525 | + if (this.autoClosePStmtStreams.getValue()) { |
| 526 | + if (inStream != null) { |
| 527 | + try { |
| 528 | + inStream.close(); |
| 529 | + } catch (IOException e) { |
502 | 530 | } |
503 | 531 | } |
504 | | - cBuf = new char[ServerPreparedQuery.BLOB_STREAM_READ_BUF_SIZE / maxBytesChar]; |
505 | 532 | } |
506 | 533 |
|
507 | | - boolean readAny = false; |
508 | | - int bytesInPacket = 0; |
509 | | - int totalBytesRead = 0; |
510 | | - int bytesReadAtLastSend = 0; |
511 | | - int packetIsFullAt = this.session.getPropertySet().getMemorySizeProperty(PropertyKey.blobSendChunkSize).getValue(); |
512 | | - int numRead = 0; |
| 534 | + this.session.getSessionLock().unlock(); |
| 535 | + } |
| 536 | + } |
513 | 537 |
|
| 538 | + private void storeReader(int parameterIndex, NativePacketPayload packet, Reader reader, long length) { |
| 539 | + this.session.checkClosed(); |
| 540 | + |
| 541 | + this.session.getSessionLock().lock(); |
| 542 | + try { |
514 | 543 | try { |
515 | | - packet.setPosition(0); |
516 | | - this.commandBuilder.buildComStmtSendLongDataHeader(packet, this.serverStatementId, parameterIndex); |
517 | | - |
518 | | - while ((numRead = isStream ? ((InputStream) streamOrReader).read(bBuf) : ((Reader) streamOrReader).read(cBuf)) != -1) { |
519 | | - readAny = true; |
520 | | - |
521 | | - if (isStream) { |
522 | | - packet.writeBytes(StringLengthDataType.STRING_FIXED, bBuf, 0, numRead); |
523 | | - bytesInPacket += numRead; |
524 | | - totalBytesRead += numRead; |
525 | | - } else { |
526 | | - byte[] valueAsBytes = StringUtils.getBytes(cBuf, 0, numRead, clobEncoding); |
527 | | - packet.writeBytes(StringSelfDataType.STRING_EOF, valueAsBytes); |
528 | | - bytesInPacket += valueAsBytes.length; |
529 | | - totalBytesRead += valueAsBytes.length; |
530 | | - } |
| 544 | + reader.mark(Integer.MAX_VALUE); // This same stream may have to be read several times, so it must be reset at the end. |
| 545 | + } catch (IOException e) { |
| 546 | + } |
531 | 547 |
|
532 | | - if (bytesInPacket >= packetIsFullAt) { |
533 | | - bytesReadAtLastSend = totalBytesRead; |
| 548 | + String clobEncoding = this.session.getPropertySet().getStringProperty(PropertyKey.clobCharacterEncoding).getStringValue(); |
| 549 | + if (clobEncoding == null) { |
| 550 | + clobEncoding = this.session.getPropertySet().getStringProperty(PropertyKey.characterEncoding).getValue(); |
| 551 | + } |
| 552 | + boolean limitLength = length == -1 ? false : this.session.getPropertySet().getBooleanProperty(PropertyKey.useStreamLengthsInPrepStmts).getValue(); |
| 553 | + int sendThreshold = this.session.getPropertySet().getMemorySizeProperty(PropertyKey.blobSendChunkSize).getValue(); |
534 | 554 |
|
535 | | - this.session.getProtocol().sendCommand(packet, true, 0); |
| 555 | + char[] buffer = new char[READ_BUFFER_SIZE]; |
| 556 | + boolean dataSent = false; |
| 557 | + int charsRead = 0; |
| 558 | + int bytesInPacket = 0; |
| 559 | + long lengthLeft = length; |
536 | 560 |
|
537 | | - bytesInPacket = 0; |
538 | | - packet.setPosition(0); |
539 | | - this.commandBuilder.buildComStmtSendLongDataHeader(packet, this.serverStatementId, parameterIndex); |
540 | | - } |
| 561 | + do { |
| 562 | + if (bytesInPacket == 0) { |
| 563 | + packet.setPosition(0); |
| 564 | + this.commandBuilder.buildComStmtSendLongDataHeader(packet, this.serverStatementId, parameterIndex); |
| 565 | + } |
| 566 | + |
| 567 | + if (limitLength) { |
| 568 | + charsRead = Util.readBlock(reader, buffer, lengthLeft, this.session.getExceptionInterceptor()); |
| 569 | + lengthLeft -= charsRead; |
| 570 | + } else { |
| 571 | + charsRead = Util.readBlock(reader, buffer, this.session.getExceptionInterceptor()); |
| 572 | + } |
| 573 | + if (charsRead > 0) { |
| 574 | + byte[] charsAsBytes = StringUtils.getBytes(buffer, 0, charsRead, clobEncoding); |
| 575 | + packet.writeBytes(StringSelfDataType.STRING_EOF, charsAsBytes); |
| 576 | + bytesInPacket += charsAsBytes.length; |
541 | 577 | } |
542 | 578 |
|
543 | | - if (!readAny || totalBytesRead != bytesReadAtLastSend) { |
| 579 | + if (bytesInPacket >= sendThreshold || charsRead <= 0 && (!dataSent || bytesInPacket > 0)) { |
544 | 580 | this.session.getProtocol().sendCommand(packet, true, 0); |
| 581 | + dataSent = true; |
| 582 | + bytesInPacket = 0; |
545 | 583 | } |
546 | | - } catch (IOException ioEx) { |
547 | | - throw ExceptionFactory.createException( |
548 | | - (isStream ? Messages.getString("ServerPreparedStatement.24") : Messages.getString("ServerPreparedStatement.25")) + ioEx.toString(), |
549 | | - ioEx, this.session.getExceptionInterceptor()); |
550 | | - } finally { |
551 | | - if (this.autoClosePStmtStreams.getValue()) { |
552 | | - if (streamOrReader != null) { |
553 | | - try { |
554 | | - streamOrReader.close(); |
555 | | - } catch (IOException ioEx) { |
556 | | - // ignore |
557 | | - } |
| 584 | + } while (charsRead > 0); |
| 585 | + } finally { |
| 586 | + try { |
| 587 | + reader.reset(); |
| 588 | + } catch (IOException e) { |
| 589 | + } |
| 590 | + |
| 591 | + if (this.autoClosePStmtStreams.getValue()) { |
| 592 | + if (reader != null) { |
| 593 | + try { |
| 594 | + reader.close(); |
| 595 | + } catch (IOException ioEx) { |
558 | 596 | } |
559 | 597 | } |
560 | 598 | } |
561 | | - } finally { |
| 599 | + |
562 | 600 | this.session.getSessionLock().unlock(); |
563 | 601 | } |
564 | 602 | } |
|
0 commit comments