Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use knapsack and slogger in kolide service #1436

Merged
merged 15 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 47 additions & 26 deletions cmd/grpc.ext/grpc.go
James-Pickett marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@ import (
"errors"
"flag"
"fmt"
"log/slog"
"os"
"path/filepath"
"time"

"github.com/apache/thrift/lib/go/thrift"
"github.com/go-kit/kit/log/level"
"github.com/kolide/kit/env"
"github.com/kolide/kit/logutil"
"github.com/kolide/kit/version"
"github.com/kolide/launcher/pkg/agent/flags"
"github.com/kolide/launcher/pkg/agent/knapsack"
"github.com/kolide/launcher/pkg/agent/storage"
agentbbolt "github.com/kolide/launcher/pkg/agent/storage/bbolt"
"github.com/kolide/launcher/pkg/log/multislogger"
grpcext "github.com/kolide/launcher/pkg/osquery"
"github.com/kolide/launcher/pkg/service"
osquery "github.com/osquery/osquery-go"
Expand Down Expand Up @@ -49,13 +50,6 @@ func main() {
// allow for osqueryd to create the socket path
time.Sleep(2 * time.Second)

logger := logutil.NewServerLogger(*flVerbose)

client, err := osquery.NewClient(*flSocketPath, timeout, osquery.MaxWaitTime(30*time.Second))
if err != nil {
logutil.Fatal(logger, "err", err, "creating osquery extension client", "stack", fmt.Sprintf("%+v", err))
}

var (
enrollSecret = env.String("KOLIDE_LAUNCHER_ENROLL_SECRET", "")
rootDirectory = env.String("KOLIDE_LAUNCHER_ROOT_DIRECTORY", "")
Expand All @@ -67,27 +61,11 @@ func main() {

// TODO(future pr): these values are unset
// they'll have to be parsed from a string
certPins [][]byte
// certPins [][]byte
rootPool *x509.CertPool
)
conn, err := service.DialGRPC(
serverURL,
insecureTLS,
insecureTransport,
certPins,
rootPool,
logger,
)
if err != nil {
logutil.Fatal(logger, "err", err, "failed to connect to grpc host", "stack", fmt.Sprintf("%+v", err))
}
remote := service.NewGRPCClient(conn, level.Debug(logger))

extOpts := grpcext.ExtensionOpts{
EnrollSecret: enrollSecret,
Logger: logger,
LoggingInterval: loggingInterval,
}
logger := logutil.NewServerLogger(*flVerbose)

db, err := bbolt.Open(filepath.Join(rootDirectory, "launcher.db"), 0600, nil)
if err != nil {
Expand All @@ -100,8 +78,51 @@ func main() {
logutil.Fatal(logger, "err", fmt.Errorf("creating stores: %w", err), "stack", fmt.Sprintf("%+v", err))
}
f := flags.NewFlagController(logger, stores[storage.AgentFlagsStore])

slogLevel := slog.LevelInfo
if *flVerbose {
slogLevel = slog.LevelDebug
}

multiSlogger := new(multislogger.MultiSlogger)
James-Pickett marked this conversation as resolved.
Show resolved Hide resolved
multiSlogger.AddHandler(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{
Level: slogLevel,
}))

k := knapsack.New(stores, f, db, nil, nil)

if err := k.SetKolideServerURL(serverURL); err != nil {
logutil.Fatal(logger, "err", fmt.Errorf("setting kolide server url: %w", err), "stack", fmt.Sprintf("%+v", err))
James-Pickett marked this conversation as resolved.
Show resolved Hide resolved
}

if err := k.SetInsecureTLS(insecureTLS); err != nil {
logutil.Fatal(logger, "err", fmt.Errorf("setting insecure tls: %w", err), "stack", fmt.Sprintf("%+v", err))
}

if err := k.SetInsecureTransportTLS(insecureTransport); err != nil {
logutil.Fatal(logger, "err", fmt.Errorf("setting insecure transport tls: %w", err), "stack", fmt.Sprintf("%+v", err))
}

client, err := osquery.NewClient(*flSocketPath, timeout, osquery.MaxWaitTime(30*time.Second))
if err != nil {
logutil.Fatal(logger, "err", err, "creating osquery extension client", "stack", fmt.Sprintf("%+v", err))
}

conn, err := service.DialGRPC(
k,
rootPool,
)
if err != nil {
logutil.Fatal(logger, "err", err, "failed to connect to grpc host", "stack", fmt.Sprintf("%+v", err))
}
remote := service.NewGRPCClient(k, conn)

extOpts := grpcext.ExtensionOpts{
EnrollSecret: enrollSecret,
Logger: logger,
LoggingInterval: loggingInterval,
}

ext, err := grpcext.NewExtension(remote, k, extOpts)
if err != nil {
logutil.Fatal(logger, "err", fmt.Errorf("starting grpc extension: %w", err), "stack", fmt.Sprintf("%+v", err))
Expand Down
6 changes: 3 additions & 3 deletions cmd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,14 +245,14 @@ func runLauncher(ctx context.Context, cancel func(), slogger, systemSlogger *mul
{
switch k.Transport() {
case "grpc":
grpcConn, err := service.DialGRPC(k.KolideServerURL(), k.InsecureTLS(), k.InsecureTransportTLS(), k.CertPins(), rootPool, logger)
grpcConn, err := service.DialGRPC(k, rootPool)
if err != nil {
return fmt.Errorf("dialing grpc server: %w", err)
}
defer grpcConn.Close()
client = service.NewGRPCClient(grpcConn, logger)
client = service.NewGRPCClient(k, grpcConn)
case "jsonrpc":
client = service.NewJSONRPCClient(k.KolideServerURL(), k.InsecureTLS(), k.InsecureTransportTLS(), k.CertPins(), rootPool, logger)
client = service.NewJSONRPCClient(k, rootPool)
case "osquery":
client = service.NewNoopClient(logger)
default:
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/flags/flag_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,10 +497,10 @@ func (fc *FlagController) LogShippingLevel() string {
value = strings.ToLower(value)

switch value {
case "debug", "info", "warn":
case "debug", "warn", "error":
return value
default:
return "error"
return "info"
}
}),
).get(fc.getControlServerValue(keys.LogShippingLevel))
Expand Down
8 changes: 8 additions & 0 deletions pkg/log/logshipper/logshipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func (ls *LogShipper) Ping() {
ls.sender.endpoint = parsedUrl.String()
}

startingLevel := ls.slogLevel.Level()
switch ls.knapsack.LogShippingLevel() {
case "debug":
ls.slogLevel.Set(slog.LevelDebug)
Expand All @@ -104,6 +105,13 @@ func (ls *LogShipper) Ping() {
)
}

if startingLevel != ls.slogLevel.Level() {
ls.knapsack.Slogger().Info("log shipping level changed",
"old_log_level", startingLevel.String(),
"new_log_level", ls.slogLevel.Level().String(),
)
}

ls.isShippingEnabled = ls.sender.endpoint != ""
ls.addDeviceIdentifyingAttributesToLogger()

Expand Down
3 changes: 1 addition & 2 deletions pkg/service/check_health.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package service

Check failure on line 1 in pkg/service/check_health.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

: # github.com/kolide/launcher/pkg/service [github.com/kolide/launcher/pkg/service.test]

Check failure on line 1 in pkg/service/check_health.go

View workflow job for this annotation

GitHub Actions / lint (macos-latest)

: # github.com/kolide/launcher/pkg/service [github.com/kolide/launcher/pkg/service.test]

import (
"context"
Expand Down Expand Up @@ -106,8 +106,7 @@
func (mw logmw) CheckHealth(ctx context.Context) (status int32, err error) {
defer func(begin time.Time) {
uuid, _ := uuid.FromContext(ctx)
mw.logger.Log(
"method", "CheckHealth",
mw.knapsack.Slogger().Debug("check health",
"uuid", uuid,
"status", status,
"err", err,
Expand Down
38 changes: 14 additions & 24 deletions pkg/service/client_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,23 @@ package service
import (
"context"
"crypto/x509"
"fmt"
"net"
"strings"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
grpctransport "github.com/go-kit/kit/transport/grpc"
"github.com/kolide/kit/contexts/uuid"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials"

"github.com/kolide/launcher/pkg/agent/types"
pb "github.com/kolide/launcher/pkg/pb/launcher"
)

// New creates a new Kolide Client (implementation of the KolideService
// interface) using the provided gRPC client connection.
func NewGRPCClient(conn *grpc.ClientConn, logger log.Logger) KolideService {
func NewGRPCClient(k types.Knapsack, conn *grpc.ClientConn) KolideService {
requestEnrollmentEndpoint := grpctransport.NewClient(
conn,
"kolide.agent.Api",
Expand Down Expand Up @@ -91,7 +89,7 @@ func NewGRPCClient(conn *grpc.ClientConn, logger log.Logger) KolideService {
CheckHealthEndpoint: checkHealthEndpoint,
}

client = LoggingMiddleware(logger)(client)
client = LoggingMiddleware(k)(client)
// Wrap with UUID middleware after logger so that UUID is available in
// the logger context.
client = uuidMiddleware(client)
Expand All @@ -101,39 +99,31 @@ func NewGRPCClient(conn *grpc.ClientConn, logger log.Logger) KolideService {

// dialGRPC creates a grpc client connection.
func DialGRPC(
serverURL string,
insecureTLS bool,
insecureTransport bool,
certPins [][]byte,
k types.Knapsack,
rootPool *x509.CertPool,
logger log.Logger,
opts ...grpc.DialOption, // Used for overrides in testing
) (*grpc.ClientConn, error) {
level.Info(logger).Log(
"msg", "dialing grpc server",
"server", serverURL,
"tls_secure", insecureTLS == false,
"transport_secure", insecureTransport == false,
"cert_pinning", len(certPins) > 0,

k.Slogger().Debug("dialing grpc server",
"server", k.KolideServerURL(),
"tls_secure", k.InsecureTLS() == false,
"transport_secure", k.InsecureTransportTLS() == false,
"cert_pinning", len(k.CertPins()) > 0,
)

grpcOpts := []grpc.DialOption{
grpc.WithTimeout(time.Second),
}
if insecureTransport {
if k.InsecureTransportTLS() {
grpcOpts = append(grpcOpts, grpc.WithInsecure())
} else {
host, _, err := net.SplitHostPort(serverURL)
if err != nil {
return nil, fmt.Errorf("split grpc server host and port: %s: %w", serverURL, err)
}

creds := &tlsCreds{credentials.NewTLS(makeTLSConfig(host, insecureTLS, certPins, rootPool, logger))}
creds := &tlsCreds{credentials.NewTLS(makeTLSConfig(k, rootPool))}
James-Pickett marked this conversation as resolved.
Show resolved Hide resolved
grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(creds))
}

grpcOpts = append(grpcOpts, opts...)

conn, err := grpc.Dial(serverURL, grpcOpts...)
conn, err := grpc.Dial(k.KolideServerURL(), grpcOpts...)
return conn, err
}

Expand Down
18 changes: 7 additions & 11 deletions pkg/service/client_jsonrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"net/url"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/transport/http/jsonrpc"
"github.com/kolide/launcher/pkg/agent/types"
)

// forceNoChunkedEncoding forces the connection not to use chunked
Expand Down Expand Up @@ -40,20 +40,16 @@ func forceNoChunkedEncoding(ctx context.Context, r *http.Request) context.Contex
// New creates a new Kolide Client (implementation of the KolideService
// interface) using a JSONRPC client connection.
func NewJSONRPCClient(
serverURL string,
insecureTLS bool,
insecureTransport bool,
certPins [][]byte,
k types.Knapsack,
rootPool *x509.CertPool,
logger log.Logger,
options ...jsonrpc.ClientOption,
) KolideService {
serviceURL := &url.URL{
Scheme: "https",
Host: serverURL,
Host: k.KolideServerURL(),
}

if insecureTransport {
if k.InsecureTransportTLS() {
serviceURL.Scheme = "http"
}

Expand All @@ -63,8 +59,8 @@ func NewJSONRPCClient(
DisableKeepAlives: true,
},
}
if !insecureTransport {
tlsConfig := makeTLSConfig(serverURL, insecureTLS, certPins, rootPool, logger)
if !k.InsecureTransportTLS() {
tlsConfig := makeTLSConfig(k, rootPool)
httpClient.Transport = &http.Transport{
TLSClientConfig: tlsConfig,
DisableKeepAlives: true,
Expand Down Expand Up @@ -125,7 +121,7 @@ func NewJSONRPCClient(
CheckHealthEndpoint: checkHealthEndpoint,
}

client = LoggingMiddleware(logger)(client)
client = LoggingMiddleware(k)(client)
// Wrap with UUID middleware after logger so that UUID is available in
// the logger context.
client = uuidMiddleware(client)
Expand Down
20 changes: 15 additions & 5 deletions pkg/service/middleware.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
package service

import (
"github.com/go-kit/kit/log"
"log/slog"

"github.com/kolide/launcher/pkg/agent/types"
)

type Middleware func(KolideService) KolideService

func LoggingMiddleware(logger log.Logger) Middleware {
func LoggingMiddleware(k types.Knapsack) Middleware {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could just pass slogger, but I think this is fine too.

return func(next KolideService) KolideService {
return logmw{logger, next}
return logmw{k, next}
}
}

type logmw struct {
logger log.Logger
next KolideService
knapsack types.Knapsack
next KolideService
}

func uuidMiddleware(next KolideService) KolideService {
Expand All @@ -24,3 +26,11 @@ func uuidMiddleware(next KolideService) KolideService {
type uuidmw struct {
next KolideService
}

// levelForError returns slog.LevelError if err != nil, else slog.LevelDebug
func levelForError(err error) slog.Level {
if err != nil {
return slog.LevelError
}
return slog.LevelDebug
}
9 changes: 2 additions & 7 deletions pkg/service/publish_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/log/level"
"github.com/go-kit/kit/transport/http/jsonrpc"
"github.com/kolide/kit/contexts/uuid"
"github.com/osquery/osquery-go/plugin/logger"
Expand Down Expand Up @@ -171,12 +170,8 @@ func (s *grpcServer) PublishLogs(ctx context.Context, req *pb.LogCollection) (*p
func (mw logmw) PublishLogs(ctx context.Context, nodeKey string, logType logger.LogType, logs []string) (message, errcode string, reauth bool, err error) {
defer func(begin time.Time) {
uuid, _ := uuid.FromContext(ctx)
logger := level.Debug(mw.logger)
if err != nil {
logger = level.Info(mw.logger)
}
logger.Log(
"method", "PublishLogs",

mw.knapsack.Slogger().Log(ctx, levelForError(err), "publish logs",
"uuid", uuid,
"logType", logType,
"log_count", len(logs),
Expand Down
Loading
Loading