Skip to content
This repository was archived by the owner on Aug 12, 2025. It is now read-only.

Commit 523523f

Browse files
committed
Update migration cli to stream updates
1 parent 4ff1149 commit 523523f

File tree

7 files changed

+521
-169
lines changed

7 files changed

+521
-169
lines changed

cmd/migration/main.go

Lines changed: 154 additions & 164 deletions
Original file line numberDiff line numberDiff line change
@@ -14,238 +14,228 @@ limitations under the License.
1414
package main
1515

1616
import (
17-
"bytes"
1817
"context"
18+
"flag"
1919
"fmt"
20-
"io"
2120
"io/ioutil"
21+
"math/rand"
2222
"os"
23-
"strings"
2423
"sync"
24+
"time"
2525

26+
"github.com/charmbracelet/bubbles/spinner"
27+
tea "github.com/charmbracelet/bubbletea"
2628
"github.com/charmbracelet/lipgloss"
27-
"github.com/spf13/cobra"
28-
"golang.org/x/term"
29-
corev1 "k8s.io/api/core/v1"
30-
"k8s.io/apimachinery/pkg/runtime"
31-
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
3229
_ "k8s.io/client-go/plugin/pkg/client/auth"
33-
"k8s.io/client-go/tools/clientcmd"
34-
"k8s.io/client-go/util/retry"
30+
"sigs.k8s.io/cluster-api-provider-packet/cmd/migration/util"
3531
clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3"
36-
"sigs.k8s.io/cluster-api/controllers/remote"
37-
"sigs.k8s.io/controller-runtime/pkg/client"
3832
)
3933

4034
var (
41-
kubeconfig string //nolint:gochecknoglobals
42-
migrationCmd = &cobra.Command{ //nolint:exhaustivestruct,gochecknoglobals
43-
Use: "migration",
44-
SilenceUsage: true,
45-
Short: "migration is used to handle migration tasks for cluster-api-provider-packet",
46-
RunE: func(cmd *cobra.Command, args []string) error {
47-
return runMigration(context.TODO())
48-
},
49-
}
35+
kubeconfig string //nolint:gochecknoglobals
36+
migrationOutputBuffers *util.OutputBuffers //nolint:gochecknoglobals
37+
migrationOutput *util.OutputCollection //nolint:gochecknoglobals
38+
migrationErrors *util.ErrorCollection //nolint:gochecknoglobals
39+
clusters []clusterv1.Cluster //nolint:gochecknoglobals
40+
buffMutex sync.Mutex //nolint:gochecknoglobals
5041
)
5142

