|
17 | 17 |
|
18 | 18 | package org.apache.flink.connector.elasticsearch.sink;
|
19 | 19 |
|
20 |
| -import org.apache.flink.api.common.operators.MailboxExecutor; |
21 | 20 | import org.apache.flink.api.connector.sink2.SinkWriter;
|
22 | 21 | import org.apache.flink.api.connector.sink2.SinkWriter.Context;
|
23 | 22 | import org.apache.flink.api.java.tuple.Tuple2;
|
|
31 | 30 | import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
|
32 | 31 | import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
|
33 | 32 | import org.apache.flink.metrics.testutils.MetricListener;
|
| 33 | +import org.apache.flink.runtime.mailbox.SyncMailboxExecutor; |
34 | 34 | import org.apache.flink.runtime.metrics.MetricNames;
|
35 | 35 | import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
|
36 | 36 | import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
|
37 | 37 | import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
|
38 | 38 | import org.apache.flink.test.junit5.MiniClusterExtension;
|
39 |
| -import org.apache.flink.util.FlinkRuntimeException; |
40 | 39 | import org.apache.flink.util.TestLoggerExtension;
|
41 |
| -import org.apache.flink.util.function.ThrowingRunnable; |
42 | 40 |
|
43 | 41 | import org.apache.http.HttpHost;
|
44 | 42 | import org.elasticsearch.action.ActionListener;
|
@@ -329,7 +327,7 @@ private static ElasticsearchWriter<Tuple2<Integer, String>> createWriter(
|
329 | 327 | new DefaultBulkResponseInspector(),
|
330 | 328 | new NetworkClientConfig(null, null, null, null, null, null, null, null),
|
331 | 329 | metricGroup,
|
332 |
| - new TestMailbox()); |
| 330 | + new SyncMailboxExecutor()); |
333 | 331 | }
|
334 | 332 |
|
335 | 333 | private TestingSinkWriterMetricGroup getSinkWriterMetricGroup() {
|
@@ -481,29 +479,4 @@ GetResponse getResponse(String index, int id) throws IOException {
|
481 | 479 | return client.get(new GetRequest(index, Integer.toString(id)), RequestOptions.DEFAULT);
|
482 | 480 | }
|
483 | 481 | }
|
484 |
| - |
485 |
| - private static class TestMailbox implements MailboxExecutor { |
486 |
| - |
487 |
| - @Override |
488 |
| - public void execute( |
489 |
| - ThrowingRunnable<? extends Exception> command, |
490 |
| - String descriptionFormat, |
491 |
| - Object... descriptionArgs) { |
492 |
| - try { |
493 |
| - command.run(); |
494 |
| - } catch (Exception e) { |
495 |
| - throw new RuntimeException("Unexpected error", e); |
496 |
| - } |
497 |
| - } |
498 |
| - |
499 |
| - @Override |
500 |
| - public void yield() throws InterruptedException, FlinkRuntimeException { |
501 |
| - Thread.sleep(100); |
502 |
| - } |
503 |
| - |
504 |
| - @Override |
505 |
| - public boolean tryYield() throws FlinkRuntimeException { |
506 |
| - return false; |
507 |
| - } |
508 |
| - } |
509 | 482 | }
|
0 commit comments