Skip to content

Commit 08d9a55

Browse files
committed
超时数据处理==>返回null,并记录日志
1 parent bdaecef commit 08d9a55

File tree

1 file changed

+10
-1
lines changed

1 file changed

+10
-1
lines changed

core/src/main/java/com/dtstack/flink/sql/side/AsyncReqRow.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ public abstract class AsyncReqRow extends RichAsyncFunction<Row, Row> implements
5151
private static final Logger LOG = LoggerFactory.getLogger(AsyncReqRow.class);
5252
private static final long serialVersionUID = 2098635244857937717L;
5353

54+
private static int TIMEOUT_LOG_FLUSH_NUM = 10;
55+
private int timeOutNum = 0;
56+
5457
protected SideInfo sideInfo;
5558
protected transient Counter parseErrorRecords;
5659

@@ -119,7 +122,13 @@ protected void dealCacheData(String key, CacheObj missKeyObj) {
119122

120123
@Override
121124
public void timeout(Row input, ResultFuture<Row> resultFuture) throws Exception {
122-
resultFuture.completeExceptionally(new TimeoutException("Async function call has timed out. input:" + input.toString()));
125+
126+
//TODO 需要添加数据指标
127+
if(timeOutNum % TIMEOUT_LOG_FLUSH_NUM == 0){
128+
LOG.info("Async function call has timed out. input:" + input.toString());
129+
}
130+
131+
resultFuture.complete(null);
123132
}
124133

125134

0 commit comments

Comments
 (0)