Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel processing of hydrants in real time path (peon) #16077

Open
kaisun2000 opened this issue Mar 8, 2024 · 1 comment
Open

Parallel processing of hydrants in real time path (peon) #16077

kaisun2000 opened this issue Mar 8, 2024 · 1 comment

Comments

@kaisun2000
Copy link
Contributor

Motivation

Recently we are working on the real time query performance of Druid. We found query/segment/time metric can be in 20sec or more frequently in the Peons for Druid release 25.0.0

It turned out that each segment would be processed in one processing thread. The segment may have multiple hydrants to process, but in sequence manner. To be more specific, SinkQuerySegmentWalker use DirectQueryProcessingPool.INSTANCE to process multiple hydrants. This is serialized

          return new SpecificSegmentQueryRunner<>(
              withPerSinkMetrics(
                  new BySegmentQueryRunner<>(
                      sinkSegmentId,
                      descriptor.getInterval().getStart(),
                      factory.mergeRunners(
                          DirectQueryProcessingPool.INSTANCE,
                          perHydrantRunners
                      )
                  ),
                  toolChest,
                  sinkSegmentId,
                  cpuTimeAccumulator
              ),
              new SpecificSegmentSpec(descriptor)
          );
        }
    );

In our case, there are 20 or so due to the large volumes of ingestion traffic. Each hydrant can take several hundreds milli-sec to 1 sec or so. Thus, the total time used is around 20sec or above.

Proposed changes

The goal is to parallel the processing of hydrants. There are many ways to do it. One consideration is to let most query make progress just as before. For example, if there are 10 threads in the processing pool and two queries each querying 5 segments. Currently each of the two queries would have 5 segments progressing in 5 processing threads. Each threads is working for all the hydrants in a specific segments for a specific query sequentially. The point is that 10 segments are making progress at the same time.

Thus, we propose to introduce a hydrant level processing pool for each thread in current processing pool. In the above example, we have 10 thread in the processing pool. Then we will have 10 hydrant level processing pools. Each segment would use one hydrant level thread pool to process the hydrants in parallel.

Rationale

There are many possible solutions say:
1/ use the same processing pool to parallel the hydrant level processing
2/ use another shared processing pool to parallel the hydrant level processing

Note, here if we maintain two level processing:

  • segment
  • hydrant

We need to parallel hydrant level processing to reduce latency. The hydrant level processing is file I/O bound. Thus, we may not use Unix "select" call to have a async pool such as Netty is used for socket processing. Or put it another way, we have to use more threads to gain I/O throughput, aka, use thread pool to speed it up.

The main reason of above proposal is that we maintain the invariant of the same number of segment making progress before and after for "fairness" consideration.

The potential cons of this approach is that we may have two many threads and thus potentially too much thread scheduling/context switch overhead. This may not be a big issue for two reasons

  • The segment processing time is in hundreds of milli second. This is way higher than context switching time in current Linux
  • The thread in the thread pool can be reclaimed if idle for a configured time. If context switching time or thread starting up time is an issue, we can always tune this parameter.

Operational impact

Not much operation impact, this is just backward compatible.

Test plan (optional)

An optional discussion of how the proposed changes will be tested. This section should focus on higher level system test strategy and not unit tests (as UTs will be implementation dependent).

Future work (optional)

An optional discussion of things that you believe are out of scope for the particular proposal but would be nice follow-ups. It helps show where a particular change could be leading us. There isn't any commitment that the proposal author will actually work on the items discussed in this section.

@kaisun2000
Copy link
Contributor Author

Some initial small scale testing, the parallel processing approach can reducing the latency of the queries to 1/2 till 1/5 or the current approach depending on the phase of ingestion progress. The closer to the segment handle off, the higher the reduction of latency.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant