Skip to content
This repository was archived by the owner on Aug 12, 2025. It is now read-only.

Commit 06cf843

Browse files
committed
Update migration cli to stream updates
1 parent 4ff1149 commit 06cf843

File tree

7 files changed

+471
-169
lines changed

7 files changed

+471
-169
lines changed

cmd/migration/main.go

Lines changed: 104 additions & 164 deletions
Original file line numberDiff line numberDiff line change
@@ -14,238 +14,178 @@ limitations under the License.
1414
package main
1515

1616
import (
17-
"bytes"
1817
"context"
18+
"flag"
1919
"fmt"
20-
"io"
2120
"io/ioutil"
21+
"math/rand"
2222
"os"
23-
"strings"
2423
"sync"
24+
"time"
2525

26+
"github.com/charmbracelet/bubbles/spinner"
27+
tea "github.com/charmbracelet/bubbletea"
2628
"github.com/charmbracelet/lipgloss"
27-
"github.com/spf13/cobra"
28-
"golang.org/x/term"
29-
corev1 "k8s.io/api/core/v1"
30-
"k8s.io/apimachinery/pkg/runtime"
31-
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
3229
_ "k8s.io/client-go/plugin/pkg/client/auth"
33-
"k8s.io/client-go/tools/clientcmd"
34-
"k8s.io/client-go/util/retry"
30+
"sigs.k8s.io/cluster-api-provider-packet/cmd/migration/util"
3531
clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3"
36-
"sigs.k8s.io/cluster-api/controllers/remote"
37-
"sigs.k8s.io/controller-runtime/pkg/client"
3832
)
3933

4034
var (
41-
kubeconfig string //nolint:gochecknoglobals
42-
migrationCmd = &cobra.Command{ //nolint:exhaustivestruct,gochecknoglobals
43-
Use: "migration",
44-
SilenceUsage: true,
45-
Short: "migration is used to handle migration tasks for cluster-api-provider-packet",
46-
RunE: func(cmd *cobra.Command, args []string) error {
47-
return runMigration(context.TODO())
48-
},
49-
}
35+
kubeconfig string //nolint:gochecknoglobals
36+
migrationOutputBuffers *util.OutputBuffers //nolint:gochecknoglobals
37+
migrationOutput *util.OutputCollection //nolint:gochecknoglobals
38+
migrationErrors *util.ErrorCollection //nolint:gochecknoglobals
39+
clusters []clusterv1.Cluster //nolint:gochecknoglobals
40+
buffMutex sync.Mutex //nolint:gochecknoglobals
5041
)
5142

52-
func getManagementClient(kubeconfig string) (client.Client, error) {
53-
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
54-
if kubeconfig != "" {
55-
loadingRules.ExplicitPath = kubeconfig
56-
}
57-
58-
configOverrides := &clientcmd.ConfigOverrides{} //nolint:exhaustivestruct
59-
kubeConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, configOverrides)
43+
func copyBuffers() {
44+
buffMutex.Lock()
45+
defer buffMutex.Unlock()
6046

61-
config, err := kubeConfig.ClientConfig()
62-
if err != nil {
63-
return nil, fmt.Errorf("failed to create client configuration for management cluster: %w", err)
64-
}
47+
for _, c := range clusters {
48+
outputKey := fmt.Sprintf("%s/%s", c.Namespace, c.Name)
6549

66-
scheme := runtime.NewScheme()
67-
_ = clientgoscheme.AddToScheme(scheme)
68-
_ = clusterv1.AddToScheme(scheme)
50+
out, err := ioutil.ReadAll(migrationOutputBuffers.Get(outputKey))
51+
if err != nil {
52+
continue
53+
}
6954

70-
return client.New(config, client.Options{Scheme: scheme}) //nolint:exhaustivestruct,wrapcheck
55+
migrationOutput.Append(outputKey, string(out))
56+
}
7157
}
7258

