From 50b4c8bdde5a7619ebf70c4e4162a6ab9f699c97 Mon Sep 17 00:00:00 2001 From: Michelle Tran Date: Fri, 25 Apr 2025 14:36:28 -0400 Subject: [PATCH] Add Codecov User Resolution Codecov needs to map the Sentry user onto a GitHub identity to be able to link them to the associated Codecov user. This returns the external_id (GitHub id) and the auth_token to be encrypted and used for requests to Codecov. --- src/sentry/codecov/helper.py | 47 +++++++++++++++++++++++++++++ tests/sentry/codecov/__init__.py | 0 tests/sentry/codecov/test_helper.py | 43 ++++++++++++++++++++++++++ 3 files changed, 90 insertions(+) create mode 100644 src/sentry/codecov/helper.py create mode 100644 tests/sentry/codecov/__init__.py create mode 100644 tests/sentry/codecov/test_helper.py diff --git a/src/sentry/codecov/helper.py b/src/sentry/codecov/helper.py new file mode 100644 index 00000000000000..fb30d8a23d02ed --- /dev/null +++ b/src/sentry/codecov/helper.py @@ -0,0 +1,47 @@ +from dataclasses import dataclass +from typing import Any + +from sentry.auth.services.access.service import access_service +from sentry.auth.services.auth import RpcAuthIdentity +from sentry.identity.services.identity import identity_service +from sentry.identity.services.identity.model import RpcIdentity + + +@dataclass +class CodecovUser: + external_id: str + auth_token: Any + + +def resolve_codecov_user(user_id: int, organization_id: int) -> CodecovUser | None: + """Given a Sentry User, and an organization id, find the GitHub user ID and GH access_token + that is linked to Codecov's user. + + The user resolution will "fall-through" based on whether the Organization has a GitHub Auth Provider enabled. + """ + # Get identity from an AuthProvider if it's available. + auth_provider = access_service.get_auth_provider(organization_id) + if auth_provider and auth_provider.provider == "github": + auth_identity: RpcAuthIdentity | None = access_service.get_auth_identity_for_user( + auth_provider.id, user_id + ) + if auth_identity: + return CodecovUser( + external_id=str(auth_identity.ident), + auth_token=auth_identity.data.get("access_token"), + ) + + # Get identity from Identity if it's available. + identities: list[RpcIdentity] = identity_service.get_user_identities_by_provider_type( + user_id=user_id, provider_type="github" + ) + if identities: + # Note: There is currently either zero or one GitHub "identity" mapped to a user. + identity = identities[0] + return CodecovUser( + external_id=identity.external_id, + auth_token=identity.data.get("access_token"), + ) + + # No GitHub identities tied to user, return None + return None diff --git a/tests/sentry/codecov/__init__.py b/tests/sentry/codecov/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/tests/sentry/codecov/test_helper.py b/tests/sentry/codecov/test_helper.py new file mode 100644 index 00000000000000..cd9b55e5b6878d --- /dev/null +++ b/tests/sentry/codecov/test_helper.py @@ -0,0 +1,43 @@ +from sentry.codecov.helper import CodecovUser, resolve_codecov_user +from sentry.testutils.cases import TestCase +from sentry.testutils.silo import all_silo_test + + +@all_silo_test +class TestCodecovUserResolution(TestCase): + def setUp(self): + self.organization = self.create_organization() + self.user = self.create_user() + + def test_get_auth_provider_identity_for_user(self): + auth_provider = self.create_auth_provider( + organization_id=self.organization.id, provider="github" + ) + self.create_auth_identity( + auth_provider=auth_provider, + user=self.user, + ident=12345, + data={"access_token": "this_is_the_access_token_stored_here"}, + ) + assert resolve_codecov_user(self.user.id, self.organization.id) == CodecovUser( + external_id="12345", auth_token="this_is_the_access_token_stored_here" + ) + + def test_get_auth_identity_for_user(self): + # Create the Auth Provider to fall through + self.create_auth_provider(organization_id=self.organization.id, provider="github") + + identity_provider = self.create_identity_provider(type="github") + self.create_identity( + user=self.user, + identity_provider=identity_provider, + external_id="identity_id_12345", + data={"access_token": "this_is_the_access_token_stored_here"}, + ) + + assert resolve_codecov_user(self.user.id, self.organization.id) == CodecovUser( + external_id="identity_id_12345", auth_token="this_is_the_access_token_stored_here" + ) + + def test_no_linked_identity(self): + assert resolve_codecov_user(self.user.id, self.organization.id) is None