Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ x-flow-worker-env: &flow-worker-env
# https://cloud.google.com/storage/docs/authentication/managing-hmackeys
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID:-}
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY:-}
# Temporary AWS Credentials for local IAM auth testing
AWS_SESSION_TOKEN: ${AWS_SESSION_TOKEN:-}
# For GCS, set this to "auto" without the quotes
AWS_REGION: ${AWS_REGION:-}
# For GCS, set this as: https://storage.googleapis.com
Expand Down
16 changes: 16 additions & 0 deletions flow/connectors/postgres/postgres_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,22 @@ func (c *PostgresConnector) CreateReplConn(ctx context.Context, env map[string]s
}

func (c *PostgresConnector) SetupReplConn(ctx context.Context, env map[string]string) error {
// For IAM-authenticated connections, force a fresh token before connecting.
// This guarantees each activity retry uses a new token rather than a potentially
// stale cached one (cache TTL 10 min; token validity 15 min).
if c.rdsAuth != nil {
host := c.Config.Host
if c.Config.TlsHost != "" {
host = c.Config.TlsHost
}
if _, err := c.rdsAuth.ForceRefreshToken(ctx, utils.RDSConnectionConfig{
Host: host,
Port: c.Config.Port,
User: c.Config.User,
}, "POSTGRES"); err != nil {
return fmt.Errorf("failed to refresh RDS IAM token before replication setup: %w", err)
}
}
conn, wst, err := c.CreateReplConn(ctx, env)
if err != nil {
return err
Expand Down
2 changes: 2 additions & 0 deletions flow/connectors/utils/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ func getPeerDBAWSEnv(connectorName string, awsKey string) string {
func LoadPeerDBAWSEnvConfigProvider(connectorName string) *StaticAWSCredentialsProvider {
accessKeyId := getPeerDBAWSEnv(connectorName, "AWS_ACCESS_KEY_ID")
secretAccessKey := getPeerDBAWSEnv(connectorName, "AWS_SECRET_ACCESS_KEY")
sessionToken := getPeerDBAWSEnv(connectorName, "AWS_SESSION_TOKEN")
region := getPeerDBAWSEnv(connectorName, "AWS_REGION")
endpointUrl := getPeerDBAWSEnv(connectorName, "AWS_ENDPOINT_URL_S3")
rootCa := getPeerDBAWSEnv(connectorName, "ROOT_CA")
Expand All @@ -238,6 +239,7 @@ func LoadPeerDBAWSEnvConfigProvider(connectorName string) *StaticAWSCredentialsP
AWS: aws.Credentials{
AccessKeyID: accessKeyId,
SecretAccessKey: secretAccessKey,
SessionToken: sessionToken,
},
EndpointUrl: endpointUrlPtr,
}, region, rootCAs, tlsHost)
Expand Down
18 changes: 18 additions & 0 deletions flow/connectors/utils/rds.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,24 @@ func GetRDSToken(ctx context.Context, connConfig RDSConnectionConfig, rdsAuth *R
}()
}

// ForceRefreshToken bypasses the cache TTL and immediately fetches a new RDS IAM token.
// Use this when a connection break requires a guaranteed-fresh token for reconnect.
func (r *RDSAuth) ForceRefreshToken(
ctx context.Context,
connConfig RDSConnectionConfig,
connectorName string,
) (string, error) {
r.lock.Lock()
defer r.lock.Unlock()
token, err := buildRdsToken(ctx, connConfig, BuildPeerAWSCredentials(r.AwsAuthConfig), connectorName)
if err != nil {
return "", err
}
r.token = token
r.updateTime = time.Now()
return token, nil
}

func buildRdsToken(
ctx context.Context,
connConfig RDSConnectionConfig,
Expand Down
Loading