Skip to content

Commit 7640171

Browse files
Updated NewRayClusterClient function logic to utilise RayClusterClientConfig and removed waitForJobStatus function
1 parent 42d52e8 commit 7640171

File tree

1 file changed

+21
-37
lines changed

1 file changed

+21
-37
lines changed

support/ray_cluster_client.go

Lines changed: 21 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,6 @@ import (
2424
"io"
2525
"net/http"
2626
"net/url"
27-
"time"
28-
29-
. "github.com/onsi/gomega"
3027
)
3128

3229
type RayJobSetup struct {
@@ -50,6 +47,8 @@ type RayJobLogsResponse struct {
5047
}
5148

5249
type RayClusterClientConfig struct {
50+
Address string
51+
Client *http.Client
5352
SkipTlsVerification bool
5453
}
5554

@@ -66,15 +65,28 @@ type RayClusterClient interface {
6665
GetJobDetails(jobID string) (*RayJobDetailsResponse, error)
6766
GetJobLogs(jobID string) (string, error)
6867
GetJobs() ([]map[string]interface{}, error)
69-
WaitForJobStatus(test Test, jobID string) string
7068
}
7169

72-
func NewRayClusterClient(dashboardEndpoint url.URL, config RayClusterClientConfig, bearerToken string) RayClusterClient {
73-
tr := &http.Transport{
74-
TLSClientConfig: &tls.Config{InsecureSkipVerify: config.SkipTlsVerification},
75-
Proxy: http.ProxyFromEnvironment,
70+
var rayClusterApiClient RayClusterClient
71+
72+
func NewRayClusterClient(config RayClusterClientConfig, bearerToken string) (RayClusterClient, error) {
73+
if rayClusterApiClient == nil {
74+
if config.Client == nil {
75+
tr := &http.Transport{
76+
TLSClientConfig: &tls.Config{InsecureSkipVerify: config.SkipTlsVerification},
77+
Proxy: http.ProxyFromEnvironment,
78+
}
79+
config.Client = &http.Client{Transport: tr}
80+
}
81+
endpoint, err := url.Parse(config.Address)
82+
if err != nil {
83+
return nil, fmt.Errorf("invalid dashboard endpoint address")
84+
}
85+
rayClusterApiClient = &rayClusterClient{
86+
endpoint: *endpoint, httpClient: config.Client, bearerToken: bearerToken,
87+
}
7688
}
77-
return &rayClusterClient{endpoint: dashboardEndpoint, httpClient: &http.Client{Transport: tr}, bearerToken: bearerToken}
89+
return rayClusterApiClient, nil
7890
}
7991

8092
func (client *rayClusterClient) CreateJob(job *RayJobSetup) (response *RayJobResponse, err error) {
@@ -190,31 +202,3 @@ func (client *rayClusterClient) GetJobLogs(jobID string) (logs string, err error
190202
err = json.Unmarshal(respData, &jobLogs)
191203
return jobLogs.Logs, err
192204
}
193-
194-
func (client *rayClusterClient) WaitForJobStatus(test Test, jobID string) string {
195-
var status string
196-
fmt.Printf("Waiting for job to be Succeeded...\n")
197-
198-
test.Eventually(func() string {
199-
resp, err := client.GetJobDetails(jobID)
200-
test.Expect(err).ToNot(HaveOccurred())
201-
statusVal := resp.Status
202-
if statusVal == "SUCCEEDED" || statusVal == "FAILED" {
203-
fmt.Printf("JobStatus : %s\n", statusVal)
204-
status = statusVal
205-
return status
206-
}
207-
if status != statusVal && statusVal != "SUCCEEDED" {
208-
fmt.Printf("JobStatus : %s...\n", statusVal)
209-
status = statusVal
210-
}
211-
return status
212-
}, TestTimeoutDouble, 3*time.Second).Should(Or(Equal("SUCCEEDED"), Equal("FAILED")), "Job did not complete within the expected time")
213-
214-
if status == "SUCCEEDED" {
215-
fmt.Printf("Job succeeded !\n")
216-
} else {
217-
fmt.Printf("Job failed !\n")
218-
}
219-
return status
220-
}

0 commit comments

Comments
 (0)