Skip to content

Commit

Permalink
OTT-1824-P31: adding purging of fastxml files
Browse files Browse the repository at this point in the history
  • Loading branch information
pm-viral-vala committed Sep 10, 2024
1 parent 1af0e9d commit 8d4b2b2
Show file tree
Hide file tree
Showing 3 changed files with 223 additions and 7 deletions.
3 changes: 1 addition & 2 deletions endpoints/events/vtrack_ow.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"encoding/base64"
"errors"
"log"
"strings"
"time"

Expand Down Expand Up @@ -65,7 +64,7 @@ func InjectVideoEventTrackers(
isResponseMismatch := (response != fastXMLResponse)

if isResponseMismatch {
log.Printf("[XML_PARSER_TEST] method:[vcr] creative:[%s]", base64.StdEncoding.EncodeToString([]byte(vastXML)))
openrtb_ext.FastXMLLogf("\n[XML_PARSER_TEST] method:[vcr] creative:[%s]", base64.StdEncoding.EncodeToString([]byte(vastXML)))
}

metrics = &openrtb_ext.FastXMLMetrics{
Expand Down
3 changes: 1 addition & 2 deletions exchange/exchange_ow.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"errors"
"fmt"
"log"
"net/url"
"regexp"
"strings"
Expand Down Expand Up @@ -206,7 +205,7 @@ func recordOpenWrapBidResponseMetrics(bidder *bidderAdapter, bidResponse *adapte
recordFastXMLMetrics(bidder.me, "vastbidder", bidResponse.FastXMLMetrics)
if bidResponse.FastXMLMetrics.IsRespMismatch {
resp, _ := jsonutil.Marshal(bidResponse)
log.Printf("\n[XML_PARSER_TEST] method:[vast_bidder] response:[%s]", resp)
openrtb_ext.FastXMLLogf("\n[XML_PARSER_TEST] method:[vast_bidder] response:[%s]", resp)
}
}

Expand Down
224 changes: 221 additions & 3 deletions openrtb_ext/fastxml_ow.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
package openrtb_ext

import "math/rand"
import (
"bufio"
"fmt"
"math/rand"
"os"
"path/filepath"
"runtime/debug"
"sort"
"sync"
"time"

"github.com/golang/glog"
)

type RandomGenerator interface {
GenerateIntn(int) int
Expand All @@ -12,12 +24,218 @@ func (RandomNumberGenerator) GenerateIntn(n int) int {
return rand.Intn(n)
}

var rg RandomGenerator
var pid = os.Getpid()

const maxFileSize = 1 * 1024 * 1024 * 1024
const maxBufferSize = 256 * 1024
const maxFiles = 10
const flushInterval = time.Second * time.Duration(300)

// Writer interface can be used to define variable returned by GetWriter() method
type Writer interface {
Write(data []byte) (int, error)
Sync() error
}

// FileWriter ...
type FileWriter struct {
mu sync.Mutex
file Writer
fileName string
}

// NewFileWriter ...
func NewFileWriter(dirPath, fileName, ext string) Writer {
//create directory if not exists
_ = os.MkdirAll(dirPath, 0755)

writer := &FileWriter{
mu: sync.Mutex{},
file: NewBufferFileWriter(dirPath, fileName, ext),
fileName: fileName,
}

go purge(dirPath, fileName, ext)
go writer.flush(flushInterval)

return writer
}

// Flushd ...
func (f *FileWriter) flush(t time.Duration) {
defer func() {
if errInterface := recover(); errInterface != nil {
glog.Infof("Recovered panic \n Error: %v \n StackTrace: %v", errInterface, string(debug.Stack()))
}
}()

for {
f.Sync()
time.Sleep(t)
}
}

// Sync ...
func (f *FileWriter) Sync() (err error) {
f.mu.Lock()
err = f.file.Sync()
f.mu.Unlock()
return err
}

// Write ...
func (f *FileWriter) Write(data []byte) (n int, err error) {
f.mu.Lock()
n, err = f.file.Write(data)
f.mu.Unlock()
return n, err
}

// purge files
func purge(dirPath, fileName, ext string) {
fileFormat := dirPath + fileName + "*" + ext
for {
_purge(fileFormat, maxFiles)
time.Sleep(flushInterval)
}
}

func _purge(fileFormat string, maxFiles int) {
defer func() {
if errInterface := recover(); errInterface != nil {
glog.Infof("Recovered panic \n Error: %v \n StackTrace: %v", errInterface, string(debug.Stack()))
}
}()

files, _ := filepath.Glob(fileFormat)
sort.Strings(files)

//remove last files
if len(files) <= maxFiles {
//no files to purge
return
}

//limit files to max files
files = files[:len(files)-maxFiles]
for _, file := range files {
glog.Infof("[purger] filename:[%s]\n", file)
if err := os.Remove(file); err != nil {
glog.Infof("[purger] error:[purge_failed] file:[%s] message:[%s]", file, err.Error())
//do not delete status file if original file not deleted
continue
}
}
}

// bufferFileWriter ...
type bufferFileWriter struct {
dirPath, fileName, ext string

buf *bufio.Writer
file *os.File
nbytes uint64
}

func NewBufferFileWriter(dirPath, fileName, ext string) *bufferFileWriter {
writer := &bufferFileWriter{
dirPath: dirPath,
fileName: fileName,
ext: ext,
}
return writer
}

// Sync ...
func (b *bufferFileWriter) Sync() (err error) {
if b.buf != nil {
if err = b.buf.Flush(); err != nil {
return err
}
}
if b.file != nil {
if err = b.file.Sync(); err != nil {
return err
}
}
return nil
}

// Write ...
func (b *bufferFileWriter) Write(data []byte) (int, error) {
if b.file == nil {
//create new file
if err := b.create(time.Now()); err != nil {
return 0, err
}
}

if b.nbytes+uint64(len(data)) >= maxFileSize {
//rotate file
if err := b.create(time.Now()); err != nil {
return 0, err
}
}

n, err := b.buf.Write(data)
b.nbytes += uint64(n)
if err != nil {
return 0, err
}

return n, nil
}

func (b *bufferFileWriter) create(t time.Time) (err error) {
if b.file != nil {
if err = b.buf.Flush(); err != nil {
return err
}

if err = b.file.Close(); err != nil {
return err
}
}

fname := filepath.Join(b.dirPath, fileNameFormat(b.fileName, b.ext, t))
b.file, err = os.Create(fname)
b.nbytes = 0
if err != nil {
return err
}

glog.Infof("[file_writer] type:[new_file] filename:[%s]\n", fname)
b.buf = bufio.NewWriterSize(b.file, int(maxBufferSize))
return err
}

func fileNameFormat(name, ext string, t time.Time) string {
return fmt.Sprintf("%s.%04d%02d%02d-%02d%02d%02d.%d%s",
name,
t.Year(),
t.Month(),
t.Day(),
t.Hour(),
t.Minute(),
t.Second(),
pid,
ext)
}

func IsFastXMLEnabled(enabledPercentage int) bool {
return enabledPercentage > 0 && enabledPercentage < rg.GenerateIntn(enabledPercentage)
return enabledPercentage > 0 && enabledPercentage >= rg.GenerateIntn(enabledPercentage)
}

func FastXMLLogf(format string, args ...any) {
if bfw != nil {
fmt.Fprintf(bfw, format, args...)
}
}

var rg RandomGenerator
var bfw Writer

func init() {
rg = &RandomNumberGenerator{}
bfw = NewFileWriter(`/var/log/ssheaderbidding/`, `fastxml`, `.txt`)
}

0 comments on commit 8d4b2b2

Please sign in to comment.