-
Notifications
You must be signed in to change notification settings - Fork 183
/
Copy pathazuredevops.py
123 lines (107 loc) Β· 5.17 KB
/
azuredevops.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import datetime
import logging
from typing import Dict, List
import requests
import base64
import urllib.parse
from dataclasses import dataclass
from azure.devops.connection import Connection
from msrest.authentication import BasicAuthentication
from data_source_api.base_data_source import BaseDataSource, ConfigField, HTMLInputType
from data_source_api.basic_document import DocumentType, BasicDocument
from data_source_api.exception import InvalidDataSourceConfig
from index_queue import IndexQueue
from parsers.html import html_to_text
from data_source_api.utils import parse_with_workers
logger = logging.getLogger(__name__)
@dataclass
class DevOpsConfig():
organization_url: str
access_token: str
project_name: str
query_id: str
def __post_init__(self):
self.query_id = self.query_id.strip()
self.access_token = self.access_token.strip()
self.project_name = self.project_name.strip()
self.organization_url = self.organization_url.strip()
class AzuredevopsDataSource(BaseDataSource):
@staticmethod
def get_config_fields() -> List[ConfigField]:
return [
ConfigField(label="AzureDevOps organization URL", placeholder="https://dev.azure.com/org", name="organization_url"),
ConfigField(label="Personal Access Token", name="access_token", type=HTMLInputType.PASSWORD),
ConfigField(label="Project Name", name="project_name"),
ConfigField(label="Query ID", name="query_id"),
]
@classmethod
def get_display_name(cls) -> str:
return "Azure DevOps"
@staticmethod
def validate_config(config: Dict) -> None:
try:
devops_config = DevOpsConfig(**config)
credentials = BasicAuthentication('', devops_config.access_token)
connection = Connection(base_url=devops_config.organization_url, creds=credentials)
core_client = connection.clients.get_core_client()
core_client.get_projects()
except Exception as e:
raise InvalidDataSourceConfig from e
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._devops_config = DevOpsConfig(**self._config)
credentials = BasicAuthentication('', self._devops_config.access_token)
connection = Connection(base_url=self._devops_config.organization_url, creds=credentials)
self._work_item_tracking_client = connection.clients.get_work_item_tracking_client()
def _parse_documents_worker(self, raw_docs: List[Dict]):
logging.info(f'Worker parsing {len(raw_docs)} documents')
parsed_docs = []
total_fed = 0
for item in raw_docs:
for raw_page in item['comments']:
create_date = datetime.datetime.strptime(raw_page['createdDate'], "%Y-%m-%dT%H:%M:%S.%fZ")
if create_date < self._last_index_time:
continue
author = raw_page['createdBy']['displayName']
workitem_id = raw_page['workItemId']
title = str(raw_page['workItemId']) + ' - ' + raw_page['createdBy']['displayName']
html_content = raw_page['text']
plain_text = html_to_text(html_content)
author_image_url = raw_page['createdBy']['_links']['avatar']['href']
url = f"{self._devops_config.organization_url}/{urllib.parse.quote(self._devops_config.project_name)}/_workitems/edit/{raw_page['workItemId']}".strip()
parsed_docs.append(BasicDocument(
id=workitem_id,
data_source_id=self._data_source_id,
author=author,
author_image_url=author_image_url,
content=plain_text,
type=DocumentType.COMMENT,
title=title,
timestamp=create_date,
location=self._devops_config.project_name,
url=url
))
if len(parsed_docs) >= 50:
total_fed += len(parsed_docs)
IndexQueue.get_instance().put(docs=parsed_docs)
parsed_docs = []
IndexQueue.get_instance().put(docs=parsed_docs)
total_fed += len(parsed_docs)
if total_fed > 0:
logging.info(f'Worker fed {total_fed} documents')
def _list_work_item_comments(self, work_item_url) -> List[Dict]:
authorization = str(base64.b64encode(bytes(':'+self._devops_config.access_token, 'ascii')), 'ascii')
headers = {
'Accept': 'application/json',
'Authorization': 'Basic '+authorization
}
return requests.get(url=work_item_url + '/comments', headers=headers).json()
def _feed_new_documents(self) -> None:
logger.info('Feeding new Azure DevOps Work Items')
raw_docs = []
work_item_results = self._work_item_tracking_client.query_by_id(self._devops_config.query_id)
for work_item in work_item_results.work_items:
result = self._list_work_item_comments(work_item.url)
if result['totalCount'] > 0:
raw_docs.append(result)
parse_with_workers(self._parse_documents_worker, raw_docs)