Skip to content

Commit 65fa505

Browse files
committed
rgw-broker: store instance info in rgw
Store the instance info in the backend so that it can be used if pod is restarted. Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
1 parent 00f01d6 commit 65fa505

File tree

4 files changed

+132
-23
lines changed

4 files changed

+132
-23
lines changed

chart/templates/broker-deployment.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,3 +56,5 @@ spec:
5656
value: {{ .Values.RGWUIDPrefix }}
5757
- name: RGW_GC_USER
5858
value: {{ .Values.RGWGCUser }}
59+
- name: RGW_DATA_BUCKET
60+
value: {{ .Values.RGWDataBucket }}

chart/values.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
# Specify
44
# --set image=gcr.io/openshift-gce-devel/cns-obj-broker:<tag>
55
# with `helm install` to use a non-default version. Useful for running WIP images.
6-
image: 172.17.8.1:5000/cns-obj-broker:d709f01-dirty-49
6+
image: 172.17.8.1:5000/cns-obj-broker:00f01d6-dirty-52
77
# ImagePullPolicy; valid values are "IfNotPresent", "Never", and "Always"
88
imagePullPolicy: Always
99

@@ -13,3 +13,4 @@ S3AccessKey: KWB4HK2NTY4D0YR7
1313
S3Secret: Fjg7ACac4uCVhxZcFkOeJofXUM7tXdQW
1414
RGWUIDPrefix: mykube-
1515
RGWGCUser: kube-gc
16+
RGWDataBucket: kube-rgw-data

chart/values.yaml.template

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,4 @@ S3AccessKey: KWB4HK2NTY4D0YR7
1313
S3Secret: Fjg7ACac4uCVhxZcFkOeJofXUM7tXdQW
1414
RGWUIDPrefix: mykube-
1515
RGWGCUser: kube-gc
16+
RGWDataBucket: kube-rgw-data

pkg/broker/broker.go

Lines changed: 127 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ const (
4646
SECRET_KEY = "secretKey"
4747
)
4848