73-
func runMigration(ctx context.Context) error {
74-
mgmtClient, err := getManagementClient(kubeconfig)
59+
func runMigration() tea.Msg {
60+
mgmtClient, err := util.GetManagementClient(kubeconfig)
7561
if err != nil {
76-
return fmt.Errorf("failed to create client for management cluster: %w", err)
62+
return fatalErr(fmt.Errorf("failed to create client for management cluster: %w", err))
7763
}
7864

7965
clusterList := &clusterv1.ClusterList{} //nolint:exhaustivestruct
80-
if err := mgmtClient.List(ctx, clusterList); err != nil {
81-
return fmt.Errorf("failed to list workload clusters in management cluster: %w", err)
66+
if err := mgmtClient.List(context.TODO(), clusterList); err != nil {
67+
return fatalErr(fmt.Errorf("failed to list workload clusters in management cluster: %w", err))
8268
}
8369

84-
var (
85-
wg sync.WaitGroup
86-
mu sync.Mutex
87-
)
70+
var wg sync.WaitGroup
8871

89-
migrationOutput := make(map[string]*bytes.Buffer, len(clusterList.Items))
90-
migrationErrors := make(map[string]error, len(clusterList.Items))
72+
clusters = clusterList.Items
73+
migrationOutputBuffers, migrationErrors = util.RunMigration(context.TODO(), mgmtClient, clusterList.Items, &wg)
74+
migrationOutput = util.NewOutputCollection(len(clusters))
9175

92-
for i, c := range clusterList.Items {
93-
outputKey := fmt.Sprintf("%s/%s", c.Namespace, c.Name)
76+
wg.Wait()
9477

95-
clusterKey, err := client.ObjectKeyFromObject(&clusterList.Items[i])
96-
if err != nil {
97-
mu.Lock()
98-
migrationErrors[outputKey] = fmt.Errorf("failed to create object key: %w", err)
99-
mu.Unlock()
78+
return cleanQuit()
79+
}
10080

101-
continue
102-
}
81+
type model struct {
82+
spinner spinner.Model
83+
err error
84+
}
10385

104-
var buf bytes.Buffer
86+
type fatalErr error
10587

106-
mu.Lock()
107-
migrationOutput[outputKey] = &buf
108-
mu.Unlock()
109-
wg.Add(1)
88+
func cleanQuit() tea.Msg {
89+
copyBuffers()
11090

111-
go func() {
112-
defer wg.Done()
91+
// This is to ensure that the buffers are flushed to stdout prior to exiting
92+
time.Sleep(time.Second)
11393

114-
if err := migrateWorkloadCluster(context.TODO(), clusterKey, mgmtClient, &buf); err != nil {
115-
mu.Lock()
116-
migrationErrors[outputKey] = err
117-
mu.Unlock()
94+
return tea.Quit()
95+
}
11896

119-
return
120-
}
121-
}()
122-
}
97+
func initialModel() model {
98+
s := spinner.NewModel()
99+
s.Spinner = spinner.Pulse
100+
s.Style = lipgloss.NewStyle().Foreground(lipgloss.Color("205"))
123101

124-
wg.Wait()
102+
return model{spinner: s} //nolint:exhaustivestruct
103+
}
104+
105+
func (m model) Init() tea.Cmd {
106+
return tea.Batch(runMigration, spinner.Tick)
107+
}
108+
109+
func (m model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
110+
var cmd tea.Cmd
125111

126-
if outputResults(clusterList.Items, migrationOutput, migrationErrors) {
127-
return fmt.Errorf("failed to run migration to completion") //nolint:goerr113
112+
switch msg := msg.(type) {
113+
case tea.KeyMsg:
114+
switch msg.String() {
115+
case "ctrl+c", "esc":
116+
cmd = cleanQuit
117+
}
118+
case fatalErr:
119+
m.err = msg
120+
cmd = cleanQuit
121+
default:
122+
copyBuffers()
123+
124+
m.spinner, cmd = m.spinner.Update(msg)
128125
}
129126

130-
return nil
127+
return m, cmd
131128
}
132129

133-
func outputResults(
134-
clusters []clusterv1.Cluster,
135-
migrationOutput map[string]*bytes.Buffer,
136-
migrationErrors map[string]error,
137-
) bool {
138-
doc := strings.Builder{}
139-
physicalWidth, _, _ := term.GetSize(int(os.Stdout.Fd()))
130+
func (m model) View() string {
131+
// TODO: handle window size detection and text wrapping
140132
errorColor := lipgloss.Color("#dc322f")
141133
infoColor := lipgloss.Color("#859900")
142134
headingColor := lipgloss.Color("#268bd2")
143-
docStyle := lipgloss.NewStyle().Padding(1, 2, 1, 2)
144135
clusterStyle := lipgloss.NewStyle().Foreground(headingColor)
145136
outputHeadings := clusterStyle.Copy().PaddingLeft(4) //nolint:gomnd
146137
clusterOutputStyle := lipgloss.NewStyle().PaddingLeft(8).Foreground(infoColor) //nolint:gomnd
147138
clusterErrorStyle := lipgloss.NewStyle().PaddingLeft(4).Foreground(errorColor) //nolint:gomnd
148139

149-
encounteredErrors := false
140+
s := "CAPP Migration\n\n"
141+
142+
s += fmt.Sprintf("%s Running...\n\n", m.spinner.View())
143+
144+
if m.err != nil {
145+
s += fmt.Sprintf("Error: %v\n", m.err)
146+
}
150147

151148
for _, c := range clusters {
152149
outputKey := fmt.Sprintf("%s/%s", c.Namespace, c.Name)
153-
doc.WriteString(clusterStyle.Render(fmt.Sprintf("Cluster %s:", outputKey)) + "\n")
150+
s += clusterStyle.Render(fmt.Sprintf("Cluster %s:", outputKey)) + "\n"
154151

155-
out, err := ioutil.ReadAll(migrationOutput[outputKey])
156-
if err != nil {
157-
fmt.Fprintf(os.Stderr, "Error reading output for cluster %s: %v\n", outputKey, err)
158-
}
152+
out := migrationOutput.Get(outputKey)
159153

160154
if len(out) > 0 {
161-
doc.WriteString(outputHeadings.Render("Output:") + "\n")
162-
doc.WriteString(clusterOutputStyle.Render(string(out)) + "\n")
155+
s += outputHeadings.Render("Output:") + "\n"
156+
s += clusterOutputStyle.Render(out) + "\n"
163157
}
164158

165-
if err, ok := migrationErrors[outputKey]; ok {
166-
encounteredErrors = true
167-
168-
doc.WriteString(clusterErrorStyle.Render(fmt.Sprintf("Error: %s", err.Error())) + "\n\n")
159+
if err, ok := migrationErrors.Load(outputKey); ok {
160+
s += clusterErrorStyle.Render(fmt.Sprintf("Error: %s", err.Error())) + "\n"
169161
}
170162
}
171163

172-
if physicalWidth > 0 {
173-
docStyle = docStyle.MaxWidth(physicalWidth)
174-
} else if physicalWidth < 0 {
175-
docStyle = docStyle.MaxWidth(80) //nolint:gomnd
176-
}
177-
178-
fmt.Println(docStyle.Render(doc.String())) //nolint:forbidigo
179-
180-
return encounteredErrors
164+
return s
181165
}
182166

183-
func migrateWorkloadCluster(
184-
ctx context.Context,
185-
clusterKey client.ObjectKey,
186-
mgmtClient client.Client,
187-
stdout io.Writer,
188-
) error {
189-
c, err := remote.NewClusterClient(ctx, mgmtClient, clusterKey, nil)
190-
if err != nil {
191-
return fmt.Errorf("failed to create client: %w", err)
192-
}
193-
194-
nodeList := &corev1.NodeList{} //nolint:exhaustivestruct
195-
if err := c.List(ctx, nodeList); err != nil {
196-
return fmt.Errorf("failed to list nodes: %w", err)
197-
}
198-
199-
for _, node := range nodeList.Items {
200-
// TODO: should this stop at first error, or attempt to continue?
201-
// TODO: should probably give some additional safety to users since this will be
202-
// deleting and re-creating Node resources
203-
if err := migrateNode(ctx, node, c, stdout); err != nil {
204-
return err
205-
}
206-
}
167+
func main() {
168+
rand.Seed(time.Now().UTC().UnixNano())
207169

208-
return nil
209-
}
170+
var showHelp bool
210171

211-
func migrateNode(ctx context.Context, node corev1.Node, workloadClient client.Client, stdout io.Writer) error {
212-
if strings.HasPrefix(node.Spec.ProviderID, "equinixmetal") {
213-
fmt.Fprintf(stdout, "✔ Node %s already has the updated providerID\n", node.Name)
172+
flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ExitOnError)
214173

215-
return nil
216-
}
217-
218-
if err := workloadClient.Delete(ctx, &node); err != nil {
219-
return fmt.Errorf("failed to delete existing node resource: %w", err)
220-
}
174+
flag.StringVar(&kubeconfig, "kubeconfig", "",
175+
"Path to the kubeconfig for the management cluster. If unspecified, default discovery rules apply.")
176+
flag.BoolVar(&showHelp, "h", false, "show help")
177+
flag.Parse()
221178

222-
node.SetResourceVersion("")
223-
node.Spec.ProviderID = strings.Replace(node.Spec.ProviderID, "packet", "equinixmetal", 1)
224-
225-
if err := retry.OnError(
226-
retry.DefaultRetry,
227-
func(err error) bool {
228-
return true
229-
},
230-
func() error {
231-
return workloadClient.Create(ctx, &node) //nolint:wrapcheck
232-
},
233-
); err != nil {
234-
return fmt.Errorf("failed to create replacement node resource: %w", err)
179+
if showHelp {
180+
flag.Usage()
181+
os.Exit(0)
235182
}
236183

237-
fmt.Fprintf(stdout, "✅ Node %s has been successfully migrated\n", node.Name)
184+
m := initialModel()
238185

239-
return nil
240-
}
241-
242-
func init() { //nolint:gochecknoinits
243-
migrationCmd.Flags().StringVar(&kubeconfig, "kubeconfig", "",
244-
"Path to the kubeconfig for the management cluster. If unspecified, default discovery rules apply.")
245-
}
246-
247-
func main() {
248-
if err := migrationCmd.Execute(); err != nil {
186+
p := tea.NewProgram(m)
187+
if err := p.Start(); err != nil {
188+
fmt.Println("Error starting Bubble Tea program:", err) //nolint: forbidigo
249189
os.Exit(1)
250190
}
251191
}

0 commit comments

Comments
 (0)