forked from owtf/owtf
-
Notifications
You must be signed in to change notification settings - Fork 0
/
owtftest.py
167 lines (142 loc) · 5.51 KB
/
owtftest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
"""
tests.owtftest
~~~~~~~~~~~~~~
Test cases.
"""
from __future__ import print_function
from builtins import input
import os
import copy
import glob
import tornado
import unittest
import mock
from hamcrest import *
from tests.utils import (
load_log,
db_setup,
clean_owtf_review,
DIR_OWTF_REVIEW,
DIR_OWTF_LOGS,
)
from tests.server import WebServerProcess
class OWTFCliTestCase(unittest.TestCase):
"""Basic OWTF test case that initialises basic patches."""
DEFAULT_ARGS = ["--nowebui"]
PROTOCOL = "http"
IP = "127.0.0.1"
DOMAIN = "localhost"
PORT = "8888"
def __init__(self, methodName="runTest"):
super(OWTFCliTestCase, self).__init__(methodName)
self.args = copy.copy(self.DEFAULT_ARGS)
def setUp(self):
self.args = copy.copy(self.DEFAULT_ARGS)
self.clean_old_runs()
self.raw_input_patcher = mock.patch("builtins.input", return_value=["Y"])
self.raw_input_patcher.start()
def tearDown(self):
self.raw_input_patcher.stop()
self.clean_logs()
def run_owtf(self, *extra_args):
"""Run OWTF with args plus ``extra_args`` if any."""
if self.args:
args = self.args[:]
else:
args = self.DEFAULT_ARGS[:]
if extra_args:
args += extra_args
print("with the following options: %s" % args)
args_str = " ".join(args)
os.system("owtf {}".format(args_str))
self.load_logs()
def load_logs(self):
"""Load all file logs generated by OWTF during the run."""
abs_path = os.path.join(os.getcwd(), DIR_OWTF_REVIEW, DIR_OWTF_LOGS)
self.logs_main_process = []
for main_process_log in glob.glob(os.path.join(abs_path, "MainProcess*.log")):
self.logs_main_process.extend(load_log(main_process_log, absolute_path=True))
self.logs_worker = []
for worker_log in glob.glob(os.path.join(abs_path, "Worker*.log")):
self.logs_worker.extend(load_log(worker_log, absolute_path=True))
self.logs_proxy_process = []
for proxy_log in glob.glob(os.path.join(abs_path, "ProxyProcess*.log")):
self.logs_proxy_process.extend(load_log(proxy_log, absolute_path=True))
self.logs_transaction_logger = []
for trans_log in glob.glob(os.path.join(abs_path, "TransactionLogger*.log")):
self.logs_transaction_logger.extend(load_log(trans_log, absolute_path=True))
self.logs = {
"MainProcess": self.logs_main_process,
"Worker": self.logs_worker,
"ProxyProcess": self.logs_proxy_process,
"TransactionLogger": self.logs_transaction_logger,
}
self.logs_all = []
for log in self.logs.items():
self.logs_all.extend(log)
def clean_logs(self):
"""Remove old logs that have been loaded during a run."""
if hasattr(self, "logs_main_process"):
self.logs_main_process = []
if hasattr(self, "logs_worker"):
self.logs_worker = []
if hasattr(self, "logs_proxy_process"):
self.logs_proxy_process = []
if hasattr(self, "logs_transaction_logger"):
self.logs_transaction_logger = []
if hasattr(self, "logs"):
self.logs = {}
if hasattr(self, "logs_all"):
self.logs_all = []
@staticmethod
def clean_old_runs():
"""Clean the database and the older owtf_review directory."""
# Reset the database.
db_setup("clean")
db_setup("init")
# Remove old OWTF outputs
clean_owtf_review()
# Specific methods that test logs and function calls.
def assert_has_been_logged(self, text, name=None, msg=None):
if name and name in self.logs:
assert_that(self.logs[name], has_item(text), msg)
else:
assert_that(self.logs_all, has_item(text), msg)
def assert_has_not_been_logged(self, text, name=None, msg=None):
if name and name in self.logs:
assert_that(self.logs[name], not (has_item(text)), msg)
else:
assert_that(self.logs_all, not (has_item(text)), msg)
def assert_is_in_logs(self, text, name=None, msg=None):
if name and name in self.logs:
self.assertTrue(text in str(self.logs[name]), msg)
else:
self.assertTrue(text in str(self.logs_all), msg)
def assert_is_not_in_logs(self, text, name=None, msg=None):
if name and name in self.logs:
self.assertFalse(text in str(self.logs[name]), msg)
else:
self.assertFalse(text in str(self.logs_all), msg)
def assert_are_in_logs(self, texts, name=None, msg=None):
for text in texts:
self.assert_is_in_logs(text, name, msg)
def assert_are_not_in_logs(self, texts, name=None, msg=None):
for text in texts:
self.assert_is_not_in_logs(text, name, msg)
class OWTFCliWebPluginTestCase(OWTFCliTestCase):
DEFAULT_ARGS = ["--nowebui"]
PROTOCOL = "http"
IP = "127.0.0.1"
PORT = "8888"
MATCH_PLUGIN_START = "Execution Start"
MATCH_BUG = "OWTF BUG"
DYNAMIC_METHOD_REGEX = "^set_(head|get|post|put|delete|options|connect)_response"
def setUp(self):
super(OWTFCliWebPluginTestCase, self).setUp()
# Web server initialization.
self.server = WebServerProcess(self.IP, self.PORT)
self.server.start()
def tearDown(self):
super(OWTFCliWebPluginTestCase, self).tearDown()
self.server.stop()
tornado.ioloop.IOLoop.clear_instance()