Skip to content

Commit

Permalink
Merge pull request #540 from ipfs-force-community/opt/generate-index
Browse files Browse the repository at this point in the history
opt: support generate direct deal index
  • Loading branch information
simlecode authored Oct 16, 2024
2 parents 4fd1901 + 10918cc commit 5be3521
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 20 deletions.
13 changes: 7 additions & 6 deletions tools/index/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,26 @@

主要有两个功能,一个是给未生成索引的 active 订单生成索引,另一个是迁移 top index 到 MongoDB,迁移 shard 到 MySQL。

### 编译
## 编译

```
```bash
make index
```

### 生成索引
## 生成索引

先去 droplet 获取订单状态是 active 的订单,然后去遍历 car 文件,如果被 active 订单使用且未生成索引,则为其生成索引。

* --car-dir:存储 car 文件的目录。
* --index-dir:存储索引文件的目录,`droplet` 默认在 `~/.droplet/dagstore/index`
* --car-dir:存储 car 文件的目录,需要用绝对路径
* --index-dir:存储索引文件的目录,需要用绝对路径,`droplet` 默认在 `~/.droplet/dagstore/index`
* --mongo-url:MongoDB 的连接地址,用于存储 top index,数据库是 `market_index`,collection 是 `top_index`
* --mysql-url:MySQL 的连接地址,用于存储 shard 状态,要和 `droplet` 使用同一个数据库,表名是 `shards`
* --droplet-url:droplet 服务的 RPC 地址。
* --droplet-token:droplet 服务的 token。
* --start:订单创建时间需大于设置的值。
* --end:订单创建时间需小于设置的值。
* --concurrency:生成索引的并发数,默认是 1。
* --miner-addr:指定 miner 生成索引,未设置则给所有 miner 生成索引。

```bash
./index-tool gen-index \
Expand All @@ -34,7 +35,7 @@ make index

> 成功生成索引会输出类似日志:`generate index success: xxxxxx`
### 迁移索引
## 迁移索引

目前 top index 和 shard 都是存储在 badger,这样多个 droplet 时不能共享,所有需要把 top index 存储到 MongoDB,shard 存储到 MySQL,方便共享数据。

Expand Down
90 changes: 76 additions & 14 deletions tools/index/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/filecoin-project/dagstore/index"
"github.com/filecoin-project/dagstore/shard"
"github.com/filecoin-project/dagstore/throttle"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-jsonrpc"
marketapi "github.com/filecoin-project/venus/venus-shared/api/market/v1"
Expand Down Expand Up @@ -54,9 +55,9 @@ var (
Required: true,
}
dropletURLFlag = &cli.StringFlag{
Name: "droplet-url",
Usage: "droplet url",
Required: true,
Name: "droplet-url",
Usage: "droplet url",
Value: "/ip4/127.0.0.1/tcp/41264",
}
dropletTokenFlag = &cli.StringFlag{
Name: "droplet-token",
Expand All @@ -76,6 +77,10 @@ var (
Usage: "Concurrent number of indexes generated",
Value: 1,
}
minersAddrFlag = &cli.StringFlag{
Name: "miner-addr",
Usage: "miner address, eg --miner-addr t010001 or --miner-addr t010001,t010002",
}
)

