-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Leonardo Reis
committed
Sep 5, 2023
1 parent
ec2b4d3
commit 56dbd48
Showing
10 changed files
with
545 additions
and
0 deletions.
There are no files selected for viewing
Validating CODEOWNERS rules …
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
* @leoeareis @rjfonseca @rilder-almeida |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
# This workflow will build a golang project | ||
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go | ||
|
||
name: Go | ||
|
||
on: | ||
push: | ||
branches: | ||
- master | ||
- main | ||
pull_request: | ||
branches: | ||
- master | ||
- main | ||
|
||
jobs: | ||
build: | ||
runs-on: ubuntu-latest | ||
name: Build, Vet & Test | ||
steps: | ||
- uses: actions/checkout@v3 | ||
- name: Set up Go | ||
uses: actions/setup-go@v4 | ||
with: | ||
go-version: "1.20" | ||
- name: Build | ||
run: go build -v ./... | ||
|
||
- name: Vet | ||
run: go vet ./... | ||
|
||
- name: Test | ||
run: go test -v -race ./... | ||
|
||
govulncheck: | ||
runs-on: ubuntu-latest | ||
name: Govulncheck | ||
steps: | ||
- id: govulncheck | ||
uses: golang/govulncheck-action@v1 | ||
with: | ||
go-version-input: 1.20.6 | ||
go-package: ./... | ||
|
||
golangci: | ||
name: Lint | ||
runs-on: ubuntu-latest | ||
steps: | ||
- uses: actions/checkout@v3 | ||
- uses: actions/setup-go@v4 | ||
with: | ||
go-version: "1.20" | ||
- name: golangci-lint | ||
uses: golangci/golangci-lint-action@v3 | ||
with: | ||
version: latest | ||
args: --verbose |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
# If you prefer the allow list template instead of the deny list, see community template: | ||
# https://github.com/github/gitignore/blob/main/community/Golang/Go.AllowList.gitignore | ||
# | ||
# Binaries for programs and plugins | ||
*.exe | ||
*.exe~ | ||
*.dll | ||
*.so | ||
*.dylib | ||
|
||
# Test binary, built with `go test -c` | ||
*.test | ||
|
||
# Output of the go coverage tool, specifically when used with LiteIDE | ||
*.out | ||
|
||
# Dependency directories (remove the comment below to include it) | ||
vendor/ | ||
|
||
# Go workspace file | ||
go.work |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
# Full list on https://golangci-lint.run/usage/linters/ | ||
run: | ||
timeout: 5m | ||
allow-parallel-runners: true | ||
linters: | ||
disable-all: true | ||
enable: | ||
#- exhaustive # check exhaustiveness of enum switch statements | ||
#- funlen # Tool for detection of long functions | ||
#- gocognit # Computes and checks the cognitive complexity of functions | ||
#- godot # Check if comments end in a period | ||
#- golint # Golint differs from gofmt. Gofmt reformats Go source code, whereas golint prints out style mistakes | ||
#- lll # Reports long lines | ||
#- scopelint # Scopelint checks for unpinned variables in go programs | ||
- bodyclose # checks whether HTTP response body is closed successfully | ||
- dupl # Tool for code clone detection | ||
- errcheck # Errcheck is a program for checking for unchecked errors in go programs. These unchecked errors can be critical bugs in some cases | ||
- exportloopref # checks for pointers to enclosing loop variables | ||
- goconst # Finds repeated strings that could be replaced by a constant | ||
- gocritic # The most opinionated Go source code linter | ||
- gocyclo # Computes and checks the cyclomatic complexity of functions | ||
- gofmt # Gofmt checks whether code was gofmt-ed. By default this tool runs with -s option to check for code simplification | ||
- goimports # Goimports does everything that gofmt does. Additionally it checks unused imports | ||
- gosec # Inspects source code for security problems | ||
- gosimple # Linter for Go source code that specializes in simplifying a code | ||
- govet # Vet examines Go source code and reports suspicious constructs, such as Printf calls whose arguments do not align with the format string | ||
- ineffassign # Detects when assignments to existing variables are not used | ||
- misspell # Finds commonly misspelled English words in comments | ||
- nakedret # Finds naked returns in functions greater than a specified function length | ||
- noctx # noctx finds sending http request without context.Context | ||
- prealloc # Finds slice declarations that could potentially be preallocated | ||
- rowserrcheck # checks whether Err of rows is checked successfully | ||
- staticcheck # Staticcheck is a go vet on steroids, applying a ton of static analysis checks | ||
- stylecheck # Stylecheck is a replacement for golint | ||
- typecheck # Like the front-end of a Go compiler, parses and type-checks Go code | ||
- unconvert # Remove unnecessary type conversions | ||
- unparam # Reports unused function parameters | ||
- unused # Checks Go code for unused constants, variables, functions and types | ||
- whitespace # Tool for detection of leading and trailing whitespace |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,46 @@ | ||
# arqbeam-sink-pubsub | ||
An Apache Beam sink for arqbeam-app. | ||
|
||
This implementation uses go/pubsub google sdk to publish messages in Pubsub topic. | ||
|
||
TL;DR | ||
|
||
```go | ||
package main | ||
|
||
import ( | ||
"context" | ||
"github.com/arquivei/arqbeam-sink-pubsub" | ||
errorpubsubio "github.com/arquivei/arqbeam-sink-pubsub/error" | ||
|
||
"github.com/apache/beam/sdks/v2/go/pkg/beam" | ||
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio" | ||
) | ||
|
||
var ( | ||
pipeline *beam.Pipeline | ||
) | ||
|
||
func getPipeline(ctx context.Context) *beam.Pipeline { | ||
if pipeline != nil { | ||
return pipeline | ||
} | ||
|
||
pipeline := beam.NewPipeline() | ||
s := pipeline.Root() | ||
|
||
// Read some files with textio default from apache beam go sdk | ||
readRows := textio.Read(s, config.GCSInputFile) | ||
|
||
// Send each line to pubsub with pubsubio from arqbeam-sink-pubsub | ||
pbResult := pubsubio.Publish(s, config.Pubsub.Project, config.Pubsub.Topic, config.Pubsub.BatchSize, readRows) | ||
|
||
// Log if any error happened in publish step | ||
errorpubsubio.LogHandler(s, config.Pubsub.BatchSize, pbResult) | ||
|
||
return pipeline | ||
} | ||
|
||
``` | ||
|
||
Comments, discussions, issues and pull-requests are welcomed. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
package errorpubsubio | ||
|
||
import ( | ||
"context" | ||
"sync" | ||
|
||
"cloud.google.com/go/pubsub" | ||
"github.com/apache/beam/sdks/v2/go/pkg/beam" | ||
"github.com/apache/beam/sdks/v2/go/pkg/beam/log" | ||
"github.com/apache/beam/sdks/v2/go/pkg/beam/register" | ||
) | ||
|
||
func init() { | ||
register.DoFn2x0[context.Context, *pubsub.PublishResult](&logHandler{}) | ||
} | ||
|
||
func LogHandler(s beam.Scope, size int, pbResult beam.PCollection) { | ||
s = s.Scope("arqbeam.pubsubio.LogHandler") | ||
|
||
beam.ParDo0(s, &logHandler{ | ||
Size: size, | ||
}, pbResult) | ||
} | ||
|
||
type logHandler struct { | ||
Size int | ||
|
||
resultChan chan *pubsub.PublishResult | ||
wg sync.WaitGroup | ||
} | ||
|
||
func (fn *logHandler) Setup(ctx context.Context) { | ||
fn.resultChan = make(chan *pubsub.PublishResult, fn.Size*3) | ||
|
||
go func() { | ||
for result := range fn.resultChan { | ||
id, err := result.Get(ctx) | ||
if err != nil { | ||
log.Errorf(ctx, "Error in %v: %v", id, err.Error()) | ||
} else { | ||
log.Infof(ctx, "Message processed: %v", id) | ||
} | ||
fn.wg.Done() | ||
} | ||
}() | ||
} | ||
|
||
func (fn *logHandler) ProcessElement(ctx context.Context, r *pubsub.PublishResult) { | ||
fn.wg.Add(1) | ||
fn.resultChan <- r | ||
} | ||
|
||
func (fn *logHandler) FinishBundle() { | ||
fn.wg.Wait() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
module github.com/arquivei/arqbeam-sink-pubsub | ||
|
||
go 1.20 | ||
|
||
require ( | ||
cloud.google.com/go/pubsub v1.33.0 | ||
github.com/apache/beam/sdks/v2 v2.50.0 | ||
) | ||
|
||
require ( | ||
cloud.google.com/go v0.110.4 // indirect | ||
cloud.google.com/go/compute v1.20.1 // indirect | ||
cloud.google.com/go/compute/metadata v0.2.3 // indirect | ||
cloud.google.com/go/iam v1.1.1 // indirect | ||
github.com/dustin/go-humanize v1.0.1 // indirect | ||
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect | ||
github.com/golang/protobuf v1.5.3 // indirect | ||
github.com/google/go-cmp v0.5.9 // indirect | ||
github.com/google/s2a-go v0.1.4 // indirect | ||
github.com/google/uuid v1.3.0 // indirect | ||
github.com/googleapis/enterprise-certificate-proxy v0.2.5 // indirect | ||
github.com/googleapis/gax-go/v2 v2.12.0 // indirect | ||
go.opencensus.io v0.24.0 // indirect | ||
golang.org/x/crypto v0.12.0 // indirect | ||
golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea // indirect | ||
golang.org/x/net v0.14.0 // indirect | ||
golang.org/x/oauth2 v0.11.0 // indirect | ||
golang.org/x/sync v0.3.0 // indirect | ||
golang.org/x/sys v0.11.0 // indirect | ||
golang.org/x/text v0.12.0 // indirect | ||
google.golang.org/api v0.135.0 // indirect | ||
google.golang.org/appengine v1.6.7 // indirect | ||
google.golang.org/genproto v0.0.0-20230706204954-ccb25ca9f130 // indirect | ||
google.golang.org/genproto/googleapis/api v0.0.0-20230706204954-ccb25ca9f130 // indirect | ||
google.golang.org/genproto/googleapis/rpc v0.0.0-20230726155614-23370e0ffb3e // indirect | ||
google.golang.org/grpc v1.57.0 // indirect | ||
google.golang.org/protobuf v1.31.0 // indirect | ||
gopkg.in/retry.v1 v1.0.3 // indirect | ||
) |
Oops, something went wrong.