Skip to content

Commit 7f5ecbd

Browse files
committed
extracted plugin and volume extension point specific code from original plugin_mode branch
1 parent 7fdf4a0 commit 7f5ecbd

File tree

9 files changed

+275
-4
lines changed

9 files changed

+275
-4
lines changed

daemon/container.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"io"
99
"io/ioutil"
10+
"net"
1011
"os"
1112
"path"
1213
"path/filepath"
@@ -30,6 +31,7 @@ import (
3031
"github.com/docker/docker/pkg/networkfs/resolvconf"
3132
"github.com/docker/docker/pkg/promise"
3233
"github.com/docker/docker/pkg/symlink"
34+
"github.com/docker/docker/plugins"
3335
"github.com/docker/docker/runconfig"
3436
"github.com/docker/docker/utils"
3537
)
@@ -1330,9 +1332,53 @@ func (container *Container) waitForStart() error {
13301332
return err
13311333
}
13321334

1335+
if container.hostConfig.Plugin {
1336+
if err := container.waitForPluginSock(); err != nil {
1337+
return err
1338+
}
1339+
}
1340+
13331341
return nil
13341342
}
13351343

1344+
func (container *Container) waitForPluginSock() error {
1345+
pluginSock, err := container.getPluginSocketPath()
1346+
if err != nil {
1347+
return err
1348+
}
1349+
1350+
chConn := make(chan net.Conn)
1351+
chStop := make(chan struct{})
1352+
go func() {
1353+
log.Debugf("waiting for plugin socket at: %s", pluginSock)
1354+
for {
1355+
conn, err := net.DialTimeout("unix", pluginSock, 100*time.Millisecond)
1356+
// If the file doesn't exist yet, that's ok, maybe plugin hasn't created it yet
1357+
if err != nil {
1358+
select {
1359+
case <-chStop:
1360+
return
1361+
default:
1362+
continue
1363+
}
1364+
}
1365+
log.Debugf("got plugin socket")
1366+
chConn <- conn
1367+
return
1368+
}
1369+
}()
1370+
1371+
select {
1372+
case conn := <-chConn:
1373+
// We can close this net.Conn since the plugin system will establish it's own connection
1374+
conn.Close()
1375+
return plugins.Repo.RegisterPlugin(pluginSock)
1376+
case <-time.After(30 * time.Second):
1377+
chStop <- struct{}{}
1378+
return fmt.Errorf("connection to plugin sock timed out")
1379+
}
1380+
}
1381+
13361382
func (container *Container) allocatePort(eng *engine.Engine, port nat.Port, bindings nat.PortMap) error {
13371383
binding := bindings[port]
13381384
if container.hostConfig.PublishAllPorts && len(binding) == 0 {
@@ -1415,3 +1461,7 @@ func (container *Container) getNetworkedContainer() (*Container, error) {
14151461
func (container *Container) Stats() (*execdriver.ResourceStats, error) {
14161462
return container.daemon.Stats(container)
14171463
}
1464+
1465+
func (container *Container) getPluginSocketPath() (string, error) {
1466+
return container.getRootResourcePath(filepath.Join("p", "plugin.sock"))
1467+
}

daemon/daemon.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,7 @@ func (daemon *Daemon) restore() error {
329329
return err
330330
}
331331

332+
// TODO: sort out plugins from normal containers and load plugins first
332333
for _, v := range dir {
333334
id := v.Name()
334335
container, err := daemon.load(id)

daemon/volumes.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ func (container *Container) registerVolumes() {
130130
if rw, exists := container.VolumesRW[path]; exists {
131131
writable = rw
132132
}
133-
v, err := container.daemon.volumes.FindOrCreateVolume(path, writable)
133+
v, err := container.daemon.volumes.FindOrCreateVolume(path, container.ID, writable)
134134
if err != nil {
135135
log.Debugf("error registering volume %s: %v", path, err)
136136
continue
@@ -159,7 +159,7 @@ func (container *Container) parseVolumeMountConfig() (map[string]*Mount, error)
159159
return nil, err
160160
}
161161
// Check if a volume already exists for this and use it
162-
vol, err := container.daemon.volumes.FindOrCreateVolume(path, writable)
162+
vol, err := container.daemon.volumes.FindOrCreateVolume(path, container.ID, writable)
163163
if err != nil {
164164
return nil, err
165165
}
@@ -184,7 +184,7 @@ func (container *Container) parseVolumeMountConfig() (map[string]*Mount, error)
184184
continue
185185
}
186186

187-
vol, err := container.daemon.volumes.FindOrCreateVolume("", true)
187+
vol, err := container.daemon.volumes.FindOrCreateVolume("", container.ID, true)
188188
if err != nil {
189189
return nil, err
190190
}
@@ -318,6 +318,20 @@ func (container *Container) setupMounts() error {
318318
mounts = append(mounts, execdriver.Mount{Source: container.HostsPath, Destination: "/etc/hosts", Writable: true, Private: true})
319319
}
320320

321+
if container.hostConfig.Plugin {
322+
// We are going to create this socket then close/unlink it so it can be
323+
// bind-mounted into the container and used by the plugin process.
324+
socketPath, err := container.getPluginSocketPath()
325+
if err != nil {
326+
return err
327+
}
328+
pluginDir := filepath.Dir(socketPath)
329+
if err := os.MkdirAll(pluginDir, 0700); err != nil {
330+
return err
331+
}
332+
mounts = append(mounts, execdriver.Mount{Source: pluginDir, Destination: "/var/run/docker-plugin", Writable: true, Private: true})
333+
}
334+
321335
for _, m := range mounts {
322336
if err := label.SetFileLabel(m.Source, container.MountLabel); err != nil {
323337
return err

plugins/client.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package plugins
2+
3+
import (
4+
"bytes"
5+
"encoding/json"
6+
"fmt"
7+
"io"
8+
"net"
9+
"net/http"
10+
"net/http/httputil"
11+
"time"
12+
13+
log "github.com/Sirupsen/logrus"
14+
"github.com/docker/docker/pkg/ioutils"
15+
)
16+
17+
const pluginApiVersion = "v1"
18+
19+
func connect(addr string) (*httputil.ClientConn, error) {
20+
c, err := net.DialTimeout("unix", addr, 30*time.Second)
21+
if err != nil {
22+
return nil, err
23+
}
24+
return httputil.NewClientConn(c, nil), nil
25+
}
26+
27+
func call(addr, method, path string, data interface{}) (io.ReadCloser, error) {
28+
client, err := connect(addr)
29+
if err != nil {
30+
return nil, err
31+
}
32+
33+
reqBody, err := json.Marshal(data)
34+
if err != nil {
35+
return nil, err
36+
}
37+
38+
log.Debugf("sending request for extension:\n%s", string(reqBody))
39+
path = "/" + pluginApiVersion + "/" + path
40+
req, err := http.NewRequest(method, path, bytes.NewBuffer(reqBody))
41+
if err != nil {
42+
client.Close()
43+
return nil, err
44+
}
45+
req.Header.Set("Content-Type", "application/json")
46+
47+
resp, err := client.Do(req)
48+
if err != nil {
49+
client.Close()
50+
return nil, err
51+
}
52+
53+
// FIXME: this should be better defined
54+
if resp.StatusCode >= 400 {
55+
return nil, fmt.Errorf("got bad status: %s", resp.Status)
56+
}
57+
58+
return ioutils.NewReadCloserWrapper(resp.Body, func() error {
59+
if err := resp.Body.Close(); err != nil {
60+
return err
61+
}
62+
return client.Close()
63+
}), nil
64+
}

plugins/plugin.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package plugins
2+
3+
import (
4+
"encoding/json"
5+
"io"
6+
)
7+
8+
type Plugin struct {
9+
addr string
10+
kind string
11+
}
12+
13+
type handshakeResp struct {
14+
InterestedIn []string
15+
Name string
16+
Author string
17+
Org string
18+
Website string
19+
}
20+
21+
func (p *Plugin) Call(method, path string, data interface{}) (io.ReadCloser, error) {
22+
path = p.kind + "/" + path
23+
return call(p.addr, method, path, data)
24+
}
25+
26+
func (p *Plugin) handshake() (*handshakeResp, error) {
27+
// Don't use the local `call` because this shouldn't be namespaced
28+
respBody, err := call(p.addr, "POST", "handshake", nil)
29+
if err != nil {
30+
return nil, err
31+
}
32+
defer respBody.Close()
33+
34+
var data handshakeResp
35+
return &data, json.NewDecoder(respBody).Decode(&data)
36+
}

plugins/repository.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package plugins
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
)
7+
8+
// Temporary singleton
9+
var Repo = NewRepository()
10+
11+
var ErrNotRegistered = errors.New("plugin type is not registered")
12+
13+
type Repository struct {
14+
plugins map[string]Plugins
15+
}
16+
17+
type Plugins []*Plugin
18+
19+
func (repository *Repository) GetPlugins(kind string) (Plugins, error) {
20+
plugins, exists := repository.plugins[kind]
21+
// TODO: check whether 'kind' is a supportedPluginType
22+
if !exists {
23+
// If no plugins have been registered for this kind yet, that's
24+
// OK. Just set and return an empty list.
25+
repository.plugins[kind] = make([]*Plugin, 0)
26+
return repository.plugins[kind], nil
27+
}
28+
return plugins, nil
29+
}
30+
31+
var supportedPluginTypes = map[string]struct{}{
32+
"volume": {},
33+
}
34+
35+
func NewRepository() *Repository {
36+
return &Repository{
37+
plugins: make(map[string]Plugins),
38+
}
39+
}
40+
41+
func (repository *Repository) RegisterPlugin(addr string) error {
42+
plugin := &Plugin{addr: addr}
43+
resp, err := plugin.handshake()
44+
if err != nil {
45+
return fmt.Errorf("error in plugin handshake: %v", err)
46+
}
47+
48+
for _, interest := range resp.InterestedIn {
49+
if _, exists := supportedPluginTypes[interest]; !exists {
50+
return fmt.Errorf("plugin type %s is not supported", interest)
51+
}
52+
53+
if _, exists := repository.plugins[interest]; !exists {
54+
repository.plugins[interest] = []*Plugin{}
55+
}
56+
plugin.kind = interest
57+
repository.plugins[interest] = append(repository.plugins[interest], plugin)
58+
}
59+
60+
return nil
61+
}

runconfig/hostconfig.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ type HostConfig struct {
119119
RestartPolicy RestartPolicy
120120
SecurityOpt []string
121121
ReadonlyRootfs bool
122+
Plugin bool
122123
}
123124

124125
// This is used by the create command when you want to set both the
@@ -150,6 +151,7 @@ func ContainerHostConfigFromJob(job *engine.Job) *HostConfig {
150151
IpcMode: IpcMode(job.Getenv("IpcMode")),
151152
PidMode: PidMode(job.Getenv("PidMode")),
152153
ReadonlyRootfs: job.GetenvBool("ReadonlyRootfs"),
154+
Plugin: job.GetenvBool("Plugin"),
153155
}
154156

155157
job.GetenvJson("LxcConf", &hostConfig.LxcConf)

runconfig/parse.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ func Parse(cmd *flag.FlagSet, args []string) (*Config, *HostConfig, *flag.FlagSe
6464
flIpcMode = cmd.String([]string{"-ipc"}, "", "Default is to create a private IPC namespace (POSIX SysV IPC) for the container\n'container:<name|id>': reuses another container shared memory, semaphores and message queues\n'host': use the host shared memory,semaphores and message queues inside the container. Note: the host mode gives the container full access to local shared memory and is therefore considered insecure.")
6565
flRestartPolicy = cmd.String([]string{"-restart"}, "", "Restart policy to apply when a container exits (no, on-failure[:max-retry], always)")
6666
flReadonlyRootfs = cmd.Bool([]string{"-read-only"}, false, "Mount the container's root filesystem as read only")
67+
flPlugin = cmd.Bool([]string{"-plugin"}, false, "Enable plugin mode!")
6768
)
6869

6970
cmd.Var(&flAttach, []string{"a", "-attach"}, "Attach to STDIN, STDOUT or STDERR.")
@@ -314,6 +315,7 @@ func Parse(cmd *flag.FlagSet, args []string) (*Config, *HostConfig, *flag.FlagSe
314315
RestartPolicy: restartPolicy,
315316
SecurityOpt: flSecurityOpt.GetAll(),
316317
ReadonlyRootfs: *flReadonlyRootfs,
318+
Plugin: *flPlugin,
317319
}
318320

319321
// When allocating stdin in attached mode, close stdin at client disconnect

volumes/repository.go

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package volumes
22

33
import (
4+
"encoding/json"
45
"fmt"
56
"io/ioutil"
67
"os"
@@ -9,9 +10,19 @@ import (
910

1011
log "github.com/Sirupsen/logrus"
1112
"github.com/docker/docker/daemon/graphdriver"
13+
"github.com/docker/docker/plugins"
1214
"github.com/docker/docker/utils"
1315
)
1416

17+
type VolumeExtensionReq struct {
18+
HostPath string
19+
ContainerID string
20+
}
21+
22+
type VolumeExtensionResp struct {
23+
ModifiedHostPath string
24+
}
25+
1526
type Repository struct {
1627
configPath string
1728
driver graphdriver.Driver
@@ -195,10 +206,40 @@ func (r *Repository) createNewVolumePath(id string) (string, error) {
195206
return path, nil
196207
}
197208

198-
func (r *Repository) FindOrCreateVolume(path string, writable bool) (*Volume, error) {
209+
func (r *Repository) FindOrCreateVolume(path, containerId string, writable bool) (*Volume, error) {
199210
r.lock.Lock()
200211
defer r.lock.Unlock()
201212

213+
plugins, err := plugins.Repo.GetPlugins("volume")
214+
if err != nil {
215+
return nil, err
216+
}
217+
218+
for _, plugin := range plugins {
219+
data := VolumeExtensionReq{
220+
HostPath: path,
221+
ContainerID: containerId,
222+
}
223+
224+
resp, err := plugin.Call("POST", "volumes", data)
225+
if err != nil {
226+
return nil, fmt.Errorf("got error calling volume extension: %v", err)
227+
}
228+
defer resp.Close()
229+
230+
var extResp VolumeExtensionResp
231+
log.Debugf("decoding volume extension response")
232+
if err := json.NewDecoder(resp).Decode(&extResp); err != nil {
233+
return nil, err
234+
}
235+
236+
// Use the path provided by the extension instead of creating one
237+
if extResp.ModifiedHostPath != "" {
238+
log.Debugf("using modified host path for volume extension")
239+
path = extResp.ModifiedHostPath
240+
}
241+
}
242+
202243
if path == "" {
203244
return r.newVolume(path, writable)
204245
}

0 commit comments

Comments
 (0)