Skip to content

Commit cef2673

Browse files
committed
Merge branch 'feat_solr' into '1.10_test_4.2.x'
Feat solr See merge request dt-insight-engine/flinkStreamSQL!277
2 parents 8c19de4 + 15e20d7 commit cef2673

File tree

16 files changed

+1198
-2
lines changed

16 files changed

+1198
-2
lines changed

core/src/main/java/com/dtstack/flink/sql/constant/PluginParamConsts.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,15 @@
2222
* @program: flinkStreamSQL
2323
* @author: wuren
2424
* @create: 2020/09/15
25-
**/
25+
*/
2626
public class PluginParamConsts {
27+
2728
public static final String PRINCIPAL = "principal";
2829
public static final String KEYTAB = "keytab";
2930
public static final String KRB5_CONF = "krb5conf";
31+
32+
public static final String SINK_BUFFER_FLUSH_MAX_ROWS = "sink.buffer-flush.max-rows";
33+
public static final String SINK_BUFFER_FLUSH_INTERVAL = "sink.buffer-flush.interval";
34+
public static final String SINK_MAX_RETRIES = "sink.max-retries";
35+
3036
}

core/src/main/java/com/dtstack/flink/sql/krb/KerberosTable.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ default void judgeKrbEnable() {
5959
} else if (allNotSet) {
6060
setEnableKrb(false);
6161
} else {
62-
throw new RuntimeException("Missing kerberos parameter! all kerberos params must be set, or all kerberos params are not set");
62+
throw new RuntimeException(
63+
"Missing kerberos parameter! all kerberos params must be set, or all kerberos params are not set");
6364
}
6465
}
6566
}

core/src/main/java/com/dtstack/flink/sql/util/KrbUtils.java

+12
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
package com.dtstack.flink.sql.util;
2020

2121
import com.esotericsoftware.minlog.Log;
22+
import org.apache.flink.runtime.security.DynamicConfiguration;
23+
import org.apache.flink.runtime.security.KerberosUtils;
2224
import org.apache.hadoop.conf.Configuration;
2325
import org.apache.hadoop.security.UserGroupInformation;
2426
import org.apache.hadoop.security.authentication.util.KerberosName;
@@ -27,6 +29,7 @@
2729
import sun.security.krb5.Config;
2830
import sun.security.krb5.KrbException;
2931

32+
import javax.security.auth.login.AppConfigurationEntry;
3033
import java.io.IOException;
3134

