Skip to content

add query_with_deferred_response method #45

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 25, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions jupiterone/client.py
Original file line number Diff line number Diff line change
@@ -31,6 +31,7 @@
UPDATE_RELATIONSHIPV2,
DELETE_RELATIONSHIP,
CURSOR_QUERY_V1,
DEFERRED_RESPONSE_QUERY,
CREATE_INSTANCE,
INTEGRATION_JOB_VALUES,
INTEGRATION_INSTANCE_EVENT_VALUES,
@@ -265,6 +266,86 @@ def _limit_and_skip_query(
page += 1

return {"data": results}

def query_with_deferred_response(self, query, cursor=None):
"""
Execute a J1QL query that returns a deferred response for handling large result sets.

Args:
query (str): The J1QL query to execute
cursor (str, optional): Pagination cursor for subsequent requests

Returns:
list: Combined results from all paginated responses
"""
all_query_results = []
current_cursor = cursor

while True:
variables = {
"query": query,
"deferredResponse": "FORCE",
"cursor": current_cursor,
"flags": {"variableResultSize": True}
}

payload = {
"query": DEFERRED_RESPONSE_QUERY,
"variables": variables
}

# Use session with retries for reliability
max_retries = 5
backoff_factor = 2

for attempt in range(1, max_retries + 1):

session = requests.Session()
retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504, 429])
session.mount('https://', HTTPAdapter(max_retries=retries))

# Get the download URL
url_response = session.post(
self.graphql_url,
headers=self.headers,
json=payload,
timeout=60
)

if url_response.status_code == 429:
retry_after = int(url_response.headers.get("Retry-After", backoff_factor ** attempt))
print(f"Rate limited. Retrying in {retry_after} seconds...")
time.sleep(retry_after)
else:
break # Exit on success or other non-retryable error

if url_response.ok:

download_url = url_response.json()['data']['queryV1']['url']

# Poll the download URL until results are ready
while True:
download_response = session.get(download_url, timeout=60).json()
status = download_response['status']

if status != 'IN_PROGRESS':
break

time.sleep(0.2) # Sleep 200 milliseconds between checks

# Add results to the collection
all_query_results.extend(download_response['data'])

# Check for more pages
if 'cursor' in download_response:
current_cursor = download_response['cursor']
else:
break

else:
print(f"Request failed after {max_retries} attempts. Status: {url_response.status_code}")

return all_query_results

def _execute_syncapi_request(self, endpoint: str, payload: Dict = None) -> Dict:
"""Executes POST request to SyncAPI endpoints"""
37 changes: 27 additions & 10 deletions jupiterone/constants.py
Original file line number Diff line number Diff line change
@@ -131,7 +131,6 @@
entityRawDataLegacy(entityId: $entityId, , source: $source) {
entityId
payload {

... on RawDataJSONEntityLegacy {
contentType
name
@@ -360,7 +359,7 @@
...IntegrationDefinitionConfigFragment @include(if: $includeConfig)
__typename
}

fragment IntegrationDefinitionConfigFragment on IntegrationDefinition {
configFields {
...ConfigFieldsRecursive
@@ -387,7 +386,7 @@
}
__typename
}

fragment ConfigFieldsRecursive on ConfigField {
...ConfigFieldValues
configFields {
@@ -400,7 +399,7 @@
}
__typename
}

fragment ConfigFieldValues on ConfigField {
key
displayName
@@ -450,7 +449,7 @@
}
__typename
}

query IntegrationInstances($definitionId: String, $cursor: String, $limit: Int, $filter: ListIntegrationInstancesSearchFilter) {
integrationInstancesV2(
definitionId: $definitionId
@@ -507,7 +506,7 @@
collectorPoolId
__typename
}

fragment IntegrationInstanceJobValues on IntegrationJob {
id
status
@@ -517,7 +516,7 @@
hasSkippedSteps
__typename
}

query IntegrationInstance($integrationInstanceId: String!) {
integrationInstance(id: $integrationInstanceId) {
...IntegrationInstanceValues
@@ -577,6 +576,24 @@
}
}
"""
DEFERRED_RESPONSE_QUERY = """
query J1QL(
$query: String!
$variables: JSON
$cursor: String
$deferredResponse: DeferredResponseOption
) {
queryV1(
query: $query
variables: $variables
deferredResponse: $deferredResponse
cursor: $cursor
) {
type
url
}
}
"""
J1QL_FROM_NATURAL_LANGUAGE = """
query j1qlFromNaturalLanguage($input: J1qlFromNaturalLanguageInput!) {
j1qlFromNaturalLanguage(input: $input) {
@@ -660,7 +677,7 @@
__typename
}
}

fragment RuleInstanceFields on QuestionRuleInstance {
id
accountId
@@ -726,7 +743,7 @@
__typename
}
}

fragment RuleInstanceFields on QuestionRuleInstance {
id
accountId
@@ -869,7 +886,7 @@
__typename
}
}

fragment QuestionFields on Question {
id
sourceId
Loading