Skip to content

Commit 2445eb2

Browse files
authored
[COP-1575 COP-1573] cache protos and other changes (#2018)
1 parent 2ffe949 commit 2445eb2

File tree

8 files changed

+114
-88
lines changed

8 files changed

+114
-88
lines changed

book/src/framework/components/chipingresset/chip_ingress.md

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -60,17 +60,10 @@ if outErr != nil {
6060
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
6161
defer cancel()
6262

63-
// we recommend to use GITHUB_TOKEN with read access to repositories with protos to avoid heavy rate limiting
64-
var client *github.Client
65-
if token := os.Getenv("GITHUB_TOKEN"); token != "" {
66-
ts := oauth2.StaticTokenSource(&oauth2.Token{AccessToken: token})
67-
tc := oauth2.NewClient(ctx, ts)
68-
client = github.NewClient(tc)
69-
} else {
70-
client = github.NewClient(nil)
71-
}
72-
73-
protoErr := chipingressset.DefaultRegisterAndFetchProtos(ctx, client, []chipingressset.RepoConfiguration{
63+
protoErr := chipingressset.DefaultRegisterAndFetchProtos(
64+
ctx,
65+
nil, // GH client will be created dynamically, if needed
66+
[]chipingressset.RepoConfiguration{
7467
{
7568
URI: "https://github.com/smartcontractkit/chainlink-protostractkit",
7669
Ref: "626c42d55bdcb36dffe0077fff58abba40acc3e5",
@@ -123,4 +116,8 @@ for _, schemaSet := range configFiles {
123116

124117
Registration logic is very simple and should handle cases of protos that import other protos as long they are all available in the `ProtoSchemaSet`s provided to the registration function. That function uses an algorithm called "topological sorting by trail", which will try to register all protos in a loop until it cannot register any more protos or it has registered all of them. That allows us to skip dependency parsing completely.
125118

126-
Kafka doesn't have any automatic discoverability mechanism for subject - schema relationship (it has to be provided out-of-band). Currenly, we create the subject in the following way: <subject_prefix>.<package>.<1st-message-name>. Subject prefix is optional and if it's not present, then subject is equal to: <package>.<1st-message-name>. Only the first message in the `.proto` file is ever registered.
119+
Kafka doesn't have any automatic discoverability mechanism for subject - schema relationship (it has to be provided out-of-band). Currenly, we create the subject in the following way: <subject_prefix>.<package>.<1st-message-name>. Subject prefix is optional and if it's not present, then subject is equal to: <package>.<1st-message-name>. Only the first message in the `.proto` file is ever registered.
120+
121+
## Protobuf caching
122+
123+
Once fetched from `https://github.com` protobuf files will be saved in `.local/share/beholder/protobufs/<OWNER>/<REPOSTIORY>/<SHA>` folder and subsequently used. If saving to cache or reading from it fails, we will load files from the original source.
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
- Added caching of protobuf files, to avoid constant download each time Beholder starts
2+
- Removed dynamic port mapping, since external ports are always static
3+
- Red Panda images will be pulled from Dockerhub instead of docker.redpanda.com to allow by-passing rate limiting by logging into Dockerhub

framework/components/dockercompose/chip_ingress_set/chip_ingress.go

Lines changed: 4 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,8 @@ import (
1111

1212
networkTypes "github.com/docker/docker/api/types/network"
1313
"github.com/docker/docker/client"
14-
"github.com/docker/go-connections/nat"
1514
"github.com/pkg/errors"
1615
"github.com/smartcontractkit/chainlink-testing-framework/framework"
17-
"github.com/testcontainers/testcontainers-go"
1816
"github.com/testcontainers/testcontainers-go/modules/compose"
1917
"github.com/testcontainers/testcontainers-go/wait"
2018
)
@@ -166,11 +164,6 @@ func New(in *Input) (*Output, error) {
166164
if chipIngressExternalHostErr != nil {
167165
return nil, errors.Wrap(chipIngressExternalHostErr, "failed to get host for Chip Ingress")
168166
}
169-
// for some magical reason mapped port sometimes cannot be found, even though we wait for it to be ready, when starting the services
170-
chipIngressExternalPort, chipIngressExternalPortErr := findMappdePort(ctx, 31*time.Second, chipIngressContainer, DEFAULT_CHIP_INGRESS_GRPC_PORT)
171-
if chipIngressExternalPortErr != nil {
172-
return nil, errors.Wrap(chipIngressExternalPortErr, "failed to get mapped port for Chip Ingress")
173-
}
174167

175168
redpandaContainer, redpandaErr := stack.ServiceContainer(ctx, DEFAULT_RED_PANDA_SERVICE_NAME)
176169
if redpandaErr != nil {
@@ -181,14 +174,6 @@ func New(in *Input) (*Output, error) {
181174
if redpandaExternalHostErr != nil {
182175
return nil, errors.Wrap(redpandaExternalHostErr, "failed to get host for Red Panda")
183176
}
184-
redpandaExternalKafkaPort, redpandaExternalKafkaPortErr := findMappdePort(ctx, 31*time.Second, redpandaContainer, DEFAULT_RED_PANDA_KAFKA_PORT)
185-
if redpandaExternalKafkaPortErr != nil {
186-
return nil, errors.Wrap(redpandaExternalKafkaPortErr, "failed to get mapped port for Red Panda")
187-
}
188-
redpandaExternalSchemaRegistryPort, redpandaExternalSchemaRegistryPortErr := findMappdePort(ctx, 31*time.Second, redpandaContainer, DEFAULT_RED_PANDA_SCHEMA_REGISTRY_PORT)
189-
if redpandaExternalSchemaRegistryPortErr != nil {
190-
return nil, errors.Wrap(redpandaExternalSchemaRegistryPortErr, "failed to get mapped port for Red Panda")
191-
}
192177

193178
redpandaConsoleContainer, redpandaConsoleErr := stack.ServiceContainer(ctx, DEFAULT_RED_PANDA_CONSOLE_SERVICE_NAME)
194179
if redpandaConsoleErr != nil {
@@ -198,22 +183,18 @@ func New(in *Input) (*Output, error) {
198183
if redpandaExternalConsoleHostErr != nil {
199184
return nil, errors.Wrap(redpandaExternalConsoleHostErr, "failed to get host for Red Panda Console")
200185
}
201-
redpandaExternalConsolePort, redpandaExternalConsolePortErr := findMappdePort(ctx, 31*time.Second, redpandaConsoleContainer, DEFAULT_RED_PANDA_CONSOLE_PORT)
202-
if redpandaExternalConsolePortErr != nil {
203-
return nil, errors.Wrap(redpandaExternalConsolePortErr, "failed to get mapped port for Red Panda Console")
204-
}
205186

206187
output := &Output{
207188
ChipIngress: &ChipIngressOutput{
208189
GRPCInternalURL: fmt.Sprintf("http://%s:%s", DEFAULT_CHIP_INGRESS_SERVICE_NAME, DEFAULT_CHIP_INGRESS_GRPC_PORT),
209-
GRPCExternalURL: fmt.Sprintf("http://%s:%s", chipIngressExternalHost, chipIngressExternalPort.Port()),
190+
GRPCExternalURL: fmt.Sprintf("http://%s:%s", chipIngressExternalHost, DEFAULT_CHIP_INGRESS_GRPC_PORT),
210191
},
211192
RedPanda: &RedPandaOutput{
212193
SchemaRegistryInternalURL: fmt.Sprintf("http://%s:%s", DEFAULT_RED_PANDA_SERVICE_NAME, DEFAULT_RED_PANDA_SCHEMA_REGISTRY_PORT),
213-
SchemaRegistryExternalURL: fmt.Sprintf("http://%s:%s", redpandaExternalHost, redpandaExternalSchemaRegistryPort.Port()),
194+
SchemaRegistryExternalURL: fmt.Sprintf("http://%s:%s", redpandaExternalHost, DEFAULT_RED_PANDA_SCHEMA_REGISTRY_PORT),
214195
KafkaInternalURL: fmt.Sprintf("%s:%s", DEFAULT_RED_PANDA_SERVICE_NAME, DEFAULT_RED_PANDA_KAFKA_PORT),
215-
KafkaExternalURL: fmt.Sprintf("%s:%s", redpandaExternalHost, redpandaExternalKafkaPort.Port()),
216-
ConsoleExternalURL: fmt.Sprintf("http://%s:%s", redpandaExternalConsoleHost, redpandaExternalConsolePort.Port()),
196+
KafkaExternalURL: fmt.Sprintf("%s:%s", redpandaExternalHost, DEFAULT_RED_PANDA_KAFKA_PORT),
197+
ConsoleExternalURL: fmt.Sprintf("http://%s:%s", redpandaExternalConsoleHost, DEFAULT_RED_PANDA_CONSOLE_PORT),
217198
},
218199
}
219200

@@ -252,33 +233,6 @@ func composeFilePath(rawFilePath string) (string, error) {
252233
return tempFile.Name(), nil
253234
}
254235

255-
func findMappdePort(ctx context.Context, timeout time.Duration, container *testcontainers.DockerContainer, port nat.Port) (nat.Port, error) {
256-
forCtx, cancel := context.WithTimeout(ctx, timeout)
257-
defer cancel()
258-
259-
tickerInterval := 5 * time.Second
260-
ticker := time.NewTicker(5 * time.Second)
261-
defer ticker.Stop()
262-
263-
for {
264-
select {
265-
case <-forCtx.Done():
266-
return "", fmt.Errorf("timeout while waiting for mapped port for %s", port)
267-
case <-ticker.C:
268-
portCtx, portCancel := context.WithTimeout(ctx, tickerInterval)
269-
defer portCancel()
270-
mappedPort, mappedPortErr := container.MappedPort(portCtx, port)
271-
if mappedPortErr != nil {
272-
return "", errors.Wrapf(mappedPortErr, "failed to get mapped port for %s", port)
273-
}
274-
if mappedPort.Port() == "" {
275-
return "", fmt.Errorf("mapped port for %s is empty", port)
276-
}
277-
return mappedPort, nil
278-
}
279-
}
280-
}
281-
282236
func connectNetwork(connCtx context.Context, timeout time.Duration, dockerClient *client.Client, containerID, networkName, stackIdentifier string) error {
283237
ticker := time.NewTicker(500 * time.Millisecond)
284238
defer ticker.Stop()

framework/components/dockercompose/chip_ingress_set/docker-compose.yml

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,10 @@ services:
4343
retries: 10
4444

4545
redpanda-0:
46-
# using a specific version of the redpanda image to exclude potentially breaking changes
47-
image: docker.redpanda.com/redpandadata/redpanda:v24.3.16
46+
# modified from the original:
47+
# - pin point a specific version of the redpanda image to exclude potentially breaking changes
48+
# - pull image from dockerhub instead of docker.redpanda.com
49+
image: redpandadata/redpanda:v24.3.16
4850
container_name: redpanda-0
4951
hostname: redpanda-0
5052
command:
@@ -74,9 +76,12 @@ services:
7476
start_period: 1s
7577

7678
redpanda-console:
77-
# using a specific version of the console image to ensure compatibility, because v3.x.x uses incompatible configuration format
79+
7880
container_name: redpanda-console
79-
image: docker.redpanda.com/redpandadata/console:v2.8.6
81+
# modified from the original:
82+
# - pin point a specific version of the console image to ensure compatibility, because v3.x.x uses incompatible configuration format
83+
# - pull image from dockerhub instead of docker.redpanda.com
84+
image: redpandadata/console:v2.8.6
8085
entrypoint: /bin/sh
8186
command: -c 'echo "$$CONSOLE_CONFIG_FILE" > /tmp/config.yml; /app/console'
8287
environment:

framework/components/dockercompose/chip_ingress_set/protos.go

Lines changed: 84 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/google/go-github/v72/github"
1616
"github.com/pkg/errors"
1717
"github.com/smartcontractkit/chainlink-testing-framework/framework"
18+
"golang.org/x/oauth2"
1819
)
1920

2021
type protoFile struct {
@@ -83,8 +84,27 @@ func RegisterAndFetchProtos(ctx context.Context, client *github.Client, protoSch
8384
}
8485
}
8586

87+
ghClientFn := func() *github.Client {
88+
if client != nil {
89+
return client
90+
}
91+
92+
var client *github.Client
93+
94+
if token := os.Getenv("GITHUB_TOKEN"); token != "" {
95+
ts := oauth2.StaticTokenSource(&oauth2.Token{AccessToken: token})
96+
tc := oauth2.NewClient(ctx, ts)
97+
client = github.NewClient(tc)
98+
} else {
99+
framework.L.Warn().Msg("GITHUB_TOKEN is not set, using unauthenticated GitHub client. This may cause rate limiting issues when downloading proto files")
100+
client = github.NewClient(nil)
101+
}
102+
103+
return client
104+
}
105+
86106
for _, protoSchemaSet := range protoSchemaSets {
87-
protos, protosErr := fetchProtoFilesInFolders(ctx, client, protoSchemaSet.URI, protoSchemaSet.Ref, protoSchemaSet.Folders)
107+
protos, protosErr := fetchProtoFilesInFolders(ctx, ghClientFn, protoSchemaSet.URI, protoSchemaSet.Ref, protoSchemaSet.Folders)
88108
if protosErr != nil {
89109
return errors.Wrapf(protosErr, "failed to fetch protos from %s", protoSchemaSet.URI)
90110
}
@@ -167,7 +187,7 @@ func extractTopLevelMessageNamesWithRegex(protoSrc string) ([]string, error) {
167187
}
168188

169189
// Fetches .proto files from a GitHub repo optionally scoped to specific folders. It is recommended to use `*github.Client` with auth token to avoid rate limiting.
170-
func fetchProtoFilesInFolders(ctx context.Context, client *github.Client, uri, ref string, folders []string) ([]protoFile, error) {
190+
func fetchProtoFilesInFolders(ctx context.Context, clientFn func() *github.Client, uri, ref string, folders []string) ([]protoFile, error) {
171191
framework.L.Debug().Msgf("Fetching proto files from %s in folders: %s", uri, strings.Join(folders, ", "))
172192

173193
if strings.HasPrefix(uri, "file://") {
@@ -176,14 +196,20 @@ func fetchProtoFilesInFolders(ctx context.Context, client *github.Client, uri, r
176196

177197
parts := strings.Split(strings.TrimPrefix(uri, "https://"), "/")
178198

179-
return fetchProtosFromGithub(ctx, client, parts[1], parts[2], ref, folders)
199+
return fetchProtosFromGithub(ctx, clientFn, parts[1], parts[2], ref, folders)
180200
}
181201

182-
func fetchProtosFromGithub(ctx context.Context, client *github.Client, owner, repository, ref string, folders []string) ([]protoFile, error) {
183-
if client == nil {
184-
return nil, errors.New("github client cannot be nil")
202+
func fetchProtosFromGithub(ctx context.Context, clientFn func() *github.Client, owner, repository, ref string, folders []string) ([]protoFile, error) {
203+
cachedFiles, found, cacheErr := loadCachedProtoFiles(owner, repository, ref, folders)
204+
if cacheErr != nil {
205+
framework.L.Warn().Msgf("Failed to load cached proto files for %s/%s at ref %s: %v", owner, repository, ref, cacheErr)
206+
}
207+
if cacheErr == nil && found {
208+
framework.L.Debug().Msgf("Using cached proto files for %s/%s at ref %s", owner, repository, ref)
209+
return cachedFiles, nil
185210
}
186211

212+
client := clientFn()
187213
var files []protoFile
188214

189215
sha, shaErr := resolveRefSHA(ctx, client, owner, repository, ref)
@@ -255,9 +281,61 @@ searchLoop:
255281
return nil, fmt.Errorf("no proto files found in %s/%s in folders %s", owner, repository, strings.Join(folders, ", "))
256282
}
257283

284+
saveErr := saveProtoFilesToCache(owner, repository, ref, files)
285+
if saveErr != nil {
286+
framework.L.Warn().Msgf("Failed to save proto files to cache for %s/%s at ref %s: %v", owner, repository, ref, saveErr)
287+
}
288+
258289
return files, nil
259290
}
260291

292+
func loadCachedProtoFiles(owner, repository, ref string, _ []string) ([]protoFile, bool, error) {
293+
cachePath, cacheErr := cacheFilePath(owner, repository, ref)
294+
if cacheErr != nil {
295+
return nil, false, errors.Wrapf(cacheErr, "failed to get cache file path for %s/%s at ref %s", owner, repository, ref)
296+
}
297+
298+
if _, err := os.Stat(cachePath); os.IsNotExist(err) {
299+
return nil, false, nil // cache not found
300+
}
301+
302+
cachedFiles, cachedErr := fetchProtosFromFilesystem("file://"+cachePath, []string{}) // ignore folders since, we already filtered them when fetching from GitHub
303+
if cachedErr != nil {
304+
return nil, false, errors.Wrapf(cachedErr, "failed to load cached proto files from %s", cachePath)
305+
}
306+
307+
return cachedFiles, true, nil
308+
}
309+
310+
func saveProtoFilesToCache(owner, repository, ref string, files []protoFile) error {
311+
cachePath, cacheErr := cacheFilePath(owner, repository, ref)
312+
if cacheErr != nil {
313+
return errors.Wrapf(cacheErr, "failed to get cache file path for %s/%s at ref %s", owner, repository, ref)
314+
}
315+
316+
for _, file := range files {
317+
path := filepath.Join(cachePath, file.Path)
318+
if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil {
319+
return errors.Wrapf(err, "failed to create directory for cache file %s", cachePath)
320+
}
321+
if writeErr := os.WriteFile(path, []byte(file.Content), 0755); writeErr != nil {
322+
return errors.Wrapf(writeErr, "failed to write cached proto files to %s", cachePath)
323+
}
324+
}
325+
326+
framework.L.Debug().Msgf("Saved %d proto files to cache at %s", len(files), cachePath)
327+
328+
return nil
329+
}
330+
331+
func cacheFilePath(owner, repository, ref string) (string, error) {
332+
homeDir, homeErr := os.UserHomeDir()
333+
if homeErr != nil {
334+
return "", errors.Wrap(homeErr, "failed to get user home directory")
335+
}
336+
return filepath.Join(homeDir, ".local", "share", "beholder", "protobufs", owner, repository, ref), nil
337+
}
338+
261339
func fetchProtosFromFilesystem(uri string, folders []string) ([]protoFile, error) {
262340
var files []protoFile
263341

framework/components/dockercompose/go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@ replace github.com/smartcontractkit/chainlink-testing-framework/framework => ../
77
require (
88
github.com/confluentinc/confluent-kafka-go v1.9.2
99
github.com/docker/docker v28.0.4+incompatible
10-
github.com/docker/go-connections v0.5.0
1110
github.com/google/go-github/v72 v72.0.0
1211
github.com/pkg/errors v0.9.1
1312
github.com/smartcontractkit/chainlink-testing-framework/framework v0.0.0-00010101000000-000000000000
1413
github.com/testcontainers/testcontainers-go v0.37.0
1514
github.com/testcontainers/testcontainers-go/modules/compose v0.37.0
15+
golang.org/x/oauth2 v0.25.0
1616
)
1717

1818
require (
@@ -65,6 +65,7 @@ require (
6565
github.com/docker/distribution v2.8.3+incompatible // indirect
6666
github.com/docker/docker-credential-helpers v0.8.2 // indirect
6767
github.com/docker/go v1.5.1-1.0.20160303222718-d30aec9fd63c // indirect
68+
github.com/docker/go-connections v0.5.0 // indirect
6869
github.com/docker/go-metrics v0.0.1 // indirect
6970
github.com/docker/go-units v0.5.0 // indirect
7071
github.com/ebitengine/purego v0.8.2 // indirect
@@ -201,7 +202,6 @@ require (
201202
golang.org/x/crypto v0.37.0 // indirect
202203
golang.org/x/exp v0.0.0-20241108190413-2d47ceb2692f // indirect
203204
golang.org/x/net v0.38.0 // indirect
204-
golang.org/x/oauth2 v0.25.0 // indirect
205205
golang.org/x/sync v0.13.0 // indirect
206206
golang.org/x/sys v0.32.0 // indirect
207207
golang.org/x/term v0.31.0 // indirect

framework/examples/myproject/go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ require (
1515
github.com/blocto/solana-go-sdk v1.30.0
1616
github.com/ethereum/go-ethereum v1.15.0
1717
github.com/go-resty/resty/v2 v2.16.5
18-
github.com/google/go-github/v72 v72.0.0
1918
github.com/smartcontractkit/chainlink-testing-framework/framework v0.8.9
2019
github.com/smartcontractkit/chainlink-testing-framework/framework/components/dockercompose v0.0.0-00010101000000-000000000000
2120
github.com/smartcontractkit/chainlink-testing-framework/framework/components/fake v0.0.0-20250707095700-c7855f06ddd1
@@ -58,6 +57,7 @@ require (
5857
github.com/go-ini/ini v1.67.0 // indirect
5958
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect
6059
github.com/gofrs/flock v0.12.1 // indirect
60+
github.com/google/go-github/v72 v72.0.0 // indirect
6161
github.com/google/go-querystring v1.1.0 // indirect
6262
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
6363
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 // indirect
@@ -382,7 +382,7 @@ require (
382382
golang.org/x/exp v0.0.0-20250106191152-7588d65b2ba8 // indirect
383383
golang.org/x/mod v0.22.0 // indirect
384384
golang.org/x/net v0.40.0 // indirect
385-
golang.org/x/oauth2 v0.27.0
385+
golang.org/x/oauth2 v0.27.0 // indirect
386386
golang.org/x/sync v0.14.0 // indirect
387387
golang.org/x/sys v0.33.0 // indirect
388388
golang.org/x/term v0.32.0 // indirect

framework/examples/myproject/smoke_chip_ingress_test.go

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,9 @@ import (
66
"testing"
77
"time"
88

9-
"github.com/google/go-github/v72/github"
109
"github.com/smartcontractkit/chainlink-testing-framework/framework"
1110
chipingressset "github.com/smartcontractkit/chainlink-testing-framework/framework/components/dockercompose/chip_ingress_set"
1211
"github.com/stretchr/testify/require"
13-
"golang.org/x/oauth2"
1412
)
1513

1614
type ChipConfig struct {
@@ -33,19 +31,10 @@ func TestChipIngressSmoke(t *testing.T) {
3331
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
3432
defer cancel()
3533

36-
var client *github.Client
37-
if token := os.Getenv("GITHUB_TOKEN"); token != "" {
38-
ts := oauth2.StaticTokenSource(&oauth2.Token{AccessToken: token})
39-
tc := oauth2.NewClient(ctx, ts)
40-
client = github.NewClient(tc)
41-
} else {
42-
client = github.NewClient(nil)
43-
}
44-
4534
createTopicsErr := chipingressset.CreateTopics(ctx, out.RedPanda.KafkaExternalURL, []string{"cre"})
4635
require.NoError(t, createTopicsErr, "failed to create topics")
4736

48-
err := chipingressset.DefaultRegisterAndFetchProtos(ctx, client, []chipingressset.ProtoSchemaSet{
37+
err := chipingressset.DefaultRegisterAndFetchProtos(ctx, nil, []chipingressset.ProtoSchemaSet{
4938
{
5039
URI: "https://github.com/smartcontractkit/chainlink-protos",
5140
Ref: "95decc005a91a1fd2621af9d9f00cb36d8061067",

0 commit comments

Comments
 (0)