|
1 | 1 | #lang scribble/doc
|
2 | 2 | @(require scribble/manual
|
3 |
| - scribble/eval |
| 3 | + scribble/examples |
4 | 4 | "guide-utils.rkt"
|
5 | 5 | (for-label racket))
|
6 | 6 |
|
7 | 7 | @(define concurrency-eval (make-base-eval))
|
| 8 | +@(concurrency-eval '(require racket/contract racket/math)) |
8 | 9 |
|
9 | 10 | @(define reference-doc '(lib "scribblings/reference/reference.scrbl"))
|
10 | 11 |
|
@@ -268,7 +269,10 @@ There are other ways to synchronize threads. The @racket[sync] function allows
|
268 | 269 | threads to coordinate via @tech[#:doc reference-doc]{synchronizable events}.
|
269 | 270 | Many values double as events, allowing a uniform way to synchronize threads
|
270 | 271 | using different types. Examples of events include channels, ports, threads,
|
271 |
| -and alarms. |
| 272 | +and alarms. This section builds up a number of examples that show how |
| 273 | +the combination of events, threads, and @racket[sync] (along with recursive functions) |
| 274 | +allow you to implement arbitrarily sophisticated communication protocols |
| 275 | +to coordinate concurrent parts of a program. |
272 | 276 |
|
273 | 277 | In the next example, a channel and an alarm are used as synchronizable events.
|
274 | 278 | The workers @racket[sync] on both so that they can process channel items until the
|
@@ -445,3 +449,153 @@ that its handler is not called in tail position with respect to
|
445 | 449 | @racket[sync]. At the same time, @racket[wrap-evt] disables break
|
446 | 450 | exceptions during its handler's invocation.
|
447 | 451 |
|
| 452 | +@section{Building Your Own Synchronization Patterns} |
| 453 | + |
| 454 | +Events also allow you to encode many different communication |
| 455 | +patterns between multiple concurrent parts of a program. One |
| 456 | +common such pattern is producer-consumer. Here is a way to |
| 457 | +implement on variation on it using the above ideas. Generally |
| 458 | +speaking, these communication patterns are implemented via |
| 459 | +a server loops that uses @racket[sync] to wait for any of |
| 460 | +a number of different possibilities to occur and then |
| 461 | +reacts them, updating some local state. |
| 462 | + |
| 463 | +@examples[ |
| 464 | + #:eval concurrency-eval |
| 465 | + #:label #f |
| 466 | + (eval:no-prompt |
| 467 | + (define/contract (produce x) |
| 468 | + (-> any/c void?) |
| 469 | + (channel-put producer-chan x))) |
| 470 | + |
| 471 | + (eval:no-prompt |
| 472 | + (define/contract (consume) |
| 473 | + (-> any/c) |
| 474 | + (channel-get consumer-chan))) |
| 475 | + |
| 476 | + (code:comment "private state and server loop") |
| 477 | +(eval:no-prompt |
| 478 | + (define producer-chan (make-channel)) |
| 479 | + (define consumer-chan (make-channel)) |
| 480 | + (void |
| 481 | + (thread |
| 482 | + (λ () |
| 483 | + (code:comment "the items variable holds the items that") |
| 484 | + (code:comment "have been produced but not yet consumed") |
| 485 | + (let loop ([items '()]) |
| 486 | + (sync |
| 487 | + |
| 488 | + (code:comment "wait for production") |
| 489 | + (handle-evt |
| 490 | + producer-chan |
| 491 | + (λ (i) |
| 492 | + (code:comment "if that event was chosen,") |
| 493 | + (code:comment "we add an item to our list") |
| 494 | + (code:comment "and go back around the loop") |
| 495 | + (loop (cons i items)))) |
| 496 | + |
| 497 | + (code:comment "wait for consumption, but only") |
| 498 | + (code:comment "if we have something to produce") |
| 499 | + (handle-evt |
| 500 | + (if (null? items) |
| 501 | + never-evt |
| 502 | + (channel-put-evt consumer-chan (car items))) |
| 503 | + (λ (_) |
| 504 | + (code:comment "if that event was chosen,") |
| 505 | + (code:comment "we know that the first item item") |
| 506 | + (code:comment "has been consumed; drop it and") |
| 507 | + (code:comment "and go back around the loop") |
| 508 | + (loop (cdr items)))))))))) |
| 509 | + |
| 510 | + (code:comment "an example (non-deterministic) interaction") |
| 511 | + (void |
| 512 | + (thread (λ () (sleep (/ (random 10) 100)) (produce 1))) |
| 513 | + (thread (λ () (sleep (/ (random 10) 100)) (produce 2)))) |
| 514 | + (list (consume) (consume)) |
| 515 | + ] |
| 516 | + |
| 517 | +It is possible to build up more complex synchronization patterns. Here is |
| 518 | +a silly example where we extend the producer consumer with an operation |
| 519 | +to wait until at least a certain number of items have been produced. |
| 520 | + |
| 521 | +@examples[ |
| 522 | + #:eval concurrency-eval |
| 523 | + #:label #f |
| 524 | + |
| 525 | + (eval:no-prompt |
| 526 | + (define/contract (produce x) |
| 527 | + (-> any/c void?) |
| 528 | + (channel-put producer-chan x)) |
| 529 | + |
| 530 | + (define/contract (consume) |
| 531 | + (-> any/c) |
| 532 | + (channel-get consumer-chan)) |
| 533 | + |
| 534 | + (define/contract (wait-at-least n) |
| 535 | + (-> natural? void?) |
| 536 | + (define c (make-channel)) |
| 537 | + (code:comment "we send a new channel over to the") |
| 538 | + (code:comment "main loop so that we can wait here") |
| 539 | + (channel-put wait-at-least-chan (cons n c)) |
| 540 | + (channel-get c))) |
| 541 | + |
| 542 | + (eval:no-prompt |
| 543 | + (define producer-chan (make-channel)) |
| 544 | + (define consumer-chan (make-channel)) |
| 545 | + (define wait-at-least-chan (make-channel)) |
| 546 | + (void |
| 547 | + (thread |
| 548 | + (λ () |
| 549 | + (let loop ([items '()] |
| 550 | + [total-items-seen 0] |
| 551 | + [waiters '()]) |
| 552 | + (code:comment "instead of waiting on just production/") |
| 553 | + (code:comment "consumption now we wait to learn about") |
| 554 | + (code:comment "threads that want to wait for a certain") |
| 555 | + (code:comment "number of elements to be reached") |
| 556 | + (apply |
| 557 | + sync |
| 558 | + (handle-evt |
| 559 | + producer-chan |
| 560 | + (λ (i) (loop (cons i items) |
| 561 | + (+ total-items-seen 1) |
| 562 | + waiters))) |
| 563 | + (handle-evt |
| 564 | + (if (null? items) |
| 565 | + never-evt |
| 566 | + (channel-put-evt consumer-chan (car items))) |
| 567 | + (λ (_) (loop (cdr items) total-items-seen waiters))) |
| 568 | + |
| 569 | + (code:comment "wait for threads that are interested") |
| 570 | + (code:comment "the number of items produced") |
| 571 | + (handle-evt |
| 572 | + wait-at-least-chan |
| 573 | + (λ (waiter) (loop items total-items-seen (cons waiter waiters)))) |
| 574 | + |
| 575 | + (code:comment "for each thread that wants to wait,") |
| 576 | + (for/list ([waiter (in-list waiters)]) |
| 577 | + (code:comment "we check to see if there has been enough") |
| 578 | + (code:comment "production") |
| 579 | + (cond |
| 580 | + [(>= (car waiter) total-items-seen) |
| 581 | + (code:comment "if so, we send a mesage back on the channel") |
| 582 | + (code:comment "and continue the loop without that item") |
| 583 | + (handle-evt |
| 584 | + (channel-put-evt |
| 585 | + (cdr waiter) |
| 586 | + (void)) |
| 587 | + (λ (_) (loop items total-items-seen (remove waiter waiters))))] |
| 588 | + [else |
| 589 | + (code:comment "otherwise, we just ignore that one") |
| 590 | + never-evt])))))))) |
| 591 | + |
| 592 | + (code:comment "an example (non-deterministic) interaction") |
| 593 | + (define thds |
| 594 | + (for/list ([i (in-range 10)]) |
| 595 | + (thread (λ () |
| 596 | + (produce i) |
| 597 | + (wait-at-least 10) |
| 598 | + (display (format "~a -> ~a\n" i (consume))))))) |
| 599 | + (for ([thd (in-list thds)]) |
| 600 | + (thread-wait thd)) |
| 601 | + ] |
0 commit comments