Skip to content

Fix ingestion pagination for jira / zendesk #318

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ def _list_channels() -> list:
{
'id': c['id'],
'name': c['name'],
'allowed': False
'allowed': False,
'direct_messages': False
}
for c in data.get('channels', [])
])
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 4.2.18 on 2025-04-30 14:16

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('core', '0082_gurutype_private'),
]

operations = [
migrations.AddField(
model_name='integration',
name='allow_dm',
field=models.BooleanField(default=False),
),
]
2 changes: 2 additions & 0 deletions src/gurubase-backend/backend/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2028,6 +2028,8 @@ class Type(models.TextChoices):
zendesk_api_token = models.TextField(null=True, blank=True)
zendesk_user_email = models.TextField(null=True, blank=True)

allow_dm = models.BooleanField(default=False)

date_created = models.DateTimeField(auto_now_add=True)
date_updated = models.DateTimeField(auto_now=True)

Expand Down
176 changes: 119 additions & 57 deletions src/gurubase-backend/backend/core/requester.py
Original file line number Diff line number Diff line change
Expand Up @@ -947,32 +947,47 @@ def list_issues(self, jql_query, start=0, max_results=50):
List Jira issues using JQL query with pagination
Args:
jql_query (str): JQL query string to filter issues
start (int): Starting index for pagination
start (int): Starting index for pagination (unused, kept for compatibility)
max_results (int): Maximum number of results to fetch per request
Returns:
list: List of Jira issues matching the query
Raises:
ValueError: If API request fails
"""
try:
# Get issues using JQL
issues_data = self.jira.jql(jql_query, start=start, limit=max_results)
issues = []
all_issues = []
current_start = 0
page_size = max_results

for issue in issues_data.get('issues', []):
formatted_issue = {
'id': issue.get('id'),
# 'key': issue.get('key'),
# 'summary': issue.get('fields', {}).get('summary'),
# 'issue_type': issue.get('fields', {}).get('issuetype', {}).get('name'),
# 'status': issue.get('fields', {}).get('status', {}).get('name'),
# 'priority': issue.get('fields', {}).get('priority', {}).get('name'),
# 'assignee': issue.get('fields', {}).get('assignee', {}).get('displayName'),
'link': f"{self.url}/browse/{issue.get('key')}"
}
issues.append(formatted_issue)
while True:
# Get issues using JQL
issues_data = self.jira.jql(jql_query, start=current_start, limit=page_size)
issues = issues_data.get('issues', [])

if not issues:
break

for issue in issues:
formatted_issue = {
'id': issue.get('id'),
# 'key': issue.get('key'),
# 'summary': issue.get('fields', {}).get('summary'),
# 'issue_type': issue.get('fields', {}).get('issuetype', {}).get('name'),
# 'status': issue.get('fields', {}).get('status', {}).get('name'),
# 'priority': issue.get('fields', {}).get('priority', {}).get('name'),
# 'assignee': issue.get('fields', {}).get('assignee', {}).get('displayName'),
'link': f"{self.url}/browse/{issue.get('key')}"
}
all_issues.append(formatted_issue)

return issues
# If we got fewer issues than requested, we've reached the end
if len(issues) < page_size:
break

# Move to the next page
current_start += page_size

return all_issues
except Exception as e:
logger.error(f"Error listing Jira issues: {str(e)}", exc_info=True)
if "401" in str(e):
Expand Down Expand Up @@ -1296,7 +1311,7 @@ def list_articles(self, batch_size=100):
if article.get('draft') is False:
all_articles.append(self._format_article(article))

url = data.get('next_page')
url = data.get('links', {}).get('next')
return all_articles
except requests.exceptions.RequestException as e:
status_code = e.response.status_code if e.response is not None else None
Expand Down Expand Up @@ -1384,7 +1399,7 @@ def _get_article_comments(self, article_id, batch_size=100):
comments = data.get('comments', [])
all_comments.extend([self._format_article_comment(comment) for comment in comments])

url = data.get('next_page')
url = data.get('links', {}).get('next')
all_comments.sort(key=lambda x: x['created_at'])
return all_comments
except requests.exceptions.RequestException as e:
Expand Down Expand Up @@ -1947,8 +1962,85 @@ def list_pages(self, cql=None):
ValueError: If API request fails
"""
try:
# Get all pages from all spaces
# If CQL is provided, use it directly to get pages
if cql:
cql += " AND type=page"
all_pages = []
seen_page_ids = set() # Track unique page IDs
cql_limit = 100 # Fetch 100 results at a time

