Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avro schema integrations #168

Merged
merged 5 commits into from
Dec 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions cmd/semaphore/daemon/config/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
formencoded "github.com/jexia/semaphore/pkg/codec/www-form-urlencoded"
"github.com/jexia/semaphore/pkg/codec/xml"
"github.com/jexia/semaphore/pkg/metrics/prometheus"
"github.com/jexia/semaphore/pkg/providers/avros"
"github.com/jexia/semaphore/pkg/providers/hcl"
"github.com/jexia/semaphore/pkg/providers/openapi3"
"github.com/jexia/semaphore/pkg/providers/protobuffers"
Expand All @@ -33,6 +34,7 @@ type Daemon struct {
GRPC GRPC
Prometheus Prometheus
Protobuffers []string
Avro []string
Openapi3 []string
Files []string
}
Expand Down Expand Up @@ -97,6 +99,10 @@ func parseHCL(options *hcl.Options, target *Daemon) {
target.Protobuffers = append(target.Protobuffers, options.Protobuffers...)
}

if len(options.Avro) > 0 {
target.Avro = append(target.Avro, options.Avro...)
}

if len(options.Openapi3) > 0 {
target.Protobuffers = append(target.Openapi3, options.Openapi3...)
}
Expand Down Expand Up @@ -157,6 +163,10 @@ func NewProviders(ctx *broker.Context, core semaphore.Options, params *Daemon) (
options = append(options, providers.WithAfterConstructor(middleware.ServiceSelector(path)))
}

for _, path := range params.Avro {
options = append(options, providers.WithSchema(avros.SchemaResolver(params.Avro, path)))
}

