Skip to content

fix(kafka): apply sts_header_overrides to MSK IAM SASL auth#6925

Open
hozkaya2000 wants to merge 1 commit into
opensearch-project:mainfrom
hozkaya2000:main
Open

fix(kafka): apply sts_header_overrides to MSK IAM SASL auth#6925
hozkaya2000 wants to merge 1 commit into
opensearch-project:mainfrom
hozkaya2000:main

Conversation

@hozkaya2000

@hozkaya2000 hozkaya2000 commented Jun 11, 2026

Copy link
Copy Markdown

Description

The Kafka plugin already applies the configured sts_header_overrides to the STS AssumeRole call used to fetch MSK bootstrap brokers, but not to the data-plane SASL (AWS_MSK_IAM) connection used for the actual consume/produce. That broker authentication delegates credential resolution to the aws-msk-iam-auth library's internal role assumption, which has no way to apply STS header overrides, so the configured headers are silently dropped for the broker connection.

This change makes sts_header_overrides apply consistently across all STS AssumeRole calls the plugin performs for MSK:

  • When aws_msk_iam: role is configured with sts_header_overrides, the SASL client callback handler is switched to a custom handler that resolves credentials from the same header-aware StsAssumeRoleCredentialsProvider already used for bootstrap-broker discovery, and the JAAS config is set to default mode so the library does not perform its own (header-less) role assumption.
  • The role is assumed once, with the overrides applied, and the resolved temporary credentials are converted to the SDK v1 credential type that aws-msk-iam-auth expects.

Behavior is unchanged when no sts_header_overrides are configured, and for aws_msk_iam: default.

Issues Resolved

Resolves #6924

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Resolves opensearch-project#6923.

Signed-off-by: Halit Ozkaya <hozkaya@amazon.com>

@dlvenable dlvenable left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @hozkaya2000 for this contribution!

properties.put(SASL_CLIENT_CALLBACK_HANDLER_CLASS, MskIamAuthCredentialsCallbackHandler.class.getName());
properties.put(SASL_JAAS_CONFIG, "software.amazon.msk.auth.iam.IAMLoginModule required;");
} else {
String baseIamAuthConfig = "software.amazon.msk.auth.iam.IAMLoginModule required " +

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit concerned about having split authentication approaches. We should always take the same authentication rather than make it a condition on STS header overrides.

final Map<String, String> stsHeaderOverrides = awsConfig.getAwsStsHeaderOverrides();
if (Objects.nonNull(stsHeaderOverrides) && !stsHeaderOverrides.isEmpty()) {
properties.put(SASL_CLIENT_CALLBACK_HANDLER_CLASS, MskIamAuthCredentialsCallbackHandler.class.getName());
properties.put(SASL_JAAS_CONFIG, "software.amazon.msk.auth.iam.IAMLoginModule required;");

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are loading this IAMLoginModule from the AWS MSK library. What prevents the MSK library from using the default credentials provider (from AWS SDK) directly instead of the callback handler? Might there be code paths that result in this bad path?

I'm wondering if there is a path to writing our own IAMLoginModule and not using the MSK one at all.

I noticed this line in IAMClientCallbackHandler:

provider = configEntry.map(c -> (AWSCredentialsProvider) new MSKCredentialProvider(c.getOptions()))
                .orElse(DefaultAWSCredentialsProviderChain.getInstance());

It seems that this will return DefaultAWSCredentialsProviderChain.getInstance() which is not at all what we want.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also want to be sure that we don't use MSKCredentialProvider. From what I can tell this is used only by IAMClientCallbackHandler and IAMOAuthBearerLoginCallbackHandler.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe writing our own IAMLoginModule won't have an impact. From what I can tell this doesn't choose the mechanism to load credentials. I think it configures internal security providers.

* <p>The header-aware provider is AWS SDK v2 while aws-msk-iam-auth 2.0.3 expects SDK v1
* {@link AWSCredentials}, so the resolved temporary credentials are translated v2 to v1.
*/
public class MskIamAuthCredentialsCallbackHandler extends IAMClientCallbackHandler {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we extend from AuthenticateCallbackHandler and not use the AWS MSK library at all? This would also let us use only AWS SDK v2.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hozkaya2000 ,

I took another look at the AWS MSK library. It performs the bridge between AWS credentials and SASL. Most of this is in their internals package. We should keep all of that. It is not worth rewriting because it maps credentials to SASL networking interactions.

What we should be sure to do is fully change the way that we get those AWS credentials. I think you should make this implement AuthenticateCallbackHandler directly instead of using the IAMClientCallbackHandler sub-class. But you can copy a lot of the existing code from IAMClientCallbackHandler. We just need to be sure to not load credentials from the SDK except through our AWS plugin.

We will need to retain v1 to keep using their internals package.

@github-actions

Copy link
Copy Markdown

⚠️ License Header Violations Found

The following newly added files are missing required license headers:

  • data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/authenticator/MskIamAuthCredentialsCallbackHandler.java
  • data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/authenticator/MskIamAuthCredentialsCallbackHandlerTest.java

Please add the appropriate license header to each file and push your changes.

See the license header requirements: https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md#license-headers

@dlvenable dlvenable left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm removing my "request changes" hold

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants