Skip to content

Commit

Permalink
feat(fxcore): Added core tasks system
Browse files Browse the repository at this point in the history
  • Loading branch information
ekkinox committed Mar 11, 2025
1 parent 9e3a6d0 commit 4e3358d
Show file tree
Hide file tree
Showing 8 changed files with 256 additions and 153 deletions.
57 changes: 57 additions & 0 deletions fxcore/info.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package fxcore

import (
"fmt"
"sort"

"github.com/ankorstore/yokai/config"
"github.com/ankorstore/yokai/log"
"github.com/ankorstore/yokai/trace"
Expand Down Expand Up @@ -129,3 +132,57 @@ func (i *FxCoreModuleInfo) Data() map[string]interface{} {
"extra": i.ExtraInfos,
}
}

// FxModuleInfoRegistry is the registry collecting info about registered modules.
type FxModuleInfoRegistry struct {
infos map[string]FxModuleInfo
}

// FxModuleInfoRegistryParam allows injection of the required dependencies in [NewFxModuleInfoRegistry].
type FxModuleInfoRegistryParam struct {
fx.In
Infos []any `group:"core-module-infos"`
}

// NewFxModuleInfoRegistry returns a new [FxModuleInfoRegistry].
func NewFxModuleInfoRegistry(p FxModuleInfoRegistryParam) *FxModuleInfoRegistry {
infos := make(map[string]FxModuleInfo)

for _, info := range p.Infos {
if castInfo, ok := info.(FxModuleInfo); ok {
infos[castInfo.Name()] = castInfo
}
}

return &FxModuleInfoRegistry{
infos: infos,
}
}

func (r *FxModuleInfoRegistry) Names() []string {
names := make([]string, len(r.infos))

i := 0
for name := range r.infos {
names[i] = name
i++
}

sort.Strings(names)

return names
}

// All returns a map of all registered [FxModuleInfo].
func (r *FxModuleInfoRegistry) All() map[string]FxModuleInfo {
return r.infos
}

// Find returns a [FxModuleInfo] by name.
func (r *FxModuleInfoRegistry) Find(name string) (FxModuleInfo, error) {
if info, ok := r.infos[name]; ok {
return info, nil
}

return nil, fmt.Errorf("fx module info with name %s was not found", name)
}
80 changes: 79 additions & 1 deletion fxcore/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,17 @@ import (
"github.com/stretchr/testify/assert"
)

