Skip to content

[Fix]: Increase distributed computing to improve performance #113

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 13 additions & 19 deletions planar/strategy.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,24 @@
package planar

import (
"sync"

"github.com/spatial-go/geoos/space/topograph"
"github.com/spatial-go/geoos/space"
)

var algorithmMegrez Algorithm
var once sync.Once
type Strategy interface {
Area(space.Geometry) (float64, error)
ToMultiPart(g space.Geometry) (space.Geometry, error)
}

type newAlgorithm func() Algorithm
type normalStrategy struct{}

// NormalStrategy returns normal algorithm.
func NormalStrategy() Algorithm {
return GetStrategy(NewMegrezAlgorithm)
func NormalStrategy() Strategy {
return normalStrategy{}
}

// GetStrategy returns algorithm by new Algorithm.
func GetStrategy(f newAlgorithm) Algorithm {
return f()
func (s normalStrategy) ToMultiPart(g space.Geometry) (space.Geometry, error) {
return space.ToMultiPart(g)
}

// NewMegrezAlgorithm returns Algorithm that is MegrezAlgorithm.
func NewMegrezAlgorithm() Algorithm {
once.Do(func() {
algorithmMegrez = &megrezAlgorithm{topograph.NormalRelationship()}
})
return algorithmMegrez
}
func (s normalStrategy) Area(g space.Geometry) (float64, error) {
return 0, nil // Placeholder
}
25 changes: 25 additions & 0 deletions space/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,31 @@ func (c Collection) BufferInMeter(width float64, quadsegs int) Geometry {
return pg.bufferInMeter(width, quadsegs)
}

func (c Collection) BufferInMeterDistributed(width float64, quadsegs int, workers int) Geometry {
if workers <= 1 || len(c) == 0 {
return c.BufferInMeter(width, quadsegs) // Fallback to single-threaded
}

pool := NewWorkerPool(workers)
for _, geom := range c {
geomCopy := geom // Capture in closure
pool.AddTask(func() Geometry {
switch g := geomCopy.(type) {
case Ring:
return g.BufferInMeter(width, quadsegs)
case Polygon:
return g.BufferInMeter(width, quadsegs)
case Collection:
return g.BufferInMeter(width, quadsegs)
default:
return geomCopy.BufferInMeter(width, quadsegs)
}
})
}

return pool.Wait()
}

// Envelope returns the minimum bounding box for the supplied geometry, as a geometry.
// The polygon is defined by the corner points of the bounding box
// ((MINX, MINY), (MINX, MAXY), (MAXX, MAXY), (MAXX, MINY), (MINX, MINY)).
Expand Down
72 changes: 72 additions & 0 deletions space/distributed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package space

import (
"sync"
)

// WorkerPool manages a pool of workers for distributed geometry processing.
type WorkerPool struct {
workers int
tasks chan func() Geometry
results chan Geometry
wg sync.WaitGroup
}

// NewWorkerPool initializes a worker pool with a specified number of workers.
func NewWorkerPool(workers int) *WorkerPool {
pool := &WorkerPool{
workers: workers,
tasks: make(chan func() Geometry),
results: make(chan Geometry, workers),
}
pool.start()
return pool
}

// start launches the worker goroutines.
func (p *WorkerPool) start() {
for i := 0; i < p.workers; i++ {
p.wg.Add(1)
go func() {
defer p.wg.Done()
for task := range p.tasks {
result := task()
p.results <- result
}
}()
}
}

// AddTask adds a task to the worker pool.
func (p *WorkerPool) AddTask(task func() Geometry) {
p.tasks <- task
}

// Wait waits for all tasks to complete and returns merged results as a Geometry.
func (p *WorkerPool) Wait() Geometry {
close(p.tasks)
p.wg.Wait()
close(p.results)

var results []Geometry
for result := range p.results {
results = append(results, result)
}
return mergeGeometries(results)
}

// mergeGeometries combines multiple Geometry results into a single Geometry.
func mergeGeometries(geometries []Geometry) Geometry {
if len(geometries) == 0 {
return nil
}
if len(geometries) == 1 {
return geometries[0]
}
// For simplicity, assume results are Polygons or Rings and create a Collection
collection := Collection{}
for _, g := range geometries {
collection = append(collection, g)
}
return collection
}
167 changes: 167 additions & 0 deletions space/distributed_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package space

import (
"testing"
"github.com/spatial-go/geoos/algorithm/matrix"
)

func TestRingBufferInMeterDistributed(t *testing.T) {
tests := []struct {
name string
ring Ring
width float64
quadsegs int
workers int
wantEmpty bool
}{
{
name: "Valid Ring with 2 Workers",
ring: Ring{{0, 0}, {1, 0}, {2, 0}, {3, 0}, {0, 0}},
width: 10,
quadsegs: 8,
workers: 2,
wantEmpty: false,
},
{
name: "Single Point Ring (Fallback)",
ring: Ring{{0, 0}},
width: 10,
quadsegs: 8,
workers: 2,
wantEmpty: true, // Should fallback and handle empty/invalid gracefully
},
{
name: "One Worker (Fallback)",
ring: Ring{{0, 0}, {1, 0}, {2, 0}, {0, 0}},
width: 10,
quadsegs: 8,
workers: 1,
wantEmpty: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := tt.ring.BufferInMeterDistributed(tt.width, tt.quadsegs, tt.workers)
if result == nil || result.IsEmpty() != tt.wantEmpty {
t.Errorf("%s: expected empty=%v, got %v", tt.name, tt.wantEmpty, result)
}
})
}
}

func TestPolygonBufferInMeterDistributed(t *testing.T) {
tests := []struct {
name string
poly Polygon
width float64
quadsegs int
workers int
wantEmpty bool
}{
{
name: "Valid Polygon with 2 Workers",
poly: Polygon{{{0, 0}, {1, 0}, {1, 1}, {0, 1}, {0, 0}}},
width: 10,
quadsegs: 8,
workers: 2,
wantEmpty: false,
},
{
name: "Empty Polygon",
poly: Polygon{},
width: 10,
quadsegs: 8,
workers: 2,
wantEmpty: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := tt.poly.BufferInMeterDistributed(tt.width, tt.quadsegs, tt.workers)
if result == nil || result.IsEmpty() != tt.wantEmpty {
t.Errorf("%s: expected empty=%v, got %v", tt.name, tt.wantEmpty, result)
}
})
}
}

