Flexible mechanism to make execution flow interruptible.
The breaker carries a cancellation signal to interrupt an action execution.
var NewYear = time.Time{}.AddDate(time.Now().Year(), 0, 0)
interrupter := breaker.Multiplex(
breaker.BreakByContext(context.WithTimeout(req.Context(), time.Minute)),
breaker.BreakByDeadline(NewYear),
breaker.BreakBySignal(os.Interrupt),
)
defer interrupter.Close()
<-interrupter.Done() // wait context cancellation, timeout or interrupt signal
A full description of the idea is available here.
I have to make retry package:
if err := retry.Retry(breaker.BreakByTimeout(time.Minute), action); err != nil {
log.Fatal(err)
}
and semaphore package:
if err := semaphore.Acquire(breaker.BreakByTimeout(time.Minute), 5); err != nil {
log.Fatal(err)
}
more consistent and reliable. Additionally, I want to implement a Graceful Shutdown and Circuit Breaker on the same mechanism.
interrupter := breaker.Multiplex(
breaker.BreakBySignal(os.Interrupt, os.Interrupt, syscall.SIGINT, syscall.SIGTERM),
breaker.BreakByTimeout(timeout),
)
defer interrupter.Close()
ctx := breaker.ToContext(interrupter)
ctx = context.WithValue(ctx, header, "...")
req, err := http.NewRequestWithContext(ctx, http.MethodGet, server.URL, nil)
if err != nil {
panic(err)
}
var resp *http.Response
action := func(ctx context.Context) (err error) {
req = req.Clone(ctx)
source := ctx.Value(header).(string)
req.Header.Set(header, source)
resp, err = http.DefaultClient.Do(req)
return err
}
if err := retry.Do(ctx, action); err != nil {
panic(err)
}
Full example
package main
import (
"context"
"fmt"
"io"
"net/http"
"net/http/httptest"
"os"
"syscall"
"time"
"github.com/kamilsk/breaker"
"github.com/kamilsk/retry/v5"
)
func main() {
const (
header = "X-Message"
timeout = time.Minute
)
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
time.Sleep(timeout / 10)
_, _ = rw.Write([]byte(req.Header.Get(header)))
}))
defer server.Close()
interrupter := breaker.Multiplex(
breaker.BreakBySignal(os.Interrupt, os.Interrupt, syscall.SIGINT, syscall.SIGTERM),
breaker.BreakByTimeout(timeout),
)
defer interrupter.Close()
ctx := breaker.ToContext(interrupter)
ctx = context.WithValue(ctx, header, "flexible mechanism to make execution flow interruptible")
req, err := http.NewRequestWithContext(ctx, http.MethodGet, server.URL, nil)
if err != nil {
panic(err)
}
var resp *http.Response
action := func(ctx context.Context) (err error) {
req = req.Clone(ctx)
source := ctx.Value(header).(string)
req.Header.Set(header, source)
resp, err = http.DefaultClient.Do(req)
return err
}
if err := retry.Do(ctx, action); err != nil {
fmt.Println("error:", err)
return
}
_, _ = io.Copy(os.Stdout, resp.Body)
}
interrupter := breaker.Multiplex(
breaker.BreakBySignal(os.Interrupt, os.Interrupt, syscall.SIGINT, syscall.SIGTERM),
breaker.BreakByTimeout(timeout),
)
defer interrupter.Close()
server := http.Server{
BaseContext: func(net.Listener) context.Context {
return breaker.ToContext(interrupter)
},
}
go func() {
if err := server.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
log.Fatal(err)
}
}()
<-interrupter.Done()
if errors.Is(interrupter.Err(), breaker.Interrupted) {
if err := server.Shutdown(context.TODO()); err != nil {
panic(err)
}
}
Full example
package main
import (
"context"
"errors"
"fmt"
"log"
"net"
"net/http"
"os"
"syscall"
"time"
"github.com/kamilsk/breaker"
)
func main() {
const timeout = time.Minute
interrupter := breaker.Multiplex(
breaker.BreakBySignal(os.Interrupt, os.Interrupt, syscall.SIGINT, syscall.SIGTERM),
breaker.BreakByTimeout(timeout),
)
defer interrupter.Close()
server := http.Server{
Addr: ":8080",
Handler: http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}),
BaseContext: func(net.Listener) context.Context {
return breaker.ToContext(interrupter)
},
}
go func() {
if err := server.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
log.Fatal(err)
}
}()
<-interrupter.Done()
if err := interrupter.Err(); errors.Is(err, breaker.Interrupted) {
if err := server.Shutdown(context.TODO()); err != nil {
panic(err)
}
}
fmt.Println("graceful shutdown")
}
The library uses SemVer for versioning, and it is not BC-safe through major releases. You can use go modules to manage its version.
$ go get github.com/kamilsk/breaker@latest
The example shows how to execute console commands for ten minutes.
$ date
# Thu Jan 7 21:02:21
$ breakit after 10m -- database run --port=5432
$ breakit after 10m -- server run --port=8080
$ breakit ps
# +--------------------------+---------------------+
# | Process | Done |
# +--------------------------+---------------------+
# | database run --port=5432 | Thu Jan 7 21:12:24 |
# | server run --port=8080 | Thu Jan 7 21:12:31 |
# +--------------------------+---------------------+
# | Total | 2 |
# +--------------------------+---------------------+
See more details here.
made with β€οΈ for everyone