-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcelery_worker.py
146 lines (133 loc) · 6.56 KB
/
celery_worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import os
import time
import requests
from requests.exceptions import RequestException
import json
from celery import Celery
from dotenv import load_dotenv
load_dotenv(".env")
celery = Celery(__name__)
celery.conf.broker_url = os.environ.get("CELERY_BROKER_URL")
celery.conf.result_backend = os.environ.get("CELERY_RESULT_BACKEND")
class CallbackTask(celery.Task):
def on_success(self, retval, task_id, args, kwargs):
# retval (object) - The return value of the task.
# task_id (str) - Id of the executed task.
# args (Tuple) - Original arguments for the task that was executed.
# kwargs (Dict) - Original keyword arguments for the task that was executed.
print('{0!r} success: {1!r}'.format(task_id, retval))
try:
headers = args[4]
channel_id = headers['X-CHANNEL-ID']
except:
channel_id = task_id
callback_url = args[5]
if len(callback_url) > 0:
if callback_url.startswith("http://") or callback_url.startswith("https://"):
try:
requests.post(args[5], data=json.dumps({
"status": "success",
"task_id": task_id,
"channel_id": channel_id,
"request": {
"taskname": args[0],
"url": args[1],
"http_method": args[2],
"body": args[3],
"headers": args[4],
"callback_url": args[5]
},
"response": retval,
}), headers={
"Content-Type": "application/json",
})
except RequestException as e:
print(e)
def on_failure(self, exc, task_id, args, kwargs, einfo):
# exc (Exception) - The exception raised by the task.
# args (Tuple) - Original arguments for the task that failed.
# kwargs (Dict) - Original keyword arguments for the task that failed.
print('{0!r} failed: {1!r}'.format(task_id, exc))
callback_url = args[5]
if len(callback_url) > 0:
if callback_url.startswith("http://") or callback_url.startswith("https://"):
try:
requests.post(args[5], data=json.dumps({
"status": "failed",
"task_id": task_id,
"request": {
"taskname": args[0],
"url": args[1],
"http_method": args[2],
"body": args[3],
"headers": args[4],
"callback_url": args[5]
},
"einfo": str(einfo),
}), headers={
"Content-Type": "application/json",
})
except RequestException as e:
print(e)
@celery.task(name="create_task", base=CallbackTask, bind=True, autoretry_for=(RequestException,), retry_backoff=True)
def create_task(self, taskname, url, http_method, body, headers, callback_url):
# try:
headers.update({'X-TASK-ID': self.request.id})
if http_method == "GET":
response = requests.get(url, headers=headers, allow_redirects=True)
elif http_method == "POST":
if headers.get('Content-Type') == 'application/json':
response = requests.post(url, json=body, headers=headers, allow_redirects=True)
elif headers.get('Content-Type') == 'application/x-www-form-urlencoded':
# response = requests.post(url, params=body, headers=headers, allow_redirects=True)
response = requests.post(url, data=body, headers=headers, allow_redirects=True)
else:
response = requests.post(url, data=json.dumps(body), headers=headers, allow_redirects=True)
elif http_method == "PUT":
if headers.get('Content-Type') == 'application/json':
response = requests.put(url, json=body, headers=headers, allow_redirects=True)
elif headers.get('Content-Type') == 'application/x-www-form-urlencoded':
# response = requests.put(url, params=body, headers=headers, allow_redirects=True)
response = requests.put(url, data=body, headers=headers, allow_redirects=True)
else:
response = requests.put(url, data=json.dumps(body), headers=headers, allow_redirects=True)
elif http_method == "PATCH":
if headers.get('Content-Type') == 'application/json':
response = requests.patch(url, json=body, headers=headers, allow_redirects=True)
elif headers.get('Content-Type') == 'application/x-www-form-urlencoded':
# response = requests.patch(url, params=body, headers=headers, allow_redirects=True)
response = requests.patch(url, data=body, headers=headers, allow_redirects=True)
else:
response = requests.patch(url, data=json.dumps(body), headers=headers, allow_redirects=True)
elif http_method == "DELETE":
response = requests.delete(url, headers=headers, allow_redirects=True)
else:
raise RequestException("HTTP Method not supported")
# GET response http code
# response_headers = {}
# for key, value in response.headers.items():
# response_headers[key] = value
if not response.ok:
raise RequestException(f'{url} returned unexpected response code: {response.status_code}')
try:
response_body = response.json()
except ValueError:
response_body = response.text
response_headers = dict(response.headers)
# response_headers = response_headers + ({'X-TASK-ID': self.request.id},)
response_headers.update({'X-TASK-ID': self.request.id})
return {
# "url": url,
# "http_method": http_method,
"headers": response_headers,
# "headers": headers,
# "body": body,
"status_code": response.status_code,
# "response_headers": response_headers,
"body": response_body
}
# return response.json()
# return str(response)
# except Exception as e:
# self.retry(countdown=5, exc=e)
# raise Exception(str(e))