Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/cmd/dolt/commands/credcmds/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ func checkCredAndPrintSuccess(ctx context.Context, dEnv *env.DoltEnv, dc creds.D
if err != nil {
return errhand.BuildDError("error: unable to connect to server with credentials.").AddCause(err).Build()
}
defer conn.Close()

grpcClient := remotesapi.NewCredentialsServiceClient(conn)

Expand Down
1 change: 1 addition & 0 deletions go/cmd/dolt/commands/credcmds/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func updateProfileWithCredentials(ctx context.Context, dEnv *env.DoltEnv, c cred
if err != nil {
return fmt.Errorf("error: unable to connect to server with credentials: %w", err)
}
defer conn.Close()
grpcClient := remotesapi.NewCredentialsServiceClient(conn)
resp, err := grpcClient.WhoAmI(ctx, &remotesapi.WhoAmIRequest{})
if err != nil {
Expand Down
26 changes: 13 additions & 13 deletions go/cmd/dolt/commands/send_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,52 +130,52 @@ func (cmd SendMetricsCmd) Exec(ctx context.Context, commandStr string, args []st

// FlushLoggedEvents flushes any logged events in the directory given to an appropriate event emitter
func FlushLoggedEvents(ctx context.Context, dEnv *env.DoltEnv, userHomeDir string, outputType string) error {
emitter, err := NewEmitter(outputType, dEnv)
emitter, closer, err := NewEmitter(outputType, dEnv)
if err != nil {
return err
}

defer closer()
flusher := events.NewFileFlusher(dEnv.FS, userHomeDir, dbfactory.DoltDir, emitter)
return flusher.Flush(ctx)
}

// NewEmitter returns an emitter for the given configuration provider, of the type named. If an empty name is provided,
// defaults to a file-based emitter.
func NewEmitter(emitterType string, pro EmitterConfigProvider) (events.Emitter, error) {
func NewEmitter(emitterType string, pro EmitterConfigProvider) (events.Emitter, func() error, error) {
switch emitterType {
case events.EmitterTypeNull:
return events.NullEmitter{}, nil
return events.NullEmitter{}, func() error { return nil }, nil
case events.EmitterTypeStdout:
return events.WriterEmitter{Wr: os.Stdout}, nil
return events.WriterEmitter{Wr: os.Stdout}, func() error { return nil }, nil
case events.EmitterTypeGrpc:
return GRPCEmitterForConfig(pro)
case events.EmitterTypeFile:
homeDir, err := pro.GetUserHomeDir()
if err != nil {
return nil, err
return nil, nil, err
}
return events.NewFileEmitter(homeDir, dbfactory.DoltDir), nil
return events.NewFileEmitter(homeDir, dbfactory.DoltDir), func() error { return nil }, nil
case events.EmitterTypeLogger:
return events.NewLoggerEmitter(logrus.DebugLevel), nil
return events.NewLoggerEmitter(logrus.DebugLevel), func() error { return nil }, nil
default:
return nil, fmt.Errorf("unknown emitter type: %s", emitterType)
return nil, nil, fmt.Errorf("unknown emitter type: %s", emitterType)
}
}

// GRPCEmitterForConfig returns an event emitter for the given environment, or nil if the environment cannot
// provide one
func GRPCEmitterForConfig(pro EmitterConfigProvider) (*events.GrpcEmitter, error) {
func GRPCEmitterForConfig(pro EmitterConfigProvider) (*events.GrpcEmitter, func() error, error) {
cfg, err := GRPCEventRemoteConfig(pro)
if err != nil {
return nil, err
return nil, nil, err
}

conn, err := grpc.Dial(cfg.Endpoint, cfg.DialOptions...)
if err != nil {
return nil, err
return nil, nil, err
}

return events.NewGrpcEmitter(conn), nil
return events.NewGrpcEmitter(conn), conn.Close, nil
}

// GRPCEventRemoteConfig returns a GRPCRemoteConfig for the given configuration provider
Expand Down
12 changes: 10 additions & 2 deletions go/cmd/dolt/commands/sqlserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,7 @@ type heartbeatService struct {
version string
eventEmitter events.Emitter
interval time.Duration
closer func() error
}

func newHeartbeatService(version string, dEnv *env.DoltEnv) *heartbeatService {
Expand All @@ -620,7 +621,7 @@ func newHeartbeatService(version string, dEnv *env.DoltEnv) *heartbeatService {
return &heartbeatService{} // will be defunct on Run()
}

emitter, err := commands.NewEmitter(emitterType, dEnv)
emitter, closer, err := commands.NewEmitter(emitterType, dEnv)
if err != nil {
return &heartbeatService{} // will be defunct on Run()
}
Expand All @@ -631,11 +632,18 @@ func newHeartbeatService(version string, dEnv *env.DoltEnv) *heartbeatService {
version: version,
eventEmitter: emitter,
interval: duration,
closer: closer,
}
}

func (h *heartbeatService) Init(ctx context.Context) error { return nil }
func (h *heartbeatService) Stop() error { return nil }

func (h *heartbeatService) Stop() error {
if h.closer != nil {
return h.closer()
}
return nil
}

func (h *heartbeatService) Run(ctx context.Context) {
// Faulty config settings or disabled metrics can cause us to not have a valid event emitter
Expand Down
4 changes: 3 additions & 1 deletion go/libraries/doltcore/dbfactory/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,15 @@ func (fact DoltRemoteFactory) newChunkStore(ctx context.Context, nbf *types.Noms
csClient := remotesapi.NewChunkStoreServiceClient(conn)
cs, err := remotestorage.NewDoltChunkStoreFromPath(ctx, nbf, urlObj.Path, urlObj.Host, wsValidate, csClient)
if err != nil {
conn.Close()
return nil, fmt.Errorf("could not access dolt url '%s': %w", urlObj.String(), err)
}
cs = cs.WithHTTPFetcher(cfg.HTTPFetcher)
cs.SetFinalizer(conn.Close)

if _, ok := params[NoCachingParameter]; ok {
cs = cs.WithNoopChunkCache()
}

return cs, err
return cs, nil
}
12 changes: 11 additions & 1 deletion go/libraries/doltcore/remotestorage/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ type DoltChunkStore struct {
host string
root hash.Hash
csClient remotesapi.ChunkStoreServiceClient
finalizer func() error
cache ChunkCache
metadata *remotesapi.GetRepoMetadataResponse
nbf *types.NomsBinFormat
Expand Down Expand Up @@ -159,6 +160,7 @@ func NewDoltChunkStoreFromPath(ctx context.Context, nbf *types.NomsBinFormat, pa
repoToken: repoToken,
host: host,
csClient: csClient,
finalizer: func() error { return nil },
cache: newMapChunkCache(),
metadata: metadata,
nbf: nbf,
Expand All @@ -181,6 +183,7 @@ func (dcs *DoltChunkStore) WithHTTPFetcher(fetcher HTTPFetcher) *DoltChunkStore
host: dcs.host,
root: dcs.root,
csClient: dcs.csClient,
finalizer: dcs.finalizer,
cache: dcs.cache,
metadata: dcs.metadata,
nbf: dcs.nbf,
Expand All @@ -198,6 +201,7 @@ func (dcs *DoltChunkStore) WithNoopChunkCache() *DoltChunkStore {
host: dcs.host,
root: dcs.root,
csClient: dcs.csClient,
finalizer: dcs.finalizer,
cache: noopChunkCache,
metadata: dcs.metadata,
nbf: dcs.nbf,
Expand All @@ -216,6 +220,7 @@ func (dcs *DoltChunkStore) WithChunkCache(cache ChunkCache) *DoltChunkStore {
host: dcs.host,
root: dcs.root,
csClient: dcs.csClient,
finalizer: dcs.finalizer,
cache: cache,
metadata: dcs.metadata,
nbf: dcs.nbf,
Expand All @@ -234,6 +239,7 @@ func (dcs *DoltChunkStore) WithDownloadConcurrency(concurrency ConcurrencyParams
host: dcs.host,
root: dcs.root,
csClient: dcs.csClient,
finalizer: dcs.finalizer,
cache: dcs.cache,
metadata: dcs.metadata,
nbf: dcs.nbf,
Expand All @@ -248,6 +254,10 @@ func (dcs *DoltChunkStore) SetLogger(logger chunks.DebugLogger) {
dcs.logger = logger
}

func (dcs *DoltChunkStore) SetFinalizer(f func() error) {
dcs.finalizer = f
}

func (dcs *DoltChunkStore) logf(fmt string, args ...interface{}) {
if dcs.logger != nil {
dcs.logger.Logf(fmt, args...)
Expand Down Expand Up @@ -961,7 +971,7 @@ func (dcs *DoltChunkStore) PersistGhostHashes(ctx context.Context, refs hash.Has
// Close() concurrently with any other ChunkStore method; behavior is
// undefined and probably crashy.
func (dcs *DoltChunkStore) Close() error {
return nil
return dcs.finalizer()
}

// getting this working using the simplest approach first
Expand Down
5 changes: 5 additions & 0 deletions go/libraries/doltcore/sqle/cluster/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,9 @@ func (c *Controller) Run() {
c.bcReplication.Run()
}()
wg.Wait()
for _, client := range c.replicationClients {
client.closer()
}
}

func (c *Controller) GracefulStop() error {
Expand Down Expand Up @@ -1127,6 +1130,7 @@ type replicationServiceClient struct {
url string
tls bool
client replicationapi.ReplicationServiceClient
closer func() error
}

func (c *Controller) replicationServiceDialOptions() []grpc.DialOption {
Expand Down Expand Up @@ -1164,6 +1168,7 @@ func (c *Controller) replicationServiceClients(ctx context.Context) ([]*replicat
url: grpcTarget,
tls: c.tlsCfg != nil,
client: client,
closer: cc.Close,
})
}
return ret, nil
Expand Down
1 change: 1 addition & 0 deletions go/libraries/doltcore/sqle/cluster/mysqldb_persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func (r *mysqlDbReplica) Run() {
r.mu.Lock()
defer r.mu.Unlock()
r.lgr.Tracef("mysqlDbReplica[%s]: running", r.client.remote)
defer r.client.closer()
for !r.shutdown {
if r.role != RolePrimary {
r.wait()
Expand Down