Skip to content

Commit e7fbf64

Browse files
authored
Merge pull request #408 from WTZ468071157/hotfix_1.10_release_407
[hotfix-407][all] fix bugs and add dirty-plugins.
2 parents 4ecfa40 + c578d1c commit e7fbf64

File tree

137 files changed

+1738
-1656
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

137 files changed

+1738
-1656
lines changed

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,6 @@ bin/nohup.out
1616
.DS_Store
1717
bin/sideSql.txt
1818
*.keytab
19-
krb5.conf
19+
krb5.conf
20+
.gradle
21+
gradle

.gitlab-ci.yml

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,22 @@
1-
build:
1+
stages:
2+
- validate
3+
- test
4+
5+
test-job:
26
stage: test
37
script:
48
- mvn clean org.jacoco:jacoco-maven-plugin:0.7.8:prepare-agent package -Dmaven.test.failure.ignore=true -q
9+
only:
10+
- v1.10.0_dev
11+
tags:
12+
- dt-insight-engine
13+
14+
validate-job:
15+
stage: validate
16+
script:
517
- mvn sonar:sonar -Dsonar.projectKey="dt-insight-engine/flinkStreamSQL" -Dsonar.login=11974c5e9a29625efa09fdc3c3fdc031efb1aab1 -Dsonar.host.url=http://172.16.100.198:9000 -Dsonar.jdbc.url=jdbc:postgresql://172.16.100.198:5432/sonar -Dsonar.java.binaries=target/sonar
618
- sh ci/sonar_notify.sh
719
only:
8-
- v1.8.0_dev
20+
- v1.10.0_dev
921
tags:
10-
- dt-insight-engine
22+
- dt-insight-engine

README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ FlinkStreamSQL
3131

3232
## 目录
3333

34-
[ 1.0 变更记录](docs/changelog.md)
3534
[ 1.1 demo](docs/demo.md)
3635
[ 1.2 快速开始](docs/quickStart.md)
3736
[ 1.3 参数配置](docs/config.md)

cassandra/pom.xml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,6 @@
1717
</modules>
1818

1919
<dependencies>
20-
<dependency>
21-
<groupId>junit</groupId>
22-
<artifactId>junit</artifactId>
23-
<version>3.8.1</version>
24-
<scope>test</scope>
25-
</dependency>
2620
<dependency>
2721
<groupId>com.dtstack.flink</groupId>
2822
<artifactId>sql.core</artifactId>

clickhouse/clickhouse-side/clickhouse-all-side/src/main/java/com/dtstack/flink/sql/side/clickhouse/ClickhouseAllReqRow.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@
1818

1919
package com.dtstack.flink.sql.side.clickhouse;
2020