3235
/**
@@ -62,4 +65,13 @@ public static UserGroupInformation loginAndReturnUgi(String principal, String ke
6265
return UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytabPath);
6366
}
6467

68+
public synchronized static void appendJaasConf(String name, String keytab, String principal) {
69+
javax.security.auth.login.Configuration priorConfig = javax.security.auth.login.Configuration.getConfiguration();
70+
// construct a dynamic JAAS configuration
71+
DynamicConfiguration currentConfig = new DynamicConfiguration(priorConfig);
72+
// wire up the configured JAAS login contexts to use the krb5 entries
73+
AppConfigurationEntry krb5Entry = KerberosUtils.keytabEntry(keytab, principal);
74+
currentConfig.addAppConfigurationEntry(name, krb5Entry);
75+
javax.security.auth.login.Configuration.setConfiguration(currentConfig);
76+
}
6577
}

pom.xml

+1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
<module>aws</module>
4343
<module>file</module>
4444
<module>http</module>
45+
<module>solr</module>
4546

4647
<!-- 脏数据插件 -->
4748
<module>dirtyData</module>

solr/pom.xml

+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>flink.sql</artifactId>
7+
<groupId>com.dtstack.flink</groupId>
8+
<version>1.0-SNAPSHOT</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>sql.solr</artifactId>
13+
<packaging>pom</packaging>
14+
<modules>
15+
<module>solr-sink</module>
16+
</modules>
17+
18+
<properties>
19+
<sql.core.version>1.0-SNAPSHOT</sql.core.version>
20+
</properties>
21+
22+
<dependencies>
23+
<dependency>
24+
<groupId>com.dtstack.flink</groupId>
25+
<artifactId>sql.core</artifactId>
26+
<version>${sql.core.version}</version>
27+
<scope>provided</scope>
28+
</dependency>
29+
<dependency>
30+
<groupId>org.apache.solr</groupId>
31+
<artifactId>solr-solrj</artifactId>
32+
<version>7.4.0</version>
33+
<exclusions>
34+
<exclusion>
35+
<groupId>org.slf4j</groupId>
36+
<artifactId>slf4j-api</artifactId>
37+
</exclusion>
38+
</exclusions>
39+
</dependency>
40+
</dependencies>
41+
</project>

solr/solr-sink/pom.xml

+85
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>sql.solr</artifactId>
7+
<groupId>com.dtstack.flink</groupId>
8+
<version>1.0-SNAPSHOT</version>
9+
<relativePath>../pom.xml</relativePath>
10+
</parent>
11+
<modelVersion>4.0.0</modelVersion>
12+
13+
<artifactId>solr-sink</artifactId>
14+
15+
<dependencies>
16+
<dependency>
17+
<groupId>commons-codec</groupId>
18+
<artifactId>commons-codec</artifactId>
19+
<version>1.15</version>
20+
</dependency>
21+
</dependencies>
22+
23+
<build>
24+
<plugins>
25+
<plugin>
26+
<groupId>org.apache.maven.plugins</groupId>
27+
<artifactId>maven-shade-plugin</artifactId>
28+
<version>1.4</version>
29+
<executions>
30+
<execution>
31+
<phase>package</phase>
32+
<goals>
33+
<goal>shade</goal>
34+
</goals>
35+
<configuration>
36+
<createDependencyReducedPom>false</createDependencyReducedPom>
37+
<artifactSet>
38+
<excludes>
39+
40+
</excludes>
41+
</artifactSet>
42+
<filters>
43+
<filter>
44+
<artifact>*:*</artifact>
45+
<excludes>
46+
<exclude>META-INF/*.SF</exclude>
47+
<exclude>META-INF/*.DSA</exclude>
48+
<exclude>META-INF/*.RSA</exclude>
49+
</excludes>
50+
</filter>
51+
</filters>
52+
</configuration>
53+
</execution>
54+
</executions>
55+
</plugin>
56+
57+
<plugin>
58+
<artifactId>maven-antrun-plugin</artifactId>
59+
<version>1.2</version>
60+
<executions>
61+
<execution>
62+
<id>copy-resources</id>
63+
<!-- here the phase you need -->
64+
<phase>package</phase>
65+
<goals>
66+
<goal>run</goal>
67+
</goals>
68+
<configuration>
69+
<tasks>
70+
<copy todir="${basedir}/../../sqlplugins/solrsink">
71+
<fileset dir="target/">
72+
<include name="${project.artifactId}-${project.version}.jar"/>
73+
</fileset>
74+
</copy>
75+
76+
<move file="${basedir}/../../sqlplugins/solrsink/${project.artifactId}-${project.version}.jar"
77+
tofile="${basedir}/../../sqlplugins/solrsink/${project.name}-${git.branch}.jar"/>
78+
</tasks>
79+
</configuration>
80+
</execution>
81+
</executions>
82+
</plugin>
83+
</plugins>
84+
</build>
85+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.sink.solr;
20+
21+
import com.dtstack.flink.sql.factory.DTThreadFactory;
22+
import com.dtstack.flink.sql.outputformat.AbstractDtRichOutputFormat;
23+
import com.dtstack.flink.sql.sink.solr.client.CloudSolrClientProvider;
24+
import com.dtstack.flink.sql.sink.solr.options.KerberosOptions;
25+
import com.dtstack.flink.sql.sink.solr.options.SolrClientOptions;
26+
import com.dtstack.flink.sql.sink.solr.options.SolrWriteOptions;
27+
import org.apache.flink.api.java.tuple.Tuple2;
28+
import org.apache.flink.configuration.Configuration;
29+
import org.apache.flink.types.Row;
30+
import org.apache.solr.client.solrj.SolrServerException;
31+
import org.apache.solr.common.SolrInputDocument;
32+
import org.slf4j.Logger;
33+
import org.slf4j.LoggerFactory;
34+
import sun.security.krb5.KrbException;
35+
36+
import javax.security.auth.login.LoginException;
37+
import java.io.IOException;
38+
import java.security.PrivilegedActionException;
39+
import java.util.concurrent.ScheduledExecutorService;
40+
import java.util.concurrent.ScheduledFuture;
41+
import java.util.concurrent.ScheduledThreadPoolExecutor;
42+
import java.util.concurrent.TimeUnit;
43+
import java.util.concurrent.atomic.AtomicInteger;
44+
45+
/**
46+
* @author wuren
47+
* @program flinkStreamSQL
48+
* @create 2021/05/17
49+
*/
50+
public class SolrOutputFormat extends AbstractDtRichOutputFormat<Tuple2<Boolean, Row>> {
51+
52+
private static final Logger LOG = LoggerFactory.getLogger(SolrOutputFormat.class);
53+
54+
// private transient SolrClient solrClient;
55+
private final SolrClientOptions solrClientOptions;
56+
private final SolrWriteOptions solrWriteOptions;
57+
private final KerberosOptions kerberosOptions;
58+
private transient AtomicInteger rowCount;
59+
protected String[] fieldNames;
60+
private transient ScheduledExecutorService scheduler;
61+
private transient ScheduledFuture<?> scheduledFuture;
62+
CloudSolrClientProvider provider;
63+
64+
public SolrOutputFormat(
65+
SolrClientOptions solrClientOptions,
66+
SolrWriteOptions solrWriteOptions,
67+
KerberosOptions kerberosOptions,
68+
String[] fieldNames) {
69+
this.solrClientOptions = solrClientOptions;
70+
this.solrWriteOptions = solrWriteOptions;
71+
this.kerberosOptions = kerberosOptions;
72+
this.fieldNames = fieldNames;
73+
}
74+
75+
@Override
76+
public void configure(Configuration parameters) {}
77+
78+
@Override
79+
public void open(int taskNumber, int numTasks) throws IOException {
80+
provider = new CloudSolrClientProvider(solrClientOptions, kerberosOptions);
81+
try {
82+
provider.getClient();
83+
} catch (KrbException | LoginException e) {
84+
LOG.error("", e);
85+
}
86+
initMetric();
87+
initIntervalFlush();
88+
rowCount = new AtomicInteger(0);
89+
}
90+
91+
@Override
92+
public void writeRecord(Tuple2<Boolean, Row> record) throws IOException {
93+
Row row = record.getField(1);
94+
SolrInputDocument solrDocument = new SolrInputDocument();
95+
int columnIndex = 0;
96+
for (String name : fieldNames) {
97+
solrDocument.setField(name, row.getField(columnIndex));
98+
columnIndex++;
99+
}
100+
try {
101+
provider.add(solrDocument);
102+
} catch (SolrServerException | PrivilegedActionException e) {
103+
LOG.error("", e);
104+
}
105+
if (rowCount.incrementAndGet() >= solrWriteOptions.getBufferFlushMaxRows()) {
106+
flush();
107+
}
108+
}
109+
110+
@Override
111+
public void close() throws IOException {
112+
flush();
113+
try {
114+
provider.close();
115+
} catch (PrivilegedActionException e) {
116+
LOG.error("", e);
117+
}
118+
}
119+
120+
private void initIntervalFlush() {
121+
if (solrWriteOptions.getBufferFlushIntervalMillis() != 0
122+
&& solrWriteOptions.getBufferFlushMaxRows() != 1) {
123+
this.scheduler =
124+
new ScheduledThreadPoolExecutor(1, new DTThreadFactory("solr-batch-flusher"));
125+
this.scheduledFuture =
126+
this.scheduler.scheduleWithFixedDelay(
127+
() -> {
128+
flush();
129+
},
130+
solrWriteOptions.getBufferFlushIntervalMillis(),
131+
solrWriteOptions.getBufferFlushIntervalMillis(),
132+
TimeUnit.MILLISECONDS);
133+
}
134+
}
135+
136+
private synchronized void flush() {
137+
try {
138+
provider.commit();
139+
rowCount.set(0);
140+
} catch (SolrServerException | IOException | PrivilegedActionException e) {
141+
LOG.error("", e);
142+
}
143+
}
144+
}

0 commit comments

Comments
 (0)