-
Notifications
You must be signed in to change notification settings - Fork 1
Add safe kafka connectors #68
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a few comments, not sure if they are helpful or not at this stage, so feel free to ignore anything that you consider not helpful right now
...org/apache/flink/streaming/connectors/kafka/table/SafeDynamicKafkaDeserializationSchema.java
Outdated
Show resolved
Hide resolved
connectors/kafka/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
Outdated
Show resolved
Hide resolved
...org/apache/flink/streaming/connectors/kafka/table/SafeDynamicKafkaDeserializationSchema.java
Outdated
Show resolved
Hide resolved
FWIW, on sqrl I made a test using testcontainers and that feel far better eventually I will back port this approach here, worth taking a look before trying to expand on the present setup |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot reviewed 8 out of 13 changed files in this pull request and generated no comments.
Files not reviewed (5)
- connectors/kafka/pom.xml: Language not supported
- connectors/kafka/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory: Language not supported
- connectors/pom.xml: Language not supported
- flink-sql-runner/pom.xml: Language not supported
- pom.xml: Language not supported
Comments suppressed due to low confidence (2)
connectors/kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeDynamicKafkaDeserializationSchema.java:237
- Consider revising the error message to correct the grammatical mistake (e.g., remove the extra 'to' so that it reads 'Could not set row kind for output record').
throw new DeserializationException("Invalid null value received in non-upsert mode. Could not to set row kind for output record.");
connectors/kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeDynamicKafkaDeserializationSchema.java:248
- Using an assert here may not be effective in production environments; consider replacing it with an explicit null check and proper error handling to prevent potential runtime issues.
assert physicalKeyRow != null;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed that project coverage dropped considerably.
I don't think the integration testing is actually testing the kafka-safe.
Could you please take a look?
"connector" : "kafka-safe", | ||
"format" : "json", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing I don't like about the compiled-plans tests is that it take a good chunk of effort to figure out what are the important bits.
I don't have a solution for this particular issue.
But, if you have any ideas.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, I just grabbed the old file and used it as is, cause it was the least amount of effort required.
Probably there can be some JSON generation that would be able to add the boilerplate and we can simply define a schema and the necessary metadata for example. but I'd rather start with refactoring the docker container setup.
flink-sql-runner/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
Show resolved
Hide resolved
flink-sql-runner/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
Show resolved
Hide resolved
3e1b7e9
to
364fec9
Compare
8a6c6d3
to
1e93ed4
Compare
Relevant changes
connectors/kafka
.SafeKafkaDynamicTableFactory
andSafeUpsertKafkaDynamicTableFactory
have new table options (scan.deser.failure-handler
,scan.deser.failure-topic
) and also does the necessary configuration.SafeKafkaDynamicSource
in this context is just a connecting layer that passes through the newly added information.SafeDynamicKafkaDeserializationSchema
is modified, a newly addeddeserWithFailureHandling
method now can handle the exceptions accordingly.DeserFailureHandlerType
defines the possible deserialization handling types.DeserFailureHandlerOptions
defines the newly added table options and a validation helper for them.DeserFailureHandler
defines a wrapper which will execute the configured type of deserialization handling.DeserFailureProducer
contains the Kafka-related logic to be able to send failed messages into a given topic.