Skip to content

KAFKA-18847: Refactor OAuth layer to improve reusability 1/N #19622

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 21 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
667bb22
KAFKA-18847: Refactor OAuth layer to improve reusability 1/N
kirktrue May 2, 2025
088d49b
Updates from linter
kirktrue May 2, 2025
aa47fe3
Update OAuthCompatibilityTool to fix import ordering problems caught …
kirktrue May 2, 2025
c45219b
Changed wording in Initable's init() Javadoc
kirktrue May 6, 2025
3d1a2ba
Removed ambiguity in OAuthBearerValidatorCallbackHandler's log messages
kirktrue May 6, 2025
5cb1de3
Removed superfluous no-op init() overrides
kirktrue May 6, 2025
84e2129
Fixed inconsistent Javadoc formatting in ClientJwtValidator
kirktrue May 6, 2025
35d3454
Created a default, no-op close() implementation and removed the no-op…
kirktrue May 6, 2025
f298a16
Updated validateUrlencodeHeader to use ConfigurationUtils’ get() meth…
kirktrue May 6, 2025
e5e97c8
Added clarifying Javadoc for DefaultJwtValidator
kirktrue May 6, 2025
08ec47d
Fixed a missing phrase in the FileJwtRetriever documentation.
kirktrue May 6, 2025
bddc0e2
HttpJwtRetriever Javadoc cleanup
kirktrue May 6, 2025
4c48438
Moved tests from JwtValidatorFactoryTest to OAuthBearerValidatorCallb…
kirktrue May 6, 2025
9882fbe
Fixed import linting issues
kirktrue May 6, 2025
395d779
Update clients/src/main/java/org/apache/kafka/common/security/oauthbe…
kirktrue May 7, 2025
dbb4ac9
Update clients/src/test/java/org/apache/kafka/common/security/oauthbe…
kirktrue May 7, 2025
54e6f69
Update OAuthBearerValidatorCallbackHandlerTest.java
kirktrue May 7, 2025
e95660e
Clean up awkward code for expected audience configuration in DefaultJ…
kirktrue May 7, 2025
df52393
Updates to remove the init() methods from OAuthBearerLoginCallbackHan…
kirktrue May 8, 2025
74fd8cd
Moved createAccessKey to OAuthBearerTest for reuse
kirktrue May 8, 2025
4fa46c5
Refactoring for unit tests to make less public API surface area
kirktrue May 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@
import org.apache.kafka.common.security.auth.SaslExtensions;
import org.apache.kafka.common.security.auth.SaslExtensionsCallback;
import org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerClientInitialResponse;
import org.apache.kafka.common.security.oauthbearer.internals.secured.AccessTokenRetriever;
import org.apache.kafka.common.security.oauthbearer.internals.secured.AccessTokenRetrieverFactory;
import org.apache.kafka.common.security.oauthbearer.internals.secured.AccessTokenValidator;
import org.apache.kafka.common.security.oauthbearer.internals.secured.AccessTokenValidatorFactory;
import org.apache.kafka.common.security.oauthbearer.internals.secured.DefaultJwtRetriever;
import org.apache.kafka.common.security.oauthbearer.internals.secured.DefaultJwtValidator;
import org.apache.kafka.common.security.oauthbearer.internals.secured.JaasOptionsUtils;
import org.apache.kafka.common.security.oauthbearer.internals.secured.JwtRetriever;
import org.apache.kafka.common.security.oauthbearer.internals.secured.JwtValidator;
import org.apache.kafka.common.security.oauthbearer.internals.secured.ValidateException;
import org.apache.kafka.common.utils.Utils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -179,55 +180,45 @@ public class OAuthBearerLoginCallbackHandler implements AuthenticateCallbackHand

private Map<String, Object> moduleOptions;

private AccessTokenRetriever accessTokenRetriever;
private JwtRetriever jwtRetriever;

private AccessTokenValidator accessTokenValidator;

private boolean isInitialized = false;
private JwtValidator jwtValidator;

@Override
public void configure(Map<String, ?> configs, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) {
moduleOptions = JaasOptionsUtils.getOptions(saslMechanism, jaasConfigEntries);
AccessTokenRetriever accessTokenRetriever = AccessTokenRetrieverFactory.create(configs, saslMechanism, moduleOptions);
AccessTokenValidator accessTokenValidator = AccessTokenValidatorFactory.create(configs, saslMechanism);
init(accessTokenRetriever, accessTokenValidator);
Map<String, Object> moduleOptions = JaasOptionsUtils.getOptions(saslMechanism, jaasConfigEntries);
JwtRetriever jwtRetriever = new DefaultJwtRetriever(configs, saslMechanism, moduleOptions);
JwtValidator jwtValidator = new DefaultJwtValidator(configs, saslMechanism);
configure(moduleOptions, jwtRetriever, jwtValidator);
}