func main() {
Expand Down Expand Up @@ -106,11 +111,12 @@ var generateIndexCmd = &cli.Command{
dropletURLFlag,
startFlag,
endFlag,
minersAddrFlag,
concurrencyFlag,
},
Action: func(cctx *cli.Context) error {
ctx := cctx.Context
carDir := cctx.String("car-dir")
carDir := cctx.String(carDirFlag.Name)
indexDir := cctx.String(indexDirFlag.Name)
p, err := paramsFromContext(cctx)
if err != nil {
Expand Down Expand Up @@ -149,10 +155,33 @@ func paramsFromContext(cctx *cli.Context) (*params, error) {
mysqlURL := cctx.String(mysqlURLFlag.Name)
url := cctx.String(dropletURLFlag.Name)
token := cctx.String(dropletTokenFlag.Name)

minerAddrStr := cctx.String(minersAddrFlag.Name)
fmt.Println("mongo url:", mongoURL)
fmt.Println("mysql url:", mysqlURL)
fmt.Println("droplet url:", url, "token:", token)
fmt.Println("miner addr:", minerAddrStr)

minerAddrs := make(map[address.Address]struct{})
for _, addr := range strings.Split(minerAddrStr, ",") {
if len(addr) == 0 {
continue
}
addr, err := address.NewFromString(addr)
if err != nil {
return nil, err
}
minerAddrs[addr] = struct{}{}
}

filter := func(addr address.Address) bool {
if len(minerAddrs) == 0 {
return false
}

_, ok := minerAddrs[addr]

return !ok
}

api, close, err := marketapi.DialIMarketRPC(ctx, url, token, nil)
if err != nil {
Expand Down Expand Up @@ -182,13 +211,39 @@ func paramsFromContext(cctx *cli.Context) (*params, error) {
if end != nil && end.Before(deal.CreationTime.Time()) {
continue
}
if filter(deal.Proposal.Provider) {
continue
}
p := deal.Proposal.PieceCID.String()
if _, ok := pieces[p]; !ok {
pieces[p] = struct{}{}
pieceInfos = append(pieceInfos, &pieceInfo{piece: p, payloadSize: deal.PayloadSize, pieceSize: uint64(deal.Proposal.PieceSize)})
}
}
fmt.Printf("active deals: %d, valid deals: %d, pieces: %d\n", len(deals), len(pieceInfos), len(pieces))

activeDirectDeal := market.DealActive
directDeals, err := api.ListDirectDeals(ctx, market.DirectDealQueryParams{State: &activeDirectDeal})
if err != nil {
return nil, fmt.Errorf("list direct deal failed: %v", err)
}
for _, deal := range directDeals {
if start != nil && start.After(time.Unix(int64(deal.CreatedAt), 0)) {
continue
}
if end != nil && end.Before(time.Unix(int64(deal.CreatedAt), 0)) {
continue
}
if filter(deal.Provider) {
continue
}
p := deal.PieceCID.String()
if _, ok := pieces[p]; !ok {
pieces[p] = struct{}{}
pieceInfos = append(pieceInfos, &pieceInfo{piece: p, payloadSize: deal.PayloadSize, pieceSize: uint64(deal.PieceSize)})
}
}

fmt.Printf("active deals: %d, valid deals: %d\n", len(deals)+len(directDeals), len(pieceInfos))

var topIndexRepo *dagstore.MongoTopIndex
if len(mongoURL) != 0 {
Expand Down Expand Up @@ -238,7 +293,7 @@ func getStartEndTime(cctx *cli.Context) (*time.Time, *time.Time, error) {
func generateIndex(ctx context.Context, carDir string, indexDir string, p *params) error {
doGenIndex := func(pi *pieceInfo) error {
piece := pi.piece
f, err := os.Open(filepath.Join(carDir, piece))
f, err := openCar(carDir, piece)
if err != nil {
return err
}
Expand Down Expand Up @@ -272,12 +327,12 @@ func generateIndex(ctx context.Context, carDir string, indexDir string, p *param
var globalErr error
for _, pi := range p.pieceInfos {
pi := pi
has, err := hasIndex(ctx, pi.piece, indexDir)
has, err := hasIndex(pi.piece, indexDir)
if err != nil {
return err
}
if has {
fmt.Println("already had index:", pi.piece)
// fmt.Println("already had index:", pi.piece)
continue
}
if globalErr != nil {
Expand Down Expand Up @@ -305,7 +360,17 @@ func generateIndex(ctx context.Context, carDir string, indexDir string, p *param
return globalErr
}

func hasIndex(ctx context.Context, piece string, indexDir string) (bool, error) {
func openCar(carDir, pieceCID string) (*os.File, error) {
carPath := filepath.Join(carDir, pieceCID+".car")
f, err := os.Open(carPath)
if err == nil {
return f, nil
}

return os.Open(filepath.Join(carDir, pieceCID))
}

func hasIndex(piece string, indexDir string) (bool, error) {
_, err := os.Stat(filepath.Join(indexDir, piece+indexSuffix))
if err != nil {
if os.IsNotExist(err) {
Expand Down Expand Up @@ -370,8 +435,6 @@ var migrateIndexCmd = &cli.Command{
return err
}

fmt.Println("index dir:", indexDir)

return migrateIndex(ctx, indexDir, p)
},
}
Expand All @@ -396,7 +459,7 @@ func migrateIndex(ctx context.Context, indexDir string, p *params) error {
return err
}
if has {
fmt.Println("already had shard:", piece)
// fmt.Println("already had shard:", piece)
return nil
}

Expand Down Expand Up @@ -449,7 +512,6 @@ var indexInfoCmd = &cli.Command{
}

indexFile := cctx.Args().First()
fmt.Println("index file: ", indexFile)
f, err := os.Open(indexFile)
if err != nil {
return err
Expand Down

0 comments on commit 5be3521

Please sign in to comment.