# Initial request
url = f"{self.url}/wiki/rest/api/search"
params = {
'cql': cql,
'limit': cql_limit,
'expand': 'space'
}

while True:
response = requests.get(
url,
auth=(self.confluence.username, self.confluence.password),
params=params,
timeout=30
)

if response.status_code == 401:
raise ValueError("Invalid Confluence credentials")
elif response.status_code == 403:
raise ValueError("Confluence API access forbidden")
elif response.status_code != 200:
if 'could not parse' in response.text.lower():
raise ValueError(f"Invalid CQL query.")
else:
split = response.json().get('message', '').split(':', 1)
if len(split) > 1:
raise ValueError(split[1].strip())
else:
raise ValueError(f"Confluence API request failed with status {response.status_code}")

cql_results = response.json()
results = cql_results.get('results', [])

if not results:
break

for result in results:
content = result.get('content', {})
page_id = content.get('id')

# Skip if we've seen this page ID before
if not page_id or page_id in seen_page_ids:
continue

seen_page_ids.add(page_id)

# Get space info from the expanded result
space = content.get('space', {})
space_key = space.get('key')
space_name = space.get('name', '')

# Format and add the page
formatted_page = self._format_page(content, space_key, space_name)
all_pages.append(formatted_page)

# Check if there's a next page
next_link = cql_results.get('_links', {}).get('next')
if not next_link:
break

# Update URL and params for next request
url = f"{self.url}/wiki{next_link}"
params = {} # Clear params as they're included in the next_link

return {
'pages': all_pages,
'page_count': len(all_pages)
}

# If no CQL, get all pages from all spaces
all_pages = []
seen_page_ids = set() # Track unique page IDs

spaces_data = self.confluence.get_all_spaces()
spaces = spaces_data.get('results', [])
Expand All @@ -1972,8 +2064,12 @@ def list_pages(self, cql=None):
break

for page in pages_batch:
formatted_page = self._format_page(page, space_key, space_name)
all_pages.append(formatted_page)
page_id = page.get('id')
# Only add page if we haven't seen its ID before
if page_id and page_id not in seen_page_ids:
seen_page_ids.add(page_id)
formatted_page = self._format_page(page, space_key, space_name)
all_pages.append(formatted_page)

# If we got fewer pages than requested, we've reached the end
if len(pages_batch) < page_limit:
Expand All @@ -1987,40 +2083,6 @@ def list_pages(self, cql=None):
logger.warning(f"Error fetching pages from space {space_key}: {str(e)}")
continue

# Filter pages by CQL if provided
if cql:
# Implement pagination for the CQL query as well
cql_page_ids = set()
cql_start = 0
cql_limit = 100 # Fetch 100 results at a time

while True:
cql_results = self.confluence.cql(cql, start=cql_start, limit=cql_limit)
results = cql_results.get('results', [])

if not results:
break

for result in results:
content = result.get('content', {})
cql_page_ids.add(content.get('id'))

# If we got fewer results than requested, we've reached the end
if len(results) < cql_limit:
break

# Move to the next page
cql_start += cql_limit

# Filter the all_pages list to only include pages that match the CQL
filtered_pages = [page for page in all_pages if page.get('id') in cql_page_ids]

return {
'pages': filtered_pages,
'page_count': len(filtered_pages)
}

