Skip to content

Commit

Permalink
feat: service stabilizations, send ecs tx on drip, new pruned snapsho…
Browse files Browse the repository at this point in the history
…t endpoint (#204)

* feat: set ecs component on drip, multi-tweet verification, relay fix

* feat: wip

* chore: reset the generated proto files to main

* chore: regenerated latest protobuf stubs

* chore: change to interface

* chore: move util function

* chore(snapshot): qol fixes

* feat(network): integrate getStateLatestStreamPruned into sync utils

* fix: return all fields when pruning snapshot

Co-authored-by: alvrs <alvarius@lattice.xyz>
  • Loading branch information
authcall and alvrs authored Oct 21, 2022
1 parent 687f840 commit d0de185
Show file tree
Hide file tree
Showing 28 changed files with 1,029 additions and 283 deletions.
2 changes: 2 additions & 0 deletions packages/network/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export interface NetworkConfig {
blockExplorer?: string;
cacheAgeThreshold?: number;
cacheInterval?: number;
pruneOptions?: { playerAddress: string; hashedComponentId: string };
}

export interface ClockConfig {
Expand Down Expand Up @@ -127,6 +128,7 @@ export type SyncWorkerConfig = {
cacheInterval?: number;
cacheAgeThreshold?: number;
snapshotNumChunks?: number;
pruneOptions?: { playerAddress: string; hashedComponentId: string };
};

export enum ContractSchemaValue {
Expand Down
11 changes: 7 additions & 4 deletions packages/network/src/workers/SyncWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,11 @@ export class SyncWorker<C extends Components> implements DoWork<SyncWorkerConfig
/**
* Pass a loading state component update to the main thread.
* Can be used to indicate the initial loading state on a loading screen.
* @param state {@link SyncState}
* @param msg Message to describe the current loading step.
* @param percentage Number between 0 and 100 to describe the loading progress.
* @param loadingState {
* state: {@link SyncState},
* msg: Message to describe the current loading step.
* percentage: Number between 0 and 100 to describe the loading progress.
* }
* @param blockNumber Optional: block number to pass in the component update.
*/
private setLoadingState(
Expand Down Expand Up @@ -189,7 +191,8 @@ export class SyncWorker<C extends Components> implements DoWork<SyncWorkerConfig
worldContract.address,
decode,
config.snapshotNumChunks,
(percentage: number) => this.setLoadingState({ percentage })
(percentage: number) => this.setLoadingState({ percentage }),
config.pruneOptions
);
} else {
this.setLoadingState({ state: SyncState.INITIAL, msg: "Fetching initial state from cache", percentage: 0 });
Expand Down
12 changes: 10 additions & 2 deletions packages/network/src/workers/syncUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,21 @@ export async function fetchSnapshotChunked(
worldAddress: string,
decode: ReturnType<typeof createDecode>,
numChunks = 10,
setPercentage?: (percentage: number) => void
setPercentage?: (percentage: number) => void,
pruneOptions?: { playerAddress: string; hashedComponentId: string }
): Promise<CacheStore> {
const cacheStore = createCacheStore();
const chunkPercentage = 100 / numChunks;

try {
const response = snapshotClient.getStateLatestStream({ worldAddress, chunkPercentage });
const response = pruneOptions
? snapshotClient.getStateLatestStreamPruned({
worldAddress,
chunkPercentage,
pruneAddress: pruneOptions.playerAddress,
pruneComponentId: pruneOptions.hashedComponentId,
})
: snapshotClient.getStateLatestStream({ worldAddress, chunkPercentage });
let i = 0;
for await (const responseChunk of response) {
await reduceFetchedState(responseChunk, cacheStore, decode);
Expand Down
28 changes: 16 additions & 12 deletions packages/services/cmd/faucet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ import (
)

var (
wsUrl = flag.String("ws-url", "ws://localhost:8545", "Websocket Url")
port = flag.Int("port", 50081, "gRPC Server Port")
faucetPrivateKey = flag.String("faucet-private-key", "0x", "Private key to use for faucet")
dripAmount = flag.Int64("drip-amount", 10000000000000000, "Drip amount in wei. Default to 0.01 ETH")
dripFrequency = flag.Float64("drip-frequency", 60, "Drip frequency per account in minutes. Default to 60 minutes")
dripLimit = flag.Uint64("drip-limit", 1000000000000000000, "Drip limit in wei per drip frequency interval. Default to 1 ETH")
devMode = flag.Bool("dev", false, "Flag to run the faucet in dev mode, where verification is not required. Default to false")
metricsPort = flag.Int("metrics-port", 6060, "Prometheus metrics http handler port. Defaults to port 6060")
wsUrl = flag.String("ws-url", "ws://localhost:8545", "Websocket Url")
port = flag.Int("port", 50081, "gRPC Server Port")
faucetPrivateKey = flag.String("faucet-private-key", "0x", "Private key to use for faucet")
dripAmount = flag.Int64("drip-amount", 10000000000000000, "Drip amount in wei. Default to 0.01 ETH")
dripFrequency = flag.Float64("drip-frequency", 1, "Drip frequency per account in minutes. Default to 60 minutes")
dripLimit = flag.Uint64("drip-limit", 1000000000000000000, "Drip limit in wei per drip frequency interval. Default to 1 ETH")
numLatestTweets = flag.Int("num-latest-tweets", 5, "Number of latest tweets to check per user when verifying drip tweet. Default to 5")
nameSystemAddress = flag.String("name-system-address", "", "Address of NameSystem to set an address/username mapping when verifying drip tweet. Not specified by default")
devMode = flag.Bool("dev", false, "Flag to run the faucet in dev mode, where verification is not required. Default to false")
metricsPort = flag.Int("metrics-port", 6060, "Prometheus metrics http handler port. Defaults to port 6060")
)

func main() {
Expand All @@ -40,10 +42,12 @@ func main() {

// Create a drip config.
dripConfig := &faucet.DripConfig{
DripAmount: *dripAmount,
DripFrequency: *dripFrequency,
DripLimit: *dripLimit,
DevMode: *devMode,
DripAmount: *dripAmount,
DripFrequency: *dripFrequency,
DripLimit: *dripLimit,
DevMode: *devMode,
NumLatestTweetsForVerify: *numLatestTweets,
NameSystemAddress: *nameSystemAddress,
}
logger.Info("using a drip configuration",
zap.Int64("amount", dripConfig.DripAmount),
Expand Down
4 changes: 4 additions & 0 deletions packages/services/pkg/eth/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,7 @@ func GetCurrentBalance(client *ethclient.Client, address string) (uint64, error)
}
return balance.Uint64(), nil
}

func GetCurrentNonce(client *ethclient.Client, address common.Address) (uint64, error) {
return client.PendingNonceAt(context.Background(), address)
}
17 changes: 16 additions & 1 deletion packages/services/pkg/faucet/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ type DripConfig struct {
DripFrequency float64
DripLimit uint64
DevMode bool

// Miscellaneous.
NumLatestTweetsForVerify int
NameSystemAddress string
}

func TwitterUsernameQuery(username string) string {
Expand All @@ -37,7 +41,7 @@ func FindEmojiPosition(tweetText string) (int, error) {
return i, nil
}
}
return -1, fmt.Errorf("no emoji signature found in tweet")
return -1, fmt.Errorf("no emoji signature found in tweet: %s", tweetText)
}

func ExtractSignatureFromTweet(tweet twitter.Tweet) (string, error) {
Expand All @@ -59,6 +63,17 @@ func ExtractSignatureFromTweet(tweet twitter.Tweet) (string, error) {
return out.String(), nil
}

func VerifyDripRequest(tweets []twitter.Tweet, username string, address string, numLatestTweets int) error {
for idx := 0; idx < utils.Min(len(tweets), numLatestTweets); idx++ {
err := VerifyDripRequestTweet(tweets[idx], username, address)
if err == nil {
return nil
}
logger.GetLogger().Info("error while verifying tweet", zap.String("username", username), zap.Int("tweet", idx), zap.Error(err))
}
return fmt.Errorf("did not find drip tweet in latest %d tweets from user @%s", numLatestTweets, username)
}

func VerifyDripRequestTweet(tweet twitter.Tweet, username string, address string) error {
tweetSignature, err := ExtractSignatureFromTweet(tweet)
if err != nil {
Expand Down
92 changes: 77 additions & 15 deletions packages/services/pkg/grpc/faucet.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
"crypto/ecdsa"
"fmt"
"latticexyz/mud/packages/services/pkg/eth"
"latticexyz/mud/packages/services/pkg/faucet"
"latticexyz/mud/packages/services/pkg/systems"
pb "latticexyz/mud/packages/services/protobuf/go/faucet"
"math"
"math/big"
Expand Down Expand Up @@ -125,7 +127,7 @@ func (server *faucetServer) Drip(ctx context.Context, request *pb.DripRequest) (
faucet.UpdateDripRequestTimestamp(request.Address)

return &pb.DripResponse{
TxHash: txHash,
DripTxHash: txHash,
}, nil
}

Expand All @@ -144,7 +146,7 @@ func (server *faucetServer) DripDev(ctx context.Context, request *pb.DripDevRequ
}

return &pb.DripResponse{
TxHash: txHash,
DripTxHash: txHash,
}, nil
}

Expand All @@ -162,6 +164,8 @@ func (server *faucetServer) DripVerifyTweet(ctx context.Context, request *pb.Dri
return nil, fmt.Errorf("address required")
}

server.logger.Info("requesting verification via twitter", zap.String("username", request.Username), zap.String("address", request.Address))

// Verify that this request has a valid address / username pairing.
// First verify that no other username is linked to address.
linkedUsername := faucet.GetUsernameForAddress(request.Address)
Expand All @@ -180,10 +184,10 @@ func (server *faucetServer) DripVerifyTweet(ctx context.Context, request *pb.Dri
return nil, err
}

query := faucet.TwitterUsernameQuery(request.Username)
search, resp, err := server.twitterClient.Search.Tweets(&twitter.SearchTweetParams{
Query: query,
TweetMode: "extended",
tweets, resp, err := server.twitterClient.Timelines.UserTimeline(&twitter.UserTimelineParams{
ScreenName: request.Username,
TweetMode: "extended",
Count: server.dripConfig.NumLatestTweetsForVerify,
})

if err != nil {
Expand All @@ -195,14 +199,13 @@ func (server *faucetServer) DripVerifyTweet(ctx context.Context, request *pb.Dri
return nil, fmt.Errorf("response not 200-OK from Twitter API")
}

if len(search.Statuses) == 0 {
server.logger.Error("twitter search did not return any tweets matching query", zap.String("query", query))
if len(tweets) == 0 {
server.logger.Error("twitter search did not return any tweets from timeline")
return nil, fmt.Errorf("did not find the tweet")
}
latestTweet := search.Statuses[0]

// Verify the signature inside of the tweet.
err = faucet.VerifyDripRequestTweet(latestTweet, request.Username, request.Address)
err = faucet.VerifyDripRequest(tweets, request.Username, request.Address, server.dripConfig.NumLatestTweetsForVerify)
if err != nil {
server.logger.Error("tweet drip request verification failed", zap.Error(err))
return nil, err
Expand All @@ -212,6 +215,15 @@ func (server *faucetServer) DripVerifyTweet(ctx context.Context, request *pb.Dri
zap.String("address", request.Address),
)

// Send a tx to link on ECS NameSystem if a system address is specified.
ecsTxHash := ""
if len(server.dripConfig.NameSystemAddress) > 0 {
ecsTxHash, err = server.SendNameSystemTransaction(request.Address, request.Username)
if err != nil {
return nil, err
}
}

// Send a tx dripping the funds.
txHash, err := server.SendDripTransaction(request.Address)
if err != nil {
Expand All @@ -229,13 +241,63 @@ func (server *faucetServer) DripVerifyTweet(ctx context.Context, request *pb.Dri
}

return &pb.DripResponse{
TxHash: txHash,
DripTxHash: txHash,
EcsTxHash: ecsTxHash,
}, nil
}

func (server *faucetServer) GetFaucetAddress() common.Address {
return crypto.PubkeyToAddress(*server.publicKey)
}

func (server *faucetServer) SendNameSystemTransaction(recipientAddress string, recipientUsername string) (string, error) {
nonce, err := eth.GetCurrentNonce(server.ethClient, server.GetFaucetAddress())
if err != nil {
return "", err
}

value := big.NewInt(0)
gasLimit := uint64(1000000)

gasPrice, err := server.ethClient.SuggestGasPrice(context.Background())
if err != nil {
return "", err
}

toNameSystemAddress := common.HexToAddress(server.dripConfig.NameSystemAddress)
input, err := systems.GetSystemsABI().Pack("executeTyped", common.HexToAddress(recipientAddress).Hash().Big(), recipientUsername)
if err != nil {
return "", err
}

tx := types.NewTransaction(nonce, toNameSystemAddress, value, gasLimit, gasPrice, input)

// Get the chain ID.
chainID, err := server.ethClient.NetworkID(context.Background())
if err != nil {
return "", err
}

// Sign the transaction.
signedTx, err := types.SignTx(tx, types.NewEIP155Signer(chainID), server.privateKey)
if err != nil {
return "", err
}

// Send the transaction.
err = server.ethClient.SendTransaction(context.Background(), signedTx)
if err != nil {
return "", err
}

txHash := signedTx.Hash().Hex()
server.logger.Info("name system tx sent", zap.String("tx", txHash))

return txHash, nil
}

func (server *faucetServer) SendDripTransaction(recipientAddress string) (string, error) {
fromAddress := crypto.PubkeyToAddress(*server.publicKey)
nonce, err := server.ethClient.PendingNonceAt(context.Background(), fromAddress)
nonce, err := eth.GetCurrentNonce(server.ethClient, server.GetFaucetAddress())
if err != nil {
return "", err
}
Expand Down Expand Up @@ -281,7 +343,7 @@ func (server *faucetServer) GetLinkedTwitterForAddress(ctx context.Context, requ

server.logger.Info("getting linked username for address",
zap.String("address", request.Address),
zap.String("username", linkedUsername),
zap.String("linkedUsername", linkedUsername),
)

return &pb.LinkedTwitterForAddressResponse{
Expand All @@ -294,7 +356,7 @@ func (server *faucetServer) GetLinkedAddressForTwitter(ctx context.Context, requ

server.logger.Info("getting linked address for username",
zap.String("username", request.Username),
zap.String("address", linkedAddress),
zap.String("linkedAddress", linkedAddress),
)
return &pb.LinkedAddressForTwitterResponse{
Address: linkedAddress,
Expand Down
8 changes: 3 additions & 5 deletions packages/services/pkg/grpc/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,8 @@ func (server *ecsRelayServer) OpenStream(signature *pb.Signature, stream pb.ECSR
client.Connect()
client.Ping()

server.logger.Info("opened stream", zap.String("client", identity.Name))

relayedMessagesChannel := client.GetChannel()
for {
select {
Expand All @@ -228,11 +230,7 @@ func (server *ecsRelayServer) OpenStream(signature *pb.Signature, stream pb.ECSR
}
return nil
case relayedMessage := <-relayedMessagesChannel:
if relayedMessage == nil {
server.logger.Warn("relayed message is nil")
} else {
stream.Send(relayedMessage)
}
stream.Send(relayedMessage)
}
}
}
Expand Down
30 changes: 30 additions & 0 deletions packages/services/pkg/grpc/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,36 @@ func (server *ecsSnapshotServer) GetStateLatestStream(in *pb.ECSStateRequestLate
return nil
}

func (server *ecsSnapshotServer) GetStateLatestStreamPruned(request *pb.ECSStateRequestLatestStreamPruned, stream pb.ECSStateSnapshotService_GetStateLatestStreamPrunedServer) error {
if !snapshot.IsSnaphotAvailableLatest(request.WorldAddress) {
return fmt.Errorf("no snapshot")
}
if len(request.PruneAddress) == 0 {
return fmt.Errorf("address for which to prune for required")
}
latestSnapshot := snapshot.RawReadStateSnapshotLatest(request.WorldAddress)
latestSnapshotPruned := snapshot.PruneSnapshotOwnedByComponent(latestSnapshot, request.PruneAddress)

// Respond in fraction chunks. If request has specified a chunk percentage, use that value.
chunkPercentage := server.config.DefaultSnapshotChunkPercentage
if request.ChunkPercentage != nil {
chunkPercentage = int(*request.ChunkPercentage)
}

latestSnapshotChunked := snapshot.ChunkRawStateSnapshot(latestSnapshotPruned, chunkPercentage)

for _, snapshotChunk := range latestSnapshotChunked {
stream.Send(&pb.ECSStateReply{
State: snapshotChunk.State,
StateComponents: snapshotChunk.StateComponents,
StateEntities: snapshotChunk.StateEntities,
StateHash: snapshotChunk.StateHash,
BlockNumber: snapshotChunk.EndBlockNumber,
})
}
return nil
}

// GetStateLatestStream is a gRPC endpoint that returns the block number for the latest available
// snapshot, if any, for a given WorldAddress provided via ECSStateBlockRequestLatest.
func (server *ecsSnapshotServer) GetStateBlockLatest(ctx context.Context, in *pb.ECSStateBlockRequestLatest) (*pb.ECSStateBlockReply, error) {
Expand Down
Loading

0 comments on commit d0de185

Please sign in to comment.