forked from EmpireProject/Empire
-
-
Notifications
You must be signed in to change notification settings - Fork 579
/
app.py
185 lines (158 loc) · 6.13 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
import json
import logging
from contextlib import asynccontextmanager
from datetime import datetime
from json import JSONEncoder
from pathlib import Path
import socketio
import uvicorn
from fastapi import FastAPI
from starlette.middleware.gzip import GZipMiddleware
from starlette.staticfiles import StaticFiles
from empire.scripts.sync_starkiller import sync_starkiller
from empire.server.api.middleware import EmpireCORSMiddleware
from empire.server.api.v2.websocket.socketio import setup_socket_events
from empire.server.core.config import empire_config
log = logging.getLogger(__name__)
class MyJsonWrapper:
@staticmethod
def dumps(*args, **kwargs):
if "cls" not in kwargs:
kwargs["cls"] = MyJsonEncoder
return json.dumps(*args, **kwargs)
@staticmethod
def loads(*args, **kwargs):
return json.loads(*args, **kwargs)
class MyJsonEncoder(JSONEncoder):
def default(self, o):
if isinstance(o, datetime):
return o.isoformat()
if isinstance(o, bytes):
return o.decode("latin-1")
if hasattr(o, "json") and callable(o.json):
return o.json()
return JSONEncoder.default(self, o)
def load_starkiller(v2App, ip, port):
try:
sync_starkiller(empire_config.model_dump())
except Exception as e:
log.warning("Failed to load Starkiller: %s", e, exc_info=True)
log.warning(
"If you are trying to pull Starkiller from a private repository ("
"such as Starkiller-Sponsors), make sure you have the proper ssh "
"credentials set in your Empire config. See "
"https://docs.github.com/en/github/authenticating-to-github"
"/connecting-to-github-with-ssh"
)
if (Path(empire_config.starkiller.directory) / "dist").exists():
v2App.mount(
"/",
StaticFiles(directory=f"{empire_config.starkiller.directory}/dist"),
name="static",
)
log.info("Starkiller served at the same ip and port as Empire Server")
log.info(f"Starkiller served at http://localhost:{port}/index.html")
def initialize(
secure: bool = False, ip: str = "0.0.0.0", port: int = 1337, run: bool = True
):
# Not pretty but allows us to use main_menu by delaying the import
from empire.server.api.v2.agent import agent_api, agent_file_api, agent_task_api
from empire.server.api.v2.bypass import bypass_api
from empire.server.api.v2.credential import credential_api
from empire.server.api.v2.download import download_api
from empire.server.api.v2.host import host_api, process_api
from empire.server.api.v2.listener import listener_api, listener_template_api
from empire.server.api.v2.meta import meta_api
from empire.server.api.v2.module import module_api
from empire.server.api.v2.obfuscation import obfuscation_api
from empire.server.api.v2.plugin import plugin_api, plugin_task_api
from empire.server.api.v2.profile import profile_api
from empire.server.api.v2.stager import stager_api, stager_template_api
from empire.server.api.v2.tag import tag_api
from empire.server.api.v2.user import user_api
from empire.server.server import main
@asynccontextmanager
async def lifespan(app: FastAPI):
yield
if main:
main.shutdown()
if sio:
log.info("Shutting down SocketIO...")
await sio.shutdown()
v2App = FastAPI(lifespan=lifespan)
v2App.include_router(listener_template_api.router)
v2App.include_router(listener_api.router)
v2App.include_router(stager_template_api.router)
v2App.include_router(stager_api.router)
v2App.include_router(agent_task_api.router)
v2App.include_router(agent_api.router)
v2App.include_router(agent_file_api.router)
v2App.include_router(user_api.router)
v2App.include_router(module_api.router)
v2App.include_router(bypass_api.router)
v2App.include_router(obfuscation_api.router)
v2App.include_router(process_api.router)
v2App.include_router(profile_api.router)
v2App.include_router(credential_api.router)
v2App.include_router(host_api.router)
v2App.include_router(download_api.router)
v2App.include_router(meta_api.router)
v2App.include_router(plugin_task_api.router)
v2App.include_router(plugin_api.router)
v2App.include_router(tag_api.router)
v2App.add_middleware(
EmpireCORSMiddleware,
allow_origins=[
"*",
"http://localhost",
"http://localhost:8080",
"http://localhost:8081",
],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
expose_headers=["content-disposition"],
)
v2App.add_middleware(GZipMiddleware, minimum_size=500)
sio = socketio.AsyncServer(
async_mode="asgi",
cors_allowed_origins="*",
# logger=True,
# engineio_logger=True,
# https://github.com/miguelgrinberg/flask-socketio/issues/274#issuecomment-231206374
json=MyJsonWrapper,
)
sio_app = socketio.ASGIApp(
socketio_server=sio, other_asgi_app=v2App, socketio_path="/socket.io/"
)
v2App.add_route("/socket.io/", route=sio_app, methods=["GET", "POST"])
v2App.add_websocket_route("/socket.io/", sio_app)
setup_socket_events(sio, main)
if empire_config.starkiller.enabled:
log.info("Starkiller enabled. Loading.")
load_starkiller(v2App, ip, port)
else:
log.info("Starkiller disabled. Not loading.")
cert_path = Path(empire_config.api.cert_path)
if run:
if not secure:
uvicorn.run(
v2App,
host=ip,
port=port,
log_config=None,
lifespan="on",
# log_level="info",
)
else:
uvicorn.run(
v2App,
host=ip,
port=port,
log_config=None,
lifespan="on",
ssl_keyfile=f"{cert_path}/empire-priv.key",
ssl_certfile=f"{cert_path}/empire-chain.pem",
# log_level="info",
)
return v2App