diff --git a/keymutex/BUILD b/keymutex/BUILD new file mode 100644 index 00000000..256ed341 --- /dev/null +++ b/keymutex/BUILD @@ -0,0 +1,36 @@ +package(default_visibility = ["//visibility:public"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", + "go_test", +) + +go_library( + name = "go_default_library", + srcs = [ + "hashed.go", + "keymutex.go", + ], + importpath = "k8s.io/kubernetes/pkg/util/keymutex", + deps = ["//vendor/github.com/golang/glog:go_default_library"], +) + +go_test( + name = "go_default_test", + srcs = ["keymutex_test.go"], + embed = [":go_default_library"], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) diff --git a/keymutex/hashed.go b/keymutex/hashed.go new file mode 100644 index 00000000..5fe9a025 --- /dev/null +++ b/keymutex/hashed.go @@ -0,0 +1,64 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package keymutex + +import ( + "hash/fnv" + "runtime" + "sync" + + "github.com/golang/glog" +) + +// NewHashed returns a new instance of KeyMutex which hashes arbitrary keys to +// a fixed set of locks. `n` specifies number of locks, if n <= 0, we use +// number of cpus. +// Note that because it uses fixed set of locks, different keys may share same +// lock, so it's possible to wait on same lock. +func NewHashed(n int) KeyMutex { + if n <= 0 { + n = runtime.NumCPU() + } + return &hashedKeyMutex{ + mutexes: make([]sync.Mutex, n), + } +} + +type hashedKeyMutex struct { + mutexes []sync.Mutex +} + +// Acquires a lock associated with the specified ID. +func (km *hashedKeyMutex) LockKey(id string) { + glog.V(5).Infof("hashedKeyMutex.LockKey(...) called for id %q\r\n", id) + km.mutexes[km.hash(id)%len(km.mutexes)].Lock() + glog.V(5).Infof("hashedKeyMutex.LockKey(...) for id %q completed.\r\n", id) +} + +// Releases the lock associated with the specified ID. +func (km *hashedKeyMutex) UnlockKey(id string) error { + glog.V(5).Infof("hashedKeyMutex.UnlockKey(...) called for id %q\r\n", id) + km.mutexes[km.hash(id)%len(km.mutexes)].Unlock() + glog.V(5).Infof("hashedKeyMutex.UnlockKey(...) for id %q completed.\r\n", id) + return nil +} + +func (km *hashedKeyMutex) hash(id string) int { + h := fnv.New32a() + h.Write([]byte(id)) + return int(h.Sum32()) +} diff --git a/keymutex/keymutex.go b/keymutex/keymutex.go new file mode 100644 index 00000000..89dc0223 --- /dev/null +++ b/keymutex/keymutex.go @@ -0,0 +1,27 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package keymutex + +// KeyMutex is a thread-safe interface for acquiring locks on arbitrary strings. +type KeyMutex interface { + // Acquires a lock associated with the specified ID, creates the lock if one doesn't already exist. + LockKey(id string) + + // Releases the lock associated with the specified ID. + // Returns an error if the specified ID doesn't exist. + UnlockKey(id string) error +} diff --git a/keymutex/keymutex_test.go b/keymutex/keymutex_test.go new file mode 100644 index 00000000..ce2f567b --- /dev/null +++ b/keymutex/keymutex_test.go @@ -0,0 +1,105 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package keymutex + +import ( + "testing" + "time" +) + +const ( + callbackTimeout = 1 * time.Second +) + +func newKeyMutexes() []KeyMutex { + return []KeyMutex{ + NewHashed(0), + NewHashed(1), + NewHashed(2), + NewHashed(4), + } +} + +func Test_SingleLock_NoUnlock(t *testing.T) { + for _, km := range newKeyMutexes() { + // Arrange + key := "fakeid" + callbackCh := make(chan interface{}) + + // Act + go lockAndCallback(km, key, callbackCh) + + // Assert + verifyCallbackHappens(t, callbackCh) + } +} + +func Test_SingleLock_SingleUnlock(t *testing.T) { + for _, km := range newKeyMutexes() { + // Arrange + key := "fakeid" + callbackCh := make(chan interface{}) + + // Act & Assert + go lockAndCallback(km, key, callbackCh) + verifyCallbackHappens(t, callbackCh) + km.UnlockKey(key) + } +} + +func Test_DoubleLock_DoubleUnlock(t *testing.T) { + for _, km := range newKeyMutexes() { + // Arrange + key := "fakeid" + callbackCh1stLock := make(chan interface{}) + callbackCh2ndLock := make(chan interface{}) + + // Act & Assert + go lockAndCallback(km, key, callbackCh1stLock) + verifyCallbackHappens(t, callbackCh1stLock) + go lockAndCallback(km, key, callbackCh2ndLock) + verifyCallbackDoesntHappens(t, callbackCh2ndLock) + km.UnlockKey(key) + verifyCallbackHappens(t, callbackCh2ndLock) + km.UnlockKey(key) + } +} + +func lockAndCallback(km KeyMutex, id string, callbackCh chan<- interface{}) { + km.LockKey(id) + callbackCh <- true +} + +func verifyCallbackHappens(t *testing.T, callbackCh <-chan interface{}) bool { + select { + case <-callbackCh: + return true + case <-time.After(callbackTimeout): + t.Fatalf("Timed out waiting for callback.") + return false + } +} + +func verifyCallbackDoesntHappens(t *testing.T, callbackCh <-chan interface{}) bool { + select { + case <-callbackCh: + t.Fatalf("Unexpected callback.") + return false + case <-time.After(callbackTimeout): + return true + } +} diff --git a/strings/BUILD b/strings/BUILD new file mode 100644 index 00000000..a7cd124f --- /dev/null +++ b/strings/BUILD @@ -0,0 +1,40 @@ +package(default_visibility = ["//visibility:public"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", + "go_test", +) + +go_library( + name = "go_default_library", + srcs = [ + "escape.go", + "line_delimiter.go", + "strings.go", + ], + importpath = "k8s.io/kubernetes/pkg/util/strings", +) + +go_test( + name = "go_default_test", + srcs = [ + "escape_test.go", + "line_delimiter_test.go", + "strings_test.go", + ], + embed = [":go_default_library"], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) diff --git a/strings/escape.go b/strings/escape.go new file mode 100644 index 00000000..bae8d81a --- /dev/null +++ b/strings/escape.go @@ -0,0 +1,36 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package strings + +import ( + "strings" +) + +// EscapeQualifiedName converts a plugin name, which might contain a / into a +// string that is safe to use on-disk. This assumes that the input has already +// been validates as a qualified name. we use "~" rather than ":" here in case +// we ever use a filesystem that doesn't allow ":". +func EscapeQualifiedName(in string) string { + return strings.Replace(in, "/", "~", -1) +} + +// UnescapeQualifiedName converts an escaped plugin name (as per EscapeQualifiedName) +// back to its normal form. This assumes that the input has already been +// validates as a qualified name. +func UnescapeQualifiedName(in string) string { + return strings.Replace(in, "~", "/", -1) +} diff --git a/strings/escape_test.go b/strings/escape_test.go new file mode 100644 index 00000000..b42398c0 --- /dev/null +++ b/strings/escape_test.go @@ -0,0 +1,42 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package strings + +import ( + "testing" +) + +func TestEscapeQualifiedNameForDisk(t *testing.T) { + testCases := []struct { + input string + output string + }{ + {"kubernetes.io/blah", "kubernetes.io~blah"}, + {"blah/blerg/borg", "blah~blerg~borg"}, + {"kubernetes.io", "kubernetes.io"}, + } + for i, tc := range testCases { + escapee := EscapeQualifiedName(tc.input) + if escapee != tc.output { + t.Errorf("case[%d]: expected (%q), got (%q)", i, tc.output, escapee) + } + original := UnescapeQualifiedName(escapee) + if original != tc.input { + t.Errorf("case[%d]: expected (%q), got (%q)", i, tc.input, original) + } + } +} diff --git a/strings/line_delimiter.go b/strings/line_delimiter.go new file mode 100644 index 00000000..8907869c --- /dev/null +++ b/strings/line_delimiter.go @@ -0,0 +1,64 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package strings + +import ( + "bytes" + "io" + "strings" +) + +// LineDelimiter is a filter that will split input on lines +// and bracket each line with the delimiter string. +type LineDelimiter struct { + output io.Writer + delimiter []byte + buf bytes.Buffer +} + +// NewLineDelimiter allocates a new io.Writer that will split input on lines +// and bracket each line with the delimiter string. This can be useful in +// output tests where it is difficult to see and test trailing whitespace. +func NewLineDelimiter(output io.Writer, delimiter string) *LineDelimiter { + return &LineDelimiter{output: output, delimiter: []byte(delimiter)} +} + +// Write writes buf to the LineDelimiter ld. The only errors returned are ones +// encountered while writing to the underlying output stream. +func (ld *LineDelimiter) Write(buf []byte) (n int, err error) { + return ld.buf.Write(buf) +} + +// Flush all lines up until now. This will assume insert a linebreak at the current point of the stream. +func (ld *LineDelimiter) Flush() (err error) { + lines := strings.Split(ld.buf.String(), "\n") + for _, line := range lines { + if _, err = ld.output.Write(ld.delimiter); err != nil { + return + } + if _, err = ld.output.Write([]byte(line)); err != nil { + return + } + if _, err = ld.output.Write(ld.delimiter); err != nil { + return + } + if _, err = ld.output.Write([]byte("\n")); err != nil { + return + } + } + return +} diff --git a/strings/line_delimiter_test.go b/strings/line_delimiter_test.go new file mode 100644 index 00000000..15bee165 --- /dev/null +++ b/strings/line_delimiter_test.go @@ -0,0 +1,40 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package strings + +import ( + "fmt" + "os" +) + +func Example_trailingNewline() { + ld := NewLineDelimiter(os.Stdout, "|") + defer ld.Flush() + fmt.Fprint(ld, " Hello \n World \n") + // Output: + // | Hello | + // | World | + // || +} +func Example_noTrailingNewline() { + ld := NewLineDelimiter(os.Stdout, "|") + defer ld.Flush() + fmt.Fprint(ld, " Hello \n World ") + // Output: + // | Hello | + // | World | +} diff --git a/strings/strings.go b/strings/strings.go new file mode 100644 index 00000000..8a9f2ece --- /dev/null +++ b/strings/strings.go @@ -0,0 +1,46 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package strings + +import ( + "path" + "strings" +) + +// SplitQualifiedName Splits a fully qualified name and returns its namespace and name. +// Assumes that the input 'str' has been validated. +func SplitQualifiedName(str string) (string, string) { + parts := strings.Split(str, "/") + if len(parts) < 2 { + return "", str + } + return parts[0], parts[1] +} + +// JoinQualifiedName joins 'namespace' and 'name' and returns a fully qualified name +// Assumes that the input is valid. +func JoinQualifiedName(namespace, name string) string { + return path.Join(namespace, name) +} + +// ShortenString returns the first N slice of a string. +func ShortenString(str string, n int) string { + if len(str) <= n { + return str + } + return str[:n] +} diff --git a/strings/strings_test.go b/strings/strings_test.go new file mode 100644 index 00000000..cf1148ee --- /dev/null +++ b/strings/strings_test.go @@ -0,0 +1,73 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package strings + +import ( + "testing" +) + +func TestSplitQualifiedName(t *testing.T) { + testCases := []struct { + input string + output []string + }{ + {"kubernetes.io/blah", []string{"kubernetes.io", "blah"}}, + {"blah", []string{"", "blah"}}, + {"kubernetes.io/blah/blah", []string{"kubernetes.io", "blah"}}, + } + for i, tc := range testCases { + namespace, name := SplitQualifiedName(tc.input) + if namespace != tc.output[0] || name != tc.output[1] { + t.Errorf("case[%d]: expected (%q, %q), got (%q, %q)", i, tc.output[0], tc.output[1], namespace, name) + } + } +} + +func TestJoinQualifiedName(t *testing.T) { + testCases := []struct { + input []string + output string + }{ + {[]string{"kubernetes.io", "blah"}, "kubernetes.io/blah"}, + {[]string{"blah", ""}, "blah"}, + {[]string{"kubernetes.io", "blah"}, "kubernetes.io/blah"}, + } + for i, tc := range testCases { + res := JoinQualifiedName(tc.input[0], tc.input[1]) + if res != tc.output { + t.Errorf("case[%d]: expected %q, got %q", i, tc.output, res) + } + } +} + +func TestShortenString(t *testing.T) { + testCases := []struct { + input string + outLen int + output string + }{ + {"kubernetes.io", 5, "kuber"}, + {"blah", 34, "blah"}, + {"kubernetes.io", 13, "kubernetes.io"}, + } + for i, tc := range testCases { + res := ShortenString(tc.input, tc.outLen) + if res != tc.output { + t.Errorf("case[%d]: expected %q, got %q", i, tc.output, res) + } + } +}