diff --git a/README.md b/README.md index b6f4c44..829b439 100644 --- a/README.md +++ b/README.md @@ -47,19 +47,26 @@ the default of "https://api.us.jupiterone.io" is used. ```python QUERY = 'FIND Host' -query_result = j1.query_v1(QUERY) +query_result = j1.query_v1(query=QUERY) # Including deleted entities -query_result = j1.query_v1(QUERY, include_deleted=True) +query_result = j1.query_v1(query=QUERY, include_deleted=True) # Tree query QUERY = 'FIND Host RETURN TREE' -query_result = j1.query_v1(QUERY) +query_result = j1.query_v1(query=QUERY) -# Using cursor graphQL variable to return full set of paginated results +# Using cursor query to return full set of paginated results QUERY = "FIND (Device | Person)" -cursor_query_r = j1._cursor_query(QUERY) +cursor_query_r = j1._cursor_query(query=QUERY) +# Using cursor query with parallel processing +QUERY = "FIND (Device | Person)" +cursor_query_r = j1._cursor_query(query=QUERY, max_workers=5) + +# Using deferredResponse with J1QL to return large datasets +QUERY = "FIND UnifiedDevice" +deferred_response_query_r = j1.query_with_deferred_response(query=QUERY) ``` ##### Create an entity: @@ -67,6 +74,8 @@ cursor_query_r = j1._cursor_query(QUERY) Note that the CreateEntity mutation behaves like an upsert, so a non-existent entity will be created or an existing entity will be updated. ```python +import time + properties = { 'myProperty': 'myValue', 'tag.myTagProperty': 'value_will_be_a_tag' @@ -80,7 +89,6 @@ entity = j1.create_entity( timestamp=int(time.time()) * 1000 # Optional, defaults to current datetime ) print(entity['entity']) - ``` @@ -96,7 +104,6 @@ j1.update_entity( entity_id='', properties=properties ) - ``` @@ -116,7 +123,6 @@ j1.create_relationship( from_entity_id='', to_entity_id='' ) - ``` ##### Update a relationship @@ -128,35 +134,30 @@ j1.update_relationship( "": "", }, ) - ``` ##### Delete a relationship ```python j1.delete_relationship(relationship_id='') - ``` ##### Fetch Graph Entity Properties ```python j1.fetch_all_entity_properties() - ``` ##### Fetch Graph Entity Tags ```python j1.fetch_all_entity_tags() - ``` ##### Fetch Entity Raw Data ```python j1.fetch_entity_raw_data(entity_id='') - ``` ##### Create Integration Instance @@ -165,14 +166,12 @@ j1.fetch_entity_raw_data(entity_id='') j1.create_integration_instance( instance_name="Integration Name", instance_description="Description Text") - ``` ##### Start Synchronization Job ```python j1.start_sync_job(instance_id='') - ``` ##### Upload Batch of Entities @@ -204,7 +203,6 @@ entities_payload = [ j1.upload_entities_batch_json(instance_job_id='', entities_list=entities_payload) - ``` ##### Upload Batch of Relationships @@ -231,7 +229,6 @@ relationships_payload = [ j1.upload_relationships_batch_json(instance_job_id='', relationships_list=relationships_payload) - ``` ##### Upload Batch of Entities and Relationships @@ -283,14 +280,12 @@ combined_payload = { j1.upload_combined_batch_json(instance_job_id='', combined_payload=combined_payload) - ``` ##### Finalize Synchronization Job ```python j1.finalize_sync_job(instance_job_id='') - ``` ##### Fetch Integration Instance Jobs @@ -305,7 +300,6 @@ j1.fetch_integration_jobs(instance_id='') ```python j1.fetch_integration_job_events(instance_id='', instance_job_id='') - ``` ##### Create SmartClass @@ -313,7 +307,6 @@ j1.fetch_integration_job_events(instance_id='', ```python j1.create_smartclass(smartclass_name='SmartClassName', smartclass_description='SmartClass Description Text') - ``` ##### Create SmartClass Query @@ -322,42 +315,36 @@ j1.create_smartclass(smartclass_name='SmartClassName', j1.create_smartclass_query(smartclass_id='', query='', query_description='Query Description Text') - ``` ##### Run SmartClass Evaluation ```python j1.evaluate_smartclass(smartclass_id='') - ``` ##### Get SmartClass Details ```python j1.get_smartclass_details(smartclass_id='') - ``` ##### Generate J1QL from Natural Language Prompt ```python j1.generate_j1ql(natural_language_prompt='') - ``` ##### List Alert Rules ```python j1.list_alert_rules() - ``` ##### Get Alert Rule Details ```python j1.get_alert_rule_details(rule_id='') - ``` ##### Create Alert Rule @@ -372,13 +359,11 @@ j1.create_alert_rule(name="create_alert_rule-name", polling_interval="DISABLED", severity="INFO", j1ql="find jupiterone_user") - ``` ##### Create Alert Rule with Action Config ```python - webhook_action_config = { "type": "WEBHOOK", "endpoint": "https://webhook.domain.here/endpoint", @@ -443,21 +428,17 @@ j1.create_alert_rule(name="create_alert_rule-name", severity="INFO", j1ql="find jupiterone_user", action_configs=webhook_action_config) - ``` ##### Delete Alert Rule ```python - j1.delete_alert_rule(rule_id=' Dict: """Executes query against graphql endpoint""" @@ -134,28 +125,26 @@ def _execute_query(self, query: str, variables: Dict = None) -> Dict: # Always ask for variableResultSize data.update(flags={"variableResultSize": True}) - # initiate requests session and implement retry logic of 5 request retries with 1 second between - s = requests.Session() - retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504]) - s.mount("https://", HTTPAdapter(max_retries=retries)) - - response = s.post(self.graphql_url, headers=self.headers, json=data, timeout=60) + response = self.session.post( + self.graphql_url, + headers=self.headers, + json=data, + timeout=60 + ) # It is still unclear if all responses will have a status # code of 200 or if 429 will eventually be used to # indicate rate limits being hit. J1 devs are aware. if response.status_code == 200: - if response._content: - content = json.loads(response._content) - if "errors" in content: - errors = content["errors"] - if len(errors) == 1: - if "429" in errors[0]["message"]: - raise JupiterOneApiRetryError( - "JupiterOne API rate limit exceeded" - ) - raise JupiterOneApiError(content.get("errors")) - return response.json() + content = response.json() + if "errors" in content: + errors = content["errors"] + if len(errors) == 1 and "429" in errors[0]["message"]: + raise JupiterOneApiRetryError( + "JupiterOne API rate limit exceeded" + ) + raise JupiterOneApiError(content.get("errors")) + return content elif response.status_code == 401: raise JupiterOneApiError( @@ -176,18 +165,24 @@ def _execute_query(self, query: str, variables: Dict = None) -> Dict: if isinstance(content, (bytes, bytearray)): content = content.decode("utf-8") if "application/json" in response.headers.get("Content-Type", "text/plain"): - data = json.loads(content) + data = response.json() content = data.get("error", data.get("errors", content)) raise JupiterOneApiError("{}:{}".format(response.status_code, content)) def _cursor_query( - self, query: str, cursor: str = None, include_deleted: bool = False + self, + query: str, + cursor: str = None, + include_deleted: bool = False, + max_workers: Optional[int] = None ) -> Dict: - """Performs a V1 graph query using cursor pagination + """Performs a V1 graph query using cursor pagination with optional parallel processing + args: query (str): Query text cursor (str): A pagination cursor for the initial query include_deleted (bool): Include recently deleted entities in query/search + max_workers (int, optional): Maximum number of parallel workers for fetching pages """ # If the query itself includes a LIMIT then we must parse that and check if we've reached @@ -200,33 +195,77 @@ def _cursor_query( result_limit = False results: List = [] - while True: - variables = {"query": query, "includeDeleted": include_deleted} + def fetch_page(cursor: Optional[str] = None) -> Dict: + variables = {"query": query, "includeDeleted": include_deleted} if cursor is not None: variables["cursor"] = cursor - - response = self._execute_query(query=CURSOR_QUERY_V1, variables=variables) - data = response["data"]["queryV1"]["data"] - - # This means it's a "TREE" query and we have everything - if "vertices" in data and "edges" in data: - return data - - results.extend(data) - - if result_limit and len(results) >= result_limit: - # We can stop paginating if we've collected enough results based on the requested limit - break - elif ( - "cursor" in response["data"]["queryV1"] - and response["data"]["queryV1"]["cursor"] is not None - ): - # We got a cursor and haven't collected enough results + return self._execute_query(query=CURSOR_QUERY_V1, variables=variables) + + # First page to get initial cursor and data + response = fetch_page(cursor) + data = response["data"]["queryV1"]["data"] + + # This means it's a "TREE" query and we have everything + if "vertices" in data and "edges" in data: + return data + + results.extend(data) + + # If no cursor or we've hit the limit, return early + if not response["data"]["queryV1"].get("cursor") or (result_limit and len(results) >= result_limit): + return {"data": results[:result_limit] if result_limit else results} + + # If parallel processing is enabled and we have more pages to fetch + if max_workers and max_workers > 1: + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + future_to_cursor = { + executor.submit(fetch_page, response["data"]["queryV1"]["cursor"]): + response["data"]["queryV1"]["cursor"] + } + + while future_to_cursor: + # Wait for the next future to complete + done, _ = concurrent.futures.wait( + future_to_cursor, + return_when=concurrent.futures.FIRST_COMPLETED + ) + + for future in done: + cursor = future_to_cursor.pop(future) + try: + response = future.result() + page_data = response["data"]["queryV1"]["data"] + results.extend(page_data) + + # Check if we need to fetch more pages + if (result_limit and len(results) >= result_limit) or \ + not response["data"]["queryV1"].get("cursor"): + # Cancel remaining futures + for f in future_to_cursor: + f.cancel() + future_to_cursor.clear() + break + + # Schedule next page fetch + next_cursor = response["data"]["queryV1"]["cursor"] + future_to_cursor[executor.submit(fetch_page, next_cursor)] = next_cursor + + except Exception as e: + # Log error but continue with other pages + print(f"Error fetching page with cursor {cursor}: {str(e)}") + else: + # Sequential processing + while True: cursor = response["data"]["queryV1"]["cursor"] - else: - # No cursor returned so we're done - break + response = fetch_page(cursor) + data = response["data"]["queryV1"]["data"] + results.extend(data) + + if result_limit and len(results) >= result_limit: + break + elif not response["data"]["queryV1"].get("cursor"): + break # If we detected an inline LIMIT make sure we only return that many results if result_limit: @@ -266,15 +305,15 @@ def _limit_and_skip_query( page += 1 return {"data": results} - + def query_with_deferred_response(self, query, cursor=None): """ Execute a J1QL query that returns a deferred response for handling large result sets. - + Args: query (str): The J1QL query to execute cursor (str, optional): Pagination cursor for subsequent requests - + Returns: list: Combined results from all paginated responses """ @@ -300,12 +339,8 @@ def query_with_deferred_response(self, query, cursor=None): for attempt in range(1, max_retries + 1): - session = requests.Session() - retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504, 429]) - session.mount('https://', HTTPAdapter(max_retries=retries)) - # Get the download URL - url_response = session.post( + url_response = self.session.post( self.graphql_url, headers=self.headers, json=payload, @@ -325,7 +360,7 @@ def query_with_deferred_response(self, query, cursor=None): # Poll the download URL until results are ready while True: - download_response = session.get(download_url, timeout=60).json() + download_response = self.session.get(download_url, timeout=60).json() status = download_response['status'] if status != 'IN_PROGRESS': @@ -345,17 +380,13 @@ def query_with_deferred_response(self, query, cursor=None): else: print(f"Request failed after {max_retries} attempts. Status: {url_response.status_code}") - return all_query_results + return all_query_results def _execute_syncapi_request(self, endpoint: str, payload: Dict = None) -> Dict: """Executes POST request to SyncAPI endpoints""" - # initiate requests session and implement retry logic of 5 request retries with 1 second between - s = requests.Session() - retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504]) - s.mount("https://", HTTPAdapter(max_retries=retries)) - - response = s.post( + # initiate requests session and implement retry logic of 5 request retries with 1 second between retries + response = self.session.post( self.sync_url + endpoint, headers=self.headers, json=payload, timeout=60 ) @@ -772,7 +803,7 @@ def update_integration_instance_config_value( ): """Update a single config k:v pair existing on a configured Integration Instance.""" - # fetch existing instnace configuration + # fetch existing instance configuration instance_config = self.get_integration_instance_details(instance_id=instance_id) config_dict = instance_config["data"]["integrationInstance"]["config"] @@ -810,7 +841,7 @@ def update_integration_instance_config_value( # remove problem fields from previous response if variables["update"].get("pollingIntervalCronExpression") is not None: - if "__typename" in ["update"]["pollingIntervalCronExpression"]: + if "__typename" in variables["update"]["pollingIntervalCronExpression"]: del variables["update"]["pollingIntervalCronExpression"][ "__typename" ] @@ -1192,12 +1223,8 @@ def fetch_evaluation_result_download_url(self, raw_data_key: str = None): def fetch_downloaded_evaluation_results(self, download_url: str = None): """Return full Alert Rule J1QL results from Download URL""" # initiate requests session and implement retry logic of 5 request retries with 1 second between - s = requests.Session() - retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504]) - s.mount("https://", HTTPAdapter(max_retries=retries)) - try: - response = s.get(download_url, timeout=60) + response = self.session.get(download_url, timeout=60) return response.json() diff --git a/requirements.txt b/requirements.txt index 92ab860..cee8ba2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ retrying>=1.3.4,<2 -requests>=2,<3 +requests>=2.31.0,<3