Skip to content
This repository has been archived by the owner on Apr 23, 2024. It is now read-only.

update clickhouse client database name, username, password, usage of … #16

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,17 @@ It will launch kittenhouse with current user and group (empty `-u` and `-g` argu
Here are the essential command-line options that you will most certainly need to modify are the following:

```sh
-u <user> change daemon user (default is "kitten")
-g <group> change daemon group (default is "kitten")
-ch-addr <addr> ClickHouse server address (if you have only one ClickHouse server)
-l <filename> log file (default is STDERR)
-h <host> listen host (default is "0.0.0.0" which may be dangerous!)
-p <port> listen port (default is 13338)
-dir <dirname> directory where kittenhouse stores persistent content
-u <user> change daemon user (default is "kitten")
-g <group> change daemon group (default is "kitten")
-ch-addr <addr> ClickHouse server address (if you have only one ClickHouse server)
-l <filename> log file (default is STDERR)
-h <host> listen host (default is "0.0.0.0" which may be dangerous!)
-p <port> listen port (default is 13338)
-dir <dirname> directory where kittenhouse stores persistent content
-db <database name> ClickHouse database (default is "default")
-ch-user <username> Clickhouse username
-ch-password <password> Clickhouse password
-ch-ssl-cert-path <path> Clickhouse SSL certificate path
```

You can see the full list of options by running `kittenhouse --help`.
Expand Down
68 changes: 58 additions & 10 deletions core/clickhouse/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ import (
"sync"
"time"

"crypto/x509"
"crypto/tls"

"github.com/vkcom/engine-go/srvfunc"
cmdconfig "github.com/vkcom/kittenhouse/core/cmdconfig"
"github.com/vkcom/kittenhouse/core/destination"
)

Expand All @@ -39,14 +43,8 @@ var (
}{
m: make(map[destination.ServerHostPort]*kittenMeow),
}

httpClient = &http.Client{
Transport: &http.Transport{
MaxIdleConnsPerHost: 1,
DialContext: srvfunc.CachingDialer,
},
Timeout: time.Minute,
}
// generate HTTP client
httpClient = generateHttpClient()
)

