Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions pkg/proxy/parsercmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@
s.commands = nil
}

func (s *TerminalParser) GetCursorRow() string {

Check warning on line 104 in pkg/proxy/parsercmd.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove the 'Get' prefix from this function name.

See more on https://sonarcloud.io/project/issues?id=jumpserver_koko&issues=AZrCvbz3CYpU4GBVU6uX&open=AZrCvbz3CYpU4GBVU6uX&pullRequest=1944
switch s.screenType {
case LinuxScreen:
row := s.Screen.GetCursorRow()
Expand Down Expand Up @@ -368,25 +368,34 @@
func (s *TerminalParser) TryTmuxInput() string {
lastLine := s.tmuxParser.TmuxScreen.GetCursorRow()
cmd := strings.TrimPrefix(lastLine.String(), s.Ps1sStr)
s.InputBuf.Reset()
s.resetInputBuf()
return strings.TrimSpace(cmd)
}

func (s *TerminalParser) TryInput() string {
lastLine := s.Screen.GetCursorRow()
cmd := strings.TrimPrefix(lastLine.String(), s.Ps1sStr)
s.InputBuf.Reset()
s.resetInputBuf()
return strings.TrimSpace(cmd)
}

func (s *TerminalParser) TryLastRowInput() string {
rowStr := s.GetCursorRow()
cmd := strings.TrimPrefix(rowStr, s.Ps1sStr)
s.InputBuf.Reset()
s.resetInputBuf()
return strings.TrimSpace(cmd)
}

// resetInputBuf resets the input buffer and prevents memory bloat
func (s *TerminalParser) resetInputBuf() {
s.InputBuf.Reset()
// If capacity is too large, create a new buffer to release memory
if s.InputBuf.Cap() > maxBufSize {
s.InputBuf = bytes.Buffer{}
}
}

func (s *TerminalParser) GetPs1() string {

Check warning on line 398 in pkg/proxy/parsercmd.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove the 'Get' prefix from this function name.

See more on https://sonarcloud.io/project/issues?id=jumpserver_koko&issues=AZrCvbz3CYpU4GBVU6uY&open=AZrCvbz3CYpU4GBVU6uY&pullRequest=1944
rowStr := s.GetCursorRow()
return strings.TrimSuffix(rowStr, s.InputBuf.String())
}
Expand Down
12 changes: 11 additions & 1 deletion pkg/proxy/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@
"github.com/jumpserver/koko/pkg/logger"
)

// ftpReadBufferPool provides reusable 32KB buffers for FTP file operations
var ftpReadBufferPool = sync.Pool{
New: func() interface{} {
buf := make([]byte, 32*1024)
return &buf
},
}

type CommandRecorder struct {
sessionID string
storage CommandStorage
Expand Down Expand Up @@ -317,7 +325,7 @@
delete(r.ftpLogMap, id)
}

func (r *FTPFileRecorder) getFTPFile(id string) *FTPFileInfo {

Check warning on line 328 in pkg/proxy/recorder.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove the 'Get' prefix from this function name.

See more on https://sonarcloud.io/project/issues?id=jumpserver_koko&issues=AZrCvbygCYpU4GBVU6uU&open=AZrCvbygCYpU4GBVU6uU&pullRequest=1944
r.lock.RLock()
defer r.lock.RUnlock()
return r.ftpLogMap[id]
Expand Down Expand Up @@ -482,7 +490,9 @@
}

func (f *FTPFileInfo) WriteFromReader(r io.Reader) error {
buf := make([]byte, 32*1024)
bufPtr := ftpReadBufferPool.Get().(*[]byte)
buf := *bufPtr
defer ftpReadBufferPool.Put(bufPtr)
var err error
for {
nr, er := r.Read(buf)
Expand Down Expand Up @@ -528,8 +538,8 @@
return nil
}

func GetFTPFileRecorder(jmsService *service.JMService) *FTPFileRecorder {

Check warning on line 541 in pkg/proxy/recorder.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove the 'Get' prefix from this function name.

See more on https://sonarcloud.io/project/issues?id=jumpserver_koko&issues=AZrCvbygCYpU4GBVU6uV&open=AZrCvbygCYpU4GBVU6uV&pullRequest=1944
terminalConfig, _ := jmsService.GetTerminalConfig()

Check failure on line 542 in pkg/proxy/recorder.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Handle this error explicitly or document why it can be safely ignored.

See more on https://sonarcloud.io/project/issues?id=jumpserver_koko&issues=AZrCvbygCYpU4GBVU6uW&open=AZrCvbygCYpU4GBVU6uW&pullRequest=1944
maxSize := int64(terminalConfig.MaxStoreFTPFileSize) * 1024 * 1024
recorder := NewFTPFileRecord(jmsService, NewFTPFileStorage(jmsService, &terminalConfig), maxSize)
return recorder
Expand Down
79 changes: 79 additions & 0 deletions pkg/utils/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,85 @@
"sync"
)

// Buffer pool sizes for different use cases
const (
SmallBufferSize = 1024 // 1KB for small reads
MediumBufferSize = 32 * 1024 // 32KB for medium reads
LargeBufferSize = 64 * 1024 // 64KB for large reads
)

// BufferPool provides a sync.Pool based buffer pool with configurable size
type BufferPool struct {
pool sync.Pool
size int
}

// NewBufferPool creates a new buffer pool with the specified buffer size
func NewBufferPool(size int) *BufferPool {
return &BufferPool{
size: size,
pool: sync.Pool{
New: func() interface{} {
buf := make([]byte, size)
return &buf
},
},
}
}

// Get returns a buffer from the pool
func (p *BufferPool) Get() *[]byte {
return p.pool.Get().(*[]byte)
}

// Put returns a buffer to the pool
func (p *BufferPool) Put(buf *[]byte) {
if buf == nil {
return
}
// Only return buffers of the expected size to the pool
if cap(*buf) == p.size {
*buf = (*buf)[:p.size]
p.pool.Put(buf)
}
}

// Global buffer pools for common use cases
var (
// SmallBufferPool for small reads (1KB)
SmallBufferPool = NewBufferPool(SmallBufferSize)
// MediumBufferPool for medium reads (32KB)
MediumBufferPool = NewBufferPool(MediumBufferSize)
// LargeBufferPool for large reads (64KB)
LargeBufferPool = NewBufferPool(LargeBufferSize)
)

// BytesBufferPool provides a sync.Pool for bytes.Buffer instances.
// It helps reduce memory allocations by reusing bytes.Buffer objects.
// Usage pattern:
// buf := GetBytesBuffer()
// defer PutBytesBuffer(buf)
// // use buf...
var BytesBufferPool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}

// GetBytesBuffer returns a bytes.Buffer from the pool
func GetBytesBuffer() *bytes.Buffer {

Check warning on line 74 in pkg/utils/buffer.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove the 'Get' prefix from this function name.

See more on https://sonarcloud.io/project/issues?id=jumpserver_koko&issues=AZrCvbx5CYpU4GBVU6uT&open=AZrCvbx5CYpU4GBVU6uT&pullRequest=1944
return BytesBufferPool.Get().(*bytes.Buffer)
}

// PutBytesBuffer returns a bytes.Buffer to the pool after resetting it
func PutBytesBuffer(buf *bytes.Buffer) {
if buf == nil {
return
}
buf.Reset()
BytesBufferPool.Put(buf)
}

type SyncBuffer struct {
maxSize int
mu sync.Mutex
Expand Down
170 changes: 170 additions & 0 deletions pkg/utils/buffer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package utils

import (
"bytes"
"sync"
"testing"
)

func TestBufferPool(t *testing.T) {
pool := NewBufferPool(1024)

// Get a buffer
buf := pool.Get()
if buf == nil {
t.Fatal("Expected non-nil buffer")

Check failure on line 15 in pkg/utils/buffer_test.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Define a constant instead of duplicating this literal "Expected non-nil buffer" 3 times.

See more on https://sonarcloud.io/project/issues?id=jumpserver_koko&issues=AZrCvbvDCYpU4GBVU6uS&open=AZrCvbvDCYpU4GBVU6uS&pullRequest=1944
}
if len(*buf) != 1024 {
t.Fatalf("Expected buffer length 1024, got %d", len(*buf))
}

// Use the buffer
copy(*buf, []byte("test data"))

Check failure on line 22 in pkg/utils/buffer_test.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Define a constant instead of duplicating this literal "test data" 3 times.

See more on https://sonarcloud.io/project/issues?id=jumpserver_koko&issues=AZrCvbvDCYpU4GBVU6uR&open=AZrCvbvDCYpU4GBVU6uR&pullRequest=1944

// Return buffer to pool
pool.Put(buf)

// Get another buffer - should reuse the pooled one
buf2 := pool.Get()
if buf2 == nil {
t.Fatal("Expected non-nil buffer")
}
if len(*buf2) != 1024 {
t.Fatalf("Expected buffer length 1024, got %d", len(*buf2))
}
pool.Put(buf2)
}

func TestBufferPoolConcurrent(t *testing.T) {
pool := NewBufferPool(512)
var wg sync.WaitGroup

for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
buf := pool.Get()
if buf == nil {
t.Error("Expected non-nil buffer")
return
}
// Simulate some work
copy(*buf, []byte("concurrent test"))
pool.Put(buf)
}()
}
wg.Wait()
}

