diff --git a/tools/index/README.md b/tools/index/README.md index 277ab29c..fb9eba83 100644 --- a/tools/index/README.md +++ b/tools/index/README.md @@ -2,18 +2,18 @@ 主要有两个功能,一个是给未生成索引的 active 订单生成索引,另一个是迁移 top index 到 MongoDB,迁移 shard 到 MySQL。 -### 编译 +## 编译 -``` +```bash make index ``` -### 生成索引 +## 生成索引 先去 droplet 获取订单状态是 active 的订单,然后去遍历 car 文件,如果被 active 订单使用且未生成索引,则为其生成索引。 -* --car-dir:存储 car 文件的目录。 -* --index-dir:存储索引文件的目录,`droplet` 默认在 `~/.droplet/dagstore/index`。 +* --car-dir:存储 car 文件的目录,需要用绝对路径。 +* --index-dir:存储索引文件的目录,需要用绝对路径,`droplet` 默认在 `~/.droplet/dagstore/index`。 * --mongo-url:MongoDB 的连接地址,用于存储 top index,数据库是 `market_index`,collection 是 `top_index`。 * --mysql-url:MySQL 的连接地址,用于存储 shard 状态,要和 `droplet` 使用同一个数据库,表名是 `shards`。 * --droplet-url:droplet 服务的 RPC 地址。 @@ -21,6 +21,7 @@ make index * --start:订单创建时间需大于设置的值。 * --end:订单创建时间需小于设置的值。 * --concurrency:生成索引的并发数,默认是 1。 +* --miner-addr:指定 miner 生成索引,未设置则给所有 miner 生成索引。 ```bash ./index-tool gen-index \ @@ -34,7 +35,7 @@ make index > 成功生成索引会输出类似日志:`generate index success: xxxxxx` -### 迁移索引 +## 迁移索引 目前 top index 和 shard 都是存储在 badger,这样多个 droplet 时不能共享,所有需要把 top index 存储到 MongoDB,shard 存储到 MySQL,方便共享数据。 diff --git a/tools/index/main.go b/tools/index/main.go index 47c09b01..ab7ecd40 100644 --- a/tools/index/main.go +++ b/tools/index/main.go @@ -15,6 +15,7 @@ import ( "github.com/filecoin-project/dagstore/index" "github.com/filecoin-project/dagstore/shard" "github.com/filecoin-project/dagstore/throttle" + "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-jsonrpc" marketapi "github.com/filecoin-project/venus/venus-shared/api/market/v1" @@ -54,9 +55,9 @@ var ( Required: true, } dropletURLFlag = &cli.StringFlag{ - Name: "droplet-url", - Usage: "droplet url", - Required: true, + Name: "droplet-url", + Usage: "droplet url", + Value: "/ip4/127.0.0.1/tcp/41264", } dropletTokenFlag = &cli.StringFlag{ Name: "droplet-token", @@ -76,6 +77,10 @@ var ( Usage: "Concurrent number of indexes generated", Value: 1, } + minersAddrFlag = &cli.StringFlag{ + Name: "miner-addr", + Usage: "miner address, eg --miner-addr t010001 or --miner-addr t010001,t010002", + } ) func main() { @@ -106,11 +111,12 @@ var generateIndexCmd = &cli.Command{ dropletURLFlag, startFlag, endFlag, + minersAddrFlag, concurrencyFlag, }, Action: func(cctx *cli.Context) error { ctx := cctx.Context - carDir := cctx.String("car-dir") + carDir := cctx.String(carDirFlag.Name) indexDir := cctx.String(indexDirFlag.Name) p, err := paramsFromContext(cctx) if err != nil { @@ -149,10 +155,33 @@ func paramsFromContext(cctx *cli.Context) (*params, error) { mysqlURL := cctx.String(mysqlURLFlag.Name) url := cctx.String(dropletURLFlag.Name) token := cctx.String(dropletTokenFlag.Name) - + minerAddrStr := cctx.String(minersAddrFlag.Name) fmt.Println("mongo url:", mongoURL) fmt.Println("mysql url:", mysqlURL) fmt.Println("droplet url:", url, "token:", token) + fmt.Println("miner addr:", minerAddrStr) + + minerAddrs := make(map[address.Address]struct{}) + for _, addr := range strings.Split(minerAddrStr, ",") { + if len(addr) == 0 { + continue + } + addr, err := address.NewFromString(addr) + if err != nil { + return nil, err + } + minerAddrs[addr] = struct{}{} + } + + filter := func(addr address.Address) bool { + if len(minerAddrs) == 0 { + return false + } + + _, ok := minerAddrs[addr] + + return !ok + } api, close, err := marketapi.DialIMarketRPC(ctx, url, token, nil) if err != nil { @@ -182,13 +211,39 @@ func paramsFromContext(cctx *cli.Context) (*params, error) { if end != nil && end.Before(deal.CreationTime.Time()) { continue } + if filter(deal.Proposal.Provider) { + continue + } p := deal.Proposal.PieceCID.String() if _, ok := pieces[p]; !ok { pieces[p] = struct{}{} pieceInfos = append(pieceInfos, &pieceInfo{piece: p, payloadSize: deal.PayloadSize, pieceSize: uint64(deal.Proposal.PieceSize)}) } } - fmt.Printf("active deals: %d, valid deals: %d, pieces: %d\n", len(deals), len(pieceInfos), len(pieces)) + + activeDirectDeal := market.DealActive + directDeals, err := api.ListDirectDeals(ctx, market.DirectDealQueryParams{State: &activeDirectDeal}) + if err != nil { + return nil, fmt.Errorf("list direct deal failed: %v", err) + } + for _, deal := range directDeals { + if start != nil && start.After(time.Unix(int64(deal.CreatedAt), 0)) { + continue + } + if end != nil && end.Before(time.Unix(int64(deal.CreatedAt), 0)) { + continue + } + if filter(deal.Provider) { + continue + } + p := deal.PieceCID.String() + if _, ok := pieces[p]; !ok { + pieces[p] = struct{}{} + pieceInfos = append(pieceInfos, &pieceInfo{piece: p, payloadSize: deal.PayloadSize, pieceSize: uint64(deal.PieceSize)}) + } + } + + fmt.Printf("active deals: %d, valid deals: %d\n", len(deals)+len(directDeals), len(pieceInfos)) var topIndexRepo *dagstore.MongoTopIndex if len(mongoURL) != 0 { @@ -238,7 +293,7 @@ func getStartEndTime(cctx *cli.Context) (*time.Time, *time.Time, error) { func generateIndex(ctx context.Context, carDir string, indexDir string, p *params) error { doGenIndex := func(pi *pieceInfo) error { piece := pi.piece - f, err := os.Open(filepath.Join(carDir, piece)) + f, err := openCar(carDir, piece) if err != nil { return err } @@ -272,12 +327,12 @@ func generateIndex(ctx context.Context, carDir string, indexDir string, p *param var globalErr error for _, pi := range p.pieceInfos { pi := pi - has, err := hasIndex(ctx, pi.piece, indexDir) + has, err := hasIndex(pi.piece, indexDir) if err != nil { return err } if has { - fmt.Println("already had index:", pi.piece) + // fmt.Println("already had index:", pi.piece) continue } if globalErr != nil { @@ -305,7 +360,17 @@ func generateIndex(ctx context.Context, carDir string, indexDir string, p *param return globalErr } -func hasIndex(ctx context.Context, piece string, indexDir string) (bool, error) { +func openCar(carDir, pieceCID string) (*os.File, error) { + carPath := filepath.Join(carDir, pieceCID+".car") + f, err := os.Open(carPath) + if err == nil { + return f, nil + } + + return os.Open(filepath.Join(carDir, pieceCID)) +} + +func hasIndex(piece string, indexDir string) (bool, error) { _, err := os.Stat(filepath.Join(indexDir, piece+indexSuffix)) if err != nil { if os.IsNotExist(err) { @@ -370,8 +435,6 @@ var migrateIndexCmd = &cli.Command{ return err } - fmt.Println("index dir:", indexDir) - return migrateIndex(ctx, indexDir, p) }, } @@ -396,7 +459,7 @@ func migrateIndex(ctx context.Context, indexDir string, p *params) error { return err } if has { - fmt.Println("already had shard:", piece) + // fmt.Println("already had shard:", piece) return nil } @@ -449,7 +512,6 @@ var indexInfoCmd = &cli.Command{ } indexFile := cctx.Args().First() - fmt.Println("index file: ", indexFile) f, err := os.Open(indexFile) if err != nil { return err