func TestCollectionBufferInMeterDistributed(t *testing.T) {
ring := Ring{{0, 0}, {1, 0}, {2, 0}, {3, 0}, {0, 0}}
poly := Polygon{{{0, 0}, {1, 0}, {1, 1}, {0, 1}, {0, 0}}}
tests := []struct {
name string
coll Collection
width float64
quadsegs int
workers int
wantEmpty bool
}{
{
name: "Valid Collection with 2 Workers",
coll: Collection{ring, poly},
width: 10,
quadsegs: 8,
workers: 2,
wantEmpty: false,
},
{
name: "Empty Collection",
coll: Collection{},
width: 10,
quadsegs: 8,
workers: 2,
wantEmpty: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := tt.coll.BufferInMeterDistributed(tt.width, tt.quadsegs, tt.workers)
if result == nil || result.IsEmpty() != tt.wantEmpty {
t.Errorf("%s: expected empty=%v, got %v", tt.name, tt.wantEmpty, result)
}
})
}
}

func BenchmarkBufferInMeterDistributed(b *testing.B) {
ring := Ring(make(LineString, 1000))
for i := 0; i < 1000; i++ {
ring[i] = Point{float64(i), 0}
}
poly := Polygon{matrix.LineMatrix(ring)}
coll := Collection{ring, poly}

b.Run("RingSingle", func(b *testing.B) {
for i := 0; i < b.N; i++ {
ring.BufferInMeter(10, 8)
}
})
b.Run("RingDistributed4", func(b *testing.B) {
for i := 0; i < b.N; i++ {
ring.BufferInMeterDistributed(10, 8, 4)
}
})
b.Run("PolySingle", func(b *testing.B) {
for i := 0; i < b.N; i++ {
poly.BufferInMeter(10, 8)
}
})
b.Run("PolyDistributed4", func(b *testing.B) {
for i := 0; i < b.N; i++ {
poly.BufferInMeterDistributed(10, 8, 4)
}
})
b.Run("CollSingle", func(b *testing.B) {
for i := 0; i < b.N; i++ {
coll.BufferInMeter(10, 8)
}
})
b.Run("CollDistributed4", func(b *testing.B) {
for i := 0; i < b.N; i++ {
coll.BufferInMeterDistributed(10, 8, 4)
}
})
}
17 changes: 17 additions & 0 deletions space/polygon.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,23 @@ func (p Polygon) BufferInMeter(width float64, quadsegs int) Geometry {
return pg.bufferInMeter(width, quadsegs)
}

func (p Polygon) BufferInMeterDistributed(width float64, quadsegs int, workers int) Geometry {
if workers <= 1 || len(p) == 0 {
return p.BufferInMeter(width, quadsegs) // Fallback to single-threaded
}

pool := NewWorkerPool(workers)
for _, ring := range p {
ringCopy := Ring(ring) // Capture in closure
pool.AddTask(func() Geometry {
return ringCopy.BufferInMeter(width, quadsegs)
})
}

return pool.Wait()
}


// Envelope returns the minimum bounding box for the supplied geometry, as a geometry.
// The polygon is defined by the corner points of the bounding box
// ((MINX, MINY), (MINX, MAXY), (MAXX, MAXY), (MAXX, MINY), (MINX, MINY)).
Expand Down
23 changes: 23 additions & 0 deletions space/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,29 @@ func (r Ring) BufferInMeter(width float64, quadsegs int) Geometry {
return LineString(r).BufferInMeter(width, quadsegs)
}

// BufferInMeterDistributed buffers the Ring using distributed computing.
func (r Ring) BufferInMeterDistributed(width float64, quadsegs int, workers int) Geometry {
if workers <= 1 || len(r) < 2 {
return r.BufferInMeter(width, quadsegs) // Fallback to single-threaded
}

pool := NewWorkerPool(workers)
chunkSize := (len(r) + workers - 1) / workers // Divide points into chunks

for i := 0; i < len(r)-1; i += chunkSize {
end := i + chunkSize
if end > len(r) {
end = len(r)
}
segment := r[i:end]
pool.AddTask(func() Geometry {
return LineString(segment).BufferInMeter(width, quadsegs)
})
}

return pool.Wait()
}

// Envelope returns the minimum bounding box for the supplied geometry, as a geometry.
// The polygon is defined by the corner points of the bounding box
// ((MINX, MINY), (MINX, MAXY), (MAXX, MAXY), (MAXX, MINY), (MINX, MINY)).
Expand Down
Loading