func TestGlobalBufferPools(t *testing.T) {
// Test SmallBufferPool
smallBuf := SmallBufferPool.Get()
if smallBuf == nil || len(*smallBuf) != SmallBufferSize {
t.Errorf("SmallBufferPool returned invalid buffer")
}
SmallBufferPool.Put(smallBuf)

// Test MediumBufferPool
mediumBuf := MediumBufferPool.Get()
if mediumBuf == nil || len(*mediumBuf) != MediumBufferSize {
t.Errorf("MediumBufferPool returned invalid buffer")
}
MediumBufferPool.Put(mediumBuf)

// Test LargeBufferPool
largeBuf := LargeBufferPool.Get()
if largeBuf == nil || len(*largeBuf) != LargeBufferSize {
t.Errorf("LargeBufferPool returned invalid buffer")
}
LargeBufferPool.Put(largeBuf)
}

func TestBytesBufferPool(t *testing.T) {
buf := GetBytesBuffer()
if buf == nil {
t.Fatal("Expected non-nil bytes.Buffer")
}

buf.WriteString("test data")
if buf.String() != "test data" {
t.Errorf("Expected 'test data', got '%s'", buf.String())
}

PutBytesBuffer(buf)

// Get another buffer
buf2 := GetBytesBuffer()
if buf2 == nil {
t.Fatal("Expected non-nil bytes.Buffer")
}
// Buffer should be reset
if buf2.Len() != 0 {
t.Errorf("Expected empty buffer, got length %d", buf2.Len())
}
PutBytesBuffer(buf2)
}

