Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancellation #25

Open
mattmassicotte opened this issue Jun 3, 2024 · 6 comments
Open

Cancellation #25

mattmassicotte opened this issue Jun 3, 2024 · 6 comments

Comments

@mattmassicotte
Copy link
Collaborator

I was starting to think a bit about cancellation and withTaskCancellationHandler. I'm not certain exactly which section this belongs in, but I think it is an important topic.

@GangWoon
Copy link

GangWoon commented Jun 14, 2024

Hello!

It seems like many people miss how cancellation works in structured concurrency. Therefore, I believe it's essential to know how to use cancellation properly. I noticed in Apple's example that cancellation wasn't correctly handled in the context of structured concurrency, which caused tasks not to be canceled properly.

Is there a way I can contribute to addressing this issue? If you can guide me, I would be happy to prepare and contribute. :)


If you check the last code in [Section2 - Step 8], even if the task running the quakeLocation asynchronous method is canceled, the task created inside the method is not canceled. This is because it is not using structured concurrency. To ensure cancellation as mentioned above, you need to wrap it with withTaskCancellationHandler.
Tutorial Link

@mattmassicotte
Copy link
Collaborator Author

mattmassicotte commented Jun 14, 2024

Thanks you for pointing this out! I was just looking at the tutorial example. Cancellation of caching behaviors is tricky! I'm including the code listing here just for reference:

func quakeLocation(from url: URL) async throws -> QuakeLocation {
    if let cached = quakeCache[url] {
        switch cached {
        case .ready(let location):
            return location
        case .inProgress(let task):
            return try await task.value
        }
    }
    let task = Task<QuakeLocation, Error> {
        let data = try await downloader.httpData(from: url)
        let location = try decoder.decode(QuakeLocation.self, from: data)
        return location
    }
    quakeCache[url] = .inProgress(task)
    do {
        let location = try await task.value
        quakeCache[url] = .ready(location)
        return location
    } catch {
        quakeCache[url] = nil
        throw error
    }
}

Consider this:

A) client 1 calls quakeLocation(from url: URL), creating the underlying Task
B) client 2 calls quakeLocation(from url: URL)
C) client 3 calls quakeLocation(from url: URL)
D) client 1 cancels

At this point, if QuakeClient cancelled the underlying Task, I believe client 2 and 3 would also end up being cancelled. I have been referring to this as the "special first request problem". In my opinion the existing solution may actually be optimal, behaviorally. I'm not sure you want the first request to be special.

What do you think @hborla, @ktoso?

@FranzBusch
Copy link
Member

So I think we need to separate documentation around cancellation into two sections:

  1. How does it work in structured concurrency using task group and async lets
  2. How does it work with unstructured concurrency e.g. Task.init and Task.detached? In particular how do you cancel a Task and calling out that this is a big reason to avoid using unstructured concurrency

The above Quake example and your first request problem is IMO a usage pattern for caching which deserves its own article and has nothing to do with cancellation per-se. The bigger question here is rather how to handle cancellation in when trying to cache. IMO the best solution here is again to avoid spawning any unstructured task but to invert control.

actor Cache<Request, Response> {
  enum RequestState {
    case response(Response)
    case loading([CheckedContinuation<Response>])
  }
  private var cache: [UUID: RequestState]
  private let streamContinuation: AsyncStream<(Request, CheckedContinuation<Response>), Never>.Continuation
  private let stream: AsyncStream<(Request, CheckedContinuation<Response>), Never>

  func run() async {
    await withDiscardingTaskGroup { group in
      for await (request, continuation) in self.stream {
        self.cache[request.id] = .loading([continuation])
        group.addTask {
          let response = await request.execute()
          self.receivedResponse(for: request.id, response)
        }
      }
    }
  }

  func receivedResponse(for id: UUID, response: Response {
    // Resume all continuations from the cache
  }

  func executeRequest(_ request: Request) async -> Response {
    switch self.cache[id] {
    case .response(let response):
      return response
    case .loading(var continuations):
      await withTaskCancellationHandler {
        await withCheckedContinuation { continuation in
          continuations.append(continuation)
          self.cache[id] = .loading(continuations)
       }
      } onCancel: {
        self.removeContinuation() // We need to give each continuation an ID by having a counter in the actor
      }
    case .none:
      // Create continuation and yield to stream
    }
  }

The above is just some pseudocode but it dodges the first request problem that you describe by just applying structured concurrency and inverting control. I think the inverting control part is often an important design pattern that helps keep the structure in the code.

@mattmassicotte
Copy link
Collaborator Author

@FranzBusch, this is great!

Do you have any thoughts about how to discuss the complexity/functionality trade-offs here? Do you think the QuakeClient should be changed?

@GangWoon
Copy link

@FranzBusch
Thank you so much for the pseudocode. It's an approach I hadn't thought of, but it seems really useful. I have an additional question about implementing the code in practice.

When calling self.removeContinuation in the onCancel parameter, I encounter an "Call to actor-isolated instance method 'removeContinuation()' in a synchronous nonisolated context" error. Wrapping it with Task.init avoids the compile error, but that doesn't seem like the correct approach. Could you give me some more hints about this code?

  func executeRequest(_ request: Request) async -> Response {
    switch self.cache[id] {
    case .loading(var continuations):
      await withTaskCancellationHandler {
       ...
      } onCancel: {
        self.removeContinuation() <- ‼️ Call to actor-isolated instance method 'removeContinuation()' in a synchronous nonisolated context
      }

@FranzBusch
Copy link
Member

Do you have any thoughts about how to discuss the complexity/functionality trade-offs here? Do you think the QuakeClient should be changed?

I think we need to explain the underlying concepts first i.e. structuredness and cancellation. With good explanation around why structuredness is better. Then we can use this cache example as a more advanced setup of how to use structuredness.

When calling self.removeContinuation in the onCancel parameter, I encounter an "Call to actor-isolated instance method 'removeContinuation()' in a synchronous nonisolated context" error. Wrapping it with Task.init avoids the compile error, but that doesn't seem like the correct approach. Could you give me some more hints about this code?

Yes that's a correct error. The one way to work around this with Cache being an actor is to again spawn an unstructured task. This shows that we probably hit the limit of what we can do with an actor here and it might be time to switch to a class and use Mutex instead to protect our state. This way we don't need an async call to remove the continuation from onCancel.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants