Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 50 additions & 45 deletions benchmark-script/read_operation/main.go
Original file line number Diff line number Diff line change
@@ -1,40 +1,55 @@
package main

import (
"context"
"flag"
"fmt"
"io"
"log"
"math/rand"
"os"
"path"
"strconv"
"syscall"
"time"
"context"
"go.opencensus.io/stats"

"go.opentelemetry.io/otel/metric"
"golang.org/x/sync/errgroup"
)

var (
fDir = flag.String("dir", "", "Directory file to be opened.")
fNumOfThreads = flag.Int("threads", 1, "Number of threads to read parallel")

fBlockSizeKB = flag.Int("block-size-kb", 1024, "Block size in KB")
"go.opentelemetry.io/otel"
)

fFileSizeMB = flag.Int64("file-size-mb", 1024, "File size in MB")
const (
scopeName = "github.com/raj-prince/warp-test/instrumentation"
OneKB = 1024
)

fileHandles []*os.File
func init() {
var err error
latencyHistogram, err = meter.Float64Histogram("warp_read_latency", metric.WithUnit("ms"), metric.WithDescription("Test sample"))
if err != nil {
log.Fatalf("Failed to start latency histogram: %v", err)
}
}

eG errgroup.Group
var (
// OTLP metrics.
meter = otel.Meter(scopeName)
latencyHistogram metric.Float64Histogram

OneKB = 1024
fDir = flag.String("dir", "", "Directory file to be opened.")

// Workload.
fFilePrefix = flag.String("file-prefix", "", "Prefix file")
fReadType = flag.String("read", "read", "Whether to do sequential reads (read) or random reads (randread)")
fNumOfThreads = flag.Int("threads", 1, "Number of threads to read parallel")
fNumberOfRead = flag.Int("read-count", 1, "number of read iteration")
fBlockSizeKB = flag.Int("block-size-kb", 1024, "Block size in KB")
fFileSizeMB = flag.Int64("file-size-mb", 1024, "File size in MB")

fOutputDir = flag.String("output-dir", "", "Directory to dump the output")
fFilePrefix = flag.String("file-prefix", "", "Prefix file")
fReadType = flag.String("read", "read", "Whether to do sequential reads (read) or random reads (randread)")
// Helper.
fileHandles []*os.File
eG errgroup.Group
)

var gResult *Result
Expand Down Expand Up @@ -76,7 +91,7 @@ func readAlreadyOpenedFile(ctx context.Context, index int) (err error) {
}

readLatency := time.Since(readStart)
stats.Record(ctx, readLatencyStat.M(float64(readLatency.Milliseconds())))
latencyHistogram.Record(ctx, float64(readLatency.Milliseconds()))

throughput := float64(*fFileSizeMB) / readLatency.Seconds()
gResult.Append(readLatency.Seconds(), throughput)
Expand Down Expand Up @@ -107,25 +122,19 @@ func randReadAlreadyOpenedFile(ctx context.Context, index int) (err error) {
for i := 0; i < *fNumberOfRead; i++ {
for j := 0; j < len(pattern); j++ {
offset := pattern[j]

readStart := time.Now()
_, _ = fileHandles[index].Seek(offset, 0)

_, err = fileHandles[index].Read(b)
if err != nil && err != io.EOF {
break
} else {
err = nil
return fmt.Errorf("while reading and discarding content: %v", err)
}

readLatency := time.Since(readStart)
throughput := float64((*fBlockSizeKB) / 1024) / readLatency.Seconds()
throughput := float64((*fBlockSizeKB)/1024) / readLatency.Seconds()
gResult.Append(readLatency.Seconds(), throughput)
stats.Record(ctx, readLatencyStat.M(float64(readLatency.Milliseconds())))
}

if err != nil {
return fmt.Errorf("while reading and discarding content: %v", err)
latencyHistogram.Record(ctx, float64(readLatency.Milliseconds()))
}
}
return
Expand Down Expand Up @@ -191,35 +200,31 @@ func runReadFileOperations(ctx context.Context) (err error) {
}

func main() {
ctx := context.Background()

flag.Parse()
fmt.Println("\n******* Passed flags: *******")
log.Println("Application called with below flags: ")
flag.VisitAll(func(f *flag.Flag) {
fmt.Printf("Flag: %s, Value: %v\n", f.Name, f.Value)
log.Printf("Flag: %s, Value: %v\n", f.Name, f.Value)
})

// Enable stack-driver exporter.
registerLatencyView()

err := enableSDExporter()
// Setup OTEL with cloud exporter.
ctx := context.Background()
shutdown, err := setupOpenTelemetryWithCloudExporter(ctx, 60*time.Second)
if err != nil {
fmt.Printf("while enabling stackdriver exporter: %v", err)
os.Exit(1)
log.Fatalf("failed to setup OpenTelemetry: %v", err)
}
defer closeSDExporter()
defer func() {
err = shutdown(ctx)
if err != nil {
log.Fatalf("failed to shutdown OpenTelemetry: %v", err)
}
}()

// Start actual workload.
err = runReadFileOperations(ctx)
if err != nil {
fmt.Printf("while performing read: %v", err)
os.Exit(1)
}
if *fOutputDir == "" {
*fOutputDir, _ = os.Getwd()
log.Fatalf("while performing read: %v", err)
}

csvFileName := "metrics_" + *fReadType + ".csv"
gResult.DumpMetricsCSV(path.Join(*fOutputDir, csvFileName))
// TODO: to remove once totally switch over metrics.
gResult.PrintStats()

}
67 changes: 0 additions & 67 deletions benchmark-script/read_operation/metrics.go

This file was deleted.

5 changes: 3 additions & 2 deletions benchmark-script/read_operation/run_warp_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ set -e
shopt -s expand_aliases

echo "Running sequential read test..."
time go run . --threads 64 --read-count 1 --file-size-mb 64 --dir /home/princer_google_com/warp-test/gcs/64M/ --file-prefix "experiment."
time go run . --threads 64 --read-count 1 --file-size-mb 1024 --dir /home/princer_google_com/warp-test/gcs/64M/ --file-prefix "experiment."
time go run . --threads 4 --read-count 50 --file-size-mb 5120 --dir /home/princer_google_com/warp-test/5G/ --file-prefix "experiment."
exit 0
time go run . --threads 4 --read-count 1 --file-size-mb 1024 --dir /home/princer_google_com/warp-test/gcs/64M/ --file-prefix "experiment."

echo "Running random read test..."
time go run . --threads 64 --read-count 1 --file-size-mb 5120 --dir /home/princer_google_com/warp-test/gcs/5G/ --file-prefix "experiment." --read "randread" --block-size-kb 8192
114 changes: 114 additions & 0 deletions benchmark-script/read_operation/setup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"context"
"errors"
"fmt"
"log"

cloudmetric "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric"
"strings"
"time"

"go.opentelemetry.io/contrib/detectors/gcp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
)

