Skip to content

Add safe kafka connectors #68

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 22, 2025
Merged

Conversation

ferenc-csaky
Copy link
Contributor

@ferenc-csaky ferenc-csaky commented Apr 4, 2025

Relevant changes

  • Added new maven modules connectors/kafka.
  • Copied the classes that has to be modified.
    • SafeKafkaDynamicTableFactory and SafeUpsertKafkaDynamicTableFactory have new table options (scan.deser.failure-handler, scan.deser.failure-topic) and also does the necessary configuration.
    • SafeKafkaDynamicSource in this context is just a connecting layer that passes through the newly added information.
    • SafeDynamicKafkaDeserializationSchema is modified, a newly added deserWithFailureHandling method now can handle the exceptions accordingly.
  • DeserFailureHandlerType defines the possible deserialization handling types.
  • DeserFailureHandlerOptions defines the newly added table options and a validation helper for them.
  • DeserFailureHandler defines a wrapper which will execute the configured type of deserialization handling.
  • DeserFailureProducer contains the Kafka-related logic to be able to send failed messages into a given topic.

@ferenc-csaky ferenc-csaky mentioned this pull request Apr 4, 2025
3 tasks
Copy link
Collaborator

@velo velo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a few comments, not sure if they are helpful or not at this stage, so feel free to ignore anything that you consider not helpful right now

@velo
Copy link
Collaborator

velo commented Apr 4, 2025

[ ] Add integration tests that uses the safe connectors.

FWIW, on sqrl I made a test using testcontainers and that feel far better
https://github.com/DataSQRL/sqrl/pull/1079/files#diff-b02fb4c7981b0e6ee781e45fcbb8d073874f80ca8c896ea6cf7851f66a1f1d28

eventually I will back port this approach here, worth taking a look before trying to expand on the present setup

@velo velo requested a review from Copilot April 4, 2025 18:02
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot reviewed 8 out of 13 changed files in this pull request and generated no comments.

Files not reviewed (5)
  • connectors/kafka/pom.xml: Language not supported
  • connectors/kafka/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory: Language not supported
  • connectors/pom.xml: Language not supported
  • flink-sql-runner/pom.xml: Language not supported
  • pom.xml: Language not supported
Comments suppressed due to low confidence (2)

connectors/kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeDynamicKafkaDeserializationSchema.java:237

  • Consider revising the error message to correct the grammatical mistake (e.g., remove the extra 'to' so that it reads 'Could not set row kind for output record').
throw new DeserializationException("Invalid null value received in non-upsert mode. Could not to set row kind for output record.");

connectors/kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeDynamicKafkaDeserializationSchema.java:248

  • Using an assert here may not be effective in production environments; consider replacing it with an explicit null check and proper error handling to prevent potential runtime issues.
assert physicalKeyRow != null;

Copy link
Collaborator

@velo velo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that project coverage dropped considerably.

I don't think the integration testing is actually testing the kafka-safe.

Could you please take a look?

Comment on lines +31 to +32
"connector" : "kafka-safe",
"format" : "json",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I don't like about the compiled-plans tests is that it take a good chunk of effort to figure out what are the important bits.

I don't have a solution for this particular issue.

But, if you have any ideas.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, I just grabbed the old file and used it as is, cause it was the least amount of effort required.

Probably there can be some JSON generation that would be able to add the boilerplate and we can simply define a schema and the necessary metadata for example. but I'd rather start with refactoring the docker container setup.

@ferenc-csaky ferenc-csaky force-pushed the safe-kafka-connectors branch from 3e1b7e9 to 364fec9 Compare April 17, 2025 19:42
@ferenc-csaky ferenc-csaky force-pushed the safe-kafka-connectors branch from 8a6c6d3 to 1e93ed4 Compare April 22, 2025 09:50
@velo velo merged commit a6956b0 into DataSQRL:main Apr 22, 2025
1 of 2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants