From 1ce604058930261dd37259d7215cfa847ac132cc Mon Sep 17 00:00:00 2001 From: Martynas Asipauskas Date: Fri, 30 Aug 2024 13:14:23 +0100 Subject: [PATCH 1/3] Fix Bearer auth to work when authenticating against K8S from Airflow Operator (#3897) --- third_party/airflow/armada/log_manager.py | 5 +---- third_party/airflow/pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/third_party/airflow/armada/log_manager.py b/third_party/airflow/armada/log_manager.py index 0a94ddeabbb..b7c763eb085 100644 --- a/third_party/airflow/armada/log_manager.py +++ b/third_party/airflow/armada/log_manager.py @@ -47,15 +47,12 @@ def _k8s_client(self, k8s_context) -> client.CoreV1Api: k8s_client = client.CoreV1Api( api_client=client.ApiClient(configuration=configuration) ) - k8s_client.api_client.configuration.api_key_prefix["authorization"] = ( - "Bearer" - ) KubernetesPodLogManager.CLIENTS[k8s_context] = k8s_client return KubernetesPodLogManager.CLIENTS[k8s_context] def _with_bearer_auth(self, client): client.api_client.configuration.api_key["authorization"] = ( - self._token_retriever.get_token() + f"Bearer {self._token_retriever.get_token()}" ) def fetch_container_logs( diff --git a/third_party/airflow/pyproject.toml b/third_party/airflow/pyproject.toml index 3c8471bde44..76229ca7e73 100644 --- a/third_party/airflow/pyproject.toml +++ b/third_party/airflow/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "armada_airflow" -version = "1.0.2" +version = "1.0.3" description = "Armada Airflow Operator" readme='README.md' authors = [{name = "Armada-GROSS", email = "armada@armadaproject.io"}] From 2573a91b6f432d3c659ef65ea881fb1efd5496cb Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Mon, 2 Sep 2024 18:59:27 +0300 Subject: [PATCH 2/3] Remove Armada UUIDs from Event Ingester and Event Api (#3886) * wip Signed-off-by: Chris Martin * wip Signed-off-by: Chris Martin * wip Signed-off-by: Chris Martin --------- Signed-off-by: Chris Martin Co-authored-by: JamesMurkin --- internal/common/eventutil/eventutil.go | 8 +- .../eventingester/convert/conversions_test.go | 21 ++- .../server/event/conversion/conversions.go | 149 ++++-------------- .../event/conversion/conversions_test.go | 85 +++++----- .../server/event/event_repository_test.go | 15 +- internal/server/event/event_test.go | 20 +-- 6 files changed, 87 insertions(+), 211 deletions(-) diff --git a/internal/common/eventutil/eventutil.go b/internal/common/eventutil/eventutil.go index 9c808403a02..a5eb49f9b61 100644 --- a/internal/common/eventutil/eventutil.go +++ b/internal/common/eventutil/eventutil.go @@ -83,12 +83,6 @@ func ShortSequenceString(sequence *armadaevents.EventSequence) string { // ApiJobFromLogSubmitJob converts a SubmitJob log message into an api.Job struct, which is used by Armada internally. func ApiJobFromLogSubmitJob(ownerId string, groups []string, queueName string, jobSetName string, time time.Time, e *armadaevents.SubmitJob) (*api.Job, error) { - jobId, err := armadaevents.UlidStringFromProtoUuid(e.JobId) - if err != nil { - err = errors.WithStack(err) - return nil, err - } - if e == nil || e.MainObject == nil || e.MainObject.Object == nil { return nil, errors.Errorf("SubmitJob or one of its member pointers is nil") } @@ -146,7 +140,7 @@ func ApiJobFromLogSubmitJob(ownerId string, groups []string, queueName string, j } return &api.Job{ - Id: jobId, + Id: e.JobIdStr, ClientId: e.DeduplicationId, Queue: queueName, JobSetId: jobSetName, diff --git a/internal/eventingester/convert/conversions_test.go b/internal/eventingester/convert/conversions_test.go index ca36b11a812..d9e52e03bcc 100644 --- a/internal/eventingester/convert/conversions_test.go +++ b/internal/eventingester/convert/conversions_test.go @@ -7,7 +7,6 @@ import ( "github.com/apache/pulsar-client-go/pulsar" "github.com/gogo/protobuf/proto" - "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/armadaproject/armada/internal/common/armadacontext" @@ -26,8 +25,6 @@ const ( ) var ( - jobIdProto, _ = armadaevents.ProtoUuidFromUlidString(jobIdString) - runIdProto = armadaevents.ProtoUuidFromUuid(uuid.MustParse(runIdString)) baseTime, _ = time.Parse("2006-01-02T15:04:05.000Z", "2022-03-01T15:04:05.000Z") baseTimeProto = protoutil.ToTimestamp(baseTime) ) @@ -37,8 +34,8 @@ var jobRunSucceeded = &armadaevents.EventSequence_Event{ Created: baseTimeProto, Event: &armadaevents.EventSequence_Event_JobRunSucceeded{ JobRunSucceeded: &armadaevents.JobRunSucceeded{ - RunId: runIdProto, - JobId: jobIdProto, + RunIdStr: runIdString, + JobIdStr: jobIdString, }, }, } @@ -48,7 +45,7 @@ var cancelled = &armadaevents.EventSequence_Event{ Created: baseTimeProto, Event: &armadaevents.EventSequence_Event_CancelledJob{ CancelledJob: &armadaevents.CancelledJob{ - JobId: jobIdProto, + JobIdStr: jobIdString, }, }, } @@ -93,8 +90,8 @@ func TestCancelled(t *testing.T) { Created: baseTimeProto, Event: &armadaevents.EventSequence_Event_CancelJob{ CancelJob: &armadaevents.CancelJob{ - JobId: jobIdProto, - Reason: "some reason 1", + JobIdStr: jobIdString, + Reason: "some reason 1", }, }, }, &armadaevents.EventSequence_Event{ @@ -108,8 +105,8 @@ func TestCancelled(t *testing.T) { Created: baseTimeProto, Event: &armadaevents.EventSequence_Event_CancelledJob{ CancelledJob: &armadaevents.CancelledJob{ - JobId: jobIdProto, - Reason: "some reason 3", + JobIdStr: jobIdString, + Reason: "some reason 3", }, }, }) @@ -124,7 +121,7 @@ func TestCancelled(t *testing.T) { Created: baseTimeProto, Event: &armadaevents.EventSequence_Event_CancelJob{ CancelJob: &armadaevents.CancelJob{ - JobId: jobIdProto, + JobIdStr: jobIdString, }, }, }, @@ -138,7 +135,7 @@ func TestCancelled(t *testing.T) { Created: baseTimeProto, Event: &armadaevents.EventSequence_Event_CancelledJob{ CancelledJob: &armadaevents.CancelledJob{ - JobId: jobIdProto, + JobIdStr: jobIdString, }, }, }, diff --git a/internal/server/event/conversion/conversions.go b/internal/server/event/conversion/conversions.go index 02200dbde80..9698cffa22a 100644 --- a/internal/server/event/conversion/conversions.go +++ b/internal/server/event/conversion/conversions.go @@ -75,18 +75,13 @@ func FromEventSequence(es *armadaevents.EventSequence) ([]*api.EventMessage, err } func FromInternalSubmit(owner string, groups []string, queue string, jobSet string, time time.Time, e *armadaevents.SubmitJob) ([]*api.EventMessage, error) { - jobId, err := armadaevents.UlidStringFromProtoUuid(e.JobId) - if err != nil { - return nil, err - } - job, err := eventutil.ApiJobFromLogSubmitJob(owner, groups, queue, jobSet, time, e) if err != nil { return nil, err } submitEvent := &api.JobSubmittedEvent{ - JobId: jobId, + JobId: e.JobIdStr, JobSetId: jobSet, Queue: queue, Created: protoutil.ToTimestamp(time), @@ -94,7 +89,7 @@ func FromInternalSubmit(owner string, groups []string, queue string, jobSet stri } queuedEvent := &api.JobQueuedEvent{ - JobId: jobId, + JobId: e.JobIdStr, JobSetId: jobSet, Queue: queue, Created: protoutil.ToTimestamp(time), @@ -115,16 +110,11 @@ func FromInternalSubmit(owner string, groups []string, queue string, jobSet stri } func FromInternalPreemptionRequested(userId string, queueName string, jobSetName string, time time.Time, e *armadaevents.JobPreemptionRequested) ([]*api.EventMessage, error) { - jobId, err := armadaevents.UlidStringFromProtoUuid(e.JobId) - if err != nil { - return nil, err - } - return []*api.EventMessage{ { Events: &api.EventMessage_Preempting{ Preempting: &api.JobPreemptingEvent{ - JobId: jobId, + JobId: e.JobIdStr, JobSetId: jobSetName, Queue: queueName, Created: protoutil.ToTimestamp(time), @@ -136,16 +126,11 @@ func FromInternalPreemptionRequested(userId string, queueName string, jobSetName } func FromInternalCancel(userId string, queueName string, jobSetName string, time time.Time, e *armadaevents.CancelJob) ([]*api.EventMessage, error) { - jobId, err := armadaevents.UlidStringFromProtoUuid(e.JobId) - if err != nil { - return nil, err - } - return []*api.EventMessage{ { Events: &api.EventMessage_Cancelling{ Cancelling: &api.JobCancellingEvent{ - JobId: jobId, + JobId: e.JobIdStr, JobSetId: jobSetName, Queue: queueName, Created: protoutil.ToTimestamp(time), @@ -157,16 +142,11 @@ func FromInternalCancel(userId string, queueName string, jobSetName string, time } func FromInternalCancelled(userId string, queueName string, jobSetName string, time time.Time, e *armadaevents.CancelledJob) ([]*api.EventMessage, error) { - jobId, err := armadaevents.UlidStringFromProtoUuid(e.JobId) - if err != nil { - return nil, err - } - return []*api.EventMessage{ { Events: &api.EventMessage_Cancelled{ Cancelled: &api.JobCancelledEvent{ - JobId: jobId, + JobId: e.JobIdStr, JobSetId: jobSetName, Queue: queueName, Created: protoutil.ToTimestamp(time), @@ -178,15 +158,11 @@ func FromInternalCancelled(userId string, queueName string, jobSetName string, t } func FromInternalReprioritiseJob(userId string, queueName string, jobSetName string, time time.Time, e *armadaevents.ReprioritiseJob) ([]*api.EventMessage, error) { - jobId, err := armadaevents.UlidStringFromProtoUuid(e.JobId) - if err != nil { - return nil, err - } return []*api.EventMessage{ { Events: &api.EventMessage_Reprioritizing{ Reprioritizing: &api.JobReprioritizingEvent{ - JobId: jobId, + JobId: e.JobIdStr, JobSetId: jobSetName, Queue: queueName, Created: protoutil.ToTimestamp(time), @@ -199,15 +175,11 @@ func FromInternalReprioritiseJob(userId string, queueName string, jobSetName str } func FromInternalReprioritisedJob(userId string, queueName string, jobSetName string, time time.Time, e *armadaevents.ReprioritisedJob) ([]*api.EventMessage, error) { - jobId, err := armadaevents.UlidStringFromProtoUuid(e.JobId) - if err != nil { - return nil, err - } return []*api.EventMessage{ { Events: &api.EventMessage_Reprioritized{ Reprioritized: &api.JobReprioritizedEvent{ - JobId: jobId, + JobId: e.JobIdStr, JobSetId: jobSetName, Queue: queueName, Created: protoutil.ToTimestamp(time), @@ -220,15 +192,11 @@ func FromInternalReprioritisedJob(userId string, queueName string, jobSetName st } func FromInternalLogJobRunLeased(queueName string, jobSetName string, time time.Time, e *armadaevents.JobRunLeased) ([]*api.EventMessage, error) { - jobId, err := armadaevents.UlidStringFromProtoUuid(e.JobId) - if err != nil { - return nil, err - } return []*api.EventMessage{ { Events: &api.EventMessage_Leased{ Leased: &api.JobLeasedEvent{ - JobId: jobId, + JobId: e.JobIdStr, JobSetId: jobSetName, Queue: queueName, Created: protoutil.ToTimestamp(time), @@ -240,13 +208,8 @@ func FromInternalLogJobRunLeased(queueName string, jobSetName string, time time. } func FromInternalJobSucceeded(queueName string, jobSetName string, time time.Time, e *armadaevents.JobSucceeded) ([]*api.EventMessage, error) { - jobId, err := armadaevents.UlidStringFromProtoUuid(e.JobId) - if err != nil { - return nil, err - } - apiEvent := &api.JobSucceededEvent{ - JobId: jobId, + JobId: e.JobIdStr, JobSetId: jobSetName, Queue: queueName, Created: protoutil.ToTimestamp(time), @@ -272,11 +235,6 @@ func FromInternalJobSucceeded(queueName string, jobSetName string, time time.Tim } func FromInternalJobRunErrors(queueName string, jobSetName string, time time.Time, e *armadaevents.JobRunErrors) ([]*api.EventMessage, error) { - jobId, err := armadaevents.UlidStringFromProtoUuid(e.JobId) - if err != nil { - return nil, err - } - events := make([]*api.EventMessage, 0) for _, msgErr := range e.GetErrors() { switch reason := msgErr.Reason.(type) { @@ -284,7 +242,7 @@ func FromInternalJobRunErrors(queueName string, jobSetName string, time time.Tim event := &api.EventMessage{ Events: &api.EventMessage_LeaseExpired{ LeaseExpired: &api.JobLeaseExpiredEvent{ - JobId: jobId, + JobId: e.JobIdStr, JobSetId: jobSetName, Queue: queueName, Created: protoutil.ToTimestamp(time), @@ -297,7 +255,7 @@ func FromInternalJobRunErrors(queueName string, jobSetName string, time time.Tim event := &api.EventMessage{ Events: &api.EventMessage_UnableToSchedule{ UnableToSchedule: &api.JobUnableToScheduleEvent{ - JobId: jobId, + JobId: e.JobIdStr, ClusterId: objectMeta.GetExecutorId(), PodNamespace: objectMeta.GetNamespace(), PodName: objectMeta.GetName(), @@ -317,7 +275,7 @@ func FromInternalJobRunErrors(queueName string, jobSetName string, time time.Tim event := &api.EventMessage{ Events: &api.EventMessage_LeaseReturned{ LeaseReturned: &api.JobLeaseReturnedEvent{ - JobId: jobId, + JobId: e.JobIdStr, JobSetId: jobSetName, Queue: queueName, Created: protoutil.ToTimestamp(time), @@ -338,10 +296,6 @@ func FromInternalJobRunErrors(queueName string, jobSetName string, time time.Tim } func FromInternalJobErrors(queueName string, jobSetName string, time time.Time, e *armadaevents.JobErrors) ([]*api.EventMessage, error) { - jobId, err := armadaevents.UlidStringFromProtoUuid(e.JobId) - if err != nil { - return nil, err - } events := make([]*api.EventMessage, 0) for _, msgErr := range e.GetErrors() { if !msgErr.Terminal { @@ -351,7 +305,7 @@ func FromInternalJobErrors(queueName string, jobSetName string, time time.Time, case *armadaevents.Error_PodError: event := &api.EventMessage{ Events: &api.EventMessage_Failed{ - Failed: makeJobFailed(jobId, queueName, jobSetName, time, reason), + Failed: makeJobFailed(e.JobIdStr, queueName, jobSetName, time, reason), }, } events = append(events, event) @@ -359,7 +313,7 @@ func FromInternalJobErrors(queueName string, jobSetName string, time time.Time, event := &api.EventMessage{ Events: &api.EventMessage_Failed{ Failed: &api.JobFailedEvent{ - JobId: jobId, + JobId: e.JobIdStr, JobSetId: jobSetName, Queue: queueName, Created: protoutil.ToTimestamp(time), @@ -372,7 +326,7 @@ func FromInternalJobErrors(queueName string, jobSetName string, time time.Time, event := &api.EventMessage{ Events: &api.EventMessage_Failed{ Failed: &api.JobFailedEvent{ - JobId: jobId, + JobId: e.JobIdStr, JobSetId: jobSetName, Queue: queueName, Created: protoutil.ToTimestamp(time), @@ -385,7 +339,7 @@ func FromInternalJobErrors(queueName string, jobSetName string, time time.Time, event := &api.EventMessage{ Events: &api.EventMessage_Failed{ Failed: &api.JobFailedEvent{ - JobId: jobId, + JobId: e.JobIdStr, JobSetId: jobSetName, Queue: queueName, Created: protoutil.ToTimestamp(time), @@ -398,7 +352,7 @@ func FromInternalJobErrors(queueName string, jobSetName string, time time.Time, event := &api.EventMessage{ Events: &api.EventMessage_Failed{ Failed: &api.JobFailedEvent{ - JobId: jobId, + JobId: e.JobIdStr, JobSetId: jobSetName, Queue: queueName, Created: protoutil.ToTimestamp(time), @@ -409,11 +363,11 @@ func FromInternalJobErrors(queueName string, jobSetName string, time time.Time, } events = append(events, event) default: - log.Warnf("unknown error %T for job %s", reason, jobId) + log.Warnf("unknown error %T for job %s", reason, e.JobIdStr) event := &api.EventMessage{ Events: &api.EventMessage_Failed{ Failed: &api.JobFailedEvent{ - JobId: jobId, + JobId: e.JobIdStr, JobSetId: jobSetName, Queue: queueName, Created: protoutil.ToTimestamp(time), @@ -427,13 +381,8 @@ func FromInternalJobErrors(queueName string, jobSetName string, time time.Time, } func FromInternalJobRunRunning(queueName string, jobSetName string, time time.Time, e *armadaevents.JobRunRunning) ([]*api.EventMessage, error) { - jobId, err := armadaevents.UlidStringFromProtoUuid(e.JobId) - if err != nil { - return nil, err - } - apiEvent := &api.JobRunningEvent{ - JobId: jobId, + JobId: e.JobIdStr, JobSetId: jobSetName, Queue: queueName, Created: protoutil.ToTimestamp(time), @@ -459,13 +408,8 @@ func FromInternalJobRunRunning(queueName string, jobSetName string, time time.Ti } func FromInternalJobRunAssigned(queueName string, jobSetName string, time time.Time, e *armadaevents.JobRunAssigned) ([]*api.EventMessage, error) { - jobId, err := armadaevents.UlidStringFromProtoUuid(e.JobId) - if err != nil { - return nil, err - } - apiEvent := &api.JobPendingEvent{ - JobId: jobId, + JobId: e.JobIdStr, JobSetId: jobSetName, Queue: queueName, Created: protoutil.ToTimestamp(time), @@ -494,39 +438,12 @@ func FromInternalJobRunPreempted(queueName string, jobSetName string, time time. return nil, nil } - jobId, err := armadaevents.UlidStringFromProtoUuid(e.PreemptedJobId) - if err != nil { - return nil, err - } - runId, err := armadaevents.UuidStringFromProtoUuid(e.PreemptedRunId) - if err != nil { - return nil, err - } - - preemptiveJobId := "" - preemptiveRunId := "" - - if e.PreemptiveJobId != nil { - preemptiveJobId, err = armadaevents.UlidStringFromProtoUuid(e.PreemptiveJobId) - if err != nil { - return nil, err - } - } - if e.PreemptiveRunId != nil { - preemptiveRunId, err = armadaevents.UuidStringFromProtoUuid(e.PreemptiveRunId) - if err != nil { - return nil, err - } - } - apiEvent := &api.JobPreemptedEvent{ - JobId: jobId, - JobSetId: jobSetName, - Queue: queueName, - Created: protoutil.ToTimestamp(time), - RunId: runId, - PreemptiveJobId: preemptiveJobId, - PreemptiveRunId: preemptiveRunId, + JobId: e.PreemptedJobIdStr, + JobSetId: jobSetName, + Queue: queueName, + Created: protoutil.ToTimestamp(time), + RunId: e.PreemptedRunIdStr, } return []*api.EventMessage{ @@ -539,13 +456,8 @@ func FromInternalJobRunPreempted(queueName string, jobSetName string, time time. } func FromInternalResourceUtilisation(queueName string, jobSetName string, time time.Time, e *armadaevents.ResourceUtilisation) ([]*api.EventMessage, error) { - jobId, err := armadaevents.UlidStringFromProtoUuid(e.JobId) - if err != nil { - return nil, err - } - apiEvent := &api.JobUtilisationEvent{ - JobId: jobId, + JobId: e.JobIdStr, JobSetId: jobSetName, Queue: queueName, Created: protoutil.ToTimestamp(time), @@ -569,13 +481,8 @@ func FromInternalResourceUtilisation(queueName string, jobSetName string, time t } func FromInternalStandaloneIngressInfo(queueName string, jobSetName string, time time.Time, e *armadaevents.StandaloneIngressInfo) ([]*api.EventMessage, error) { - jobId, err := armadaevents.UlidStringFromProtoUuid(e.JobId) - if err != nil { - return nil, err - } - apiEvent := &api.JobIngressInfoEvent{ - JobId: jobId, + JobId: e.JobIdStr, JobSetId: jobSetName, Queue: queueName, Created: protoutil.ToTimestamp(time), diff --git a/internal/server/event/conversion/conversions_test.go b/internal/server/event/conversion/conversions_test.go index fbe3ef0c840..625f1e21fc5 100644 --- a/internal/server/event/conversion/conversions_test.go +++ b/internal/server/event/conversion/conversions_test.go @@ -5,7 +5,6 @@ import ( "time" "github.com/gogo/protobuf/proto" - "github.com/google/uuid" "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" v11 "k8s.io/api/networking/v1" @@ -18,19 +17,13 @@ import ( ) const ( - jobIdString = "01f3j0g1md4qx7z5qb148qnh4r" - preemptiveJobIdString = "02f3j0g1md4qx7z5qb148qnh4r" - runIdString = "123e4567-e89b-12d3-a456-426614174000" - preemptiveRunIdString = "123e4567-e89b-12d3-a456-426614174001" + jobIdString = "01f3j0g1md4qx7z5qb148qnh4r" + runIdString = "123e4567-e89b-12d3-a456-426614174000" ) var ( - jobIdProto, _ = armadaevents.ProtoUuidFromUlidString(jobIdString) - runIdProto = armadaevents.ProtoUuidFromUuid(uuid.MustParse(runIdString)) - preemptiveJobIdProto, _ = armadaevents.ProtoUuidFromUlidString(preemptiveJobIdString) - preemptiveRunIdRunIdProto = armadaevents.ProtoUuidFromUuid(uuid.MustParse(preemptiveRunIdString)) - baseTime, _ = time.Parse("2006-01-02T15:04:05.000Z", "2022-03-01T15:04:05.000Z") - baseTimeProto = protoutil.ToTimestamp(baseTime) + baseTime, _ = time.Parse("2006-01-02T15:04:05.000Z", "2022-03-01T15:04:05.000Z") + baseTimeProto = protoutil.ToTimestamp(baseTime) ) const ( @@ -51,7 +44,7 @@ func TestConvertSubmitted(t *testing.T) { Created: baseTimeProto, Event: &armadaevents.EventSequence_Event_SubmitJob{ SubmitJob: &armadaevents.SubmitJob{ - JobId: jobIdProto, + JobIdStr: jobIdString, Priority: priority, ObjectMeta: &armadaevents.ObjectMeta{ Namespace: namespace, @@ -126,7 +119,7 @@ func TestConvertCancel(t *testing.T) { Created: baseTimeProto, Event: &armadaevents.EventSequence_Event_CancelJob{ CancelJob: &armadaevents.CancelJob{ - JobId: jobIdProto, + JobIdStr: jobIdString, }, }, } @@ -155,7 +148,7 @@ func TestConvertCancelled(t *testing.T) { Created: baseTimeProto, Event: &armadaevents.EventSequence_Event_CancelledJob{ CancelledJob: &armadaevents.CancelledJob{ - JobId: jobIdProto, + JobIdStr: jobIdString, }, }, } @@ -184,7 +177,7 @@ func TestConvertReprioritising(t *testing.T) { Created: baseTimeProto, Event: &armadaevents.EventSequence_Event_ReprioritiseJob{ ReprioritiseJob: &armadaevents.ReprioritiseJob{ - JobId: jobIdProto, + JobIdStr: jobIdString, }, }, } @@ -213,7 +206,7 @@ func TestConvertReprioritised(t *testing.T) { Created: baseTimeProto, Event: &armadaevents.EventSequence_Event_ReprioritisedJob{ ReprioritisedJob: &armadaevents.ReprioritisedJob{ - JobId: jobIdProto, + JobIdStr: jobIdString, }, }, } @@ -242,7 +235,7 @@ func TestConvertLeased(t *testing.T) { Created: baseTimeProto, Event: &armadaevents.EventSequence_Event_JobRunLeased{ JobRunLeased: &armadaevents.JobRunLeased{ - JobId: jobIdProto, + JobIdStr: jobIdString, ExecutorId: executorId, PodRequirementsOverlay: &schedulerobjects.PodRequirements{ Tolerations: []v1.Toleration{ @@ -281,8 +274,8 @@ func TestConvertLeaseExpired(t *testing.T) { Created: baseTimeProto, Event: &armadaevents.EventSequence_Event_JobRunErrors{ JobRunErrors: &armadaevents.JobRunErrors{ - JobId: jobIdProto, - RunId: runIdProto, + JobIdStr: jobIdString, + RunIdStr: runIdString, Errors: []*armadaevents.Error{ { Terminal: true, @@ -318,8 +311,8 @@ func TestConvertPodUnschedulable(t *testing.T) { Created: baseTimeProto, Event: &armadaevents.EventSequence_Event_JobRunErrors{ JobRunErrors: &armadaevents.JobRunErrors{ - JobId: jobIdProto, - RunId: runIdProto, + JobIdStr: jobIdString, + RunIdStr: runIdString, Errors: []*armadaevents.Error{ { Terminal: false, @@ -372,8 +365,8 @@ func TestConvertPodLeaseReturned(t *testing.T) { Created: baseTimeProto, Event: &armadaevents.EventSequence_Event_JobRunErrors{ JobRunErrors: &armadaevents.JobRunErrors{ - JobId: jobIdProto, - RunId: runIdProto, + JobIdStr: jobIdString, + RunIdStr: runIdString, Errors: []*armadaevents.Error{ { Terminal: true, @@ -424,7 +417,7 @@ func TestConvertJobError(t *testing.T) { Created: baseTimeProto, Event: &armadaevents.EventSequence_Event_JobErrors{ JobErrors: &armadaevents.JobErrors{ - JobId: jobIdProto, + JobIdStr: jobIdString, Errors: []*armadaevents.Error{ { Terminal: true, @@ -463,7 +456,7 @@ func TestConvertJobError(t *testing.T) { Created: baseTimeProto, Event: &armadaevents.EventSequence_Event_JobErrors{ JobErrors: &armadaevents.JobErrors{ - JobId: jobIdProto, + JobIdStr: jobIdString, Errors: []*armadaevents.Error{ { Terminal: true, @@ -530,7 +523,7 @@ func TestConvertJobSucceeded(t *testing.T) { Created: baseTimeProto, Event: &armadaevents.EventSequence_Event_JobSucceeded{ JobSucceeded: &armadaevents.JobSucceeded{ - JobId: jobIdProto, + JobIdStr: jobIdString, ResourceInfos: []*armadaevents.KubernetesResourceInfo{ { ObjectMeta: &armadaevents.ObjectMeta{ @@ -580,8 +573,8 @@ func TestConvertJobRunning(t *testing.T) { Created: baseTimeProto, Event: &armadaevents.EventSequence_Event_JobRunRunning{ JobRunRunning: &armadaevents.JobRunRunning{ - RunId: runIdProto, - JobId: jobIdProto, + RunIdStr: runIdString, + JobIdStr: jobIdString, ResourceInfos: []*armadaevents.KubernetesResourceInfo{ { ObjectMeta: &armadaevents.ObjectMeta{ @@ -631,8 +624,8 @@ func TestIgnoredEventDoesntDuplicate(t *testing.T) { Created: baseTimeProto, Event: &armadaevents.EventSequence_Event_JobRunErrors{ JobRunErrors: &armadaevents.JobRunErrors{ - JobId: jobIdProto, - RunId: runIdProto, + JobIdStr: jobIdString, + RunIdStr: runIdString, Errors: []*armadaevents.Error{ { Terminal: true, @@ -675,8 +668,8 @@ func TestConvertJobAssigned(t *testing.T) { Created: baseTimeProto, Event: &armadaevents.EventSequence_Event_JobRunAssigned{ JobRunAssigned: &armadaevents.JobRunAssigned{ - RunId: runIdProto, - JobId: jobIdProto, + RunIdStr: runIdString, + JobIdStr: jobIdString, ResourceInfos: []*armadaevents.KubernetesResourceInfo{ { ObjectMeta: &armadaevents.ObjectMeta{ @@ -725,8 +718,8 @@ func TestConvertResourceUtilisation(t *testing.T) { Created: baseTimeProto, Event: &armadaevents.EventSequence_Event_ResourceUtilisation{ ResourceUtilisation: &armadaevents.ResourceUtilisation{ - RunId: runIdProto, - JobId: jobIdProto, + RunIdStr: runIdString, + JobIdStr: jobIdString, ResourceInfo: &armadaevents.KubernetesResourceInfo{ ObjectMeta: &armadaevents.ObjectMeta{ ExecutorId: executorId, @@ -790,8 +783,8 @@ func TestConvertIngressInfo(t *testing.T) { Created: baseTimeProto, Event: &armadaevents.EventSequence_Event_StandaloneIngressInfo{ StandaloneIngressInfo: &armadaevents.StandaloneIngressInfo{ - RunId: runIdProto, - JobId: jobIdProto, + RunIdStr: runIdString, + JobIdStr: jobIdString, ObjectMeta: &armadaevents.ObjectMeta{ ExecutorId: executorId, Namespace: namespace, @@ -841,7 +834,7 @@ func TestConvertJobPreemptionRequested(t *testing.T) { Created: baseTimeProto, Event: &armadaevents.EventSequence_Event_JobPreemptionRequested{ JobPreemptionRequested: &armadaevents.JobPreemptionRequested{ - JobId: jobIdProto, + JobIdStr: jobIdString, }, }, } @@ -870,10 +863,8 @@ func TestConvertJobRunPreempted(t *testing.T) { Created: baseTimeProto, Event: &armadaevents.EventSequence_Event_JobRunPreempted{ JobRunPreempted: &armadaevents.JobRunPreempted{ - PreemptedJobId: jobIdProto, - PreemptedRunId: runIdProto, - PreemptiveJobId: preemptiveJobIdProto, - PreemptiveRunId: preemptiveRunIdRunIdProto, + PreemptedJobIdStr: jobIdString, + PreemptedRunIdStr: runIdString, }, }, } @@ -882,13 +873,11 @@ func TestConvertJobRunPreempted(t *testing.T) { { Events: &api.EventMessage_Preempted{ Preempted: &api.JobPreemptedEvent{ - JobId: jobIdString, - JobSetId: jobSetName, - Queue: queue, - Created: protoutil.ToTimestamp(baseTime), - RunId: runIdString, - PreemptiveJobId: preemptiveJobIdString, - PreemptiveRunId: preemptiveRunIdString, + JobId: jobIdString, + JobSetId: jobSetName, + Queue: queue, + Created: protoutil.ToTimestamp(baseTime), + RunId: runIdString, }, }, }, diff --git a/internal/server/event/event_repository_test.go b/internal/server/event/event_repository_test.go index e21ea7e0047..36012050795 100644 --- a/internal/server/event/event_repository_test.go +++ b/internal/server/event/event_repository_test.go @@ -5,7 +5,6 @@ import ( "time" "github.com/gogo/protobuf/proto" - "github.com/google/uuid" "github.com/redis/go-redis/v9" "github.com/stretchr/testify/assert" @@ -23,8 +22,6 @@ const ( ) var ( - jobIdProto, _ = armadaevents.ProtoUuidFromUlidString(jobIdString) - runIdProto = armadaevents.ProtoUuidFromUuid(uuid.MustParse(runIdString)) baseTime, _ = time.Parse("2006-01-02T15:04:05.000Z", "2022-03-01T15:04:05.000Z") baseTimeProto = protoutil.ToTimestamp(baseTime) ) @@ -47,8 +44,8 @@ var assigned = &armadaevents.EventSequence_Event{ Created: baseTimeProto, Event: &armadaevents.EventSequence_Event_JobRunAssigned{ JobRunAssigned: &armadaevents.JobRunAssigned{ - RunId: runIdProto, - JobId: jobIdProto, + RunIdStr: runIdString, + JobIdStr: jobIdString, ResourceInfos: []*armadaevents.KubernetesResourceInfo{ { ObjectMeta: &armadaevents.ObjectMeta{ @@ -73,8 +70,8 @@ var running = &armadaevents.EventSequence_Event{ Created: baseTimeProto, Event: &armadaevents.EventSequence_Event_JobRunRunning{ JobRunRunning: &armadaevents.JobRunRunning{ - RunId: runIdProto, - JobId: jobIdProto, + RunIdStr: runIdString, + JobIdStr: jobIdString, ResourceInfos: []*armadaevents.KubernetesResourceInfo{ { Info: &armadaevents.KubernetesResourceInfo_PodInfo{ @@ -93,8 +90,8 @@ var runSucceeded = &armadaevents.EventSequence_Event{ Created: baseTimeProto, Event: &armadaevents.EventSequence_Event_JobRunSucceeded{ JobRunSucceeded: &armadaevents.JobRunSucceeded{ - RunId: runIdProto, - JobId: jobIdProto, + RunIdStr: runIdString, + JobIdStr: jobIdString, ResourceInfos: []*armadaevents.KubernetesResourceInfo{ { Info: &armadaevents.KubernetesResourceInfo_PodInfo{ diff --git a/internal/server/event/event_test.go b/internal/server/event/event_test.go index 7662798a354..44410a98210 100644 --- a/internal/server/event/event_test.go +++ b/internal/server/event/event_test.go @@ -7,7 +7,6 @@ import ( "github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/types" - "github.com/google/uuid" "github.com/jackc/pgx/v5/pgxpool" "github.com/redis/go-redis/v9" "github.com/stretchr/testify/assert" @@ -74,8 +73,6 @@ func TestEventServer_ForceNew(t *testing.T) { runIdString := "123e4567-e89b-12d3-a456-426614174000" baseTime, _ := time.Parse("2006-01-02T15:04:05.000Z", "2022-03-01T15:04:05.000Z") baseTimeProto := protoutil.ToTimestamp(baseTime) - jobIdProto, _ := armadaevents.ProtoUuidFromUlidString(jobIdString) - runIdProto := armadaevents.ProtoUuidFromUuid(uuid.MustParse(runIdString)) err := s.queueRepository.(armadaqueue.QueueRepository).CreateQueue(ctx, q) require.NoError(t, err) @@ -86,8 +83,8 @@ func TestEventServer_ForceNew(t *testing.T) { Created: baseTimeProto, Event: &armadaevents.EventSequence_Event_JobRunAssigned{ JobRunAssigned: &armadaevents.JobRunAssigned{ - RunId: runIdProto, - JobId: jobIdProto, + RunIdStr: runIdString, + JobIdStr: jobIdString, }, }, } @@ -218,15 +215,13 @@ func TestEventServer_GetJobSetEvents_ErrorIfMissing(t *testing.T) { runIdString := "123e4567-e89b-12d3-a456-426614174000" baseTime, _ := time.Parse("2006-01-02T15:04:05.000Z", "2022-03-01T15:04:05.000Z") baseTimeProto := protoutil.ToTimestamp(baseTime) - jobIdProto, _ := armadaevents.ProtoUuidFromUlidString(jobIdString) - runIdProto := armadaevents.ProtoUuidFromUuid(uuid.MustParse(runIdString)) assigned := &armadaevents.EventSequence_Event{ Created: baseTimeProto, Event: &armadaevents.EventSequence_Event_JobRunAssigned{ JobRunAssigned: &armadaevents.JobRunAssigned{ - RunId: runIdProto, - JobId: jobIdProto, + RunIdStr: runIdString, + JobIdStr: jobIdString, }, }, } @@ -263,15 +258,12 @@ func TestEventServer_GetJobSetEvents_ErrorIfMissing(t *testing.T) { runIdString := "123e4567-e89b-12d3-a456-426614174000" baseTime, _ := time.Parse("2006-01-02T15:04:05.000Z", "2022-03-01T15:04:05.000Z") baseTimeProto := protoutil.ToTimestamp(baseTime) - jobIdProto, _ := armadaevents.ProtoUuidFromUlidString(jobIdString) - runIdProto := armadaevents.ProtoUuidFromUuid(uuid.MustParse(runIdString)) - assigned := &armadaevents.EventSequence_Event{ Created: baseTimeProto, Event: &armadaevents.EventSequence_Event_JobRunAssigned{ JobRunAssigned: &armadaevents.JobRunAssigned{ - RunId: runIdProto, - JobId: jobIdProto, + RunIdStr: runIdString, + JobIdStr: jobIdString, }, }, } From 41b3f7979fa35e2ddda9739e81dd23c22b7c153c Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Mon, 2 Sep 2024 19:32:12 +0300 Subject: [PATCH 3/3] Move code in common.go to more sensible places (#3893) * Move common.go to more sensible places Signed-off-by: Chris Martin * fix typo Signed-off-by: Chris Martin --------- Signed-off-by: Chris Martin Co-authored-by: JamesMurkin --- internal/scheduler/common.go | 72 --------------- internal/scheduler/common_test.go | 92 ------------------- internal/scheduler/context/job.go | 62 +++++++++++++ .../scheduler/preempting_queue_scheduler.go | 4 +- internal/scheduler/queue/queue_cache_test.go | 2 +- .../schedulerobjects/resourcelist_test.go | 82 +++++++++++++++++ 6 files changed, 147 insertions(+), 167 deletions(-) delete mode 100644 internal/scheduler/common.go delete mode 100644 internal/scheduler/common_test.go diff --git a/internal/scheduler/common.go b/internal/scheduler/common.go deleted file mode 100644 index 066ab502505..00000000000 --- a/internal/scheduler/common.go +++ /dev/null @@ -1,72 +0,0 @@ -package scheduler - -import ( - "fmt" - - "golang.org/x/exp/maps" - - "github.com/armadaproject/armada/internal/common/armadacontext" - armadamaps "github.com/armadaproject/armada/internal/common/maps" - armadaslices "github.com/armadaproject/armada/internal/common/slices" - schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" - "github.com/armadaproject/armada/internal/scheduler/jobdb" - "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" -) - -// PrintJobSummary logs a summary of the job scheduling context -// It will log a high level summary at Info level, and a list of all queues + jobs affected at debug level -func PrintJobSummary(ctx *armadacontext.Context, prefix string, jctxs []*schedulercontext.JobSchedulingContext) { - if len(jctxs) == 0 { - return - } - jobsByQueue := armadaslices.MapAndGroupByFuncs( - jctxs, - func(jctx *schedulercontext.JobSchedulingContext) string { - return jctx.Job.Queue() - }, - func(jctx *schedulercontext.JobSchedulingContext) *jobdb.Job { - return jctx.Job - }, - ) - resourcesByQueue := armadamaps.MapValues( - jobsByQueue, - func(jobs []*jobdb.Job) schedulerobjects.ResourceList { - rv := schedulerobjects.NewResourceListWithDefaultSize() - for _, job := range jobs { - rv.AddV1ResourceList(job.ResourceRequirements().Requests) - } - return rv - }, - ) - jobCountPerQueue := armadamaps.MapValues( - jobsByQueue, - func(jobs []*jobdb.Job) int { - return len(jobs) - }, - ) - jobIdsByQueue := armadamaps.MapValues( - jobsByQueue, - func(jobs []*jobdb.Job) []string { - rv := make([]string, len(jobs)) - for i, job := range jobs { - rv[i] = job.Id() - } - return rv - }, - ) - summary := fmt.Sprintf( - "affected queues %v; resources %v; jobs per queue %v", - maps.Keys(jobsByQueue), - armadamaps.MapValues( - resourcesByQueue, - func(rl schedulerobjects.ResourceList) string { - return rl.CompactString() - }, - ), - jobCountPerQueue, - ) - verbose := fmt.Sprintf("affected jobs %v", jobIdsByQueue) - - ctx.Infof("%s %s", prefix, summary) - ctx.Debugf("%s %s", prefix, verbose) -} diff --git a/internal/scheduler/common_test.go b/internal/scheduler/common_test.go deleted file mode 100644 index abb6594ec32..00000000000 --- a/internal/scheduler/common_test.go +++ /dev/null @@ -1,92 +0,0 @@ -package scheduler - -import ( - "math" - "testing" - - "github.com/stretchr/testify/assert" - "k8s.io/apimachinery/pkg/api/resource" - - "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" -) - -func TestResourceListAsWeightedMillis(t *testing.T) { - tests := map[string]struct { - rl schedulerobjects.ResourceList - weights map[string]float64 - expected int64 - }{ - "default": { - rl: schedulerobjects.ResourceList{ - Resources: map[string]resource.Quantity{ - "foo": resource.MustParse("2"), - "bar": resource.MustParse("10Gi"), - "baz": resource.MustParse("1"), - }, - }, - weights: map[string]float64{ - "foo": 1, - "bar": 0.1, - "baz": 10, - }, - expected: (1 * 2 * 1000) + (1 * 1000 * 1024 * 1024 * 1024) + (10 * 1 * 1000), - }, - "zeroes": { - rl: schedulerobjects.ResourceList{ - Resources: map[string]resource.Quantity{ - "foo": resource.MustParse("0"), - "bar": resource.MustParse("1"), - "baz": resource.MustParse("2"), - }, - }, - weights: map[string]float64{ - "foo": 1, - "bar": 0, - }, - expected: 0, - }, - "1Pi": { - rl: schedulerobjects.ResourceList{ - Resources: map[string]resource.Quantity{ - "foo": resource.MustParse("1Pi"), - }, - }, - weights: map[string]float64{ - "foo": 1, - }, - expected: int64(math.Pow(1024, 5)) * 1000, - }, - "rounding": { - rl: schedulerobjects.ResourceList{ - Resources: map[string]resource.Quantity{ - "foo": resource.MustParse("1"), - }, - }, - weights: map[string]float64{ - "foo": 0.3006, - }, - expected: 301, - }, - } - for name, tc := range tests { - t.Run(name, func(t *testing.T) { - assert.Equal(t, tc.expected, tc.rl.AsWeightedMillis(tc.weights)) - }) - } -} - -func BenchmarkResourceListAsWeightedMillis(b *testing.B) { - rl := schedulerobjects.NewResourceList(3) - rl.Set("cpu", resource.MustParse("2")) - rl.Set("memory", resource.MustParse("10Gi")) - rl.Set("nvidia.com/gpu", resource.MustParse("1")) - weights := map[string]float64{ - "cpu": 1, - "memory": 0.1, - "nvidia.com/gpu": 10, - } - b.ResetTimer() - for n := 0; n < b.N; n++ { - rl.AsWeightedMillis(weights) - } -} diff --git a/internal/scheduler/context/job.go b/internal/scheduler/context/job.go index 8e5342ba5c7..57b75523902 100644 --- a/internal/scheduler/context/job.go +++ b/internal/scheduler/context/job.go @@ -9,8 +9,12 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" + "golang.org/x/exp/maps" v1 "k8s.io/api/core/v1" + "github.com/armadaproject/armada/internal/common/armadacontext" + armadamaps "github.com/armadaproject/armada/internal/common/maps" + armadaslices "github.com/armadaproject/armada/internal/common/slices" schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration" "github.com/armadaproject/armada/internal/scheduler/interfaces" "github.com/armadaproject/armada/internal/scheduler/internaltypes" @@ -187,3 +191,61 @@ func JobSchedulingContextFromJob(job *jobdb.Job) *JobSchedulingContext { GangInfo: gangInfo, } } + +// PrintJobSummary logs a summary of the job scheduling context +// It will log a high level summary at Info level, and a list of all queues + jobs affected at debug level +func PrintJobSummary(ctx *armadacontext.Context, prefix string, jctxs []*JobSchedulingContext) { + if len(jctxs) == 0 { + return + } + jobsByQueue := armadaslices.MapAndGroupByFuncs( + jctxs, + func(jctx *JobSchedulingContext) string { + return jctx.Job.Queue() + }, + func(jctx *JobSchedulingContext) *jobdb.Job { + return jctx.Job + }, + ) + resourcesByQueue := armadamaps.MapValues( + jobsByQueue, + func(jobs []*jobdb.Job) schedulerobjects.ResourceList { + rv := schedulerobjects.NewResourceListWithDefaultSize() + for _, job := range jobs { + rv.AddV1ResourceList(job.ResourceRequirements().Requests) + } + return rv + }, + ) + jobCountPerQueue := armadamaps.MapValues( + jobsByQueue, + func(jobs []*jobdb.Job) int { + return len(jobs) + }, + ) + jobIdsByQueue := armadamaps.MapValues( + jobsByQueue, + func(jobs []*jobdb.Job) []string { + rv := make([]string, len(jobs)) + for i, job := range jobs { + rv[i] = job.Id() + } + return rv + }, + ) + summary := fmt.Sprintf( + "affected queues %v; resources %v; jobs per queue %v", + maps.Keys(jobsByQueue), + armadamaps.MapValues( + resourcesByQueue, + func(rl schedulerobjects.ResourceList) string { + return rl.CompactString() + }, + ), + jobCountPerQueue, + ) + verbose := fmt.Sprintf("affected jobs %v", jobIdsByQueue) + + ctx.Infof("%s %s", prefix, summary) + ctx.Debugf("%s %s", prefix, verbose) +} diff --git a/internal/scheduler/preempting_queue_scheduler.go b/internal/scheduler/preempting_queue_scheduler.go index 105f87a746e..de36595c395 100644 --- a/internal/scheduler/preempting_queue_scheduler.go +++ b/internal/scheduler/preempting_queue_scheduler.go @@ -237,8 +237,8 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*sche } ctx.WithField("stage", "scheduling-algo").Infof("Finished unbinding preempted and evicted jobs") - PrintJobSummary(ctx, "Preempting running jobs;", preemptedJobs) - PrintJobSummary(ctx, "Scheduling new jobs;", scheduledJobs) + schedulercontext.PrintJobSummary(ctx, "Preempting running jobs;", preemptedJobs) + schedulercontext.PrintJobSummary(ctx, "Scheduling new jobs;", scheduledJobs) // TODO: Show failed jobs. if sch.enableAssertions { diff --git a/internal/scheduler/queue/queue_cache_test.go b/internal/scheduler/queue/queue_cache_test.go index 3d43e491ea3..7687d5772ea 100644 --- a/internal/scheduler/queue/queue_cache_test.go +++ b/internal/scheduler/queue/queue_cache_test.go @@ -30,7 +30,7 @@ func TestFetch(t *testing.T) { {Name: "testQueue2"}, }, }, - "Immediate Steam Error": { + "Immediate Stream Error": { queues: []*api.Queue{}, streamError: true, }, diff --git a/internal/scheduler/schedulerobjects/resourcelist_test.go b/internal/scheduler/schedulerobjects/resourcelist_test.go index c3c989880ba..c3d7d738461 100644 --- a/internal/scheduler/schedulerobjects/resourcelist_test.go +++ b/internal/scheduler/schedulerobjects/resourcelist_test.go @@ -1,6 +1,7 @@ package schedulerobjects import ( + "math" "testing" "github.com/stretchr/testify/assert" @@ -796,6 +797,87 @@ func TestV1ResourceListConversion(t *testing.T) { assert.True(t, maps.Equal(v1rlCopy, v1rl)) } +func TestResourceListAsWeightedMillis(t *testing.T) { + tests := map[string]struct { + rl ResourceList + weights map[string]float64 + expected int64 + }{ + "default": { + rl: ResourceList{ + Resources: map[string]resource.Quantity{ + "foo": resource.MustParse("2"), + "bar": resource.MustParse("10Gi"), + "baz": resource.MustParse("1"), + }, + }, + weights: map[string]float64{ + "foo": 1, + "bar": 0.1, + "baz": 10, + }, + expected: (1 * 2 * 1000) + (1 * 1000 * 1024 * 1024 * 1024) + (10 * 1 * 1000), + }, + "zeroes": { + rl: ResourceList{ + Resources: map[string]resource.Quantity{ + "foo": resource.MustParse("0"), + "bar": resource.MustParse("1"), + "baz": resource.MustParse("2"), + }, + }, + weights: map[string]float64{ + "foo": 1, + "bar": 0, + }, + expected: 0, + }, + "1Pi": { + rl: ResourceList{ + Resources: map[string]resource.Quantity{ + "foo": resource.MustParse("1Pi"), + }, + }, + weights: map[string]float64{ + "foo": 1, + }, + expected: int64(math.Pow(1024, 5)) * 1000, + }, + "rounding": { + rl: ResourceList{ + Resources: map[string]resource.Quantity{ + "foo": resource.MustParse("1"), + }, + }, + weights: map[string]float64{ + "foo": 0.3006, + }, + expected: 301, + }, + } + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + assert.Equal(t, tc.expected, tc.rl.AsWeightedMillis(tc.weights)) + }) + } +} + +func BenchmarkResourceListAsWeightedMillis(b *testing.B) { + rl := NewResourceList(3) + rl.Set("cpu", resource.MustParse("2")) + rl.Set("memory", resource.MustParse("10Gi")) + rl.Set("nvidia.com/gpu", resource.MustParse("1")) + weights := map[string]float64{ + "cpu": 1, + "memory": 0.1, + "nvidia.com/gpu": 10, + } + b.ResetTimer() + for n := 0; n < b.N; n++ { + rl.AsWeightedMillis(weights) + } +} + func BenchmarkResourceListZeroAdd(b *testing.B) { rla := NewResourceList(3) rlb := NewResourceList(3)