const (
ServiceName = "gcsfuse-scale-tester"
Version = "0.0.1"
)

func metricFormatter(m metricdata.Metrics) string {
return "custom.googleapis.com/gcsfuse-scale-tester/" + strings.ReplaceAll(m.Name, ".", "/")
}

// getResource returns a resource describing application and its run environment.
func getApplicationResource(ctx context.Context) (*resource.Resource, error) {
return resource.New(ctx,
// Use the GCP resource detector to detect information about the GCP platform
resource.WithDetectors(gcp.NewDetector()),
resource.WithTelemetrySDK(),
resource.WithAttributes(
semconv.ServiceName(ServiceName),
semconv.ServiceVersion(Version),
),
// To get pod specific metrices. Ref: https://github.com/open-telemetry/opentelemetry-go-contrib/blob/7a12292a9f4bfe9f562e8d32cbdddd694280d851/detectors/gcp/README.md?plain=1#L47
resource.WithFromEnv(),
)
}

// setupOpenTelemetryWithCloudExporter sets up OpenTelemetry with a Cloud exporter.
func setupOpenTelemetryWithCloudExporter(ctx context.Context, exportInterval time.Duration) (shutdown func(context.Context) error, err error) {
var shutdownFuncs []func(context.Context) error

// shutdown combines shutdown functions from multiple OpenTelemetry
// components into a single function.
shutdown = func(ctx context.Context) error {
var err error
for _, fn := range shutdownFuncs {
err = errors.Join(err, fn(ctx))
}
shutdownFuncs = nil
return err
}

options := []cloudmetric.Option{
cloudmetric.WithMetricDescriptorTypeFormatter(metricFormatter),
cloudmetric.WithFilteredResourceAttributes(func(kv attribute.KeyValue) bool {
// Ensure that PID is available as a metric label on metrics explorer.
return kv.Key == semconv.K8SContainerNameKey || kv.Key == semconv.K8SClusterNameKey ||
kv.Key == semconv.ProcessPIDKey || kv.Key == semconv.K8SPodNameKey
}),
cloudmetric.WithProjectID("gcs-tess"),
}

// Create cloud exporter.
exporter, err := cloudmetric.New(options...)
if err != nil {
fmt.Printf("Error while creating Google Cloud exporter:%v\n", err)
return nil, nil
}

r := metric.NewPeriodicReader(exporter, metric.WithInterval(exportInterval))

// Create a resource that describes the application. This is used to
// add context to the metrics that are exported. For example, this
// resource can include information about the environment where the
// application is running, such as the Kubernetes cluster name and pod
// name. This information is useful for filtering and aggregating
// metrics in the Cloud Monitoring console.
resource, err := getApplicationResource(ctx)
if err != nil {

log.Fatalf("failed to create resource: %v", err)
}

mp := metric.NewMeterProvider(
metric.WithReader(r),
metric.WithResource(resource),
)
shutdownFuncs = append(shutdownFuncs, mp.Shutdown)
otel.SetMeterProvider(mp)

return shutdown, nil
}
Loading