diff --git a/.changes/next-release/feature-eks-46018.json b/.changes/next-release/feature-eks-46018.json new file mode 100644 index 000000000000..4d77e22b9417 --- /dev/null +++ b/.changes/next-release/feature-eks-46018.json @@ -0,0 +1,5 @@ +{ + "type": "feature", + "category": "``eks``", + "description": "Add assume-role-arn option to update-kubeconfig command for cross-account access" +} diff --git a/awscli/customizations/eks/update_kubeconfig.py b/awscli/customizations/eks/update_kubeconfig.py index 0e6971b85f2b..7a3781344ce7 100644 --- a/awscli/customizations/eks/update_kubeconfig.py +++ b/awscli/customizations/eks/update_kubeconfig.py @@ -103,6 +103,14 @@ class UpdateKubeconfigCommand(BasicCommand): 'help_text': ("Alias for the generated user name. " "Defaults to match cluster ARN."), 'required': False + }, + { + 'name': 'assume-role-arn', + 'help_text': ('To assume a role for retrieving cluster information, ' + 'specify an IAM role ARN with this option. ' + 'Use this for cross-account access to get cluster details ' + 'from the account where the cluster resides.'), + 'required': False } ] @@ -249,27 +257,42 @@ def cluster_description(self): Cache the response in self._cluster_description. describe-cluster will only be called once. """ - if self._cluster_description is None: - if self._parsed_globals is None: - client = self._session.create_client("eks") - else: - client = self._session.create_client( - "eks", - region_name=self._parsed_globals.region, - endpoint_url=self._parsed_globals.endpoint_url, - verify=self._parsed_globals.verify_ssl - ) - full_description = client.describe_cluster(name=self._cluster_name) - self._cluster_description = full_description["cluster"] - - if "status" not in self._cluster_description: - raise EKSClusterError("Cluster not found") - if self._cluster_description["status"] not in ["ACTIVE", "UPDATING"]: - raise EKSClusterError("Cluster status is {0}".format( - self._cluster_description["status"] - )) - - return self._cluster_description + if self._cluster_description is not None: + return self._cluster_description + + client_kwargs = {} + if self._parsed_globals: + client_kwargs.update({ + "region_name": self._parsed_globals.region, + "endpoint_url": self._parsed_globals.endpoint_url, + "verify": self._parsed_globals.verify_ssl, + }) + + # Handle role assumption if needed + if getattr(self._parsed_args, 'assume_role_arn', None): + sts_client = self._session.create_client('sts') + credentials = sts_client.assume_role( + RoleArn=self._parsed_args.assume_role_arn, + RoleSessionName='EKSDescribeClusterSession' + )["Credentials"] + + client_kwargs.update({ + "aws_access_key_id": credentials["AccessKeyId"], + "aws_secret_access_key": credentials["SecretAccessKey"], + "aws_session_token": credentials["SessionToken"], + }) + + client = self._session.create_client("eks", **client_kwargs) + full_description = client.describe_cluster(name=self._cluster_name) + cluster = full_description.get("cluster") + + if not cluster or "status" not in cluster: + raise EKSClusterError("Cluster not found") + if cluster["status"] not in ["ACTIVE", "UPDATING"]: + raise EKSClusterError(f"Cluster status is {cluster['status']}") + + self._cluster_description = cluster + return cluster def get_cluster_entry(self): """ diff --git a/tests/functional/eks/test_update_kubeconfig.py b/tests/functional/eks/test_update_kubeconfig.py index 9ceb97885c15..16d0fbe16565 100644 --- a/tests/functional/eks/test_update_kubeconfig.py +++ b/tests/functional/eks/test_update_kubeconfig.py @@ -32,7 +32,8 @@ KubeconfigInaccessableError) from tests.functional.eks.test_util import (describe_cluster_response, describe_cluster_creating_response, - get_testdata) + get_testdata, + assume_role_response) def sanitize_output(output): """ @@ -66,6 +67,15 @@ def setUp(self): self.client.describe_cluster.return_value = describe_cluster_response() self.mock_create_client.return_value = self.client + # Set up the sts_client_mock + self.sts_client_mock = mock.Mock() + self.sts_client_mock.assume_role.return_value = assume_role_response() + + # Ensure the mock_create_client correctly returns the appropriate mock + self.mock_create_client.side_effect = lambda service_name, **kwargs: ( + self.sts_client_mock if service_name == "sts" else self.client + ) + self.command = UpdateKubeconfigCommand(self.session) self.maxDiff = None @@ -422,3 +432,59 @@ def test_update_old_api_version(self): self.assert_cmd(configs, passed, environment) self.assert_config_state("valid_old_api_version", "valid_old_api_version_updated") + + def test_assume_role(self): + """ + Test that assume_role_arn is handled correctly when provided. + """ + configs = ["valid_existing"] + self.initialize_tempfiles(configs) + + # Include the --assume-role-arn argument + args = [ + "--name", "ExampleCluster", + "--assume-role-arn", "arn:aws:iam::123456789012:role/test-role" + ] + + # Mock environment variables and paths + kubeconfig_path = self._get_temp_config("valid_existing") + default_path = self._get_temp_config("default_temp") + + with mock.patch.dict(os.environ, {'KUBECONFIG': kubeconfig_path}): + with mock.patch("awscli.customizations.eks.update_kubeconfig.DEFAULT_PATH", default_path): + self.command(args, None) + + # Verify that assume_role was called with the correct parameters + self.sts_client_mock.assume_role.assert_called_once_with( + RoleArn="arn:aws:iam::123456789012:role/test-role", + RoleSessionName="EKSDescribeClusterSession" + ) + + # Verify that the EKS client was created with the assumed credentials + self.mock_create_client.assert_any_call( + "eks", + aws_access_key_id="test-access-key", + aws_secret_access_key="test-secret-key", + aws_session_token="test-session-token" + ) + + # Verify that the cluster was described + self.client.describe_cluster.assert_called_once_with(name="ExampleCluster") + + # Assert the configuration state + self.assert_config_state("valid_existing", "output_combined") + + def test_no_assume_role(self): + """ + Test that assume_role_arn is not used when not provided. + """ + configs = ["valid_existing"] + passed = "valid_existing" + environment = [] + + self.client.describe_cluster = mock.Mock(return_value=describe_cluster_response()) + self.assert_cmd(configs, passed, environment) + + # Verify that assume_role was not called + self.mock_create_client.assert_called_once_with("eks") + self.client.describe_cluster.assert_called_once_with(name="ExampleCluster") diff --git a/tests/functional/eks/test_util.py b/tests/functional/eks/test_util.py index 4bb2916c5395..bcc9bc9a3636 100644 --- a/tests/functional/eks/test_util.py +++ b/tests/functional/eks/test_util.py @@ -176,3 +176,12 @@ def describe_cluster_deleting_response(): "createdAt": 1500000000.000 } } + +def assume_role_response(): + return { + "Credentials": { + "AccessKeyId": "test-access-key", + "SecretAccessKey": "test-secret-key", + "SessionToken": "test-session-token" + } + }