52-
func getManagementClient(kubeconfig string) (client.Client, error) {
53-
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
54-
if kubeconfig != "" {
55-
loadingRules.ExplicitPath = kubeconfig
56-
}
57-
58-
configOverrides := &clientcmd.ConfigOverrides{} //nolint:exhaustivestruct
59-
kubeConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, configOverrides)
43+
// func outputResults(
44+
// clusters []clusterv1.Cluster,
45+
// migrationOutput *util.OutputCollection,
46+
// migrationErrors *util.ErrorCollection,
47+
// ) bool {
48+
// doc := strings.Builder{}
49+
// physicalWidth, _, _ := term.GetSize(int(os.Stdout.Fd()))
50+
// errorColor := lipgloss.Color("#dc322f")
51+
// infoColor := lipgloss.Color("#859900")
52+
// headingColor := lipgloss.Color("#268bd2")
53+
// docStyle := lipgloss.NewStyle().Padding(1, 2, 1, 2)
54+
// clusterStyle := lipgloss.NewStyle().Foreground(headingColor)
55+
// outputHeadings := clusterStyle.Copy().PaddingLeft(4) //nolint:gomnd
56+
// clusterOutputStyle := lipgloss.NewStyle().PaddingLeft(8).Foreground(infoColor) //nolint:gomnd
57+
// clusterErrorStyle := lipgloss.NewStyle().PaddingLeft(4).Foreground(errorColor) //nolint:gomnd
58+
59+
// encounteredErrors := false
60+
61+
// for _, c := range clusters {
62+
// outputKey := fmt.Sprintf("%s/%s", c.Namespace, c.Name)
63+
// doc.WriteString(clusterStyle.Render(fmt.Sprintf("Cluster %s:", outputKey)) + "\n")
64+
65+
// out, err := ioutil.ReadAll(migrationOutput.Get(outputKey))
66+
// if err != nil {
67+
// fmt.Fprintf(os.Stderr, "Error reading output for cluster %s: %v\n", outputKey, err)
68+
// }
69+
70+
// if len(out) > 0 {
71+
// doc.WriteString(outputHeadings.Render("Output:") + "\n")
72+
// doc.WriteString(clusterOutputStyle.Render(string(out)) + "\n")
73+
// }
74+
75+
// if err, ok := migrationErrors.Load(outputKey); ok {
76+
// encounteredErrors = true
77+
78+
// doc.WriteString(clusterErrorStyle.Render(fmt.Sprintf("Error: %s", err.Error())) + "\n\n")
79+
// }
80+
// }
81+
82+
// if physicalWidth > 0 {
83+
// docStyle = docStyle.MaxWidth(physicalWidth)
84+
// } else if physicalWidth < 0 {
85+
// docStyle = docStyle.MaxWidth(80) //nolint:gomnd
86+
// }
87+
88+
// fmt.Println(docStyle.Render(doc.String())) //nolint:forbidigo
89+
90+
// return encounteredErrors
91+
// }
92+
93+
func copyBuffers() {
94+
buffMutex.Lock()
95+
defer buffMutex.Unlock()
6096

61-
config, err := kubeConfig.ClientConfig()
62-
if err != nil {
63-
return nil, fmt.Errorf("failed to create client configuration for management cluster: %w", err)
64-
}
97+
for _, c := range clusters {
98+
outputKey := fmt.Sprintf("%s/%s", c.Namespace, c.Name)
6599

66-
scheme := runtime.NewScheme()
67-
_ = clientgoscheme.AddToScheme(scheme)
68-
_ = clusterv1.AddToScheme(scheme)
100+
out, err := ioutil.ReadAll(migrationOutputBuffers.Get(outputKey))
101+
if err != nil {
102+
continue
103+
}
69104

70-
return client.New(config, client.Options{Scheme: scheme}) //nolint:exhaustivestruct,wrapcheck
105+
migrationOutput.Append(outputKey, string(out))
106+
}
71107
}
72108

