-
-
Notifications
You must be signed in to change notification settings - Fork 2.6k
feat(p2p): Federation and AI swarms #2723
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
18 commits
Select commit
Hold shift + click to select a range
53cef0b
Wip p2p enhancements
mudler a16f2e7
get online state
mudler 6a7a5b4
Pass-by token to show in the dashboard
mudler cc3f608
Style
mudler 2b869d1
Minor fixups
mudler 692e59f
parametrize SearchID
mudler d8d0098
Refactoring
mudler a56199c
Allow to expose/bind more services
mudler bd1c9cc
Add federation
mudler 9117b18
Display federated mode in the WebUI
mudler 79b1691
Small fixups
mudler 72755f1
make federated nodes visible from the WebUI
mudler a63c1fa
Fix version display
mudler 3eafecd
improve web page
mudler d0966c4
live page update
mudler c1110f0
visual enhancements
mudler ea40097
enhancements
mudler 0a98a8e
visual enhancements
mudler File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,130 @@ | ||
package cli | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"io" | ||
"net" | ||
"time" | ||
|
||
"math/rand/v2" | ||
|
||
cliContext "github.com/mudler/LocalAI/core/cli/context" | ||
"github.com/mudler/LocalAI/core/p2p" | ||
"github.com/mudler/edgevpn/pkg/node" | ||
"github.com/mudler/edgevpn/pkg/protocol" | ||
"github.com/mudler/edgevpn/pkg/types" | ||
"github.com/rs/zerolog/log" | ||
) | ||
|
||
type FederatedCLI struct { | ||
Address string `env:"LOCALAI_ADDRESS,ADDRESS" default:":8080" help:"Bind address for the API server" group:"api"` | ||
Peer2PeerToken string `env:"LOCALAI_P2P_TOKEN,P2P_TOKEN,TOKEN" name:"p2ptoken" help:"Token for P2P mode (optional)" group:"p2p"` | ||
} | ||
|
||
func (f *FederatedCLI) Run(ctx *cliContext.Context) error { | ||
|
||
n, err := p2p.NewNode(f.Peer2PeerToken) | ||
if err != nil { | ||
return fmt.Errorf("creating a new node: %w", err) | ||
} | ||
err = n.Start(context.Background()) | ||
if err != nil { | ||
return fmt.Errorf("creating a new node: %w", err) | ||
} | ||
|
||
if err := p2p.ServiceDiscoverer(context.Background(), n, f.Peer2PeerToken, p2p.FederatedID, nil); err != nil { | ||
return err | ||
} | ||
|
||
return Proxy(context.Background(), n, f.Address, p2p.FederatedID) | ||
} | ||
|
||
func Proxy(ctx context.Context, node *node.Node, listenAddr, service string) error { | ||
|
||
log.Info().Msgf("Allocating service '%s' on: %s", service, listenAddr) | ||
// Open local port for listening | ||
l, err := net.Listen("tcp", listenAddr) | ||
if err != nil { | ||
log.Error().Err(err).Msg("Error listening") | ||
return err | ||
} | ||
// ll.Info("Binding local port on", srcaddr) | ||
|
||
ledger, _ := node.Ledger() | ||
|
||
// Announce ourselves so nodes accepts our connection | ||
ledger.Announce( | ||
ctx, | ||
10*time.Second, | ||
func() { | ||
// Retrieve current ID for ip in the blockchain | ||
//_, found := ledger.GetKey(protocol.UsersLedgerKey, node.Host().ID().String()) | ||
// If mismatch, update the blockchain | ||
//if !found { | ||
updatedMap := map[string]interface{}{} | ||
updatedMap[node.Host().ID().String()] = &types.User{ | ||
PeerID: node.Host().ID().String(), | ||
Timestamp: time.Now().String(), | ||
} | ||
ledger.Add(protocol.UsersLedgerKey, updatedMap) | ||
// } | ||
}, | ||
) | ||
|
||
defer l.Close() | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return errors.New("context canceled") | ||
default: | ||
log.Debug().Msg("New for connection") | ||
// Listen for an incoming connection. | ||
conn, err := l.Accept() | ||
if err != nil { | ||
fmt.Println("Error accepting: ", err.Error()) | ||
continue | ||
} | ||
|
||
// Handle connections in a new goroutine, forwarding to the p2p service | ||
go func() { | ||
var tunnelAddresses []string | ||
for _, v := range p2p.GetAvailableNodes(p2p.FederatedID) { | ||
if v.IsOnline() { | ||
tunnelAddresses = append(tunnelAddresses, v.TunnelAddress) | ||
} else { | ||
log.Info().Msgf("Node %s is offline", v.ID) | ||
} | ||
} | ||
|
||
// open a TCP stream to one of the tunnels | ||
// chosen randomly | ||
// TODO: optimize this and track usage | ||
tunnelAddr := tunnelAddresses[rand.IntN(len(tunnelAddresses))] | ||
|
||
tunnelConn, err := net.Dial("tcp", tunnelAddr) | ||
if err != nil { | ||
log.Error().Err(err).Msg("Error connecting to tunnel") | ||
return | ||
} | ||
|
||
log.Info().Msgf("Redirecting %s to %s", conn.LocalAddr().String(), tunnelConn.RemoteAddr().String()) | ||
closer := make(chan struct{}, 2) | ||
go copyStream(closer, tunnelConn, conn) | ||
go copyStream(closer, conn, tunnelConn) | ||
<-closer | ||
|
||
tunnelConn.Close() | ||
Check warningCode scanning / gosec Errors unhandled. Warning
Errors unhandled.
|
||
conn.Close() | ||
Check warningCode scanning / gosec Errors unhandled. Warning
Errors unhandled.
|
||
// ll.Infof("(service %s) Done handling %s", serviceID, l.Addr().String()) | ||
}() | ||
} | ||
} | ||
|
||
} | ||
|
||
func copyStream(closer chan struct{}, dst io.Writer, src io.Reader) { | ||
defer func() { closer <- struct{}{} }() // connection is closed, send signal to stop proxy | ||
io.Copy(dst, src) | ||
Check warningCode scanning / gosec Errors unhandled. Warning
Errors unhandled.
|
||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.