Skip to content

Commit 1ab58ea

Browse files
authored
Fix bugs with connection switching (#2957)
* Add per blockId mutex to block controller resync * Pass initial termsize through to startJob
1 parent 27f77f2 commit 1ab58ea

File tree

3 files changed

+30
-4
lines changed

3 files changed

+30
-4
lines changed

pkg/blockcontroller/blockcontroller.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/wavetermdev/waveterm/pkg/jobcontroller"
2020
"github.com/wavetermdev/waveterm/pkg/remote"
2121
"github.com/wavetermdev/waveterm/pkg/remote/conncontroller"
22+
"github.com/wavetermdev/waveterm/pkg/util/ds"
2223
"github.com/wavetermdev/waveterm/pkg/util/shellutil"
2324
"github.com/wavetermdev/waveterm/pkg/wavebase"
2425
"github.com/wavetermdev/waveterm/pkg/waveobj"
@@ -75,10 +76,17 @@ type Controller interface {
7576

7677
// Registry for all controllers
7778
var (
78-
controllerRegistry = make(map[string]Controller)
79-
registryLock sync.RWMutex
79+
controllerRegistry = make(map[string]Controller)
80+
registryLock sync.RWMutex
81+
blockResyncMutexMap = ds.MakeSyncMap[*sync.Mutex]()
8082
)
8183

84+
func getBlockResyncMutex(blockId string) *sync.Mutex {
85+
return blockResyncMutexMap.GetOrCreate(blockId, func() *sync.Mutex {
86+
return &sync.Mutex{}
87+
})
88+
}
89+
8290
// Registry operations
8391
func getController(blockId string) Controller {
8492
registryLock.RLock()
@@ -145,6 +153,10 @@ func ResyncController(ctx context.Context, tabId string, blockId string, rtOpts
145153
return fmt.Errorf("invalid tabId or blockId passed to ResyncController")
146154
}
147155

156+
mu := getBlockResyncMutex(blockId)
157+
mu.Lock()
158+
defer mu.Unlock()
159+
148160
blockData, err := wstore.DBMustGet[*waveobj.Block](ctx, blockId)
149161
if err != nil {
150162
return fmt.Errorf("error getting block: %w", err)

pkg/blockcontroller/durableshellcontroller.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ func (dsc *DurableShellController) Start(ctx context.Context, blockMeta waveobj.
163163

164164
if jobId == "" {
165165
log.Printf("block %q starting new durable shell\n", dsc.BlockId)
166-
newJobId, err := dsc.startNewJob(ctx, blockMeta, dsc.ConnName)
166+
newJobId, err := dsc.startNewJob(ctx, blockMeta, dsc.ConnName, rtOpts)
167167
if err != nil {
168168
return fmt.Errorf("failed to start new job: %w", err)
169169
}
@@ -218,11 +218,14 @@ func (dsc *DurableShellController) SendInput(inputUnion *BlockInputUnion) error
218218
return jobcontroller.SendInput(context.Background(), data)
219219
}
220220

221-
func (dsc *DurableShellController) startNewJob(ctx context.Context, blockMeta waveobj.MetaMapType, connName string) (string, error) {
221+
func (dsc *DurableShellController) startNewJob(ctx context.Context, blockMeta waveobj.MetaMapType, connName string, rtOpts *waveobj.RuntimeOpts) (string, error) {
222222
termSize := waveobj.TermSize{
223223
Rows: shellutil.DefaultTermRows,
224224
Cols: shellutil.DefaultTermCols,
225225
}
226+
if rtOpts != nil && rtOpts.TermSize.Rows > 0 && rtOpts.TermSize.Cols > 0 {
227+
termSize = rtOpts.TermSize
228+
}
226229
cmdStr := blockMeta.GetString(waveobj.MetaKey_Cmd, "")
227230
cwd := blockMeta.GetString(waveobj.MetaKey_CmdCwd, "")
228231
opts, err := remote.ParseOpts(connName)

pkg/util/ds/syncmap.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,3 +62,14 @@ func (sm *SyncMap[T]) TestAndSet(key string, newValue T, testFn func(T, bool) bo
6262
}
6363
return false
6464
}
65+
66+
func (sm *SyncMap[T]) GetOrCreate(key string, createFn func() T) T {
67+
sm.lock.Lock()
68+
defer sm.lock.Unlock()
69+
if v, ok := sm.m[key]; ok {
70+
return v
71+
}
72+
v := createFn()
73+
sm.m[key] = v
74+
return v
75+
}

0 commit comments

Comments
 (0)