public void init(AccessTokenRetriever accessTokenRetriever, AccessTokenValidator accessTokenValidator) {
this.accessTokenRetriever = accessTokenRetriever;
this.accessTokenValidator = accessTokenValidator;
void configure(Map<String, Object> moduleOptions, JwtRetriever jwtRetriever, JwtValidator jwtValidator) {
this.moduleOptions = moduleOptions;
this.jwtRetriever = jwtRetriever;
this.jwtValidator = jwtValidator;

try {
this.accessTokenRetriever.init();
this.jwtRetriever.init();
} catch (IOException e) {
throw new KafkaException("The OAuth login configuration encountered an error when initializing the AccessTokenRetriever", e);
throw new KafkaException("The OAuth login callback encountered an error when initializing the JwtRetriever", e);
}

isInitialized = true;
}

/*
* Package-visible for testing.
*/

AccessTokenRetriever getAccessTokenRetriever() {
return accessTokenRetriever;
try {
this.jwtValidator.init();
} catch (IOException e) {
throw new KafkaException("The OAuth login callback encountered an error when initializing the JwtValidator", e);
}
}

@Override
public void close() {
if (accessTokenRetriever != null) {
try {
this.accessTokenRetriever.close();
} catch (IOException e) {
log.warn("The OAuth login configuration encountered an error when closing the AccessTokenRetriever", e);
}
}
Utils.closeQuietly(jwtRetriever, "The OAuth login callback encountered an error when closing the JwtRetriever");
Utils.closeQuietly(jwtValidator, "The OAuth login callback encountered an error when closing the JwtValidator");
}

@Override
public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
checkInitialized();
checkConfigured();

for (Callback callback : callbacks) {
if (callback instanceof OAuthBearerTokenCallback) {
Expand All @@ -241,11 +232,11 @@ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallback
}

private void handleTokenCallback(OAuthBearerTokenCallback callback) throws IOException {
checkInitialized();
String accessToken = accessTokenRetriever.retrieve();
checkConfigured();
String accessToken = jwtRetriever.retrieve();

try {
OAuthBearerToken token = accessTokenValidator.validate(accessToken);
OAuthBearerToken token = jwtValidator.validate(accessToken);
callback.token(token);
} catch (ValidateException e) {
log.warn(e.getMessage(), e);
Expand All @@ -254,7 +245,7 @@ private void handleTokenCallback(OAuthBearerTokenCallback callback) throws IOExc
}

private void handleExtensionsCallback(SaslExtensionsCallback callback) {
checkInitialized();
checkConfigured();

Map<String, String> extensions = new HashMap<>();

Expand Down Expand Up @@ -286,9 +277,9 @@ private void handleExtensionsCallback(SaslExtensionsCallback callback) {
callback.extensions(saslExtensions);
}

private void checkInitialized() {
if (!isInitialized)
throw new IllegalStateException(String.format("To use %s, first call the configure or init method", getClass().getSimpleName()));
private void checkConfigured() {
if (moduleOptions == null || jwtRetriever == null || jwtValidator == null)
throw new IllegalStateException(String.format("To use %s, first call the configure method", getClass().getSimpleName()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@

import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.oauthbearer.internals.secured.AccessTokenValidator;
import org.apache.kafka.common.security.oauthbearer.internals.secured.AccessTokenValidatorFactory;
import org.apache.kafka.common.security.oauthbearer.internals.secured.CloseableVerificationKeyResolver;
import org.apache.kafka.common.security.oauthbearer.internals.secured.DefaultJwtValidator;
import org.apache.kafka.common.security.oauthbearer.internals.secured.JaasOptionsUtils;
import org.apache.kafka.common.security.oauthbearer.internals.secured.JwtValidator;
import org.apache.kafka.common.security.oauthbearer.internals.secured.RefreshingHttpsJwksVerificationKeyResolver;
import org.apache.kafka.common.security.oauthbearer.internals.secured.ValidateException;
import org.apache.kafka.common.security.oauthbearer.internals.secured.VerificationKeyResolverFactory;
import org.apache.kafka.common.utils.Utils;

import org.jose4j.jws.JsonWebSignature;
import org.jose4j.jwx.JsonWebStructure;
Expand Down Expand Up @@ -119,9 +120,7 @@ public class OAuthBearerValidatorCallbackHandler implements AuthenticateCallback

private CloseableVerificationKeyResolver verificationKeyResolver;

private AccessTokenValidator accessTokenValidator;

private boolean isInitialized = false;
private JwtValidator jwtValidator;

@Override
public void configure(Map<String, ?> configs, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) {
Expand All @@ -135,37 +134,36 @@ public void configure(Map<String, ?> configs, String saslMechanism, List<AppConf
new RefCountingVerificationKeyResolver(VerificationKeyResolverFactory.create(configs, saslMechanism, moduleOptions)));
}

AccessTokenValidator accessTokenValidator = AccessTokenValidatorFactory.create(configs, saslMechanism, verificationKeyResolver);
init(verificationKeyResolver, accessTokenValidator);
JwtValidator jwtValidator = new DefaultJwtValidator(configs, saslMechanism, verificationKeyResolver);
configure(verificationKeyResolver, jwtValidator);
}

public void init(CloseableVerificationKeyResolver verificationKeyResolver, AccessTokenValidator accessTokenValidator) {
void configure(CloseableVerificationKeyResolver verificationKeyResolver, JwtValidator jwtValidator) {
this.verificationKeyResolver = verificationKeyResolver;
this.accessTokenValidator = accessTokenValidator;
this.jwtValidator = jwtValidator;

try {
verificationKeyResolver.init();
} catch (Exception e) {
throw new KafkaException("The OAuth validator configuration encountered an error when initializing the VerificationKeyResolver", e);
throw new KafkaException("The OAuth validator callback encountered an error when initializing the VerificationKeyResolver", e);
}

isInitialized = true;
try {
jwtValidator.init();
} catch (IOException e) {
throw new KafkaException("The OAuth validator callback encountered an error when initializing the JwtValidator", e);
}
}

@Override
public void close() {
if (verificationKeyResolver != null) {
try {
verificationKeyResolver.close();
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
Utils.closeQuietly(jwtValidator, "The OAuth validator callback encountered an error when closing the JwtValidator");
Utils.closeQuietly(verificationKeyResolver, "The OAuth validator callback encountered an error when closing the VerificationKeyResolver");
}

@Override
public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
checkInitialized();
checkConfigured();

for (Callback callback : callbacks) {
if (callback instanceof OAuthBearerValidatorCallback) {
Expand All @@ -179,12 +177,12 @@ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallback
}

private void handleValidatorCallback(OAuthBearerValidatorCallback callback) {
checkInitialized();
checkConfigured();

OAuthBearerToken token;

try {
token = accessTokenValidator.validate(callback.tokenValue());
token = jwtValidator.validate(callback.tokenValue());
callback.token(token);
} catch (ValidateException e) {
log.warn(e.getMessage(), e);
Expand All @@ -193,14 +191,14 @@ private void handleValidatorCallback(OAuthBearerValidatorCallback callback) {
}

private void handleExtensionsValidatorCallback(OAuthBearerExtensionsValidatorCallback extensionsValidatorCallback) {
checkInitialized();
checkConfigured();

extensionsValidatorCallback.inputExtensions().map().forEach((extensionName, v) -> extensionsValidatorCallback.valid(extensionName));
}

private void checkInitialized() {
if (!isInitialized)
throw new IllegalStateException(String.format("To use %s, first call the configure or init method", getClass().getSimpleName()));
private void checkConfigured() {
if (verificationKeyResolver == null || jwtValidator == null)
throw new IllegalStateException(String.format("To use %s, first call the configure method", getClass().getSimpleName()));
}

/**
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import static org.jose4j.jwa.AlgorithmConstraints.DISALLOW_NONE;

/**
* ValidatorAccessTokenValidator is an implementation of {@link AccessTokenValidator} that is used
* {@code BrokerJwtValidator} is an implementation of {@link JwtValidator} that is used
* by the broker to perform more extensive validation of the JWT access token that is received
* from the client, but ultimately from posting the client credentials to the OAuth/OIDC provider's
* token endpoint.
Expand All @@ -62,9 +62,9 @@
* </ol>
*/

public class ValidatorAccessTokenValidator implements AccessTokenValidator {
public class BrokerJwtValidator implements JwtValidator {

private static final Logger log = LoggerFactory.getLogger(ValidatorAccessTokenValidator.class);
private static final Logger log = LoggerFactory.getLogger(BrokerJwtValidator.class);

private final JwtConsumer jwtConsumer;

Expand All @@ -73,7 +73,7 @@ public class ValidatorAccessTokenValidator implements AccessTokenValidator {
private final String subClaimName;

/**
* Creates a new ValidatorAccessTokenValidator that will be used by the broker for more
* Creates a new {@code BrokerJwtValidator} that will be used by the broker for more
* thorough validation of the JWT.
*
* @param clockSkew The optional value (in seconds) to allow for differences
Expand Down Expand Up @@ -112,12 +112,12 @@ public class ValidatorAccessTokenValidator implements AccessTokenValidator {
* @see VerificationKeyResolver
*/

public ValidatorAccessTokenValidator(Integer clockSkew,
Set<String> expectedAudiences,
String expectedIssuer,
VerificationKeyResolver verificationKeyResolver,
String scopeClaimName,
String subClaimName) {
public BrokerJwtValidator(Integer clockSkew,
Set<String> expectedAudiences,
String expectedIssuer,
VerificationKeyResolver verificationKeyResolver,
String scopeClaimName,
String subClaimName) {
final JwtConsumerBuilder jwtConsumerBuilder = new JwtConsumerBuilder();

if (clockSkew != null)
Expand Down
Loading