Skip to content
This repository was archived by the owner on Jul 29, 2024. It is now read-only.

Commit 9a7f0b9

Browse files
Update connector.py
1 parent 852450f commit 9a7f0b9

File tree

1 file changed

+48
-51
lines changed

1 file changed

+48
-51
lines changed

labelpandas/connector.py

Lines changed: 48 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -29,42 +29,36 @@ def create_upload_dict(df:pandas.core.frame.DataFrame, lb_client:Client, base_cl
2929
metadata_schema_to_name_key = base_client.get_metadata_schema_to_name_key(lb_mdo=False, divider=divider, invert=False)
3030
metadata_name_key_to_schema = base_client.get_metadata_schema_to_name_key(lb_mdo=False, divider=divider, invert=True)
3131
global_key_to_upload_dict = {}
32-
try:
33-
with ThreadPoolExecutor() as exc:
34-
futures = []
35-
x = 0
36-
dupe_print = 0
37-
if verbose:
38-
print(f'Submitting data rows...')
39-
for index, row in df.iterrows():
40-
futures.append(exc.submit(create_data_rows, lb_client, base_client, row, metadata_name_key_to_schema, metadata_schema_to_name_key, row_data_col, global_key_col, external_id_col, metadata_index, local_files, divider))
41-
if verbose:
42-
print(f'Processing data rows...')
43-
for f in tqdm(as_completed(futures)):
44-
res = f.result()
45-
global_key_to_upload_dict[str(res["global_key"])] = res
46-
else:
47-
for f in as_completed(futures):
48-
res = f.result()
49-
global_key_to_upload_dict[str(res["global_key"])] = res
50-
# for f in as_completed(futures):
51-
# res = f.result()
52-
# global_key_to_upload_dict[str(res["global_key"])] = res
53-
# if verbose:
54-
# x += 1
55-
# percent_complete = math.ceil((x / len(df)*100))
56-
# if percent_complete%1 == 0 and (percent_complete!=dupe_print):
57-
# print(f'{str(percent_complete)}% complete')
58-
# dupe_print = percent_complete
32+
with ThreadPoolExecutor(max_workers=8) as exc:
33+
failed_global_keys = []
34+
futures = []
5935
if verbose:
60-
print(f'Generated upload list - {len(global_key_to_upload_dict)} data rows to upload')
61-
return True, global_key_to_upload_dict
62-
except Exception as e:
63-
print(e)
64-
if res:
65-
return False, res
36+
print(f'Submitting data rows...')
37+
for index, row in df.iterrows():
38+
futures.append(exc.submit(create_data_rows, lb_client, base_client, row, metadata_name_key_to_schema, metadata_schema_to_name_key, row_data_col, global_key_col, external_id_col, metadata_index, local_files, divider))
39+
if verbose:
40+
print(f'Processing data rows...')
41+
for f in tqdm(as_completed(futures)):
42+
res = f.result()
43+
if type(res)==dict:
44+
global_key_to_upload_dict[str(res["global_key"])] = res
45+
else:
46+
failed_global_keys.append(res)
6647
else:
67-
return False, False
48+
for f in as_completed(futures):
49+
res = f.result()
50+
if type(res)==dict:
51+
global_key_to_upload_dict[str(res["global_key"])] = res
52+
else:
53+
failed_global_keys.append(res)
54+
if verbose:
55+
print(f'Generated upload list - {len(global_key_to_upload_dict)} data rows to upload')
56+
if failed_global_keys:
57+
if global_key_to_upload_dict:
58+
print(f'There were {len(failed_global_keys)} errors in creating your upload list - upload will continue and return a list of failed global keys')
59+
else:
60+
print(f'There were {len(failed_global_keys)} errors in creating your upload list - upload will not continue')
61+
return global_key_to_upload_dict, failed_global_keys
6862

6963
def create_data_rows(lb_client:Client, base_client:baseClient, row:pandas.core.series.Series,
7064
metadata_name_key_to_schema:dict, metadata_schema_to_name_key:dict, row_data_col:str,
@@ -85,23 +79,26 @@ def create_data_rows(lb_client:Client, base_client:baseClient, row:pandas.core.s
8579
Returns:
8680
Two items - the global_key, and a dictionary with "row_data", "global_key", "external_id" and "metadata_fields" keys
8781
"""
88-
row_data = lb_client.upload_file(str(row[row_data_col])) if local_files else str(row[row_data_col])
89-
# row_data = base_client.upload_local_file(file_path=str(row[row_data_col])) if local_files else str(row[row_data_col])
90-
metadata_fields = [{"schema_id" : metadata_name_key_to_schema['lb_integration_source'], "value" : "Pandas"}]
91-
if metadata_index:
92-
for metadata_field_name in metadata_index.keys():
93-
input_metadata = base_client.process_metadata_value(
94-
metadata_value=row[metadata_field_name],
95-
metadata_type=metadata_index[metadata_field_name],
96-
parent_name=metadata_field_name,
97-
metadata_name_key_to_schema=metadata_name_key_to_schema,
98-
divider=divider
99-
)
100-
if input_metadata:
101-
metadata_fields.append({"schema_id" : metadata_name_key_to_schema[metadata_field_name], "value" : input_metadata})
102-
else:
103-
continue
104-
return {"row_data":row_data,"global_key":str(row[global_key_col]),"external_id":str(row[external_id_col]),"metadata_fields":metadata_fields}
82+
try:
83+
row_data = lb_client.upload_file(str(row[row_data_col])) if local_files else str(row[row_data_col])
84+
metadata_fields = [{"schema_id" : metadata_name_key_to_schema['lb_integration_source'], "value" : "Pandas"}]
85+
if metadata_index:
86+
for metadata_field_name in metadata_index.keys():
87+
input_metadata = base_client.process_metadata_value(
88+
metadata_value=row[metadata_field_name],
89+
metadata_type=metadata_index[metadata_field_name],
90+
parent_name=metadata_field_name,
91+
metadata_name_key_to_schema=metadata_name_key_to_schema,
92+
divider=divider
93+
)
94+
if input_metadata:
95+
metadata_fields.append({"schema_id" : metadata_name_key_to_schema[metadata_field_name], "value" : input_metadata})
96+
else:
97+
continue
98+
return_value = {"row_data":row_data,"global_key":str(row[global_key_col]),"external_id":str(row[external_id_col]),"metadata_fields":metadata_fields}
99+
except Exception as e:
100+
return_value = [str(row[global_key_col]), e]
101+
return return_value
105102

106103
def get_columns_function(df):
107104
"""Grabs all column names from a Pandas DataFrame

0 commit comments

Comments
 (0)