Skip to content

Commit

Permalink
Merge pull request #9 from ozoncp/Task-8
Browse files Browse the repository at this point in the history
Task 8
  • Loading branch information
echekunov authored Aug 25, 2021
2 parents 7ca3ae1 + 017ea49 commit 6b14d85
Show file tree
Hide file tree
Showing 20 changed files with 1,413 additions and 89 deletions.
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ COPY --from=builder /home/developer/go/src/github.com/ozoncp/ocp-contact-api/bin
COPY --from=builder /home/developer/go/src/github.com/ozoncp/ocp-contact-api/config.yml .
RUN chown root:root ocp-contact-api
EXPOSE 8002
EXPOSE 9100
CMD ["./ocp-contact-api"]
32 changes: 32 additions & 0 deletions api/ocp-contact-api/ocp-contact-api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,22 @@ option go_package = "github.com/ozoncp/ocp-contact-api/pkg/ocp-contact-api;ocp_c

service OcpContactApi {

// Update contact
rpc UpdateContactV1(UpdateContactV1Request) returns (UpdateContactV1Response) {
option (google.api.http) = {
put: "/v1/contacts"
body: "*"
};
}

// Create list of contacts
rpc MultiCreateContactsV1(MultiCreateContactsV1Request) returns (MultiCreateContactsV1Response) {
option (google.api.http) = {
post: "/v1/contacts"
body: "*"
};
}

// Returns a list of the contact
rpc ListContactsV1(ListContactsV1Request) returns (ListContactsV1Response) {
option (google.api.http) = {
Expand Down Expand Up @@ -74,6 +90,22 @@ message DescribeContactV1Response {
Contact contact = 1;
}

message UpdateContactV1Request {
Contact contact = 1;
}

message UpdateContactV1Response {
bool updated = 1;
}

message MultiCreateContactsV1Request {
repeated Contact contacts = 1 [(validate.rules).repeated.min_items = 1];
}

message MultiCreateContactsV1Response {
uint64 count = 1;
}

message Contact {
uint64 id = 1;
uint64 userId = 2;
Expand Down
30 changes: 27 additions & 3 deletions cmd/ocp-contact-api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ package main
import (
"fmt"
"github.com/jmoiron/sqlx"
"github.com/ozoncp/ocp-contact-api/internal/metrics"
"github.com/ozoncp/ocp-contact-api/internal/producer"
"github.com/ozoncp/ocp-contact-api/internal/repo"
"github.com/prometheus/client_golang/prometheus/promhttp"
"net"
"net/http"
"os"
"time"

Expand All @@ -17,7 +21,7 @@ import (
"google.golang.org/grpc"
)

func runGrpc(config *config.Config, log zerolog.Logger) error {
func runGrpc(config *config.Config, prod producer.Producer, log zerolog.Logger) error {
listen, err := net.Listen("tcp", config.Grpc.Address)
if err != nil {
log.Fatal().Err(err).Msgf("failed to listen port %v: %v", config.Grpc.Address, err)
Expand Down Expand Up @@ -46,7 +50,7 @@ func runGrpc(config *config.Config, log zerolog.Logger) error {
// start Grpc server
s := grpc.NewServer()
newRepo := repo.NewRepo(db)
desc.RegisterOcpContactApiServer(s, api.NewOcpContactApiServer(newRepo, log))
desc.RegisterOcpContactApiServer(s, api.NewOcpContactApiServer(newRepo, prod, config.Request.BatchSize, log))

if err := s.Serve(listen); err != nil {
log.Fatal().Err(err).Msgf("failed to serve: %v", err)
Expand All @@ -55,6 +59,21 @@ func runGrpc(config *config.Config, log zerolog.Logger) error {
return nil
}

func runMetricsServer(uri string, port string, log zerolog.Logger) error {
mux := http.NewServeMux()
mux.Handle(uri, promhttp.Handler())

srv := &http.Server{
Addr: port,
Handler: mux,
}
metrics.RegisterMetrics()
log.Info().Msg("Metrics server started")

return srv.ListenAndServe()
}


func main() {
const configPath string = "./config.yml"
zerolog.TimeFieldFormat = zerolog.TimeFormatUnix
Expand All @@ -67,7 +86,12 @@ func main() {
return
}

if err := runGrpc(cfg, log); err != nil {
prod := producer.NewProducer(cfg.Kafka.Topic, cfg.Kafka.Brokers, log)
log.Info().Msg("start producer")

go runMetricsServer(cfg.Prometheus.Uri, cfg.Prometheus.Port, log)

if err = runGrpc(cfg, prod, log); err != nil {
log.Fatal().Err(err)
}

Expand Down
14 changes: 13 additions & 1 deletion config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,16 @@ database:
password: "postgres"
database: "postgres"
ssl: "disable"
driver: "postgres"
driver: "postgres"

request:
batchsize: 5

prometheus:
uri: "/metrics"
port: ":9100"

kafka:
topic: "contact"
brokers:
- "kafka:9092"
35 changes: 35 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,45 @@ services:
restart: unless-stopped
depends_on:
- postgres
- kafka
links:
- postgres
ports:
- "8002:8002" # grpc

zookeeper:
image: confluentinc/cp-zookeeper
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ports:
- "2181:2181"

kafka:
image: confluentinc/cp-kafka
container_name: kafka
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ALLOW_PLAINTEXT_LISTENER: "yes"
KAFKA_LISTENERS-INTERNAL: //kafka:29092,EXTERNAL://localhost:9092
KAFKA_ADVERTISED: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
ports:
- "9092:9092"
depends_on:
- zookeeper
restart: on-failure

prometheus:
image: prom/prometheus
restart: unless-stopped
ports:
- "9090:9090"
volumes:
- "./prometheus.yml:/etc/prometheus/prometheus.yml"

volumes:
pgdata:
18 changes: 8 additions & 10 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,28 @@ module github.com/ozoncp/ocp-contact-api
go 1.16

require (
github.com/DATA-DOG/go-sqlmock v1.5.0 // indirect
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/Masterminds/squirrel v1.5.0
github.com/Shopify/sarama v1.29.1
github.com/envoyproxy/protoc-gen-validate v0.6.1
github.com/golang/glog v1.0.0 // indirect
github.com/go-sql-driver/mysql v1.6.0 // indirect
github.com/golang/mock v1.6.0
github.com/golang/protobuf v1.5.2
github.com/grpc-ecosystem/grpc-gateway v1.16.0
github.com/iancoleman/strcase v0.2.0 // indirect
github.com/jmoiron/sqlx v1.3.4
github.com/lyft/protoc-gen-star v0.5.3 // indirect
github.com/lib/pq v1.10.2
github.com/mattn/go-sqlite3 v1.14.8 // indirect
github.com/onsi/ginkgo v1.16.4
github.com/onsi/gomega v1.14.0
github.com/pressly/goose/v3 v3.1.0 // indirect
github.com/prometheus/client_golang v1.11.0
github.com/rs/zerolog v1.23.0
github.com/spf13/afero v1.6.0 // indirect
github.com/stretchr/testify v1.7.0
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5 // indirect
golang.org/x/mod v0.5.0 // indirect
golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20210820121016-41cdb8703e55 // indirect
golang.org/x/text v0.3.7 // indirect
google.golang.org/genproto v0.0.0-20210821163610-241b8fcbd6c8
google.golang.org/grpc v1.40.0
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0 // indirect
google.golang.org/protobuf v1.27.1
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
)
104 changes: 101 additions & 3 deletions internal/api/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,33 @@ package api
import (
"context"
"github.com/ozoncp/ocp-contact-api/internal/models"
"github.com/ozoncp/ocp-contact-api/internal/producer"
"github.com/ozoncp/ocp-contact-api/internal/repo"
"github.com/ozoncp/ocp-contact-api/internal/utils"
"github.com/ozoncp/ocp-contact-api/internal/metrics"
desc "github.com/ozoncp/ocp-contact-api/pkg/ocp-contact-api"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"time"
)

type contactApiServer struct {
repo repo.Repo
prod producer.Producer
batchSize int
log zerolog.Logger
desc.UnimplementedOcpContactApiServer
}

func NewOcpContactApiServer(repo repo.Repo, log zerolog.Logger) desc.OcpContactApiServer {
return &contactApiServer{repo: repo, log: log}
func NewOcpContactApiServer(
repo repo.Repo,
prod producer.Producer,
batchSize int,
log zerolog.Logger,
) desc.OcpContactApiServer {
return &contactApiServer{repo: repo, prod: prod, batchSize: batchSize, log: log}
}

func (s *contactApiServer) ListContactsV1(
Expand Down Expand Up @@ -96,7 +107,7 @@ func (s *contactApiServer) CreateContactV1(
contactId, err := s.repo.CreateContact(context, contact)

if err != nil {
log.Error().Err(err).Msg("create contact failed")
s.log.Error().Err(err).Msg("create contact failed")
return nil, status.Error(codes.Internal, err.Error())
}

Expand All @@ -106,6 +117,19 @@ func (s *contactApiServer) CreateContactV1(

s.log.Info().Msgf("contact was created with id: %v", contactId)

metrics.CreateCounterInc()

event := producer.EventMessage{
Id: contact.Id,
Action: producer.Create.String(),
Timestamp: time.Now().Unix(),
}

msg := producer.CreateMessage(producer.Create, event)
if err = s.prod.Send(msg); err != nil {
s.log.Error().Err(err).Msgf("failed send message to kafka")
}

return response, nil
}

Expand All @@ -127,5 +151,79 @@ func (s *contactApiServer) RemoveContactV1(
}

s.log.Info().Msgf("remove contact with id %v was removed", req.ContactId)

metrics.RemoveCounterInc()

event := producer.EventMessage{
Id: req.ContactId,
Action: producer.Remove.String(),
Timestamp: time.Now().Unix(),
}
msg := producer.CreateMessage(producer.Remove, event)
if err := s.prod.Send(msg); err != nil {
s.log.Error().Err(err).Msgf("failed send message to kafka")
}

return &desc.RemoveContactV1Response{Result: true}, nil
}

func (s *contactApiServer) UpdateContactV1(
ctx context.Context,
req *desc.UpdateContactV1Request,
) (*desc.UpdateContactV1Response, error) {
contact := models.Contact{Id: req.Contact.Id, UserId: req.Contact.UserId,
Type: req.Contact.Type, Text: req.Contact.Text}

if err := s.repo.UpdateContact(ctx, contact); err != nil {
log.Error().Err(err).Msgf("update contact with id %v failed", req.Contact.Id)
return &desc.UpdateContactV1Response{Updated: false}, err
}

metrics.UpdateCounterInc()

event := producer.EventMessage{
Id: contact.Id,
Action: producer.Update.String(),
Timestamp: time.Now().Unix(),
}
msg := producer.CreateMessage(producer.Update, event)
if err := s.prod.Send(msg); err != nil {
s.log.Error().Err(err).Msgf("failed send message to kafka")
}

return &desc.UpdateContactV1Response{Updated: true}, nil
}

func (s *contactApiServer) MultiCreateContactsV1(
ctx context.Context,
req *desc.MultiCreateContactsV1Request,
) (*desc.MultiCreateContactsV1Response, error) {
if err := req.Validate(); err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}

contacts := make([]models.Contact, 0, len(req.Contacts))

for _, req := range req.Contacts {
contact := models.Contact{UserId: req.UserId, Type: req.Type, Text: req.Text}
contacts = append(contacts, contact)
}

batches, err := utils.Split(contacts, s.batchSize)
if err != nil {
log.Error().Err(err).Msgf("multiple contacts creation failed")
return nil, status.Error(codes.Internal, err.Error())
}

var count uint64
for _, batch := range batches {
if err := s.repo.AddContacts(ctx, batch); err != nil {
log.Error().Err(err).Msgf("multiple contacts creation failed while adding in repo")
return nil, status.Error(codes.Internal, err.Error())
}
count += uint64(len(batch))
}
return &desc.MultiCreateContactsV1Response{
Count: count,
}, nil
}
Loading

0 comments on commit 6b14d85

Please sign in to comment.