49-
type s3ServiceInstance struct {
49+
type rgwServiceInstance struct {
5050
// k8s namespace
5151
Namespace string
5252
// binding credential created during Bind()
@@ -93,12 +93,13 @@ type broker struct {
9393
// rwMutex controls concurrent R and RW access
9494
rwMutex sync.RWMutex
9595
// instanceMap maps instanceIDs to the ID's userProvidedServiceInstance values
96-
instanceMap map[string]*s3ServiceInstance
96+
instanceMap map[string]*rgwServiceInstance
9797

9898
rgw RGWClient
9999

100100
uidPrefix string
101101
gcUser string
102+
dataBucket string
102103

103104
// client used to access kubernetes
104105
kubeClient *clientset.Clientset
@@ -112,7 +113,7 @@ type bucketInstance struct {
112113

113114
// Initialize the rgw service broker. This function is called by `server.Start()`.
114115
func CreateBroker() Broker {
115-
var instanceMap = make(map[string]*s3ServiceInstance)
116+
var instanceMap = make(map[string]*rgwServiceInstance)
116117
glog.Info("Generating new s3 broker.")
117118

118119
// get the kubernetes client
@@ -124,6 +125,7 @@ func CreateBroker() Broker {
124125

125126
client := RGWClient{}
126127
uidPrefix := "kube-rgw."
128+
dataBucket := "kube-rgw-data"
127129
gcUser := ""
128130

129131
for _, e := range os.Environ() {
@@ -139,6 +141,8 @@ func CreateBroker() Broker {
139141
uidPrefix = pair[1]
140142
case "RGW_GC_USER":
141143
gcUser = pair[1]
144+
case "RGW_DATA_BUCKET":
145+
dataBucket = pair[1]
142146
}
143147
}
144148

@@ -157,13 +161,20 @@ func CreateBroker() Broker {
157161
}
158162
}
159163

164+
err = client.createBucket(dataBucket)
165+
if (err != nil) {
166+
glog.Fatalf("Error: failed to create bucket %s", dataBucket)
167+
return nil
168+
}
169+
160170
glog.Infof("New Broker for s3 endpoint: %s", client.endpoint)
161171
return &broker{
162172
instanceMap: instanceMap,
163173
rgw: client,
164174
kubeClient: cs,
165175
uidPrefix: uidPrefix,
166176
gcUser: gcUser,
177+
dataBucket: dataBucket,
167178
}
168179
}
169180
// Implements the `Catalog` interface method.
@@ -188,6 +199,18 @@ func (b *broker) Catalog() (*brokerapi.Catalog, error) {
188199
}, nil
189200
}
190201

202+
func (b *broker) findInstance(instanceID string) (*rgwServiceInstance, error) {
203+
instance, ok := b.instanceMap[instanceID]
204+
if !ok {
205+
var err error
206+
instance, err = b.getInstanceInfo(instanceID)
207+
if err != nil {
208+
return nil, retErrInfof("InstanceID %q not found.", instanceID)
209+
}
210+
}
211+
return instance, nil
212+
}
213+
191214
// The `GetServiceInstanceLastOperation` interface method is not implemented.
192215
func (b *broker) GetServiceInstanceLastOperation(instanceID, serviceID, planID, operation string) (*brokerapi.LastOperationResponse, error) {
193216
glog.Info("GetServiceInstanceLastOperation not yet implemented.")
@@ -202,10 +225,12 @@ func (b *broker) CreateServiceInstance(instanceID string, req *brokerapi.CreateS
202225
b.rwMutex.Lock()
203226
defer b.rwMutex.Unlock()
204227
// does service instance exist?
205-
if _, ok := b.instanceMap[instanceID]; ok {
206-
glog.Errorf("Instance requested already exists.")
207-
return nil, fmt.Errorf("ServiceInstance %q already exists", instanceID)
228+
229+
_, err := b.findInstance(instanceID)
230+
if err == nil {
231+
return nil, retErrInfof("Instance requested already exists.")
208232
}
233+
209234
// Check required parameter "bucketName"
210235
bucketName, ok := req.Parameters["bucketName"].(string)
211236
if !ok {
@@ -224,9 +249,10 @@ func (b *broker) CreateServiceInstance(instanceID string, req *brokerapi.CreateS
224249
return nil, err
225250
}
226251

227-
newClient := RGWClient{}
228-
newClient.user = *newUser
229-
newClient.endpoint = b.rgw.endpoint
252+
newClient := RGWClient{
253+
user: *newUser,
254+
endpoint: b.rgw.endpoint,
255+
}
230256
err = newClient.init()
231257
if err != nil {
232258
glog.Errorf("Failed to init s3 client for new user: %v", err)
@@ -236,7 +262,8 @@ func (b *broker) CreateServiceInstance(instanceID string, req *brokerapi.CreateS
236262
if err := newClient.createBucket(bucketName); err != nil {
237263
return nil, err
238264
}
239-
b.instanceMap[instanceID] = &s3ServiceInstance{
265+
266+
instanceInfo := rgwServiceInstance{
240267
Namespace: req.ContextProfile.Namespace,
241268
Credential: brokerapi.Credential{
242269
USER_NAME: newUser.name,
@@ -246,6 +273,14 @@ func (b *broker) CreateServiceInstance(instanceID string, req *brokerapi.CreateS
246273
SECRET_KEY: newUser.secret,
247274
},
248275
}
276+
277+
err = b.storeInstanceInfo(instanceID, instanceInfo)
278+
if (err != nil) {
279+
return nil, retErrInfof("Error: failed to store instance info: %s", err)
280+
}
281+
282+
b.instanceMap[instanceID] = &instanceInfo
283+
249284
return nil, nil
250285
}
251286

@@ -254,15 +289,17 @@ func (b *broker) RemoveServiceInstance(instanceID, serviceID, planID string, acc
254289
glog.Infof("RemoveServiceInstance called. instanceID: %s", instanceID)
255290
b.rwMutex.Lock()
256291
defer b.rwMutex.Unlock()
257-
instance, ok := b.instanceMap[instanceID]
258-
if !ok {
259-
glog.Errorf("InstanceID %q not found.", instanceID)
260-
return nil, fmt.Errorf("Broker cannot find instanceID %q.", instanceID)
292+
instance, err := b.findInstance(instanceID)
293+
if err != nil {
294+
glog.Errorf("InstanceID %q not found.", instanceID)
295+
/* don't return error, if it wasn't found it was already removed */
296+
return nil, nil
261297
}
298+
262299
userName := instance.Credential[USER_NAME].(string)
263300
bucketName := instance.Credential[BUCKET_NAME].(string)
264301

265-
err := b.rgw.suspendUser(instance.Credential[USER_NAME].(string))
302+
err = b.rgw.suspendUser(instance.Credential[USER_NAME].(string))
266303
if err != nil {
267304
glog.Errorf("Error failed to suspend user: %v", err)
268305
return nil, fmt.Errorf("Error failed to suspend user: %v", err)
@@ -284,6 +321,11 @@ func (b *broker) RemoveServiceInstance(instanceID, serviceID, planID string, acc
284321
return nil, fmt.Errorf("Error failed to unlink bucket %s/%s: %v", userName, bucketName, err)
285322
}
286323

324+
err = b.removeInstanceInfo(instanceID)
325+
if err != nil {
326+
glog.Infof("Warning: failed to clean instance info: instanceID=%s: %s", instanceID, err)
327+
}
328+
287329
delete(b.instanceMap, BUCKET_NAME)
288330
glog.Infof("Remove bucket %q succeeded.", bucketName)
289331
return nil, nil
@@ -292,11 +334,11 @@ func (b *broker) RemoveServiceInstance(instanceID, serviceID, planID string, acc
292334
// Implements the `Bind` interface method.
293335
func (b *broker) Bind(instanceID, bindingID string, req *brokerapi.BindingRequest) (*brokerapi.CreateServiceBindingResponse, error) {
294336
glog.Infof("Bind called. instanceID: %q", instanceID)
295-
instance, ok := b.instanceMap[instanceID]
296-
if !ok {
297-
glog.Errorf("Instance ID %q not found.")
337+
instance, err := b.findInstance(instanceID)
338+
if err != nil {
298339
return nil, fmt.Errorf("Instance ID %q not found.", instanceID)
299340
}
341+
300342
if len(instance.Credential) == 0 {
301343
glog.Errorf("Instance %q is missing credentials.", instanceID)
302344
return nil, fmt.Errorf("No credentials found for instance %q.", instanceID)
@@ -314,6 +356,66 @@ func (b *broker) UnBind(instanceID, bindingID, serviceID, planID string) error {
314356
return nil
315357
}
316358

359+
func (b *broker) storeInfo(section, id string, object interface{}) error {
360+
data, err := json.Marshal(object)
361+
if err != nil {
362+
glog.Errorf("Error failed to marshal object %s/%s: %s", section, id, err)
363+
return fmt.Errorf("Error failed to marshal object %s/%s: %s", section, id, err)
364+
}
365+
366+
n, err := b.rgw.client.PutObject(b.dataBucket, section + "/" +id, bytes.NewReader(data), "application/json")
367+
if err != nil {
368+
glog.Errorf("Error failed to PutObject() %s/%s: %s", section, id, err)
369+
return fmt.Errorf("Error failed to PutObject() %s/%s", section, id)
370+
}
371+
if n != int64(len(data)) {
372+
glog.Errorf("Error PutObject() unexpected num of bytes written %s/%s: expected %d wrote %d", section, id, len(data), n)
373+
return fmt.Errorf("Error PutObject() unexpected num of bytes written %s/%s: expected %d wrote %d", section, id, len(data), n)
374+
}
375+
return nil
376+
}
377+
378+
func (b *broker) readInfo(section, id string, object interface{}) error {
379+
r, err := b.rgw.client.GetObject(b.dataBucket, section + "/" + id)
380+
if err != nil {
381+
return retErrInfof("Error failed to GetObject() %s/%s: %s", section, id, err)
382+
}
383+
384+
buf, err := ioutil.ReadAll(r)
385+
if err != nil {
386+
return retErrInfof("Error failed to ReadAll() %s/%s: %s", section, id, err)
387+
}
388+
389+
err = json.Unmarshal(buf, &object)
390+
if err != nil {
391+
return retErrInfof("Error failed to unmarshal object %s/%s: %s", section, id, err)
392+
}
393+
return nil
394+
}
395+
396+
func (b *broker) removeInfo(section, id string) error {
397+
err := b.rgw.client.RemoveObject(b.dataBucket, section + "/" + id)
398+
if err != nil {
399+
return retErrInfof("Error failed to RemoveObject() %s/%s: %s", section, id, err)
400+
}
401+
return nil
402+
}
403+
404+
405+
func (b *broker) storeInstanceInfo(id string, info rgwServiceInstance) error {
406+
return b.storeInfo("instance", id, info)
407+
}
408+
409+
func (b *broker) getInstanceInfo(id string) (*rgwServiceInstance, error) {
410+
info := new(rgwServiceInstance)
411+
err := b.readInfo("instance", id, info)
412+
return info, err
413+
}
414+
415+
func (b *broker) removeInstanceInfo(id string) error {
416+
return b.removeInfo("instance", id)
417+
}
418+
317419
func (rgw *RGWClient) rgwAdminRequestRaw(method, section string, params url.Values) (*http.Response, error) {
318420
httpClient := &http.Client{
319421
Timeout: 30 * time.Second,
@@ -366,7 +468,6 @@ func (rgw *RGWClient) rgwAdminRequest(method, section string, params url.Values)
366468
return body, nil
367469
}
368470

369-
370471
func (rgw *RGWClient) provisionUser(userName, displayName string, successIfExists bool) (*RGWUser, error) {
371472
glog.Infof("Creating user %q", userName)
372473

@@ -452,8 +553,7 @@ func (rgw *RGWClient) getBucketId(bucketName string) (string, error) {
452553
res := bucketEntrypointInfo{}
453554
err = json.Unmarshal(body, &res)
454555
if (err != nil) {
455-
glog.Errorf("Error failed to unmarshal bucket entrypoint info: %v", err)
456-
return "", fmt.Errorf("Error failed to unmarshal bucket entrypoint info: %v", err)
556+
return "", retErrInfof("Error failed to unmarshal bucket entrypoint info: %v", err)
457557
}
458558

459559
glog.Infof("retrieved bucket_id=%s)", res.Data.Bucket.BucketId)
@@ -481,7 +581,7 @@ func (rgw *RGWClient) unlinkBucket(userName, bucketName string) error {
481581
}
482582

483583
func (rgw *RGWClient) linkBucket(userName, bucketName, bucketId string) error {
484-
glog.Infof("Linking bucket %s/%s", userName, bucketName)
584+
glog.Infof("Linking bucket %s/%s to user %s", userName, bucketName, userName)
485585

486586
// Set request parameters.
487587
params := make(url.Values)
@@ -553,3 +653,8 @@ func getKubeClient() (*clientset.Clientset, error) {
553653
return cs, err
554654
}
555655

656+
657+
func retErrInfof(format string, args ...interface{}) error {
658+
glog.Infof(format, args)
659+
return fmt.Errorf(format, args)
660+
}

0 commit comments

Comments
 (0)