Skip to content

Commit

Permalink
Initial Commit
Browse files Browse the repository at this point in the history
  • Loading branch information
prastamaha committed Apr 11, 2021
0 parents commit 39dab64
Show file tree
Hide file tree
Showing 15 changed files with 436 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
./bin
12 changes: 12 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
build:
cd microservice1 && GOOS=linux GOARCH=amd64 go build -o ../bin/microservice1
cd microservice2 && GOOS=linux GOARCH=amd64 go build -o ../bin/microservice2

clean:
rm -rf bin/*

run1:
go run microservice1/producer.go

run2:
go run microservice2/consumer.go
88 changes: 88 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
## Kafka Example Project

This project written using golang with following the module:

- [confluent-kafka-go](github.com/confluentinc/confluent-kafka-go)
- [mux](github.com/gorilla/mux)
- [mgo](gopkg.in/mgo.v2)

---
### Prerequisites

- [Kafka Cluster installation](kafka-cluster/README.md)
- [MongoDB installation](mongodb/README.md)

### Architecture
![](images/kafka-riset.png)

### Build program
```
git clone go.btech.id/prasta/kafka-riset.git
cd kafka-riset
# Microservice1
cd microservice1
GOOS=linux GOARCH=amd64 go build -o ../bin/microservice1
# Microservice2
cd ../microservice2
GOOS=linux GOARCH=amd64 go build -o ../bin/microservice2
```

### Program arguments

#### Microservice1

| Args | Default | Description |
|---|---|---|
| --kafka-address | localhost:9092 | [Address]:[Port] kafka listen |
| --kafka-topic | kafka-riset-topic | Topics to be written by producer |
| --listen | 0.0.0.0:9090 | [Address]:[Port] that will be listened to by the webservice |

#### Microservice2

| Args | Default | Description |
|---|---|---|
| --kafka-address | localhost:9092 | [Address]:[Port] kafka listen |
| --kafka-topic | kafka-riset-topic | Topics to be subcribe by consumer |
| --kafka-group | kafka-riset-group | Consumer group name |
| --mongo-addr | localhost:27017 | [Address]:[Port] Mongodb listen |
| --mongo-db | job | Mongodb target database |
| --mongo-col | ops | [Mongodb target database |
| --mongo-username | - | Mongodb password |
| --mongo-password | - | Mongodb username |

### Run Program

#### Running Microservice1
```
# Terminal 1
./bin/microservice1
```

#### Running Microservice2
```
# Terminal 2
./bin/microservice1
```

### Test Program

#### Using CURL
```
curl -X POST --data '{"name": "prasta maha","division": "project","position": "staff"}' http://localhost:9090/ops
```

#### Using Postman

Open postman with `POST` request to http://localhost:9090/ops (default addr)

Add body request with raw format and paste text below
```
{
"name": "prasta maha",
"division": "project",
"position": "staff"
}
```

Binary file added bin/microservice1
Binary file not shown.
Binary file added bin/microservice2
Binary file not shown.
10 changes: 10 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
module github.com/prastamaha/golang-kafka-example

go 1.16

require (
github.com/confluentinc/confluent-kafka-go v1.6.1 // indirect
github.com/gorilla/mux v1.8.0 // indirect
gopkg.in/confluentinc/confluent-kafka-go.v1 v1.6.1 // indirect
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 // indirect
)
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
github.com/confluentinc/confluent-kafka-go v1.6.1 h1:YxM/UtMQ2vgJX2gIgeJFUD0ANQYTEvfo4Cs4qKUlmGE=
github.com/confluentinc/confluent-kafka-go v1.6.1/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
gopkg.in/confluentinc/confluent-kafka-go.v1 v1.6.1 h1:nKc5Vj4Kko8O6khwOIxQ2UqkEZP7ZZ91vb/lI+ephvk=
gopkg.in/confluentinc/confluent-kafka-go.v1 v1.6.1/go.mod h1:ZdI3yfYmdNSLQPNCpO1y00EHyWaHG5EnQEyL/ntAegY=
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 h1:VpOs+IwYnYBaFnrNAeB8UUWtL3vEUnzSCL1nVjPhqrw=
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA=
Binary file added images/kafka-cluster.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/kafka-riset.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
9 changes: 9 additions & 0 deletions kafka-cluster/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
## Run Kafka Cluster

![](../images/kafka-cluster.png)

> image source: [https://betterprogramming.pub](https://betterprogramming.pub/a-simple-apache-kafka-cluster-with-docker-kafdrop-and-python-cf45ab99e2b9)
```yaml
docker-compose up -d
```
57 changes: 57 additions & 0 deletions kafka-cluster/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
version: '3'
services:

zookeeper:
image: zookeeper:3.4.9
hostname: zookeeper
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_SERVERS: server.1=zookeeper:2888:3888

kafka1:
image: confluentinc/cp-kafka:6.1.0
hostname: kafka1
ports:
- "9091:9091"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19091,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9091
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
depends_on:
- zookeeper

kafka2:
image: confluentinc/cp-kafka:6.1.0
hostname: kafka2
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_BROKER_ID: 2
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
depends_on:
- zookeeper

kafka3:
image: confluentinc/cp-kafka:6.1.0
hostname: kafka3
ports:
- "9093:9093"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19093,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_BROKER_ID: 3
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
depends_on:
- zookeeper
96 changes: 96 additions & 0 deletions microservice1/producer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package main

import (
"encoding/json"
"flag"
"io/ioutil"
"log"
"net/http"

"github.com/gorilla/mux"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)

// flag variable
var (
listen string
kafkaAddr string
kafkaTopic string
)

// ops object struct
type Ops struct {
Name string `json:"name"`
Division string `json:"division"`
Position string `json:"position"`
}

func main() {

// Flag arguments
flag.StringVar(&listen, "listen", "0.0.0.0:9090", "[Address]:[Port] that will be listened to by the webservice")
flag.StringVar(&kafkaAddr, "kafka-address", "localhost:9092", "[Address]:[Port] kafka listen")
flag.StringVar(&kafkaTopic, "kafka-topic", "kafka-riset-topic", "topics to be written by producer")
flag.Parse()

// Listen web service with Mux
router := mux.NewRouter().StrictSlash(true)
router.HandleFunc("/ops", opsHandler).Methods("POST")
log.Printf("Service running on %s\n", listen)
log.Fatalln(http.ListenAndServe(listen, router))
}

func opsHandler(w http.ResponseWriter, r *http.Request) {

b, err := ioutil.ReadAll(r.Body)
defer r.Body.Close()
if err != nil {
panic(err)
}

var ops Ops
err = json.Unmarshal(b, &ops)
if err != nil {
http.Error(w, err.Error(), 500)
return
}

produceToKafka(ops)

opsString, err := json.Marshal(ops)
if err != nil {
http.Error(w, err.Error(), 500)
return
}

w.Header().Set("content-type", "aplication/json")

w.Write(opsString)
}

func produceToKafka(ops Ops) {

// Convert object into bytes
opsString, err := json.Marshal(ops)
if err != nil {
panic(err)
}

// more configuration, check on https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
producer, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": kafkaAddr,
})
if err != nil {
panic(err)
}

// Produce messages to topic (asynchronously)
for _, word := range []string{string(opsString)} {
producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &kafkaTopic, Partition: kafka.PartitionAny},
Value: []byte(word),
}, nil)
log.Printf("Event produce to Kafka topic %s: %v\n", kafkaTopic, ops)
}

}
Loading

0 comments on commit 39dab64

Please sign in to comment.