-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathuav.py
190 lines (147 loc) · 5.78 KB
/
uav.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
from dataclasses import dataclass
from math import floor, sqrt
from enum import Enum, auto
from twisted.internet import reactor
from pyspades.protocol import BaseProtocol
from pyspades.common import Vertex3
from pyspades.world import Grenade
from pyspades.team import Team
from piqueserver.commands import command, get_player, CommandError
from piqueserver.config import config
from milsim.blast import sendGrenadePacket
from milsim.common import alive_only
section = config.section("drone")
drone_phase = section.option("phase", 90).get()
drone_delay = section.option("delay", 240).get()
drone_rate = section.option("rate", 1).get()
drone_timeout = section.option("timeout", 60).get()
drone_teamkill = section.option("teamkill", False).get()
drone_grenades = section.option("grenades", 5).get()
class Status(Enum):
inflight = auto()
awaiting = auto()
inwork = auto()
@dataclass
class Drone:
name : str
team : Team
protocol : BaseProtocol
def __post_init__(self):
self.init()
def init(self, by_server = False):
self.status = Status.inflight
self.callback = None
self.player_id = None
self.target_id = None
self.grenades = 0
self.passed = 0
if by_server:
self.callback = reactor.callLater(drone_phase, self.start)
def report(self, msg):
self.protocol.broadcast_chat(
"<{}> {}. Over.".format(self.name, msg),
global_message=False, team=self.team
)
def start(self):
if self.status != Status.inflight:
return
self.arrive()
def stop(self):
if self.callback and self.callback.active():
self.callback.cancel()
self.callback = None
def arrive(self):
self.grenades = drone_grenades
self.status = Status.awaiting
self.report("Drone on the battlefield")
def track(self, player, target):
self.player_id = player.player_id
self.target_id = target.player_id
self.status = Status.inwork
self.passed = 0
self.report("Received. Watching for {}".format(target.name))
self.callback = reactor.callLater(drone_rate, self.ping)
def free(self):
self.status = Status.awaiting
self.passed = 0
self.player_id = None
self.target_id = None
def ping(self):
self.passed += drone_rate
if self.target_id not in self.protocol.players:
self.report("Don't see the target. Awaiting for further instructions")
return self.free()
target = self.protocol.players[self.target_id]
if self.passed > drone_timeout:
self.report("Don't see {}. Awaiting for further instructions".format(target.name))
return self.free()
if target.dead():
self.callback = reactor.callLater(drone_rate, self.ping)
return
x, y, z = target.world_object.position.get()
H = self.protocol.map.get_z(floor(x), floor(y))
if z <= H:
fuse = sqrt(z) / 4
position = Vertex3(x, y, 0)
velocity = Vertex3(0, 0, 0)
player = self.protocol.take_player(self.player_id)
grenade = self.protocol.world.create_object(
Grenade, fuse, position, None, velocity, player.grenade_exploded
)
grenade.name = 'grenade'
sendGrenadePacket(self.protocol, player.player_id, position, velocity, fuse)
self.grenades -= 1
self.passed = 0
self.target_id = None
self.player_id = None
if self.grenades > 0:
self.status = Status.awaiting
self.callback = None
self.report("Bombed out. Awaiting for further instructions")
else:
self.status = Status.inflight
self.callback = reactor.callLater(drone_delay, self.arrive)
self.report("Bombed out. Will be ready in {} seconds".format(drone_delay))
else:
self.callback = reactor.callLater(drone_rate, self.ping)
def remaining(self):
if self.callback is not None:
return self.callback.getTime() - reactor.seconds()
@command('drone', 'd')
@alive_only
def drone(conn, nickname = None):
"""
Commands the drone to follow the player
/drone <player>
"""
if drone := conn.get_drone():
if drone.status == Status.inflight:
if rem := drone.remaining():
approx = (rem // 5 + 1) * 5
drone.report("Will be on the battlefield in {:.0f} seconds".format(approx))
elif drone.status == Status.inwork:
drone.report("Drone is busy")
elif nickname is None:
return "Usage: /drone <player>"
elif drone.status == Status.awaiting:
player = get_player(conn.protocol, nickname, spectators = False)
if player.team.id == conn.team.id and not drone_teamkill:
raise CommandError("Expected enemy's nickname")
drone.track(conn, player)
def apply_script(protocol, connection, config):
class UAVProtocol(protocol):
def __init__(self, *w, **kw):
protocol.__init__(self, *w, **kw)
self.drones = {
self.team_1.id : Drone("DJI Mavic 3", self.team_1, self),
self.team_2.id : Drone("DJI Phantom 4", self.team_2, self)
}
def on_map_change(self, M):
for drone in self.drones.values():
drone.stop()
drone.init(by_server = True)
protocol.on_map_change(self, M)
class UAVConnection(connection):
def get_drone(self):
return self.protocol.drones.get(self.team.id)
return UAVProtocol, UAVConnection