Skip to content

Commit

Permalink
Merge pull request #1 from arquivei/feature/add-pubsub-sink
Browse files Browse the repository at this point in the history
Init project with all dependencies and implementations
  • Loading branch information
leoeareis authored Sep 5, 2023
2 parents ec2b4d3 + 56dbd48 commit 27bb73b
Show file tree
Hide file tree
Showing 10 changed files with 545 additions and 0 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* @leoeareis @rjfonseca @rilder-almeida
57 changes: 57 additions & 0 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# This workflow will build a golang project
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go

name: Go

on:
push:
branches:
- master
- main
pull_request:
branches:
- master
- main

jobs:
build:
runs-on: ubuntu-latest
name: Build, Vet & Test
steps:
- uses: actions/checkout@v3
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: "1.20"
- name: Build
run: go build -v ./...

- name: Vet
run: go vet ./...

- name: Test
run: go test -v -race ./...

govulncheck:
runs-on: ubuntu-latest
name: Govulncheck
steps:
- id: govulncheck
uses: golang/govulncheck-action@v1
with:
go-version-input: 1.20.6
go-package: ./...

golangci:
name: Lint
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-go@v4
with:
go-version: "1.20"
- name: golangci-lint
uses: golangci/golangci-lint-action@v3
with:
version: latest
args: --verbose
21 changes: 21 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# If you prefer the allow list template instead of the deny list, see community template:
# https://github.com/github/gitignore/blob/main/community/Golang/Go.AllowList.gitignore
#
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib

# Test binary, built with `go test -c`
*.test

# Output of the go coverage tool, specifically when used with LiteIDE
*.out

# Dependency directories (remove the comment below to include it)
vendor/

# Go workspace file
go.work
39 changes: 39 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Full list on https://golangci-lint.run/usage/linters/
run:
timeout: 5m
allow-parallel-runners: true
linters:
disable-all: true
enable:
#- exhaustive # check exhaustiveness of enum switch statements
#- funlen # Tool for detection of long functions
#- gocognit # Computes and checks the cognitive complexity of functions
#- godot # Check if comments end in a period
#- golint # Golint differs from gofmt. Gofmt reformats Go source code, whereas golint prints out style mistakes
#- lll # Reports long lines
#- scopelint # Scopelint checks for unpinned variables in go programs
- bodyclose # checks whether HTTP response body is closed successfully
- dupl # Tool for code clone detection
- errcheck # Errcheck is a program for checking for unchecked errors in go programs. These unchecked errors can be critical bugs in some cases
- exportloopref # checks for pointers to enclosing loop variables
- goconst # Finds repeated strings that could be replaced by a constant
- gocritic # The most opinionated Go source code linter
- gocyclo # Computes and checks the cyclomatic complexity of functions
- gofmt # Gofmt checks whether code was gofmt-ed. By default this tool runs with -s option to check for code simplification
- goimports # Goimports does everything that gofmt does. Additionally it checks unused imports
- gosec # Inspects source code for security problems
- gosimple # Linter for Go source code that specializes in simplifying a code
- govet # Vet examines Go source code and reports suspicious constructs, such as Printf calls whose arguments do not align with the format string
- ineffassign # Detects when assignments to existing variables are not used
- misspell # Finds commonly misspelled English words in comments
- nakedret # Finds naked returns in functions greater than a specified function length
- noctx # noctx finds sending http request without context.Context
- prealloc # Finds slice declarations that could potentially be preallocated
- rowserrcheck # checks whether Err of rows is checked successfully
- staticcheck # Staticcheck is a go vet on steroids, applying a ton of static analysis checks
- stylecheck # Stylecheck is a replacement for golint
- typecheck # Like the front-end of a Go compiler, parses and type-checks Go code
- unconvert # Remove unnecessary type conversions
- unparam # Reports unused function parameters
- unused # Checks Go code for unused constants, variables, functions and types
- whitespace # Tool for detection of leading and trailing whitespace
44 changes: 44 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,46 @@
# arqbeam-sink-pubsub
An Apache Beam sink for arqbeam-app.

This implementation uses go/pubsub google sdk to publish messages in Pubsub topic.

TL;DR

```go
package main

import (
"context"
"github.com/arquivei/arqbeam-sink-pubsub"
errorpubsubio "github.com/arquivei/arqbeam-sink-pubsub/error"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
)

var (
pipeline *beam.Pipeline
)

func getPipeline(ctx context.Context) *beam.Pipeline {
if pipeline != nil {
return pipeline
}

pipeline := beam.NewPipeline()
s := pipeline.Root()

// Read some files with textio default from apache beam go sdk
readRows := textio.Read(s, config.GCSInputFile)

// Send each line to pubsub with pubsubio from arqbeam-sink-pubsub
pbResult := pubsubio.Publish(s, config.Pubsub.Project, config.Pubsub.Topic, config.Pubsub.BatchSize, readRows)

// Log if any error happened in publish step
errorpubsubio.LogHandler(s, config.Pubsub.BatchSize, pbResult)

return pipeline
}

```

Comments, discussions, issues and pull-requests are welcomed.
55 changes: 55 additions & 0 deletions error/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package errorpubsubio

import (
"context"
"sync"

"cloud.google.com/go/pubsub"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
)

func init() {
register.DoFn2x0[context.Context, *pubsub.PublishResult](&logHandler{})
}

func LogHandler(s beam.Scope, size int, pbResult beam.PCollection) {
s = s.Scope("arqbeam.pubsubio.LogHandler")

beam.ParDo0(s, &logHandler{
Size: size,
}, pbResult)
}

type logHandler struct {
Size int

resultChan chan *pubsub.PublishResult
wg sync.WaitGroup
}

func (fn *logHandler) Setup(ctx context.Context) {
fn.resultChan = make(chan *pubsub.PublishResult, fn.Size*3)

go func() {
for result := range fn.resultChan {
id, err := result.Get(ctx)
if err != nil {
log.Errorf(ctx, "Error in %v: %v", id, err.Error())
} else {
log.Infof(ctx, "Message processed: %v", id)
}
fn.wg.Done()
}
}()
}

func (fn *logHandler) ProcessElement(ctx context.Context, r *pubsub.PublishResult) {
fn.wg.Add(1)
fn.resultChan <- r
}

func (fn *logHandler) FinishBundle() {
fn.wg.Wait()
}
39 changes: 39 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
module github.com/arquivei/arqbeam-sink-pubsub

go 1.20

require (
cloud.google.com/go/pubsub v1.33.0
github.com/apache/beam/sdks/v2 v2.50.0
)

require (
cloud.google.com/go v0.110.4 // indirect
cloud.google.com/go/compute v1.20.1 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/iam v1.1.1 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/s2a-go v0.1.4 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.5 // indirect
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
go.opencensus.io v0.24.0 // indirect
golang.org/x/crypto v0.12.0 // indirect
golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea // indirect
golang.org/x/net v0.14.0 // indirect
golang.org/x/oauth2 v0.11.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.11.0 // indirect
golang.org/x/text v0.12.0 // indirect
google.golang.org/api v0.135.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230706204954-ccb25ca9f130 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230706204954-ccb25ca9f130 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230726155614-23370e0ffb3e // indirect
google.golang.org/grpc v1.57.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/retry.v1 v1.0.3 // indirect
)
Loading

0 comments on commit 27bb73b

Please sign in to comment.