21+
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
2122
import com.dtstack.flink.sql.side.FieldInfo;
2223
import com.dtstack.flink.sql.side.JoinInfo;
2324
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
2425
import com.dtstack.flink.sql.side.rdb.all.AbstractRdbAllReqRow;
25-
import com.dtstack.flink.sql.util.JDBCUtils;
2626
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2727
import org.slf4j.Logger;
2828
import org.slf4j.LoggerFactory;
@@ -45,7 +45,7 @@ public ClickhouseAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<Fiel
4545
public Connection getConn(String dbUrl, String userName, String passWord) {
4646
try {
4747
Connection connection ;
48-
JDBCUtils.forName(CLICKHOUSE_DRIVER, getClass().getClassLoader());
48+
ClassLoaderManager.forName(CLICKHOUSE_DRIVER, getClass().getClassLoader());
4949
// ClickHouseProperties contains all properties
5050
if (userName == null) {
5151
connection = DriverManager.getConnection(dbUrl);

clickhouse/clickhouse-side/clickhouse-side-core/src/main/java/com/dtstack/flink/sql/side/clickhouse/table/ClickhouseSideParser.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package com.dtstack.flink.sql.side.clickhouse.table;
2121

22+
import com.dtstack.flink.sql.core.rdb.JdbcCheckKeys;
2223
import com.dtstack.flink.sql.side.rdb.table.RdbSideParser;
2324
import com.dtstack.flink.sql.table.AbstractTableInfo;
2425
import ru.yandex.clickhouse.domain.ClickHouseDataType;
@@ -39,6 +40,7 @@ public class ClickhouseSideParser extends RdbSideParser {
3940

4041
@Override
4142
public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {
43+
props.put(JdbcCheckKeys.DRIVER_NAME, "ru.yandex.clickhouse.ClickHouseDriver");
4244
AbstractTableInfo clickhouseTableInfo = super.getTableInfo(tableName, fieldsInfo, props);
4345
clickhouseTableInfo.setType(CURR_TYPE);
4446
return clickhouseTableInfo;

clickhouse/clickhouse-side/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,16 @@
3131
<artifactId>sql.side.rdb</artifactId>
3232
<version>${rdb.side.version}</version>
3333
</dependency>
34+
35+
<!-- test dependencies -->
36+
37+
<dependency>
38+
<groupId>com.dtstack.flink</groupId>
39+
<artifactId>sql.side.rdb</artifactId>
40+
<version>${rdb.side.version}</version>
41+
<type>test-jar</type>
42+
<scope>test</scope>
43+
</dependency>
3444
</dependencies>
3545

3646
</project>

clickhouse/clickhouse-sink/src/main/java/com/dtstack/flink/sql/sink/clickhouse/ClickhouseSink.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public JDBCUpsertOutputFormat getOutputFormat() {
5050
.setKeyFields(primaryKeys)
5151
.setAllReplace(allReplace)
5252
.setUpdateMode(updateMode)
53+
.setErrorLimit(errorLimit)
5354
.build();
5455
}
5556

clickhouse/clickhouse-sink/src/main/java/com/dtstack/flink/sql/sink/clickhouse/table/ClickhouseSinkParser.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package com.dtstack.flink.sql.sink.clickhouse.table;
2121

22+
import com.dtstack.flink.sql.core.rdb.JdbcCheckKeys;
2223
import com.dtstack.flink.sql.sink.rdb.table.RdbSinkParser;
2324
import com.dtstack.flink.sql.table.AbstractTableInfo;
2425
import ru.yandex.clickhouse.domain.ClickHouseDataType;
@@ -31,6 +32,7 @@ public class ClickhouseSinkParser extends RdbSinkParser {
3132

3233
@Override
3334
public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {
35+
props.put(JdbcCheckKeys.DRIVER_NAME, "ru.yandex.clickhouse.ClickHouseDriver");
3436
AbstractTableInfo clickhouseTableInfo = super.getTableInfo(tableName, fieldsInfo, props);
3537
clickhouseTableInfo.setType(CURR_TYPE);
3638
return clickhouseTableInfo;

console/pom.xml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,6 @@
1616
</modules>
1717

1818
<dependencies>
19-
<dependency>
20-
<groupId>junit</groupId>
21-
<artifactId>junit</artifactId>
22-
<version>3.8.1</version>
23-
<scope>test</scope>
24-
</dependency>
2519
<dependency>
2620
<groupId>com.dtstack.flink</groupId>
2721
<artifactId>sql.core</artifactId>

core/pom.xml

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@
176176
<include name="${project.artifactId}-${project.version}.jar" />
177177
</fileset>
178178
</copy>
179+
<!--suppress UnresolvedMavenProperty -->
179180
<move file="${basedir}/../sqlplugins/${project.artifactId}-${project.version}.jar"
180181
tofile="${basedir}/../sqlplugins/${project.name}-${git.branch}.jar" />
181182
</tasks>
@@ -184,7 +185,17 @@
184185
</executions>
185186
</plugin>
186187

187-
188+
<plugin>
189+
<groupId>org.apache.maven.plugins</groupId>
190+
<artifactId>maven-jar-plugin</artifactId>
191+
<executions>
192+
<execution>
193+
<goals>
194+
<goal>test-jar</goal>
195+
</goals>
196+
</execution>
197+
</executions>
198+
</plugin>
188199
</plugins>
189200
</build>
190201
</project>

core/src/main/java/com/dtstack/flink/sql/classloader/ClassLoaderManager.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.lang.reflect.Method;
3131
import java.net.URL;
3232
import java.net.URLClassLoader;
33+
import java.sql.DriverManager;
3334
import java.util.ArrayList;
3435
import java.util.Arrays;
3536
import java.util.Comparator;
@@ -45,8 +46,28 @@
4546
public class ClassLoaderManager {
4647

4748
private static final Logger LOG = LoggerFactory.getLogger(ClassLoaderManager.class);
49+
private static final Map<String, DtClassLoader> pluginClassLoader = new ConcurrentHashMap<>();
50+
private static final Object LOCK = new Object();
4851

49-
private static Map<String, DtClassLoader> pluginClassLoader = new ConcurrentHashMap<>();
52+
public static void forName(String clazz, ClassLoader classLoader) {
53+
synchronized (LOCK) {
54+
try {
55+
Class.forName(clazz, true, classLoader);
56+
DriverManager.setLoginTimeout(10);
57+
} catch (Exception e) {
58+
throw new RuntimeException(e);
59+
}
60+
}
61+
}
62+
63+
public synchronized static void forName(String clazz) {
64+
try {
65+
Class<?> driverClass = Class.forName(clazz);
66+
driverClass.newInstance();
67+
} catch (Exception e) {
68+
throw new RuntimeException(e);
69+
}
70+
}
5071

5172
public static <R> R newInstance(String pluginJarPath, ClassLoaderSupplier<R> supplier) throws Exception {
5273
ClassLoader classLoader = retrieveClassLoad(pluginJarPath);
@@ -109,11 +130,10 @@ public static List<URL> getClassPath() {
109130
}
110131

111132

112-
113133
public static URLClassLoader loadExtraJar(List<URL> jarUrlList, URLClassLoader classLoader)
114-
throws IllegalAccessException, InvocationTargetException {
115-
for(URL url : jarUrlList){
116-
if(url.toString().endsWith(".jar")){
134+
throws IllegalAccessException, InvocationTargetException {
135+
for (URL url : jarUrlList) {
136+
if (url.toString().endsWith(".jar")) {
117137
urlClassLoaderAddUrl(classLoader, url);
118138
}
119139
}

core/src/main/java/com/dtstack/flink/sql/exception/ExceptionTrace.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.dtstack.flink.sql.exception;
22

3+
import org.apache.flink.runtime.execution.SuppressRestartsException;
4+
35
import java.util.Objects;
46

57
/**
@@ -18,4 +20,21 @@ public static String traceOriginalCause(Throwable e) {
1820
}
1921
return errorMsg;
2022
}
23+
24+
/**
25+
* 根据异常的种类来判断是否需要强制跳过Flink的重启{@link SuppressRestartsException}
26+
* @param e exception
27+
* @param errorMsg 需要抛出的异常信息
28+
*/
29+
public static void dealExceptionWithSuppressStart(Exception e, String errorMsg) {
30+
if (e instanceof SuppressRestartsException) {
31+
throw new SuppressRestartsException(
32+
new Throwable(
33+
errorMsg
34+
)
35+
);
36+
} else {
37+
throw new RuntimeException(errorMsg);
38+
}
39+
}
2140
}

0 commit comments

Comments
 (0)