73-
func runMigration(ctx context.Context) error {
74-
mgmtClient, err := getManagementClient(kubeconfig)
109+
func runMigration() tea.Msg {
110+
mgmtClient, err := util.GetManagementClient(kubeconfig)
75111
if err != nil {
76-
return fmt.Errorf("failed to create client for management cluster: %w", err)
112+
return fatalErr(fmt.Errorf("failed to create client for management cluster: %w", err))
77113
}
78114

79115
clusterList := &clusterv1.ClusterList{} //nolint:exhaustivestruct
80-
if err := mgmtClient.List(ctx, clusterList); err != nil {
81-
return fmt.Errorf("failed to list workload clusters in management cluster: %w", err)
116+
if err := mgmtClient.List(context.TODO(), clusterList); err != nil {
117+
return fatalErr(fmt.Errorf("failed to list workload clusters in management cluster: %w", err))
82118
}
83119

84-
var (
85-
wg sync.WaitGroup
86-
mu sync.Mutex
87-
)
120+
var wg sync.WaitGroup
88121

89-
migrationOutput := make(map[string]*bytes.Buffer, len(clusterList.Items))
90-
migrationErrors := make(map[string]error, len(clusterList.Items))
122+
clusters = clusterList.Items
123+
migrationOutputBuffers, migrationErrors = util.RunMigration(context.TODO(), mgmtClient, clusterList.Items, &wg)
124+
migrationOutput = util.NewOutputCollection(len(clusters))
91125

92-
for i, c := range clusterList.Items {
93-
outputKey := fmt.Sprintf("%s/%s", c.Namespace, c.Name)
126+
wg.Wait()
94127

95-
clusterKey, err := client.ObjectKeyFromObject(&clusterList.Items[i])
96-
if err != nil {
97-
mu.Lock()
98-
migrationErrors[outputKey] = fmt.Errorf("failed to create object key: %w", err)
99-
mu.Unlock()
128+
return cleanQuit()
129+
}
100130

101-
continue
102-
}
131+
type model struct {
132+
spinner spinner.Model
133+
err error
134+
}
103135

104-
var buf bytes.Buffer
136+
type fatalErr error
105137

106-
mu.Lock()
107-
migrationOutput[outputKey] = &buf
108-
mu.Unlock()
109-
wg.Add(1)
138+
func cleanQuit() tea.Msg {
139+
copyBuffers()
110140

111-
go func() {
112-
defer wg.Done()
141+
// This is to ensure that the buffers are flushed to stdout prior to exiting
142+
time.Sleep(time.Second)
113143

114-
if err := migrateWorkloadCluster(context.TODO(), clusterKey, mgmtClient, &buf); err != nil {
115-
mu.Lock()
116-
migrationErrors[outputKey] = err
117-
mu.Unlock()
144+
return tea.Quit()
145+
}
118146

119-
return
120-
}
121-
}()
122-
}
147+
func initialModel() model {
148+
s := spinner.NewModel()
149+
s.Spinner = spinner.Pulse
150+
s.Style = lipgloss.NewStyle().Foreground(lipgloss.Color("205"))
123151

124-
wg.Wait()
152+
return model{spinner: s} //nolint:exhaustivestruct
153+
}
154+
155+
func (m model) Init() tea.Cmd {
156+
return tea.Batch(runMigration, spinner.Tick)
157+
}
158+
159+
func (m model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
160+
var cmd tea.Cmd
125161

126-
if outputResults(clusterList.Items, migrationOutput, migrationErrors) {
127-
return fmt.Errorf("failed to run migration to completion") //nolint:goerr113
162+
switch msg := msg.(type) {
163+
case tea.KeyMsg:
164+
switch msg.String() {
165+
case "ctrl+c", "esc":
166+
cmd = cleanQuit
167+
}
168+
case fatalErr:
169+
m.err = msg
170+
cmd = cleanQuit
171+
default:
172+
copyBuffers()
173+
174+
m.spinner, cmd = m.spinner.Update(msg)
128175
}
129176

130-
return nil
177+
return m, cmd
131178
}
132179

133-
func outputResults(
134-
clusters []clusterv1.Cluster,
135-
migrationOutput map[string]*bytes.Buffer,
136-
migrationErrors map[string]error,
137-
) bool {
138-
doc := strings.Builder{}
139-
physicalWidth, _, _ := term.GetSize(int(os.Stdout.Fd()))
180+
func (m model) View() string {
181+
// TODO: handle window size detection and text wrapping
140182
errorColor := lipgloss.Color("#dc322f")
141183
infoColor := lipgloss.Color("#859900")
142184
headingColor := lipgloss.Color("#268bd2")
143-
docStyle := lipgloss.NewStyle().Padding(1, 2, 1, 2)
144185
clusterStyle := lipgloss.NewStyle().Foreground(headingColor)
145186
outputHeadings := clusterStyle.Copy().PaddingLeft(4) //nolint:gomnd
146187
clusterOutputStyle := lipgloss.NewStyle().PaddingLeft(8).Foreground(infoColor) //nolint:gomnd
147188
clusterErrorStyle := lipgloss.NewStyle().PaddingLeft(4).Foreground(errorColor) //nolint:gomnd
148189

149-
encounteredErrors := false
190+
s := "CAPP Migration\n\n"
191+
192+
s += fmt.Sprintf("%s Running...\n\n", m.spinner.View())
193+
194+
if m.err != nil {
195+
s += fmt.Sprintf("Error: %v\n", m.err)
196+
}
150197

151198
for _, c := range clusters {
152199
outputKey := fmt.Sprintf("%s/%s", c.Namespace, c.Name)
153-
doc.WriteString(clusterStyle.Render(fmt.Sprintf("Cluster %s:", outputKey)) + "\n")
200+
s += clusterStyle.Render(fmt.Sprintf("Cluster %s:", outputKey)) + "\n"
154201

155-
out, err := ioutil.ReadAll(migrationOutput[outputKey])
156-
if err != nil {
157-
fmt.Fprintf(os.Stderr, "Error reading output for cluster %s: %v\n", outputKey, err)
158-
}
202+
out := migrationOutput.Get(outputKey)
159203

160204
if len(out) > 0 {
161-
doc.WriteString(outputHeadings.Render("Output:") + "\n")
162-
doc.WriteString(clusterOutputStyle.Render(string(out)) + "\n")
205+
s += outputHeadings.Render("Output:") + "\n"
206+
s += clusterOutputStyle.Render(out) + "\n"
163207
}
164208

165-
if err, ok := migrationErrors[outputKey]; ok {
166-
encounteredErrors = true
167-
168-
doc.WriteString(clusterErrorStyle.Render(fmt.Sprintf("Error: %s", err.Error())) + "\n\n")
209+
if err, ok := migrationErrors.Load(outputKey); ok {
210+
s += clusterErrorStyle.Render(fmt.Sprintf("Error: %s", err.Error())) + "\n"
169211
}
170212
}
171213

172-
if physicalWidth > 0 {
173-
docStyle = docStyle.MaxWidth(physicalWidth)
174-
} else if physicalWidth < 0 {
175-
docStyle = docStyle.MaxWidth(80) //nolint:gomnd
176-
}
177-
178-
fmt.Println(docStyle.Render(doc.String())) //nolint:forbidigo
179-
180-
return encounteredErrors
214+
return s
181215
}
182216

183-
func migrateWorkloadCluster(
184-
ctx context.Context,
185-
clusterKey client.ObjectKey,
186-
mgmtClient client.Client,
187-
stdout io.Writer,
188-
) error {
189-
c, err := remote.NewClusterClient(ctx, mgmtClient, clusterKey, nil)
190-
if err != nil {
191-
return fmt.Errorf("failed to create client: %w", err)
192-
}
193-
194-
nodeList := &corev1.NodeList{} //nolint:exhaustivestruct
195-
if err := c.List(ctx, nodeList); err != nil {
196-
return fmt.Errorf("failed to list nodes: %w", err)
197-
}
198-
199-
for _, node := range nodeList.Items {
200-
// TODO: should this stop at first error, or attempt to continue?
201-
// TODO: should probably give some additional safety to users since this will be
202-
// deleting and re-creating Node resources
203-
if err := migrateNode(ctx, node, c, stdout); err != nil {
204-
return err
205-
}
206-
}
217+
func main() {
218+
rand.Seed(time.Now().UTC().UnixNano())
207219

208-
return nil
209-
}
220+
var showHelp bool
210221

211-
func migrateNode(ctx context.Context, node corev1.Node, workloadClient client.Client, stdout io.Writer) error {
212-
if strings.HasPrefix(node.Spec.ProviderID, "equinixmetal") {
213-
fmt.Fprintf(stdout, "✔ Node %s already has the updated providerID\n", node.Name)
222+
flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ExitOnError)
214223

215-
return nil
216-
}
217-
218-
if err := workloadClient.Delete(ctx, &node); err != nil {
219-
return fmt.Errorf("failed to delete existing node resource: %w", err)
220-
}
224+
flag.StringVar(&kubeconfig, "kubeconfig", "",
225+
"Path to the kubeconfig for the management cluster. If unspecified, default discovery rules apply.")
226+
flag.BoolVar(&showHelp, "h", false, "show help")
227+
flag.Parse()
221228

222-
node.SetResourceVersion("")
223-
node.Spec.ProviderID = strings.Replace(node.Spec.ProviderID, "packet", "equinixmetal", 1)
224-
225-
if err := retry.OnError(
226-
retry.DefaultRetry,
227-
func(err error) bool {
228-
return true
229-
},
230-
func() error {
231-
return workloadClient.Create(ctx, &node) //nolint:wrapcheck
232-
},
233-
); err != nil {
234-
return fmt.Errorf("failed to create replacement node resource: %w", err)
229+
if showHelp {
230+
flag.Usage()
231+
os.Exit(0)
235232
}
236233

237-
fmt.Fprintf(stdout, "✅ Node %s has been successfully migrated\n", node.Name)
234+
m := initialModel()
238235

239-
return nil
240-
}
241-
242-
func init() { //nolint:gochecknoinits
243-
migrationCmd.Flags().StringVar(&kubeconfig, "kubeconfig", "",
244-
"Path to the kubeconfig for the management cluster. If unspecified, default discovery rules apply.")
245-
}
246-
247-
func main() {
248-
if err := migrationCmd.Execute(); err != nil {
236+
p := tea.NewProgram(m)
237+
if err := p.Start(); err != nil {
238+
fmt.Println("Error starting Bubble Tea program:", err) //nolint: forbidigo
249239
os.Exit(1)
250240
}
251241
}

0 commit comments

Comments
 (0)