func TestSyncBuffer(t *testing.T) {
buf := NewMaxSizeBuffer(100)

// Test write within limit
data := []byte("hello world")

Check failure on line 111 in pkg/utils/buffer_test.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Define a constant instead of duplicating this literal "hello world" 3 times.

See more on https://sonarcloud.io/project/issues?id=jumpserver_koko&issues=AZrCvbvDCYpU4GBVU6uQ&open=AZrCvbvDCYpU4GBVU6uQ&pullRequest=1944
n, err := buf.Write(data)
if err != nil {
t.Fatalf("Write failed: %v", err)
}
if n != len(data) {
t.Errorf("Expected write of %d bytes, got %d", len(data), n)
}

if buf.String() != "hello world" {
t.Errorf("Expected 'hello world', got '%s'", buf.String())
}

// Test write exceeding limit
largeData := make([]byte, 100)
n, err = buf.Write(largeData)
if err != nil {
t.Fatalf("Write failed: %v", err)
}
if n != len(largeData) {
t.Errorf("Expected write of %d bytes, got %d", len(largeData), n)
}
// Should not have written as it would exceed max size
if buf.String() != "hello world" {
t.Errorf("Buffer should not have changed, got '%s'", buf.String())
}
}

func BenchmarkBufferPoolGet(b *testing.B) {
pool := NewBufferPool(1024)
b.ResetTimer()
for i := 0; i < b.N; i++ {
buf := pool.Get()
pool.Put(buf)
}
}

