Skip to content

Commit

Permalink
chore(controller): list event support task-scoped aggregation (#2915)
Browse files Browse the repository at this point in the history
  • Loading branch information
jialeicui authored Oct 27, 2023
1 parent 872e4b1 commit 87994fc
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,25 @@

package ai.starwhale.mlops.domain.event;

import ai.starwhale.mlops.api.protocol.event.Event;
import ai.starwhale.mlops.api.protocol.event.Event.EventResourceType;
import ai.starwhale.mlops.api.protocol.event.Event.EventSource;
import ai.starwhale.mlops.api.protocol.event.Event.EventType;
import ai.starwhale.mlops.api.protocol.event.EventRequest;
import ai.starwhale.mlops.api.protocol.event.EventRequest.RelatedResource;
import ai.starwhale.mlops.api.protocol.event.EventVo;
import ai.starwhale.mlops.domain.event.mapper.EventMapper;
import ai.starwhale.mlops.domain.event.po.EventEntity;
import ai.starwhale.mlops.domain.job.JobDao;
import ai.starwhale.mlops.domain.job.step.mapper.StepMapper;
import ai.starwhale.mlops.domain.run.RunEntity;
import ai.starwhale.mlops.domain.run.mapper.RunMapper;
import ai.starwhale.mlops.domain.task.mapper.TaskMapper;
import ai.starwhale.mlops.exception.SwNotFoundException;
import ai.starwhale.mlops.exception.SwNotFoundException.ResourceType;
import java.util.Comparator;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
import javax.validation.constraints.NotNull;
Expand Down Expand Up @@ -84,11 +89,36 @@ public void addEvent(EventRequest event) {

@NotNull
public List<EventVo> getEvents(EventRequest.RelatedResource related) {
var events = eventMapper.listEvents(related.getEventResourceType(), related.getId());
if (events == null) {
List<EventEntity> events = new LinkedList<>();

// returns all the run events belongs to the task when the scope is task.
// we do not support job scope aggregation for now.
if (related.getEventResourceType() == EventResourceType.TASK) {
var runs = runMapper.list(related.getId());
if (runs == null) {
return List.of();
}
var runIds = runs.stream().map(RunEntity::getId).collect(Collectors.toList());
var runEvents = eventMapper.listEventsOfResources(EventResourceType.RUN, runIds);
if (runEvents != null) {
events.addAll(runEvents);
}
}

var resourceEvents = eventMapper.listEventsOfResource(related.getEventResourceType(), related.getId());
if (resourceEvents != null) {
events.addAll(resourceEvents);
}

if (events.isEmpty()) {
return List.of();
}
return events.stream().map(eventConverter::toVo).collect(Collectors.toList());

// order by created time asc
return events.stream()
.map(eventConverter::toVo)
.sorted(Comparator.comparingLong(Event::getTimestamp))
.collect(Collectors.toList());
}

public void addEventForJob(String jobUrl, @NotNull EventRequest event) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,17 @@ public interface EventMapper {

@Select("select " + COLUMNS + " from " + TABLE
+ " where resource_type = #{resourceType} and resource_id = #{resourceId} order by created_time asc")
List<EventEntity> listEvents(@NotNull Event.EventResourceType resourceType, @NotNull Long resourceId);
List<EventEntity> listEventsOfResource(@NotNull Event.EventResourceType resourceType, @NotNull Long resourceId);

@Select("<script>"
+ "select " + COLUMNS + " from " + TABLE
+ " where resource_type = #{resourceType} and resource_id in "
+ "<foreach item='item' index='index' collection='resourceIds' open='(' separator=',' close=')'>"
+ "#{item}"
+ "</foreach>"
+ "</script>")
List<EventEntity> listEventsOfResources(
@NotNull Event.EventResourceType resourceType,
@NotNull List<Long> resourceIds
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ void testGetEvents() {
var req = new EventRequest();
req.setRelatedResource(new EventRequest.RelatedResource(EventResourceType.JOB, 2L));
var resp = eventService.getEvents(req.getRelatedResource());
verify(eventMapper).listEvents(EventResourceType.JOB, 2L);
verify(eventMapper).listEventsOfResource(EventResourceType.JOB, 2L);
assertEquals(resp, List.of());

when(eventMapper.listEvents(EventResourceType.JOB, 2L)).thenReturn(
when(eventMapper.listEventsOfResource(EventResourceType.JOB, 2L)).thenReturn(
List.of(EventEntity.builder()
.id(1L)
.type(EventType.INFO)
Expand Down Expand Up @@ -165,21 +165,42 @@ void testGetEventForJob() {

// use job if related is null
eventService.getEventsForJob("1", null);
verify(eventMapper).listEvents(EventResourceType.JOB, 1L);
verify(eventMapper).listEventsOfResource(EventResourceType.JOB, 1L);

// normal case
when(jobDao.getJobId("2")).thenReturn(2L);
eventService.getEventsForJob("2", related);
verify(eventMapper).listEvents(EventResourceType.JOB, 2L);
verify(eventMapper).listEventsOfResource(EventResourceType.JOB, 2L);

// the related resource is task of job
when(taskMapper.findTaskById(3L)).thenReturn(TaskEntity.builder().stepId(4L).build());
when(stepMapper.findById(4L)).thenReturn(StepEntity.builder().jobId(5L).build());
when(jobDao.getJobId("5")).thenReturn(5L);
when(runMapper.list(3L)).thenReturn(
List.of(RunEntity.builder().id(42L).build(), RunEntity.builder().id(43L).build()));
reset(eventMapper);

when(eventMapper.listEventsOfResources(EventResourceType.RUN, List.of(42L, 43L))).thenReturn(
List.of(EventEntity.builder().id(1L).createdTime(new Date(8L)).build(),
EventEntity.builder().id(2L).createdTime(new Date(7L)).build(),
EventEntity.builder().id(3L).createdTime(new Date(9L)).build()));
when(eventMapper.listEventsOfResource(EventResourceType.TASK, 3L)).thenReturn(
List.of(EventEntity.builder().id(4L).createdTime(new Date(3L)).build(),
EventEntity.builder().id(5L).createdTime(new Date(2L)).build(),
EventEntity.builder().id(6L).createdTime(new Date(1L)).build()));

related = new EventRequest.RelatedResource(EventResourceType.TASK, 3L);
eventService.getEventsForJob("5", related);
verify(eventMapper).listEvents(EventResourceType.TASK, 3L);
var events = eventService.getEventsForJob("5", related);
verify(eventMapper).listEventsOfResource(EventResourceType.TASK, 3L);
verify(eventMapper).listEventsOfResources(EventResourceType.RUN, List.of(42L, 43L));
assertEquals(events.size(), 6);
// order by created time asc
assertEquals(events.get(0).getId(), 6L);
assertEquals(events.get(1).getId(), 5L);
assertEquals(events.get(2).getId(), 4L);
assertEquals(events.get(3).getId(), 2L);
assertEquals(events.get(4).getId(), 1L);
assertEquals(events.get(5).getId(), 3L);

reset(eventMapper);
// the related resource is task, but the task is not found
Expand All @@ -191,7 +212,7 @@ void testGetEventForJob() {
when(runMapper.get(71L)).thenReturn(RunEntity.builder().id(71L).taskId(3L).build());
reset(eventMapper);
eventService.getEventsForJob("5", new RelatedResource(EventResourceType.RUN, 71L));
verify(eventMapper).listEvents(EventResourceType.RUN, 71L);
verify(eventMapper).listEventsOfResource(EventResourceType.RUN, 71L);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package ai.starwhale.mlops.domain.event.mapper;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;

import ai.starwhale.mlops.api.protocol.event.Event.EventResourceType;
Expand All @@ -24,6 +25,7 @@
import ai.starwhale.mlops.domain.MySqlContainerHolder;
import ai.starwhale.mlops.domain.event.po.EventEntity;
import java.util.Date;
import java.util.List;
import org.junit.jupiter.api.Test;
import org.mybatis.spring.boot.test.autoconfigure.MybatisTest;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -38,7 +40,7 @@ public class EventMapperTest extends MySqlContainerHolder {

@Test
public void testEvents() {
var events = eventMapper.listEvents(null, null);
var events = eventMapper.listEventsOfResource(null, null);
assertEquals(0, events.size());

var entity = EventEntity.builder()
Expand All @@ -61,15 +63,30 @@ public void testEvents() {
.createdTime(new Date(456L * 1000))
.build();

var entity3 = EventEntity.builder()
.type(EventType.INFO)
.source(EventSource.CLIENT)
.resourceType(EventResourceType.TASK)
.resourceId(3L)
.message("baz")
.data("{}")
.createdTime(new Date(789L * 1000))
.build();

eventMapper.insert(entity);
eventMapper.insert(entity2);
eventMapper.insert(entity3);

events = eventMapper.listEvents(EventResourceType.JOB, 1L);
events = eventMapper.listEventsOfResource(EventResourceType.JOB, 1L);
assertEquals(1, events.size());
assertEquals(entity, events.get(0));

events = eventMapper.listEvents(EventResourceType.TASK, 2L);
events = eventMapper.listEventsOfResource(EventResourceType.TASK, 2L);
assertEquals(1, events.size());
assertEquals(entity2, events.get(0));

events = eventMapper.listEventsOfResources(EventResourceType.TASK, List.of(2L, 3L));
assertEquals(2, events.size());
assertThat(events).containsExactly(entity2, entity3);
}
}

0 comments on commit 87994fc

Please sign in to comment.