forked from safing/spn
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathop_publish.go
171 lines (145 loc) · 4.64 KB
/
op_publish.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package captain
import (
"github.com/safing/portbase/container"
"github.com/safing/spn/cabin"
"github.com/safing/spn/conf"
"github.com/safing/spn/docks"
"github.com/safing/spn/hub"
"github.com/safing/spn/terminal"
)
const PublishOpType string = "publish"
type PublishOp struct {
terminal.OpBase
controller *docks.CraneControllerTerminal
identity *cabin.Identity
requestingHub *hub.Hub
verification *cabin.Verification
result chan *terminal.Error
}
func (op *PublishOp) Type() string {
return PublishOpType
}
func init() {
terminal.RegisterOpType(terminal.OpParams{
Type: PublishOpType,
Requires: terminal.IsCraneController,
RunOp: runPublishOp,
})
}
func NewPublishOp(controller *docks.CraneControllerTerminal, identity *cabin.Identity) (*PublishOp, *terminal.Error) {
// Create and init.
op := &PublishOp{
controller: controller,
identity: identity,
result: make(chan *terminal.Error, 1),
}
op.OpBase.Init()
msg := container.New()
// Add Hub Announcement.
announcementData, err := identity.ExportAnnouncement()
if err != nil {
return nil, terminal.ErrInternalError.With("failed to export announcement: %w", err)
}
msg.AppendAsBlock(announcementData)
// Add Hub Status.
statusData, err := identity.ExportStatus()
if err != nil {
return nil, terminal.ErrInternalError.With("failed to export status: %w", err)
}
msg.AppendAsBlock(statusData)
tErr := controller.OpInit(op, msg)
if tErr != nil {
return nil, tErr
}
return op, nil
}
func runPublishOp(t terminal.OpTerminal, opID uint32, data *container.Container) (terminal.Operation, *terminal.Error) {
// Check if we are run by a controller.
controller, ok := t.(*docks.CraneControllerTerminal)
if !ok {
return nil, terminal.ErrIncorrectUsage.With("publish op may only be started by a crane controller terminal, but was started by %T", t)
}
// Parse and import Announcement and Status.
announcementData, err := data.GetNextBlock()
if err != nil {
return nil, terminal.ErrMalformedData.With("failed to get announcement: %w", err)
}
statusData, err := data.GetNextBlock()
if err != nil {
return nil, terminal.ErrMalformedData.With("failed to get status: %w", err)
}
h, forward, tErr := docks.ImportAndVerifyHubInfo(module.Ctx, "", announcementData, statusData, conf.MainMapName, conf.MainMapScope)
if tErr != nil {
return nil, tErr.Wrap("failed to import and verify hub")
}
// Update reference in case it was changed by the import.
controller.Crane.ConnectedHub = h
// Relay data.
if forward {
gossipRelayMsg(controller.Crane.ID, GossipHubAnnouncementMsg, announcementData)
gossipRelayMsg(controller.Crane.ID, GossipHubStatusMsg, statusData)
}
// Create verification request.
v, request, err := cabin.CreateVerificationRequest(PublishOpType, "", "")
if err != nil {
return nil, terminal.ErrInternalError.With("failed to create verification request: %w", err)
}
// Create operation.
op := &PublishOp{
controller: controller,
requestingHub: h,
verification: v,
result: make(chan *terminal.Error, 1),
}
op.OpBase.Init()
op.OpBase.SetID(opID)
// Reply with verification request.
tErr = controller.OpSend(op, container.New(request))
if tErr != nil {
return nil, tErr.Wrap("failed to send verification request")
}
return op, nil
}
func (op *PublishOp) Deliver(c *container.Container) *terminal.Error {
if op.identity != nil {
// Client
// Sign the received verification request.
response, err := op.identity.SignVerificationRequest(c.CompileData(), PublishOpType, "", "")
if err != nil {
return terminal.ErrPermissinDenied.With("signing verification request failed: %w", err)
}
return op.controller.OpSend(op, container.New(response))
} else if op.requestingHub != nil {
// Server
// Verify the signed request.
err := op.verification.Verify(c.CompileData(), op.requestingHub)
if err != nil {
return terminal.ErrPermissinDenied.With("checking verification request failed: %w", err)
}
return terminal.ErrExplicitAck
}
return terminal.ErrInternalError.With("invalid operation state")
}
func (op *PublishOp) Result() <-chan *terminal.Error {
return op.result
}
func (op *PublishOp) End(tErr *terminal.Error) {
if tErr.Is(terminal.ErrExplicitAck) {
// TODO: Check for concurrenct access.
if op.controller.Crane.ConnectedHub == nil {
op.controller.Crane.ConnectedHub = op.requestingHub
}
// Publish crane, abort if it fails.
err := op.controller.Crane.Publish()
if err != nil {
tErr = terminal.ErrInternalError.With("failed to publish crane: %w", err)
op.controller.Crane.Stop(tErr)
} else {
op.controller.Crane.NotifyUpdate()
}
}
select {
case op.result <- tErr:
default:
}
}