# No CQL filtering, just return all pages
return {
'pages': all_pages,
'page_count': len(all_pages)
Expand Down Expand Up @@ -2163,7 +2225,7 @@ def list_spaces(self, start=0, limit=50):

# Use pagination to get all spaces
space_start = 0
space_limit = 100 # Fetch 100 spaces at a time
space_limit = 25 # Fetch 25 spaces at a time
while True:
spaces_data = self.confluence.get_all_spaces(start=space_start, limit=space_limit)
results = spaces_data.get('results', [])
Expand Down
54 changes: 38 additions & 16 deletions src/gurubase-backend/backend/core/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1941,13 +1941,18 @@ def manage_channels(request, guru_type, integration_type):
try:
channels = request.data.get('channels', [])
integration.channels = channels

if 'allow_dm' in request.data:
integration.allow_dm = request.data['allow_dm']

integration.save()

return Response({
'id': integration.id,
'type': integration.type,
'guru_type': integration.guru_type.slug,
'channels': integration.channels
'channels': integration.channels,
'allow_dm': integration.allow_dm
})
except Exception as e:
logger.error(f"Error updating channels: {e}", exc_info=True)
Expand Down Expand Up @@ -2422,17 +2427,25 @@ async def send_channel_unauthorized_message(
client: WebClient,
channel_id: str,
thread_ts: str,
guru_slug: str
guru_slug: str,
dm: bool
) -> None:
"""Send a message explaining how to authorize the channel."""
try:
base_url = await sync_to_async(get_base_url)()
settings_url = f"{base_url.rstrip('/')}/guru/{guru_slug}/integrations/slack"
message = (
"❌ This channel is not authorized to use the bot.\n\n"
f"Please visit <{settings_url}|Gurubase Settings> to configure "
"the bot and add this channel to the allowed channels list."
)
if dm:
message = (
"❌ Bot direct messages are not enabled.\n\n"
f"Please visit <{settings_url}|Gurubase Settings> to configure "
"the bot and enable direct messages."
)
else:
message = (
"❌ This channel is not authorized to use the bot.\n\n"
f"Please visit <{settings_url}|Gurubase Settings> to configure "
"the bot and add this channel to the allowed channels list."
)
client.chat_postMessage(
channel=channel_id,
thread_ts=thread_ts,
Expand Down Expand Up @@ -2465,12 +2478,17 @@ def process_event():

# Only proceed if it's a message event and not from a bot
if event["type"] == "message" and "subtype" not in event and event.get("user") != event.get("bot_id"):
dm = False
if event['channel_type'] == 'im':
dm = True
# Get bot user ID from authorizations
bot_user_id = data.get("authorizations", [{}])[0].get("user_id")
user_message = event["text"]

# First check if the bot is mentioned
if not (bot_user_id and f"<@{bot_user_id}>" in user_message):
if not dm and not (bot_user_id and f"<@{bot_user_id}>" in user_message):
return
elif dm and event['user'] == bot_user_id:
return

team_id = data.get('team_id')
Expand Down Expand Up @@ -2500,12 +2518,15 @@ def process_event():
channel_id = event["channel"]

# Check if the current channel is allowed
channels = integration.channels
channel_allowed = False
for channel in channels:
if str(channel.get('id')) == channel_id and channel.get('allowed', False):
channel_allowed = True
break
if not dm:
channels = integration.channels
channel_allowed = False
for channel in channels:
if str(channel.get('id')) == channel_id and channel.get('allowed', False):
channel_allowed = True
break
else:
channel_allowed = integration.allow_dm

# Get thread_ts if it exists (means we're in a thread)
thread_ts = event.get("thread_ts") or event.get("ts")
Expand All @@ -2519,7 +2540,8 @@ def process_event():
client=client,
channel_id=channel_id,
thread_ts=thread_ts,
guru_slug=integration.guru_type.slug
guru_slug=integration.guru_type.slug,
dm=dm
))
finally:
loop.close()
Expand Down
Loading