func TestNewFxCoreModuleInfo(t *testing.T) {
type testModuleInfo struct{}

func (i *testModuleInfo) Name() string {
return "test"
}

func (i *testModuleInfo) Data() map[string]interface{} {
return map[string]interface{}{}
}

func TestFxCoreModuleInfo(t *testing.T) {
t.Setenv("APP_ENV", "test")

cfg, err := config.NewDefaultConfigFactory().Create(
Expand Down Expand Up @@ -54,3 +64,71 @@ func TestNewFxCoreModuleInfo(t *testing.T) {
info.Data(),
)
}

func TestFxModuleInfoRegistry(t *testing.T) {
t.Parallel()

createRegistry := func(tb testing.TB) *fxcore.FxModuleInfoRegistry {
t.Helper()

cfg, err := config.NewDefaultConfigFactory().Create(
config.WithFilePaths("./testdata/config"),
)
assert.NoError(tb, err)

return fxcore.NewFxModuleInfoRegistry(fxcore.FxModuleInfoRegistryParam{
Infos: []interface{}{
&testModuleInfo{},
fxcore.NewFxCoreModuleInfo(fxcore.FxCoreModuleInfoParam{
Config: cfg,
ExtraInfos: []fxcore.FxExtraInfo{},
}),
"invalid",
},
})
}

t.Run("test type", func(t *testing.T) {
t.Parallel()

registry := createRegistry(t)

assert.IsType(t, &fxcore.FxModuleInfoRegistry{}, registry)
})

t.Run("test all", func(t *testing.T) {
t.Parallel()

registry := createRegistry(t)

assert.Len(t, registry.All(), 2)
})

t.Run("test names", func(t *testing.T) {
t.Parallel()

registry := createRegistry(t)

assert.Equal(t, []string{fxcore.ModuleName, "test"}, registry.Names())
})

t.Run("test find", func(t *testing.T) {
t.Parallel()

registry := createRegistry(t)

testInfo, err := registry.Find("test")
assert.NoError(t, err)
assert.Equal(t, "test", testInfo.Name())

coreInfo, err := registry.Find(fxcore.ModuleName)
assert.NoError(t, err)
assert.Equal(t, fxcore.ModuleName, coreInfo.Name())

invalidInfo, err := registry.Find("invalid")
assert.Error(t, err)
assert.Equal(t, "fx module info with name invalid was not found", err.Error())
assert.Nil(t, invalidInfo)
})

}
58 changes: 54 additions & 4 deletions fxcore/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"embed"
"fmt"
"io"
"net/http"
"strconv"

Expand Down Expand Up @@ -37,6 +38,7 @@ const (
DefaultHealthCheckStartupPath = "/healthz"
DefaultHealthCheckLivenessPath = "/livez"
DefaultHealthCheckReadinessPath = "/readyz"
DefaultTasksPath = "/tasks/:name"
DefaultDebugConfigPath = "/debug/config"
DefaultDebugPProfPath = "/debug/pprof"
DefaultDebugBuildPath = "/debug/build"
Expand All @@ -63,6 +65,7 @@ var FxCoreModule = fx.Module(
fxhealthcheck.FxHealthcheckModule,
fx.Provide(
NewFxModuleInfoRegistry,
NewTaskRegistry,
NewFxCore,
fx.Annotate(
NewFxCoreModuleInfo,
Expand Down Expand Up @@ -92,7 +95,8 @@ type FxCoreParam struct {
Checker *healthcheck.Checker
Config *config.Config
Logger *log.Logger
Registry *FxModuleInfoRegistry
InfoRegistry *FxModuleInfoRegistry
TaskRegistry *TaskRegistry
MetricsRegistry *prometheus.Registry
}

Expand Down Expand Up @@ -232,7 +236,7 @@ func withHandlers(coreServer *echo.Echo, p FxCoreParam) (*echo.Echo, error) {
dashboardEnabled := p.Config.GetBool("modules.core.server.dashboard.enabled")

// dashboard overview
overviewInfo, err := p.Registry.Find(ModuleName)
overviewInfo, err := p.InfoRegistry.Find(ModuleName)
if err != nil {
return nil, err
}
Expand All @@ -248,6 +252,7 @@ func withHandlers(coreServer *echo.Echo, p FxCoreParam) (*echo.Echo, error) {
overviewTraceProcessorExpose := p.Config.GetBool("modules.core.server.dashboard.overview.trace_processor")

// template expositions
tasksExpose := p.Config.GetBool("modules.core.server.tasks.expose")
metricsExpose := p.Config.GetBool("modules.core.server.metrics.expose")
startupExpose := p.Config.GetBool("modules.core.server.healthcheck.startup.expose")
livenessExpose := p.Config.GetBool("modules.core.server.healthcheck.liveness.expose")
Expand All @@ -260,6 +265,7 @@ func withHandlers(coreServer *echo.Echo, p FxCoreParam) (*echo.Echo, error) {
modulesExpose := p.Config.GetBool("modules.core.server.debug.modules.expose")

// template paths
tasksPath := p.Config.GetString("modules.core.server.tasks.path")
metricsPath := p.Config.GetString("modules.core.server.metrics.path")
startupPath := p.Config.GetString("modules.core.server.healthcheck.startup.path")
livenessPath := p.Config.GetString("modules.core.server.healthcheck.liveness.path")
Expand All @@ -271,6 +277,48 @@ func withHandlers(coreServer *echo.Echo, p FxCoreParam) (*echo.Echo, error) {
buildPath := p.Config.GetString("modules.core.server.debug.build.path")
modulesPath := p.Config.GetString("modules.core.server.debug.modules.path")

// tasks
if tasksExpose {
if tasksPath == "" {
tasksPath = DefaultTasksPath
}

coreServer.POST(metricsPath, func(c echo.Context) error {
ctx := c.Request().Context()

logger := log.CtxLogger(ctx)

name := c.Param("name")

input, err := io.ReadAll(c.Request().Body)
if err != nil {
logger.Error().Err(err).Str("task", name).Msg("request body read error")

return echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("cannot read request body: %v", err.Error()))
}

err = c.Request().Body.Close()
if err != nil {
logger.Error().Err(err).Str("task", name).Msg("request body close error")

return echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("cannot close request body: %v", err.Error()))
}

res, err := p.TaskRegistry.Run(ctx, name, input)
if err != nil {
logger.Error().Err(err).Str("task", name).Msg("task execution error")

return echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("task execution error: %v", err.Error()))
}

logger.Info().Str("task", name).Msg("task execution success")

return c.String(http.StatusOK, string(res))
})

coreServer.Logger.Debug("registered tasks handler")
}

// metrics
if metricsExpose {
if metricsPath == "" {
Expand Down Expand Up @@ -400,7 +448,7 @@ func withHandlers(coreServer *echo.Echo, p FxCoreParam) (*echo.Echo, error) {
}

coreServer.GET(fmt.Sprintf("%s/:name", modulesPath), func(c echo.Context) error {
info, err := p.Registry.Find(c.Param("name"))
info, err := p.InfoRegistry.Find(c.Param("name"))
if err != nil {
return echo.NewHTTPError(http.StatusNotFound, err.Error())
}
Expand Down Expand Up @@ -466,6 +514,8 @@ func withHandlers(coreServer *echo.Echo, p FxCoreParam) (*echo.Echo, error) {
"overviewLogOutputExpose": overviewLogOutputExpose,
"overviewTraceSamplerExpose": overviewTraceSamplerExpose,
"overviewTraceProcessorExpose": overviewTraceProcessorExpose,
"tasksExpose": tasksExpose,
"tasksPath": tasksPath,
"metricsExpose": metricsExpose,
"metricsPath": metricsPath,
"startupExpose": startupExpose,
Expand All @@ -486,7 +536,7 @@ func withHandlers(coreServer *echo.Echo, p FxCoreParam) (*echo.Echo, error) {
"buildPath": buildPath,
"modulesExpose": modulesExpose || appDebug,
"modulesPath": modulesPath,
"modulesNames": p.Registry.Names(),
"modulesNames": p.InfoRegistry.Names(),
"theme": theme,
})
})
Expand Down
11 changes: 11 additions & 0 deletions fxcore/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,14 @@ func AsCoreExtraInfo(name string, value string) fx.Option {
),
)
}

// AsTask registers a task.
func AsTask(constructor any) fx.Option {
return fx.Provide(
fx.Annotate(
constructor,
fx.As(new(Task)),
fx.ResultTags(`group:"core-tasks"`),
),
)
}
62 changes: 0 additions & 62 deletions fxcore/registry.go

This file was deleted.

Loading

0 comments on commit 4e3358d

Please sign in to comment.