func BenchmarkRawBufferAlloc(b *testing.B) {
for i := 0; i < b.N; i++ {
buf := make([]byte, 1024)
_ = buf
}
}

func BenchmarkBytesBufferPoolGet(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
buf := GetBytesBuffer()
buf.WriteString("test data for benchmarking")
PutBytesBuffer(buf)
}
}

func BenchmarkRawBytesBufferAlloc(b *testing.B) {
for i := 0; i < b.N; i++ {
buf := new(bytes.Buffer)
buf.WriteString("test data for benchmarking")
_ = buf
}
}
11 changes: 10 additions & 1 deletion pkg/zmodem/zsession.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
const (
bin16HeaderLen = 7
bin32HeaderLen = 9

// maxSubPacketBufCap is the maximum capacity for subPacketBuf before resetting
// to release memory. This helps prevent memory bloat from large file transfers.
maxSubPacketBufCap = 64 * 1024
)

func DecodeHexFrameHeader(p []byte) (h ZmodemHeader, offset int, ok bool) {
Expand Down Expand Up @@ -252,6 +256,8 @@
gotZDLE bool
endSubPacket bool
)
// Reuse parsedSubPacket slice capacity instead of allocating new
s.parsedSubPacket = s.parsedSubPacket[:0]
for i := range buf {
switch buf[i] {
case ZDLE:
Expand Down Expand Up @@ -280,8 +286,11 @@
}
}
s.subPacketBuf.Reset()
// Reset subPacketBuf capacity if it's too large to avoid memory bloat
if s.subPacketBuf.Cap() > maxSubPacketBufCap {
s.subPacketBuf = bytes.Buffer{}
}
s.onSubPacket(s.parsedSubPacket)
s.parsedSubPacket = nil
s.consume(buf[offset+1:])
}

Expand Down Expand Up @@ -315,7 +324,7 @@
s.currentHd = nil
}

func (s *ZSession) getHexHeader(p []byte) {

Check warning on line 327 in pkg/zmodem/zsession.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove the 'Get' prefix from this function name.

See more on https://sonarcloud.io/project/issues?id=jumpserver_koko&issues=AZrCvb1ACYpU4GBVU6uZ&open=AZrCvb1ACYpU4GBVU6uZ&pullRequest=1944
if hd, offset, ok := DecodeHexFrameHeader(p); ok {
s.onHeader(&hd)
if s.IsNeedSubPacket() {
Expand All @@ -324,7 +333,7 @@
}
}

func (s *ZSession) getB16Header(p []byte) {

Check warning on line 336 in pkg/zmodem/zsession.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove the 'Get' prefix from this function name.

See more on https://sonarcloud.io/project/issues?id=jumpserver_koko&issues=AZrCvb1ACYpU4GBVU6ua&open=AZrCvb1ACYpU4GBVU6ua&pullRequest=1944
if hd, offset, ok := DecodeB16FrameHeader(p); ok {
s.onHeader(&hd)
if s.IsNeedSubPacket() {
Expand All @@ -333,7 +342,7 @@
}
}

func (s *ZSession) getB32Header(p []byte) {

Check warning on line 345 in pkg/zmodem/zsession.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove the 'Get' prefix from this function name.

See more on https://sonarcloud.io/project/issues?id=jumpserver_koko&issues=AZrCvb1ACYpU4GBVU6ub&open=AZrCvb1ACYpU4GBVU6ub&pullRequest=1944
if hd, offset, ok := DecodeB32FrameHeader(p); ok {
s.onHeader(&hd)
if s.IsNeedSubPacket() {
Expand Down