Skip to content

Commit

Permalink
Merge pull request #153 from gleanerio/dv_headlessTesting
Browse files Browse the repository at this point in the history
Headless rework to implement headlessWait in javascript and make headless testable
  • Loading branch information
fils authored Mar 2, 2023
2 parents a7f773a + a6f8b00 commit 14434be
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 57 deletions.
13 changes: 6 additions & 7 deletions cmd/husker/main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"bytes"
"flag"
"fmt"
"log"
Expand Down Expand Up @@ -55,10 +54,10 @@ func main() {
// url := "http://dx.doi.org/10.7288/V4/MAGIC/15032" // magic
url := "https://dev.rvdata.us/search/fileset/100142" // rvdata
k := "demo"
var (
buf bytes.Buffer
logger = log.New(&buf, "logger: ", log.Lshortfile)
)
//var (
// buf bytes.Buffer
// logger = log.New(&buf, "logger: ", log.Lshortfile)
//)
// setup the KV store to hold a record of indexed resources
db, err := bolt.Open("gleaner.db", 0600, nil)
if err != nil {
Expand All @@ -68,8 +67,8 @@ func main() {
rlogginer, _ := common.LogIssues(v1, k)

runStats := common.NewRunStats()

err = acquire.PageRender(v1, mc, 45*time.Second, url, k, db, rlogginer, runStats)
repostats := runStats.Add(k)
err = acquire.PageRenderAndUpload(v1, mc, 45*time.Second, url, k, db, rlogginer, repostats)
if err != nil {
panic(fmt.Errorf("error when reading config: %v", err))
}
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ require (
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e
)

require github.com/valyala/fasttemplate v1.2.2

require (
cloud.google.com/go/compute/metadata v0.2.3 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,8 @@ github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6Kllzaw
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.37.1-0.20220607072126-8a320890c08d h1:xS9QTPgKl9ewGsAOPc+xW7DeStJDqYPfisDmeSCcbco=
github.com/valyala/fasthttp v1.37.1-0.20220607072126-8a320890c08d/go.mod h1:t/G+3rLek+CyY9bnIE+YlMRddxVAAGjhxndDB4i4C0I=
github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQD0Loo=
github.com/valyala/fasttemplate v1.2.2/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=
github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc=
github.com/willf/bitset v1.1.10/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
Expand Down
4 changes: 2 additions & 2 deletions internal/summoner/acquire/acquire.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,10 @@ func getDomain(v1 *viper.Viper, mc *minio.Client, urls []string, sourceName stri
// TODO is her where I then try headless, and scope the following for into an else?
log.WithFields(log.Fields{"url": urlloc, "contentType": "Direct access failed, trying headless']"}).Info("Direct access failed, trying headless for ", urlloc)
repologger.WithFields(log.Fields{"url": urlloc, "contentType": "Direct access failed, trying headless']"}).Error() // this needs to go into the issues file
err := PageRender(v1, mc, 60*time.Second, urlloc, sourceName, db, repologger, repoStats) // TODO make delay configurable
err := PageRenderAndUpload(v1, mc, 60*time.Second, urlloc, sourceName, db, repologger, repoStats) // TODO make delay configurable

if err != nil {
log.WithFields(log.Fields{"url": urlloc, "issue": "converting json ld"}).Error("PageRender ", urlloc, "::", err)
log.WithFields(log.Fields{"url": urlloc, "issue": "converting json ld"}).Error("PageRenderAndUpload ", urlloc, "::", err)
repologger.WithFields(log.Fields{"url": urlloc, "issue": "converting json ld"}).Error(err)
}
db.Update(func(tx *bolt.Tx) error {
Expand Down
176 changes: 128 additions & 48 deletions internal/summoner/acquire/headlessNG.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"github.com/gleanerio/gleaner/internal/common"
log "github.com/sirupsen/logrus"

"time"

configTypes "github.com/gleanerio/gleaner/internal/config"
Expand All @@ -18,6 +17,7 @@ import (
"github.com/mafredri/cdp/rpcc"
minio "github.com/minio/minio-go/v7"
"github.com/spf13/viper"
"github.com/valyala/fasttemplate"
bolt "go.etcd.io/bbolt"
)

Expand Down Expand Up @@ -55,7 +55,7 @@ func HeadlessNG(v1 *viper.Viper, mc *minio.Client, m map[string][]string, db *bo

for i := range m[k] {

err := PageRender(v1, mc, 60*time.Second, m[k][i], k, db, repologger, r) // TODO make delay configurable
err := PageRenderAndUpload(v1, mc, 60*time.Second, m[k][i], k, db, repologger, r) // TODO make delay configurable
if err != nil {
log.Error(m[k][i], "::", err)
}
Expand Down Expand Up @@ -129,7 +129,7 @@ func HeadlessNG(v1 *viper.Viper, mc *minio.Client, m map[string][]string, db *bo
// //thread management
// semaphoreChan <- struct{}{}
//
// err := PageRender(v1, mc, 60*time.Second, m[k][i], k, db) // TODO make delay configurable
// err := PageRenderAndUpload(v1, mc, 60*time.Second, m[k][i], k, db) // TODO make delay configurable
// if err != nil {
// log.Error(m[k][i], "::", err)
// }
Expand All @@ -148,30 +148,78 @@ func HeadlessNG(v1 *viper.Viper, mc *minio.Client, m map[string][]string, db *bo
//
//}

func PageRender(v1 *viper.Viper, mc *minio.Client, timeout time.Duration, url, k string, db *bolt.DB, repologger *log.Logger, repoStats *common.RepoStats) error {
repologger.WithFields(log.Fields{"url": url}).Trace("PageRender")
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
func PageRenderAndUpload(v1 *viper.Viper, mc *minio.Client, timeout time.Duration, url, k string, db *bolt.DB, repologger *log.Logger, repoStats *common.RepoStats) error {
repologger.WithFields(log.Fields{"url": url}).Trace("PageRenderAndUpload")
// page render handles this
//ctx, cancel := context.WithTimeout(context.Background(), timeout)
//defer cancel()

// read config file
//miniocfg := v1.GetStringMapString("minio")
//bucketName := miniocfg["bucket"] // get the top level bucket for all of gleaner operations from config file
bucketName, err := configTypes.GetBucketName(v1)

//mcfg := v1.GetStringMapString("summoner")
//mcfg, err := configTypes.ReadSummmonerConfig(v1.Sub("summoner"))

jsonlds, err := PageRender(v1, timeout, url, k, repologger, repoStats)

if err != nil { // from page render
if len(jsonlds) > 1 {
repologger.WithFields(log.Fields{"url": url, "issue": "Multiple JSON"}).Debug(err)
}
for _, jsonld := range jsonlds {
sha, err := Upload(v1, mc, bucketName, k, url, jsonld)
if err != nil {
log.WithFields(log.Fields{"url": url, "sha": sha, "issue": "Error uploading jsonld to object store"}).Error("Error uploading jsonld to object store:", url, err, sha)
repologger.WithFields(log.Fields{"url": url, "sha": sha, "issue": "Error uploading jsonld to object store"}).Error(err)
repoStats.Inc(common.StoreError)
} else {
repologger.WithFields(log.Fields{"url": url, "sha": sha, "issue": "Uploaded JSONLD to object store"}).Debug()
repoStats.Inc(common.Stored)
}
// TODO Is here where to add an entry to the KV store
err = db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(k))
err := b.Put([]byte(url), []byte(sha))
if err != nil {
log.Error("Error writing to bolt", err)
}
return nil
})
}
}
return err
}

func PageRender(v1 *viper.Viper, timeout time.Duration, url, k string, repologger *log.Logger, repoStats *common.RepoStats) ([]string, error) {
repologger.WithFields(log.Fields{"url": url}).Trace("PageRender")
retries := 3
sources, err := configTypes.GetSources(v1)
source, err := configTypes.GetSourceByName(sources, k)
headlessWait := source.HeadlessWait
if timeout*time.Duration(retries) < time.Duration(headlessWait)*time.Second {
timeout = time.Duration(headlessWait) * time.Second
}

ctx, cancel := context.WithTimeout(context.Background(), timeout*time.Duration(retries))
defer cancel()
response := []string{}
// read config file
mcfg, err := configTypes.ReadSummmonerConfig(v1.Sub("summoner"))

// Use the DevTools HTTP/JSON API to manage targets (e.g. pages, webworkers).
//devt := devtool.New(mcfg["headless"])
devt := devtool.New(mcfg.Headless)

pt, err := devt.Get(ctx, devtool.Page)
if err != nil {
pt, err = devt.Create(ctx)
if err != nil {
log.WithFields(log.Fields{"url": url, "issue": "Not REPO FAULT. Devtools... Is Headless Container running?"}).Error(err)
repologger.WithFields(log.Fields{"url": url}).Error("Not REPO FAULT. Devtools... Is Headless Container running?")
repoStats.Inc(common.HeadlessError)
return err
return response, err
}
}

Expand All @@ -181,7 +229,7 @@ func PageRender(v1 *viper.Viper, mc *minio.Client, timeout time.Duration, url, k
log.WithFields(log.Fields{"url": url, "issue": "Not REPO FAULT. Devtools... Is Headless Container running?"}).Error(err)
repologger.WithFields(log.Fields{"url": url}).Error("Not REPO FAULT. Devtools... Is Headless Container running?")
repoStats.Inc(common.HeadlessError)
return err
return response, err
}
defer conn.Close() // Leaving connections open will leak memory.

Expand All @@ -194,7 +242,7 @@ func PageRender(v1 *viper.Viper, mc *minio.Client, timeout time.Duration, url, k
log.WithFields(log.Fields{"url": url, "issue": "Not REPO FAULT. Devtools... Is Headless Container running?"}).Error(err)
repologger.WithFields(log.Fields{"url": url}).Error("Not REPO FAULT. Devtools... Is Headless Container running?")
repoStats.Inc(common.HeadlessError)
return err
return response, err
}

// Open a DOMContentEventFired client to buffer this event.
Expand All @@ -203,26 +251,42 @@ func PageRender(v1 *viper.Viper, mc *minio.Client, timeout time.Duration, url, k
log.WithFields(log.Fields{"url": url, "issue": "Not REPO FAULT. Devtools... Is Headless Container running?"}).Error(err)
repologger.WithFields(log.Fields{"url": url}).Error("Not REPO FAULT. Devtools... Is Headless Container running?")
repoStats.Inc(common.HeadlessError)
return err
return response, err
}
defer domContent.Close()

// Open a LoadEventFired client to buffer this event.
loadEventFired, err := c.Page.LoadEventFired(ctx)
if err != nil {
log.WithFields(log.Fields{"url": url, "issue": "Not REPO FAULT. Devtools... Is Headless Container running?"}).Error(err)
repologger.WithFields(log.Fields{"url": url}).Error("Not REPO FAULT. Devtools... Is Headless Container running?")
repoStats.Inc(common.HeadlessError)
return response, err
}
defer loadEventFired.Close()

// Create the Navigate arguments with the optional Referrer field set.
navArgs := page.NewNavigateArgs(url)
nav, err := c.Page.Navigate(ctx, navArgs)
if err != nil {
log.WithFields(log.Fields{"url": url, "issue": "Navigate To Headless"}).Error(err)
repologger.WithFields(log.Fields{"url": url, "issue": "Navigate To Headless"}).Error(err)
repoStats.Inc(common.HeadlessError)
return err
return response, err
}

_, err = loadEventFired.Recv()
if err != nil {
return nil, err
}
loadEventFired.Close()

// Wait until we have a DOMContentEventFired event.
if _, err = domContent.Recv(); err != nil {
log.WithFields(log.Fields{"url": url, "issue": "Dom Error"}).Error(err)
repologger.WithFields(log.Fields{"url": url, "issue": "Dom Error"}).Error(err)
repoStats.Inc(common.HeadlessError)
return err
return response, err
}

log.WithFields(log.Fields{"url": url, "issue": "Navigate Complete"}).Debug(nav.FrameID, "for", url)
Expand All @@ -235,7 +299,7 @@ func PageRender(v1 *viper.Viper, mc *minio.Client, timeout time.Duration, url, k
* I cannot figure out how to get the cdp Runtime to distinguish between a resolved and a rejected
* promise - so in this case, we simply do not index a document, and fail silently.
**/
expression := `
expressionTmpl := `
function getMetadata() {
return new Promise((resolve, reject) => {
const elements = document.querySelectorAll('script[type="application/ld+json"]');
Expand All @@ -249,12 +313,12 @@ func PageRender(v1 *viper.Viper, mc *minio.Client, timeout time.Duration, url, k
resolve(metadata);
}
else {
reject("No JSON-LD present after 1 second.");
reject("No JSON-LD present after {{timeout}} second.");
}
});
}
function retry(fn, retriesLeft = 3, interval = 1000) {
function retry(fn, retriesLeft = {{retries}}, interval = {{timeout}}) {
return new Promise((resolve, reject) => {
fn()
.then(resolve)
Expand All @@ -270,25 +334,35 @@ func PageRender(v1 *viper.Viper, mc *minio.Client, timeout time.Duration, url, k
});
});
}
function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
retry(getMetadata);
sleep( {{headlesswait}} ).then( () => { return retry(getMetadata) } );
`

tmpl := fasttemplate.New(expressionTmpl, "{{", "}}")
expression := tmpl.ExecuteString(map[string]interface{}{
"timeout": fmt.Sprintf("%d", timeout.Milliseconds()),
"headlesswait": fmt.Sprintf("%d", headlessWait*1000),
"retries": "3",
})
log.Trace(expression)
evalArgs := runtime.NewEvaluateArgs(expression).SetAwaitPromise(true).SetReturnByValue(true)
eval, err := c.Runtime.Evaluate(ctx, evalArgs)
if err != nil {
log.WithFields(log.Fields{"url": url, "issue": "Headless Evaluate"}).Error(err)
repologger.WithFields(log.Fields{"url": url, "issue": "Headless Evaluate"}).Error(err)
repoStats.Inc(common.Issues)
return (err)
return response, err
}

// Rejecting that promise just sends null as its value,
// so we need to stop if we got that.
if eval.Result.Value == nil {
repologger.WithFields(log.Fields{"url": url, "issue": "Headless Nil Result"}).Trace()
repoStats.Inc(common.EmptyDoc)
return nil
return response, nil
}

// todo: what are the data types that will always be in this json? we
Expand All @@ -298,7 +372,7 @@ func PageRender(v1 *viper.Viper, mc *minio.Client, timeout time.Duration, url, k
log.WithFields(log.Fields{"url": url, "issue": "Json Unmarshal"}).Error(err)
repologger.WithFields(log.Fields{"url": url, "issue": "Json Unmarshal"}).Error(err)
repoStats.Inc(common.Issues)
return (err)
return response, err
}

if len(jsonlds) > 1 {
Expand All @@ -307,43 +381,49 @@ func PageRender(v1 *viper.Viper, mc *minio.Client, timeout time.Duration, url, k
for _, jsonld := range jsonlds {
valid, err := isValid(v1, jsonld)
if err != nil {
// there could be one bad jsonld, and one good. We want to process the jsonld
// so, do not set an err
log.WithFields(log.Fields{"url": url, "issue": "invalid JSON"}).Error("error checking for valid json :", err)
repologger.WithFields(log.Fields{"url": url, "issue": "invalid JSON"}).Error(err)
repoStats.Inc(common.Issues)
} else if valid && jsonld != "" { // traps out the root domain... should do this different
sha, err := Upload(v1, mc, bucketName, k, url, jsonld)
if err != nil {
log.WithFields(log.Fields{"url": url, "sha": sha, "issue": "Error uploading jsonld to object store"}).Error("Error uploading jsonld to object store:", url, err, sha)
repologger.WithFields(log.Fields{"url": url, "sha": sha, "issue": "Error uploading jsonld to object store"}).Error(err)
repoStats.Inc(common.StoreError)
} else {
repologger.WithFields(log.Fields{"url": url, "sha": sha, "issue": "Uploaded JSONLD to object store"}).Debug()
repoStats.Inc(common.Stored)
}
// TODO Is here where to add an entry to the KV store
err = db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(k))
err := b.Put([]byte(url), []byte(sha))
if err != nil {
log.Error("Error writing to bolt", err)
}
return nil
})
response = append(response, jsonld)
err = nil
//sha, err := Upload(v1, mc, bucketName, k, url, jsonld)
//if err != nil {
// log.WithFields(log.Fields{"url": url, "sha": sha, "issue": "Error uploading jsonld to object store"}).Error("Error uploading jsonld to object store:", url, err, sha)
// repologger.WithFields(log.Fields{"url": url, "sha": sha, "issue": "Error uploading jsonld to object store"}).Error(err)
// repoStats.Inc(common.StoreError)
//} else {
// repologger.WithFields(log.Fields{"url": url, "sha": sha, "issue": "Uploaded JSONLD to object store"}).Debug()
// repoStats.Inc(common.Stored)
//}
//// TODO Is here where to add an entry to the KV store
//err = db.Update(func(tx *bolt.Tx) error {
// b := tx.Bucket([]byte(k))
// err := b.Put([]byte(url), []byte(sha))
// if err != nil {
// log.Error("Error writing to bolt", err)
// }
// return nil
//})
} else {
// there could be one bad jsonld, and one good. We want to process the jsonld
// so, do not set an err
log.Info("Empty JSON-LD document found. Continuing.", url)
repologger.WithFields(log.Fields{"url": url, "issue": "Empty JSON-LD document found"}).Debug()
repoStats.Inc(common.EmptyDoc)
// TODO Is here where to add an entry to the KV store
err = db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(k))
err := b.Put([]byte(url), []byte("NULL")) // no JOSN-LD found at this URL
if err != nil {
log.Error("Error writing to bolt", err)
}
return nil
})
//err = db.Update(func(tx *bolt.Tx) error {
// b := tx.Bucket([]byte(k))
// err := b.Put([]byte(url), []byte("NULL")) // no JOSN-LD found at this URL
// if err != nil {
// log.Error("Error writing to bolt", err)
// }
// return nil
//})
}
}

return err
return response, err
}
Loading

0 comments on commit 14434be

Please sign in to comment.