@@ -330,6 +330,8 @@ void exit(){
330
330
private boolean ifCallback_ = false ;
331
331
private ColInfo [] colInfos_ ;
332
332
private boolean enableActualSendTime_ = false ;
333
+ private boolean isSetStreamTableTimestamp ;
334
+
333
335
public MultithreadedTableWriter (String hostName , int port , String userId , String password ,
334
336
String dbName , String tableName , boolean useSSL ,
335
337
boolean enableHighAvailability , String [] highAvailabilitySites ,
@@ -561,6 +563,17 @@ private void init(String hostName, int port, String userId, String password,
561
563
562
564
BasicTable colDefs = (BasicTable )schema .get (new BasicString ("colDefs" ));
563
565
BasicIntVector colDefsTypeInt = (BasicIntVector )colDefs .getColumn ("typeInt" );
566
+ try {
567
+ BasicString streamTableTimestampColName = (BasicString ) pConn .run ("getStreamTableTimestamp(" + tableName_ + ")" );
568
+ if (Objects .nonNull (streamTableTimestampColName )) {
569
+ isSetStreamTableTimestamp = true ;
570
+ }
571
+ } catch (Exception e ) {
572
+ if (!e .getMessage ().contains ("Cannot recognize the token getStreamTableTimestamp" )) {
573
+ throw e ;
574
+ }
575
+ }
576
+
564
577
int columnSize = colDefs .rows ();
565
578
if (Objects .nonNull (compressTypes_ )) {
566
579
if (!enableActualSendTime && compressTypes_ .length != columnSize )
@@ -574,11 +587,18 @@ private void init(String hostName, int port, String userId, String password,
574
587
compressTypes_ = copy ;
575
588
}
576
589
}
577
- if (callbackHandler != null ){
590
+
591
+ if (callbackHandler != null ) {
578
592
ifCallback_ = true ;
579
- colInfos_ = new ColInfo [columnSize +1 ];
580
- }else
581
- colInfos_ = new ColInfo [columnSize ];
593
+ columnSize ++;
594
+ }
595
+
596
+ if (isSetStreamTableTimestamp ) {
597
+ columnSize --;
598
+ }
599
+
600
+ colInfos_ = new ColInfo [columnSize ];
601
+
582
602
for (int i = 0 ; i < colInfos_ .length ; i ++){
583
603
ColInfo colInfo = new ColInfo ();
584
604
colInfos_ [i ] = colInfo ;
@@ -852,7 +872,8 @@ public ErrorCodeInfo insert(Object... args){
852
872
throw new RuntimeException ("Thread is exiting. " );
853
873
}
854
874
if ((!enableActualSendTime_ && args .length != colInfos_ .length ) ||
855
- (enableActualSendTime_ && args .length != colInfos_ .length - 1 )) {
875
+ (enableActualSendTime_ && !isSetStreamTableTimestamp && args .length != colInfos_ .length - 1 ) ||
876
+ enableActualSendTime_ && isSetStreamTableTimestamp && args .length != colInfos_ .length - 1 ) {
856
877
return new ErrorCodeInfo (ErrorCodeInfo .Code .EC_InvalidParameter , "Column counts don't match." );
857
878
}
858
879
0 commit comments