type kittenMeow struct {
Expand Down Expand Up @@ -466,9 +464,26 @@ func flush(dst *destination.Setting, table string, body []byte, rowBinary bool,
compressionArgs = "decompress=1&http_native_compression_disable_checksumming_on_decompress=1&"
}

url := fmt.Sprintf("http://%s/?input_format_values_interpret_expressions=0&%squery=%s", srv, compressionArgs, queryPrefix)
url := fmt.Sprintf("http://%s/?input_format_values_interpret_expressions=0&%squery=%s&database=%s", srv, compressionArgs, queryPrefix, cmdconfig.Argv.ChDatabase)

resp, err := httpClient.Post(url, "application/x-www-form-urlencoded", bytes.NewReader(body))
//resp, err := httpClient.Post(url, "application/x-www-form-urlencoded", bytes.NewReader(body))
// generate request
req, err := http.NewRequest("POST", url, bytes.NewReader(body))
if err != nil {
panic(err)
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
// add clickhouse user
if cmdconfig.Argv.ChUser != "" {
req.Header.Add("X-ClickHouse-User", cmdconfig.Argv.ChUser)
}
// add clickhouse password
if cmdconfig.Argv.ChPassword != "" {
req.Header.Add("X-ClickHouse-Key", cmdconfig.Argv.ChPassword)
}
// send request
resp, err := httpClient.Do(req)
// check error
if err != nil {
log.Printf("Could not post to table %s to clickhouse: %s", table, err.Error())
dst.TempDisableHost(srv, checkHostAlive)
Expand Down Expand Up @@ -505,3 +520,36 @@ func flush(dst *destination.Setting, table string, body []byte, rowBinary bool,

return nil
}

// generate HTTP client
func generateHttpClient() *http.Client {
// SSL cert path
var sslCertPath = cmdconfig.Argv.ChSslCertPath
//
if sslCertPath != "" {
// read cert
caCert, err := ioutil.ReadFile(sslCertPath)
// if cert absent, panic
if err != nil { panic(err) }
var caCertPool = x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
return &http.Client{
Transport: &http.Transport{
MaxIdleConnsPerHost: 1,
DialContext: srvfunc.CachingDialer,
TLSClientConfig: &tls.Config{
RootCAs: caCertPool,
},
},
Timeout: time.Minute,
}
} else {
return &http.Client{
Transport: &http.Transport{
MaxIdleConnsPerHost: 1,
DialContext: srvfunc.CachingDialer,
},
Timeout: time.Minute,
}
}
}
119 changes: 34 additions & 85 deletions core/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/vkcom/engine-go/srvfunc"
"github.com/vkcom/kittenhouse/core/clickhouse"
cmdconfig "github.com/vkcom/kittenhouse/core/cmdconfig"
"github.com/vkcom/kittenhouse/core/destination"
"github.com/vkcom/kittenhouse/core/inmem"
"github.com/vkcom/kittenhouse/core/persist"
Expand All @@ -30,31 +31,6 @@ var (
BuildOSUname string
BuildCommit string
buildVersion string // concatination of Build* into a single string
)

var (
argv struct {
reverse bool

host string
port uint
help bool
version bool
markAsDone bool
user string
group string
log string

maxOpenFiles uint64
nProc uint
pprofHostPort string
chHost string
config string
dir string
maxSendSize int64
maxFileSize int64
rotateIntervalSec int64
}

logFd *os.File
)
Expand All @@ -77,33 +53,6 @@ func init() {
)

log.SetFlags(log.LstdFlags | log.Lshortfile | log.Lmicroseconds)

// actions
flag.BoolVar(&argv.help, `h`, false, `show this help`)
flag.BoolVar(&argv.version, `version`, false, `show version`)
flag.BoolVar(&argv.reverse, `reverse`, false, `start reverse proxy server instead (ch-addr is used as clickhouse host-port)`)

// common options
flag.StringVar(&argv.host, `host`, `0.0.0.0`, `listening host`)
flag.UintVar(&argv.port, `port`, 13338, `listening port. REQUIRED`)
flag.UintVar(&argv.port, `p`, 13338, `listening port. REQUIRED`)
flag.StringVar(&argv.user, `u`, `kitten`, "setuid user (if needed)")
flag.StringVar(&argv.group, `g`, `kitten`, "setgid user (if needed)")
flag.StringVar(&argv.log, `l`, "", "log file (if needed)")
flag.StringVar(&argv.chHost, `ch-addr`, `127.0.0.1:8123`, `default clickhouse host:port`)
flag.UintVar(&argv.nProc, `cores`, uint(0), `max cpu cores usage`)
flag.StringVar(&argv.pprofHostPort, `pprof`, ``, `host:port for http pprof`)
flag.Uint64Var(&argv.maxOpenFiles, `max-open-files`, 262144, `open files limit`)

// local proxy options
flag.StringVar(&argv.config, `c`, ``, `path to routing config`)
flag.StringVar(&argv.dir, `dir`, `/tmp/kittenhouse`, `dir for persistent logs`)
flag.Int64Var(&argv.maxSendSize, `max-send-size`, 1<<20, `max batch size to be sent to clickhouse in bytes`)
flag.Int64Var(&argv.maxFileSize, `max-file-size`, 50<<20, `max file size in bytes`)
flag.Int64Var(&argv.rotateIntervalSec, `rotate-interval-sec`, 1800, `how often to rotate files`)
flag.BoolVar(&argv.markAsDone, `mark-as-done`, false, `rename files to *.done instead of deleting them upon successful delivery`)

flag.Parse()
}

func updateThread(ch chan os.Signal) {
Expand All @@ -127,15 +76,15 @@ func updateConfig() {
var confHash string
var err error

if argv.config != "" {
newConf, ts, confHash, err = parseConfigFile(argv.config)
if cmdconfig.Argv.Config != "" {
newConf, ts, confHash, err = parseConfigFile(cmdconfig.Argv.Config)
if err != nil {
log.Printf("Error: Bad config: %s", err.Error())
return
}
} else {
ts = time.Now()
newConf, confHash, err = parseConfig(bytes.NewBufferString(`* ` + argv.chHost))
newConf, confHash, err = parseConfig(bytes.NewBufferString(`* ` + cmdconfig.Argv.ChHost))
if err != nil {
log.Printf("Error: Bad default config: %s", err.Error())
return
Expand All @@ -158,14 +107,14 @@ func updateConfig() {
}

func reopenLog() {
if argv.log == "" {
if cmdconfig.Argv.Log == "" {
return
}

var err error
logFd, err = srvfunc.LogRotate(logFd, argv.log)
logFd, err = srvfunc.LogRotate(logFd, cmdconfig.Argv.Log)
if err != nil {
os.Stderr.WriteString(fmt.Sprintf(`Cannot log to file "%s": %s`, argv.log, err.Error()))
os.Stderr.WriteString(fmt.Sprintf(`Cannot log to file "%s": %s`, cmdconfig.Argv.Log, err.Error()))
return
}

Expand All @@ -180,8 +129,8 @@ func tryIncreaseRlimit() {
return
}

rLimit.Max = argv.maxOpenFiles
rLimit.Cur = argv.maxOpenFiles
rLimit.Max = cmdconfig.Argv.MaxOpenFiles
rLimit.Cur = cmdconfig.Argv.MaxOpenFiles

err = syscall.Setrlimit(syscall.RLIMIT_NOFILE, &rLimit)
if err != nil {
Expand Down Expand Up @@ -231,58 +180,58 @@ func heartbeatThread() {

// Main is actual main function for kittenhouse but allows to register certain hooks beforehand.
func Main() {
if argv.version {
if cmdconfig.Argv.Version {
fmt.Fprint(os.Stderr, buildVersion, "\n")
return
} else if argv.help {
} else if cmdconfig.Argv.Help {
flag.Usage()
return
}

if argv.nProc > 0 {
runtime.GOMAXPROCS(int(argv.nProc))
if cmdconfig.Argv.NProc > 0 {
runtime.GOMAXPROCS(int(cmdconfig.Argv.NProc))
} else {
argv.nProc = uint(runtime.NumCPU())
cmdconfig.Argv.NProc = uint(runtime.NumCPU())
}

if argv.pprofHostPort != `` {
if cmdconfig.Argv.PprofHostPort != `` {
go func() {
if err := http.ListenAndServe(argv.pprofHostPort, nil); err != nil {
if err := http.ListenAndServe(cmdconfig.Argv.PprofHostPort, nil); err != nil {
log.Printf(`pprof listen fail: %s`, err.Error())
}
}()
}

tryIncreaseRlimit()

if argv.group != "" {
if err := srvfunc.ChangeGroup(argv.group); err != nil {
log.Fatalf("Could not change group to %s: %s", argv.group, err.Error())
if cmdconfig.Argv.Group != "" {
if err := srvfunc.ChangeGroup(cmdconfig.Argv.Group); err != nil {
log.Fatalf("Could not change group to %s: %s", cmdconfig.Argv.Group, err.Error())
}
}

if argv.user != "" {
if err := srvfunc.ChangeUser(argv.user); err != nil {
log.Fatalf("Could not change user to %s: %s", argv.user, err.Error())
if cmdconfig.Argv.User != "" {
if err := srvfunc.ChangeUser(cmdconfig.Argv.User); err != nil {
log.Fatalf("Could not change user to %s: %s", cmdconfig.Argv.User, err.Error())
}
}

if argv.reverse {
listenAddr := fmt.Sprintf("%s:%d", argv.host, argv.port)
log.Printf("Starting reverse proxy at %s (proxy to %s)", listenAddr, argv.chHost)
err := clickhouse.RunReverseProxy(listenAddr, argv.chHost)
if cmdconfig.Argv.Reverse {
listenAddr := fmt.Sprintf("%s:%d", cmdconfig.Argv.Host, cmdconfig.Argv.Port)
log.Printf("Starting reverse proxy at %s (proxy to %s)", listenAddr, cmdconfig.Argv.ChHost)
err := clickhouse.RunReverseProxy(listenAddr, cmdconfig.Argv.ChHost)
log.Fatalf("Could not run reverse proxy: %s", err.Error())
}

clickhouse.Init()

persist.Init(persist.Config{
Dir: argv.dir,
MaxSendSize: argv.maxSendSize,
MaxFileSize: argv.maxFileSize,
RotateInterval: time.Duration(argv.rotateIntervalSec) * time.Second,
MarkAsDone: argv.markAsDone,
Port: argv.port,
Dir: cmdconfig.Argv.Dir,
MaxSendSize: cmdconfig.Argv.MaxSendSize,
MaxFileSize: cmdconfig.Argv.MaxFileSize,
RotateInterval: time.Duration(cmdconfig.Argv.RotateIntervalSec) * time.Second,
MarkAsDone: cmdconfig.Argv.MarkAsDone,
Port: cmdconfig.Argv.Port,
})

persist.InternalLog("start", "", 0, "", "version: "+buildVersion+" args:"+fmt.Sprint(os.Args))
Expand All @@ -295,11 +244,11 @@ func Main() {
go heartbeatThread()

go func() {
if err := StartServerCallback(argv.host, argv.port); err != nil {
if err := StartServerCallback(cmdconfig.Argv.Host, cmdconfig.Argv.Port); err != nil {
log.Fatalf("Could not listen rpc: %s", err.Error())
}

log.Printf("Listening %s:%d (TCP)", argv.host, argv.port)
log.Printf("Listening %s:%d (TCP)", cmdconfig.Argv.Host, cmdconfig.Argv.Port)
}()

go listenUDP()
Expand Down
5 changes: 3 additions & 2 deletions core/cmd/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net"
"syscall"

cmdconfig "github.com/vkcom/kittenhouse/core/cmdconfig"
"github.com/vkcom/kittenhouse/core/inmem"
)

Expand Down Expand Up @@ -57,7 +58,7 @@ func parseUDPPacket(buf []byte) (table string, data []byte, flags byte, err erro
func listenUDP() {
buf := make([]byte, maxUDPPacketSize)

addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", argv.host, argv.port))
addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", cmdconfig.Argv.Host, cmdconfig.Argv.Port))
if err != nil {
log.Fatalf("Could not resolve udp addr: %s", err.Error())
}
Expand All @@ -68,7 +69,7 @@ func listenUDP() {
}
defer conn.Close()

log.Printf("Listening %s:%d (UDP)", argv.host, argv.port)
log.Printf("Listening %s:%d (UDP)", cmdconfig.Argv.Host, cmdconfig.Argv.Port)

for {
n, _, err := conn.ReadFromUDP(buf)
Expand Down
Loading