for _, path := range params.Protobuffers {
options = append(options, providers.WithSchema(protobuffers.SchemaResolver(params.Protobuffers, path)))
options = append(options, providers.WithServices(protobuffers.ServiceResolver(params.Protobuffers, path)))
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ require (
github.com/rs/cors v1.7.0
github.com/spf13/cobra v0.0.6
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/testify v1.5.1
github.com/stretchr/testify v1.6.1
github.com/zclconf/go-cty v1.3.1
go.uber.org/zap v1.13.0
golang.org/x/net v0.0.0-20200319234117-63522dbf7eec // indirect
Expand All @@ -32,6 +32,7 @@ require (
google.golang.org/grpc v1.27.0
google.golang.org/protobuf v1.25.0
gopkg.in/yaml.v2 v2.3.0
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 // indirect
)

replace github.com/francoispqt/gojay v1.2.13 => github.com/Alma-media/gojay v1.2.14
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:W
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/golang/protobuf v1.4.1 h1:ZFgWrT+bLgsYPirOnRfKLYJLvssAegOj/hgyMFdJZe0=
github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0 h1:0udJVsspx3VBr5FwtLhQQtuAsVc79tTq0ocGIPAU6qo=
Expand Down Expand Up @@ -579,6 +580,8 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/syndtr/gocapability v0.0.0-20170704070218-db04d3cc01c8/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww=
github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cbyO7IOYJZWg1U88JhDg3PB6klq9Hg2pA=
github.com/technoweenie/multipartstreamer v1.0.1/go.mod h1:jNVxdtShOxzAsukZwTSw6MDx5eUJoiEBsSvzDU9uzog=
Expand Down Expand Up @@ -893,6 +896,9 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 h1:tQIYjPdBoyREyB9XMu+nnTclpTYkz2zFM+lzLJFO4gQ=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
grpc.go4.org v0.0.0-20170609214715-11d0a25b4919/go.mod h1:77eQGdRu53HpSqPFJFmuJdjuHRquDANNeA4x7B8WQ9o=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand Down
103 changes: 103 additions & 0 deletions pkg/providers/avros/collect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package avros

import (
"encoding/json"
"io/ioutil"
"os"
"path/filepath"

"github.com/jexia/semaphore/pkg/broker"
"github.com/jexia/semaphore/pkg/broker/logger"
"github.com/jexia/semaphore/pkg/providers"
"github.com/jexia/semaphore/pkg/specs"
"go.uber.org/zap"
)

// Collect attempts to collect all the available avro files inside the given path and parses them to resources
func Collect(ctx *broker.Context, paths []string, path string) ([]*AvroSchema, error) {
imports := make([]string, len(paths))
for index, path := range paths {
imports[index] = path
}

logger.Debug(ctx, "collect available avro", zap.String("path", path))
logger.Debug(ctx, "collect available avro with imports", zap.Strings("imports", paths))

path, err := filepath.Abs(path)
if err != nil {
return nil, err
}

logger.Debug(ctx, "absolute avro path", zap.String("path", path))

for index, path := range imports {
path, err := filepath.Abs(path)
if err != nil {
return nil, err
}

imports[index] = path
}

logger.Debug(ctx, "absolute avro imports", zap.Strings("imports", imports))

files, err := providers.ResolvePath(ctx, []string{}, path)
if err != nil {
return nil, err
}

for index, path := range imports {
stat, err := os.Stat(path)
if err != nil {
imports[index] = filepath.Dir(path)
continue
}

if stat.IsDir() {
imports[index] = path
continue
}

imports[index] = filepath.Dir(path)
}

descriptors, err := UnmarshalFiles(imports, files)
if err != nil {
return nil, err
}

return descriptors, nil
}

// SchemaResolver returns a new schema resolver for the given avro collection
func SchemaResolver(imports []string, path string) providers.SchemaResolver {
return func(ctx *broker.Context) (specs.Schemas, error) {
logger.Debug(ctx, "resolving acro schemas", zap.String("path", path))

files, err := Collect(ctx, imports, path)
if err != nil {
return nil, err
}

return NewSchema(files), nil
}
}

// UnmarshalFiles attempts to parse the given HCL files to intermediate resources.
// Files are parsed based from the given import paths
func UnmarshalFiles(imports []string, files []*providers.FileInfo) ([]*AvroSchema, error) {

results := make([]*AvroSchema, 0)
for _, file := range files {
schema, err := ioutil.ReadFile(file.Path)
if err != nil {
return nil, err
}
tempSchema := AvroSchema{}
err = json.Unmarshal(schema, &tempSchema)

results = append(results, &tempSchema)
}

return results, nil
}
90 changes: 90 additions & 0 deletions pkg/providers/avros/schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package avros

import (
"github.com/jexia/semaphore/pkg/specs"
"github.com/jexia/semaphore/pkg/specs/labels"
"github.com/jexia/semaphore/pkg/specs/template"
"github.com/jexia/semaphore/pkg/specs/types"
)

// AvroSchema impliments avro high level schema
type AvroSchema struct {
Type string `json:"type"`
Name string `json:"name"`
Namespace string `json:"namespace"`
Symbols []string `json:"symbols"`
Fields []*AvroSchema `json:"fields"`
}

// NewSchema constructs a new schema manifest from the given avro schema
func NewSchema(descriptors []*AvroSchema) specs.Schemas {
result := make(specs.Schemas, 0)

for _, desc := range descriptors {
result[desc.Namespace] = NewMessage("", desc)
}

return result
}

// NewMessage constructs a schema Property with the given avro schema
func NewMessage(path string, message *AvroSchema) *specs.Property {
result := &specs.Property{
Path: message.Namespace,
Name: message.Name,
Position: 1,
Label: labels.Optional,
Template: specs.Template{
Message: make(specs.Message, len(message.Fields)),
},
Options: specs.Options{},
}

for _, field := range message.Fields {
result.Message[field.Name] = NewProperty(template.JoinPath(message.Namespace, message.Name, field.Name), field)
}

return result
}

// NewProperty constructs a schema Property with the given avro schema
func NewProperty(path string, message *AvroSchema) *specs.Property {
result := &specs.Property{
Path: path,
Name: message.Name,
Options: specs.Options{},
}

switch message.Type {
case AvroTypes[types.Enum]:
keys := map[string]*specs.EnumValue{}
positions := map[int32]*specs.EnumValue{}
for i, value := range message.Symbols {
result := &specs.EnumValue{
Key: value,
Position: int32(i),
}
keys[value] = result
positions[int32(i)] = result
}

result.Enum = &specs.Enum{
Name: message.Name,
Keys: keys,
Positions: positions,
}
case AvroTypes[types.Message]:
fields := message.Fields
result.Message = make(specs.Message, len(fields))
for _, field := range fields {
result.Message[field.Name] = NewProperty(template.JoinPath(path, message.Name, field.Name), field)
}
break
default:
result.Scalar = &specs.Scalar{
Type: Types[message.Type],
}
}

return result
}
32 changes: 32 additions & 0 deletions pkg/providers/avros/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package avros

import (
"github.com/jexia/semaphore/pkg/specs/types"
)

// AvroTypes is a lookup table for avro descriptor types
var AvroTypes = map[types.Type]string{
types.Message: "record",
types.Bool: "boolean",
types.Int32: "int",
types.Int64: "int",
types.Float: "float",
types.Double: "double",
types.Bytes: "bytes",
types.String: "string",
types.Enum: "enum",
types.Array: "array",
}

// Types is a lookup table for avro descriptor types
var Types = map[string]types.Type{
"record": types.Message,
"boolean": types.Bool,
"int": types.Int64,
"float": types.Float,
"double": types.Double,
"bytes": types.Bytes,
"string": types.String,
"enum": types.Enum,
"array": types.Array,
}
14 changes: 14 additions & 0 deletions pkg/providers/hcl/hcl.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ func GetOptions(ctx *broker.Context, path string) (*Options, error) {
options.Protobuffers = append(options.Protobuffers, definition.Protobuffers...)
}

if len(definition.Avro) > 0 {
options.Avro = append(options.Avro, definition.Avro...)
}

if len(definition.Openapi3) > 0 {
options.Openapi3 = append(options.Openapi3, definition.Openapi3...)
}
Expand Down Expand Up @@ -225,6 +229,16 @@ func ResolvePath(ctx *broker.Context, ignore []string, path string) ([]Manifest,
}
}

if definition.Avro != nil {
for index, avro := range definition.Avro {
if !filepath.IsAbs(avro) {
avro = filepath.Join(filepath.Dir(file.Path), avro)
}

definition.Avro[index] = avro
}
}

if definition.Openapi3 != nil {
for index, doc := range definition.Openapi3 {
if !filepath.IsAbs(doc) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/providers/hcl/intermediate.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type Manifest struct {
GRPC *GRPC `hcl:"grpc,block"`
Prometheus *Prometheus `hcl:"prometheus,block"`
Protobuffers []string `hcl:"protobuffers,optional"`
Avro []string `hcl:"avro,optional"`
Openapi3 []string `hcl:"openapi3,optional"`
Include []string `hcl:"include,optional"`
Error *ParameterMap `hcl:"error,block"`
Expand All @@ -21,7 +22,7 @@ type Manifest struct {
DiscoveryServers Discoveries `hcl:"discovery,block"`
}

// GraphQL represents the GraphQL option definitions.
// GraphQL represents the GraphQL option definitions
type GraphQL struct {
Address string `hcl:"address"`
}
Expand Down
1 change: 1 addition & 0 deletions pkg/providers/hcl/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package hcl
type Options struct {
LogLevel string
Protobuffers []string
Avro []string
Openapi3 []string
GraphQL *GraphQL
HTTP *HTTP
Expand Down
1 change: 0 additions & 1 deletion pkg/providers/protobuffers/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ func NewSchema(descriptors []*desc.FileDescriptor) specs.Schemas {
result[message.GetFullyQualifiedName()] = NewMessage("", message)
}
}

return result
}

Expand Down
2 changes: 1 addition & 1 deletion vendor/github.com/stretchr/testify/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading