Skip to content

Commit

Permalink
πŸ› fix: eliminate zombie jobs (#96)
Browse files Browse the repository at this point in the history
* 🎨 [chore] create chainEndpoint constant

This commit moves the hard-coded network chains endpoint into a `chainEndpoint` constant for better visibility and configurability.

* ✨ Feat: add manuscript state machine

Added a Manuscript State Detector which can be instanciated in the ListJobs() function for sake of identifying manuscript state with accuracy and finality.  The state machine executes a series of elaborate looks at container status and flink logs to  update to one  of five states: RUNNING, INITIALIZING, FAILED, STOPPED, and UNKNOWN (rare). This is the grounds for more advanced manuscript detection from the CLI.

* ♻️ [chore] refactor listjobs to accomodate state machine

This commit adds new logic to jobs_manuscipt that allows for the detection of manuscripts by using `manuscript_state.go`

* πŸ’‘ [chore] add comments to state machine

Recognizing that there is a lot of complicated logic in the state detector, I added comments explaining its general operation throughout.

* πŸ› Fix: enhance manuscript state detection

Within the DetectState method, manuscripts previously defaulted to initializing state, despite surviving all previous checks. Now, if a manuscript survives all checks it will display as `RUNNING` in the jobs list.

* 🦺 [chore] add graphQL check to state detector

This adds as an additional state check - the checkGraphQLEndpoint function which hits the local endpoint to determine healthiness and ensure it is live before returning RUNNING. If it is determined not to be live, but survive all other checks, the manuscript will return INITIALIZING status.

* πŸ“ [chore] add additional help documentation for cli `list` cmd

This commit adds additional status indicators to the long help text of the list command.

* ✨ [feat] add alias `ls` to cli `list` command

This commit adds an alias, `ls` to the list command.

* 🦺 [chore] add guard rails for cli init

This commit adds additional guard rails to fix broken functionality of `init` command that was affecting the jobs list. Previously, a person could init and partially overwrite an existing manuscript. Now, extensive checks are in place to prevent that from happening: manuscript is looked for in the directory of the manuscript path, the manuscript is already running, the manuscript is in the config file.

* ✨ [feat] enhance `manuscript-cli list` cmd

This commit adds new functionality to manuscript-cli list. It now accepts a directory as an argument. If no directory is specified it looks in the default directory for users. It also shows STOPPED jobs by looking at the manuscript yamls in the subdirectories of the specified directory and showing ALL manuscripts. This makes it more informative and usable.
  • Loading branch information
KagemniKarimu authored Dec 16, 2024
1 parent db496fc commit a9bdbdb
Show file tree
Hide file tree
Showing 6 changed files with 419 additions and 67 deletions.
40 changes: 21 additions & 19 deletions client/network_chains.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ import (
)

type ChainBaseClient struct {
baseURL string
httpClient *http.Client
baseURL string
chainEndpoint string
httpClient *http.Client
}

type ChainBaseDatasetListItem struct {
Expand All @@ -20,23 +21,23 @@ type ChainBaseDatasetListItem struct {
}

type ChainResponse struct {
Code int `json:"code"`
Message string `json:"message"`
GraphData []struct {
Chain struct {
ID string `json:"id"`
Name string `json:"name"`
DatabaseName string `json:"databaseName"`
DataDictionary map[string][]TableInfo `json:"dataDictionary"`
} `json:"chain"`
} `json:"graphData"`
TransactionLogs *[]TransactionLog `json:"transactionLogs,omitempty"`
Code int `json:"code"`
Message string `json:"message"`
GraphData []struct {
Chain struct {
ID string `json:"id"`
Name string `json:"name"`
DatabaseName string `json:"databaseName"`
DataDictionary map[string][]TableInfo `json:"dataDictionary"`
} `json:"chain"`
} `json:"graphData"`
TransactionLogs *[]TransactionLog `json:"transactionLogs,omitempty"`
}

type TransactionLog struct {
Timestamp string `json:"timestamp"`
Action string `json:"action"`
Details string `json:"details"`
Timestamp string `json:"timestamp"`
Action string `json:"action"`
Details string `json:"details"`
}

type TableInfo struct {
Expand All @@ -45,17 +46,18 @@ type TableInfo struct {
Description string `json:"description"`
}

func NewChainBaseClient(baseURL string) *ChainBaseClient {
func NewChainBaseClient(baseURL string, chainEndpoint string) *ChainBaseClient {
return &ChainBaseClient{
baseURL: baseURL,
baseURL: baseURL,
chainEndpoint: chainEndpoint,
httpClient: &http.Client{
Timeout: 10 * time.Second,
},
}
}

func (c *ChainBaseClient) GetChainBaseDatasetList() ([]*ChainBaseDatasetListItem, error) {
url := fmt.Sprintf("%s/api/v1/metadata/network_chains", c.baseURL)
url := fmt.Sprintf("%s%s", c.baseURL, c.chainEndpoint)

resp, err := c.httpClient.Get(url)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion client/network_chains_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestGetChainBaseDatasetList(t *testing.T) {
}))
defer mockServer.Close()

client := NewChainBaseClient(mockServer.URL)
client := NewChainBaseClient(mockServer.URL, "/api/v1/metadata/network_chains")

chains, err := client.GetChainBaseDatasetList()

Expand Down
31 changes: 26 additions & 5 deletions commands/commander.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package commands

import (
"fmt"
"log"
"manuscript-core/pkg"
"os"

"github.com/spf13/cobra"
Expand Down Expand Up @@ -47,8 +49,9 @@ You'll be prompted to select:
}

var jobListCmd = &cobra.Command{
Use: "list",
Short: "List all manuscript jobs",
Use: "list [directory]",
Aliases: []string{"ls"},
Short: "List all manuscript jobs",
Long: `πŸ“‹ View all running manuscript jobs
Each job shows:
Expand All @@ -60,10 +63,28 @@ Each job shows:
Status indicators:
🟒 Running - Job is active and processing data
🟑 Warning - Job needs attention
βšͺ️ Other - Various other states`,
Example: `>> manuscript-cli list`,
πŸ”΄ Failed - Job encountered an error
⚫ Stopped - Job was stopped
Usage:
- Run without arguments to check default directory
- Specify a directory path to check manuscripts in that location`,
Example: `>> manuscript-cli ls
>> manuscript-cli list /path/to/manuscripts`,
Args: cobra.MaximumNArgs(1),
Run: func(cmd *cobra.Command, args []string) {
ListJobs()
var dir string
// if no args, use default manuscript directory
if len(args) == 0 {
config, err := pkg.LoadConfig(manuscriptConfig)
if err != nil {
log.Fatalf("Error: Failed to load config: %v", err)
}
dir = fmt.Sprintf("%s/%s", config.BaseDir, manuscriptBaseName)
} else {
dir = args[0] // use specified directory if user input
}
ListJobs(dir)
},
}

Expand Down
68 changes: 58 additions & 10 deletions commands/init_manuscript.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@ import (
)

const (
manuscriptBaseName = "manuscript"
manuscriptBaseDir = "$HOME"
manuscriptConfig = "$HOME/.manuscript_config.ini"
networkChainURL = "https://api.chainbase.com"
defaultDatabase = "zkevm"
defaultTable = "blocks"
defaultSink = "postgres"
graphQLImage = "repository.chainbase.com/manuscript-node/graphql-engine:latest"
graphQLARMImage = "repository.chainbase.com/manuscript-node/graphql-engine-arm64:latest"
manuscriptBaseName = "manuscript"
manuscriptBaseDir = "$HOME"
manuscriptConfig = "$HOME/.manuscript_config.ini"
networkChainURL = "https://api.chainbase.com"
networkChainEndpoint = "/api/v1/metadata/network_chains"
defaultDatabase = "zkevm"
defaultTable = "blocks"
defaultSink = "postgres"
graphQLImage = "repository.chainbase.com/manuscript-node/graphql-engine:latest"
graphQLARMImage = "repository.chainbase.com/manuscript-node/graphql-engine-arm64:latest"
)

func executeInitManuscript(ms pkg.Manuscript) {
Expand Down Expand Up @@ -91,6 +92,10 @@ func InitManuscript() {
fmt.Printf("\033[32mβœ“ Manuscript base directory set to: %s\033[0m\n\n", manuscriptDir)

manuscriptName := promptInput("πŸ‚ 2. Enter your manuscript name (default is demo)\u001B[0m: ", "demo")
if err := checkExistingManuscript(manuscriptName); err != nil {
logErrorAndReturn(fmt.Sprintf("Cannot create manuscript: %v", err), nil)
return
}
if checkDockerContainerExists(manuscriptName) {
logErrorAndReturn(fmt.Sprintf("Manuscript with name [ %s ] already exists. Please choose a different name.", manuscriptName), nil)
}
Expand Down Expand Up @@ -320,10 +325,53 @@ func checkDockerContainerExists(manuscriptName string) bool {
return false
}

func checkExistingManuscript(name string) error {
// Check if manuscript directory exists
msConfig, err := pkg.LoadConfig(manuscriptConfig)
if err != nil {
return fmt.Errorf("failed to load manuscript config: %w", err)
}

manuscriptPath := filepath.Join(msConfig.BaseDir, manuscriptBaseName, name)
if _, err := os.Stat(manuscriptPath); !os.IsNotExist(err) {
return fmt.Errorf("manuscript directory already exists at %s.", manuscriptPath)
}

// Check if manuscript containers are running
dockers, err := pkg.RunDockerPs()
if err != nil {
return fmt.Errorf("failed to check running containers: %w", err)
}

containerNames := []string{
fmt.Sprintf("%s-jobmanager-1", name),
fmt.Sprintf("%s-taskmanager-1", name),
fmt.Sprintf("%s-postgres-1", name),
fmt.Sprintf("%s-hasura-1", name),
}

for _, docker := range dockers {
for _, containerName := range containerNames {
if docker.Name == containerName {
return fmt.Errorf("manuscript containers for '%s' already exist. Please stop and remove them first", name)
}
}
}

// Check if manuscript is in config
for _, ms := range msConfig.Manuscripts {
if ms.Name == name {
return fmt.Errorf("manuscript '%s' already exists in configuration. \n Consider cleaning %s", name, manuscriptConfig)
}
}

return nil
}

func fetchChainBaseDatasets() ([]*client.ChainBaseDatasetListItem, error) {
var chains []*client.ChainBaseDatasetListItem
err := pkg.ExecuteStepWithLoading("Checking Datasets From Network", false, func() error {
c := client.NewChainBaseClient(networkChainURL)
c := client.NewChainBaseClient(networkChainURL, networkChainEndpoint)
var err error
chains, err = c.GetChainBaseDatasetList()
if err != nil {
Expand Down
145 changes: 113 additions & 32 deletions commands/jobs_manuscript.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import (
"encoding/json"
"fmt"
"log"
"manuscript-core/client"
"manuscript-core/pkg"
"net/http"
"os"
"path/filepath"
"time"
)

Expand Down Expand Up @@ -40,48 +41,128 @@ func formatDurationToMinutes(durationMs int64) string {
return fmt.Sprintf("%d minutes", durationMinutes)
}

func ListJobs() {
func ListJobs(dir string) {
_ = pkg.ExecuteStepWithLoading("Checking jobs", false, func() error {
dockers, err := pkg.RunDockerPs()
// Step 1: Check for running Docker containers
dockers, err := getRunningContainers()
if err != nil {
log.Fatalf("Error: Failed to get docker ps: %v", err)
return err
}

// Always show if no containers are running
if len(dockers) == 0 {
fmt.Println("\r🟑 There are no jobs running...")
fmt.Println("\rπŸ“ There are no jobs running...")
}

// Step 2: Get manuscripts based on source (config or directory)
manuscripts, err := getManuscripts(dir)
if err != nil {
return fmt.Errorf("failed to get manuscripts: %w", err)
}

if len(manuscripts) == 0 {
if dir != "" {
fmt.Printf("\r⚠️ No manuscript files found in %s\n", dir)
}
return nil
}

jobNumber := 0
manuscripts, err := pkg.LoadConfig(manuscriptConfig)
for _, m := range manuscripts.Manuscripts {
if m.Port != 0 {
c := client.NewFlinkUiClient(fmt.Sprintf("http://localhost:%d", m.Port))
jobs, err := c.GetJobsList()
// Step 3: Check and display state for each manuscript
displayManuscriptStates(manuscripts, dockers)
return nil
})
}

// getRunningContainers retrieves all running Docker containers
func getRunningContainers() ([]pkg.ContainerInfo, error) {
dockers, err := pkg.RunDockerPs()
if err != nil {
return nil, fmt.Errorf("failed to get docker processes: %w", err)
}
return dockers, nil
}

// getManuscripts retrieves manuscript configurations from either config file or directory
func getManuscripts(dir string) ([]pkg.Manuscript, error) {
if dir == "" {
return getManuscriptsFromConfig()
}
return getManuscriptsFromDirectory(dir)
}

// getManuscriptsFromConfig loads manuscripts from the config file
func getManuscriptsFromConfig() ([]pkg.Manuscript, error) {
config, err := pkg.LoadConfig(manuscriptConfig)
if err != nil {
return nil, fmt.Errorf("failed to load configuration: %w", err)
}
return config.Manuscripts, nil
}

// getManuscriptsFromDirectory scans directory for manuscript.yaml files
func getManuscriptsFromDirectory(dir string) ([]pkg.Manuscript, error) {
// Check if directory exists
if _, err := os.Stat(dir); os.IsNotExist(err) {
return nil, fmt.Errorf("directory does not exist: %s", dir)
}

var manuscripts []pkg.Manuscript
// Read only the immediate subdirectories
entries, err := os.ReadDir(dir)
if err != nil {
return nil, fmt.Errorf("failed to read directory: %w", err)
}

for _, entry := range entries {
if entry.IsDir() {
// Check for manuscript.yaml in each immediate subdirectory
manuscriptPath := filepath.Join(dir, entry.Name(), "manuscript.yaml")
if _, err := os.Stat(manuscriptPath); err == nil {
manuscript, err := pkg.ParseYAML(manuscriptPath)
if err != nil {
fmt.Printf("\r🟑 Manuscript: \u001B[34m%s\u001B[0m | State: \033[33mInitializing...(may wait for 2 minutes)\033[0m\n", m.Name)
}
if len(jobs) == 0 && err == nil {
fmt.Printf("\r🟑 Manuscript: \u001B[34m%s\u001B[0m | State: \033[33mInitializing...\033[0m\n", m.Name)
}
for _, job := range jobs {
jobNumber++
startTime := formatTimestamp(job.StartTime)
duration := formatDurationToMinutes(job.Duration)

switch job.State {
case "RUNNING":
trackHasuraTable(m.Name)
fmt.Printf("\r🟒 %d: Manuscript: \033[32m%s\033[0m | State: \033[32m%s\033[0m | Start Time: %s | Duration: %v | GraphQL: http://127.0.0.1:%d\n", jobNumber, m.Name, job.State, startTime, duration, m.GraphQLPort)
case "CANCELED":
fmt.Printf("\r🟑 %d: Manuscript: %s | State: \033[33m%s\033[0m | Start Time: %s | Duration: %v\n", jobNumber, m.Name, job.State, startTime, duration)
default:
fmt.Printf("\rβšͺ️ %d: Manuscript: %s | State: %s | Start Time: %s | Duration: %v\n", jobNumber, m.Name, job.State, startTime, duration)
}
log.Printf("Warning: Failed to parse %s: %v", manuscriptPath, err)
continue
}
manuscripts = append(manuscripts, *manuscript)
}
}
return err
})
}

return manuscripts, nil
}

// displayManuscriptStates checks and displays the state of each manuscript
func displayManuscriptStates(manuscripts []pkg.Manuscript, dockers []pkg.ContainerInfo) {
for i, m := range manuscripts {
detector := pkg.NewStateDetector(&m, dockers)
state, err := detector.DetectState()
if err != nil {
log.Printf("Warning: Failed to detect state for %s: %v", m.Name, err)
state = pkg.StateUnknown
}

displayJobStatus(i+1, &m, state)
}
}

func displayJobStatus(jobNumber int, m *pkg.Manuscript, state pkg.ManuscriptState) {
switch state {
case pkg.StateRunning:
fmt.Printf("\r🟒 %d: Manuscript: \033[32m%s\033[0m | State: \033[32m%s\033[0m | GraphQL: http://127.0.0.1:%d\n",
jobNumber, m.Name, state, m.GraphQLPort)
case pkg.StateInitializing:
fmt.Printf("\r🟑 %d: Manuscript: \033[34m%s\033[0m | State: \033[33m%s\033[0m\n",
jobNumber, m.Name, state)
case pkg.StateFailed:
fmt.Printf("\rπŸ”΄ %d: Manuscript: %s | State: \033[31m%s\033[0m\n",
jobNumber, m.Name, state)
case pkg.StateStopped:
fmt.Printf("\r⚫ %d: Manuscript: %s | State: \033[90m%s\033[0m\n",
jobNumber, m.Name, state)
default:
fmt.Printf("\rβšͺ %d: Manuscript: %s | State: %s\n",
jobNumber, m.Name, state)
}
}

func JobLogs(jobName string) {
Expand Down
Loading

0 comments on commit a9bdbdb

Please sign in to comment.