-
Notifications
You must be signed in to change notification settings - Fork 0
Implement Go-Like Scala channels #34
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: base
Are you sure you want to change the base?
Conversation
Also, add an implementation for sending objects through a channel.
* Allow getting pending messages from a closed channel.
* Add non-blocking send and receive * Add foreach()
* Java semaphores support bounded waits on acquire().
…es are received. This can only happen when 'trySend' is used on the Synchronous channel.
…-blocking methods.
…ranteed across several channels waiting on select.
* Fixed a deadlock that happens when select() tries to tale 2 locks at the same time. * Added progress guarantee by randomizing the order of priorities channels have in select().
…ith general time utils.
Release Channels v0.2.0
Release Channels 0.2.1
@coderabbitai review |
✅ Actions performedReview triggered.
|
WalkthroughThis update introduces a new Scala library implementing GoLang-style channels for concurrency, complete with synchronous, buffered, and unbounded channels, selection mechanisms, and time-based utilities. The release includes comprehensive documentation, Apache 2.0 licensing, SBT build and publishing configuration, and an extensive suite of unit tests covering channel operations, transformations, fairness, and performance. Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant Channel
participant AsyncChannel
participant SyncChannel
participant Selector
participant WaitGroup
participant TimeChannels
User->>Channel: Channel.make / makeUnbounded / make(bufferSize)
User->>Channel: send(value) / recv()
Channel->>AsyncChannel: (if buffered)
Channel->>SyncChannel: (if sync)
User->>Selector: select(sender/recver...)
Selector->>Channel: trySend/tryRecv or register waiter
User->>WaitGroup: add()/done()/await()
User->>TimeChannels: after()/ticker()
TimeChannels->>Channel: send(Instant)/close()
Poem
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 8
🧹 Nitpick comments (23)
project/plugins.sbt (1)
1-14
: Update plugin header year if needed.
The Apache 2.0 header uses the year 2020; consider updating it to the current year or adding an additional line for ongoing maintenance attribution.src/main/scala/com/github/yruslan/channel/ChannelLike.scala (1)
16-20
: Add Scaladoc for public API.
Consider documenting theChannelLike
trait and itsisClosed
method to clarify their purpose and usage. For example:/** A common interface for channel types to signal closure state. */ trait ChannelLike { /** Returns true if the channel is closed and cannot send or receive further messages. */ def isClosed: Boolean }src/main/scala/com/github/yruslan/channel/impl/Waiter.scala (1)
20-23
: Consider making this a case class for better semantics.The current implementation is correct, but converting to a case class would provide automatic
equals
,hashCode
,toString
, andcopy
methods, which are useful for debugging and potential future use cases.-class Waiter( +case class Waiter( val sem: Semaphore, val threadId: Long )src/test/scala/com/github/yruslan/channel/TimeChannelsSuite.scala (1)
44-44
: Fix the test name.The test name contains "111" which appears to be a copy-paste error or temporary naming.
Apply this diff to fix the test name:
- "generate a ticker that ticks until closed111" in { + "not block when closed immediately" in {src/main/scala/com/github/yruslan/channel/impl/Selector.scala (1)
20-25
: Fix parameter naming convention and approve the design.The
Selector
abstract class design is solid and properly integrates with the channel selection mechanism. However, there's a naming convention issue.Apply this diff to fix the parameter naming:
- def sendRecv(Opt: Option[Waiter]): Int + def sendRecv(waiterOpt: Option[Waiter]): IntThe parameter name should follow camelCase convention instead of PascalCase.
src/test/scala/com/github/yruslan/channel/TestUtils.scala (1)
18-18
: Consider removing unused import.The
NegativeWaitGroupCounter
import appears to be unused in this file.If this exception is not used in this utility object, consider removing the import:
- import com.github.yruslan.channel.exception.NegativeWaitGroupCounter
However, if it's imported for use by test files that import these utilities, the import can remain.
src/test/scala/com/github/yruslan/channel/ChannelFilterSuite.scala (2)
25-26
: Complete or remove the incomplete import comment.The comment mentions requiring an import for Scala 2.13 but no import statement follows.
Either complete the import or remove the incomplete comment:
- // This import is required for Scala 2.13 since it has a builtin Channel object. -Or if an import is actually needed:
- // This import is required for Scala 2.13 since it has a builtin Channel object. - + // This import is required for Scala 2.13 since it has a builtin Channel object. + import com.github.yruslan.{channel => ch}
179-185
: Consider making timing assertions more robust.The timing assertions using
Instant.now()
could be flaky in CI environments due to system load and scheduling variations.Consider using more lenient timing assertions or mocking time for deterministic tests:
- assert(java.time.Duration.between(start, finish).toMillis <= 10L) + assert(java.time.Duration.between(start, finish).toMillis <= 50L) // More lenient for CI - assert(java.time.Duration.between(start, finish).toMillis >= 10L) + assert(java.time.Duration.between(start, finish).toMillis >= 5L) // More lenient for CIAlternatively, consider using a test clock or focusing on functional correctness rather than precise timing.
Also applies to: 195-201
README.md (1)
1-581
: Excellent comprehensive documentation for the channel library.The README provides thorough coverage of all channel features with clear examples and explanations. The documentation effectively demonstrates:
- Basic channel operations (send/recv)
- Different channel types (sync, buffered, unbounded)
- Advanced features (select, time channels, WaitGroup)
- Scala-specific idioms (map, filter, for-comprehension)
This will greatly help users understand and adopt the library.
Consider addressing some minor formatting issues identified by static analysis:
- Add language specifiers to fenced code blocks for better syntax highlighting
- Consider using consistent list styles (asterisk vs dash)
- Fix some minor grammar issues like "A message is send" → "A message is sent"
These are minor improvements that would enhance the professional appearance of the documentation.
🧰 Tools
🪛 LanguageTool
[uncategorized] ~10-~10: Use a comma before ‘so’ if it connects two independent clauses (unless they are closely connected and short).
Context: ...possible the features provided by GoLang so channels can be used for concurrency co...(COMMA_COMPOUND_SENTENCE_2)
[uncategorized] ~11-~11: Use a comma before ‘so’ if it connects two independent clauses (unless they are closely connected and short).
Context: ...res as underlying concurrency primitives so the performance is not expected to matc...(COMMA_COMPOUND_SENTENCE_2)
[typographical] ~24-~24: Consider isolating this adverbial phrase with commas.
Context: ...e these models are similar, but they are in fact very different. Good deescripton on the...(IN_FACT_COMMA_2)
[style] ~27-~27: As an alternative to the over-used intensifier ‘extremely’, consider replacing this phrase.
Context: .../22622880/1038282 CSP channels provide an extremely simple and uniform building block for designin...(EN_WEAK_ADJECTIVE)
[grammar] ~36-~36: Consider using either the past participle “sent” or the present participle “sending” here.
Context: ... and a thread are created. A message is send from the new thread and received in the...(BEEN_PART_AGREEMENT)
[uncategorized] ~41-~41: Possible missing comma found.
Context: ...e Scala-like. Another reason is that in Scala it is not possible to define an infix `...(AI_HYDRA_LEO_MISSING_COMMA)
[formatting] ~69-~69: Consider inserting a comma after ‘that’.
Context: ...tsend()
method returns immediately. To do that you need to specify the maximum number ...(TO_VERB_COMMA)
[style] ~92-~92: Consider a more concise word here.
Context: ...require limited but unknown buffer size in order to satisfy progress guarantees. This port ...(IN_ORDER_TO_PREMIUM)
[duplication] ~178-~178: Possible typo: you repeated a word.
Context: ...` Output:message
### Closing channels Channels can be closed which prevents sending mo...(ENGLISH_WORD_REPEAT_RULE)
[uncategorized] ~242-~242: Possible missing comma found.
Context: ...eral channels at the same time. In this exampleselect()
is used to wait for any of t...(AI_HYDRA_LEO_MISSING_COMMA)
[typographical] ~243-~243: Consider adding a comma here.
Context: ... an incoming message. Once a message is available it is received. ```scala import com.gi...(IF_THAT_S_NOT_POSSIBLE_COMMA)
[uncategorized] ~319-~319: Possible missing comma found.
Context: ....close() ``` #### WaitGroup Similar to GoLang you can useWaitGroup
to coordinate m...(AI_HYDRA_LEO_MISSING_COMMA)
[typographical] ~337-~337: It seems that a comma is missing.
Context: ... without any waiting. If the timeout is specified the methods will wait the specified amo...(IF_COMMA)
[formatting] ~338-~338: Consider inserting a comma after an introductory phrase for better readability.
Context: .... Timeout can be set toDuration.Inf
. In this case these methods are equivalent to their b...(IN_THAT_CASE_COMMA)
[style] ~338-~338: Consider using “except” or “except for”
Context: ...e equivalent to their blocking variants with the exception of the type of returned value. * `trySend...(WITH_THE_EXCEPTION_OF)
[uncategorized] ~339-~339: Possible missing article found.
Context: ...iants with the exception of the type of returned value. *trySend()
returns a boolean...(AI_HYDRA_LEO_MISSING_THE)
[typographical] ~341-~341: The word “otherwise” is an adverb that can’t be used like a conjunction, and therefore needs to be separated from the sentence.
Context: ...n. Iftrue
, the message has been sent successfully, otherwise it is failed for whatever reason (ma...(THUS_SENTENCE)
[uncategorized] ~342-~342: Use a comma before “or” if it connects two independent clauses (unless they are closely connected and short).
Context: ... reason (maybe the channel was closed or the buffer is full). *tryRecv()
retu...(COMMA_COMPOUND_SENTENCE_2)
[misspelling] ~343-~343: Use “an” instead of ‘a’ if the following word starts with a vowel sound, e.g. ‘an article’, ‘an hour’.
Context: ... buffer is full). *tryRecv()
returns a optional value. If there are no pending...(EN_A_VS_AN)
[uncategorized] ~343-~343: Possible missing comma found.
Context: ...optional value. If there are no pending messages the method returnsNone
. * `trySelect...(AI_HYDRA_LEO_MISSING_COMMA)
[typographical] ~380-~380: Consider adding a comma.
Context: ...ters. So ifsclect()
returns a channel there are no guarantees that another thread w...(IF_THERE_COMMA)
[uncategorized] ~425-~425: Possible missing article found.
Context: ...def ``` ### A balancer example Here is example function that balances inputs from two ...(AI_HYDRA_LEO_MISSING_AN)
[grammar] ~457-~457: With the plural noun ‘features’, the verb inflection ‘is’ is not correct.
Context: ...a-specific channel features Since Scala is a functional language, this implementat...(NNS_IN_NNP_VBZ)
[typographical] ~561-~561: Consider adding a comma here.
Context: ...or channels. When several selectors are ready the first one will take precedence. ...(IF_THAT_S_NOT_POSSIBLE_COMMA)
🪛 markdownlint-cli2 (0.17.2)
20-20: Bare URL used
null(MD034, no-bare-urls)
25-25: Bare URL used
null(MD034, no-bare-urls)
40-40: Emphasis style
Expected: underscore; Actual: asterisk(MD049, emphasis-style)
40-40: Emphasis style
Expected: underscore; Actual: asterisk(MD049, emphasis-style)
61-61: Fenced code blocks should have a language specified
null(MD040, fenced-code-language)
85-85: Fenced code blocks should have a language specified
null(MD040, fenced-code-language)
110-110: Fenced code blocks should have a language specified
null(MD040, fenced-code-language)
140-140: Fenced code blocks should have a language specified
null(MD040, fenced-code-language)
174-174: Fenced code blocks should have a language specified
null(MD040, fenced-code-language)
196-196: Fenced code blocks should have a language specified
null(MD040, fenced-code-language)
232-232: Fenced code blocks should have a language specified
null(MD040, fenced-code-language)
271-271: Fenced code blocks should have a language specified
null(MD040, fenced-code-language)
370-370: Fenced code blocks should have a language specified
null(MD040, fenced-code-language)
417-417: Fenced code blocks should have a language specified
null(MD040, fenced-code-language)
547-547: Heading levels should only increment by one level at a time
Expected: h3; Actual: h4(MD001, heading-increment)
547-547: Unordered list style
Expected: asterisk; Actual: dash(MD004, ul-style)
547-547: Trailing punctuation in heading
Punctuation: '.'(MD026, no-trailing-punctuation)
548-548: Unordered list style
Expected: asterisk; Actual: dash(MD004, ul-style)
548-548: Unordered list indentation
Expected: 2; Actual: 4(MD007, ul-indent)
549-549: Unordered list style
Expected: asterisk; Actual: dash(MD004, ul-style)
549-549: Unordered list indentation
Expected: 2; Actual: 4(MD007, ul-indent)
551-551: Unordered list style
Expected: asterisk; Actual: dash(MD004, ul-style)
551-551: Trailing punctuation in heading
Punctuation: '.'(MD026, no-trailing-punctuation)
552-552: Unordered list style
Expected: asterisk; Actual: dash(MD004, ul-style)
552-552: Unordered list indentation
Expected: 2; Actual: 4(MD007, ul-indent)
553-553: Unordered list style
Expected: asterisk; Actual: dash(MD004, ul-style)
553-553: Unordered list indentation
Expected: 2; Actual: 4(MD007, ul-indent)
554-554: Unordered list style
Expected: asterisk; Actual: dash(MD004, ul-style)
554-554: Unordered list indentation
Expected: 2; Actual: 4(MD007, ul-indent)
556-556: Unordered list style
Expected: asterisk; Actual: dash(MD004, ul-style)
556-556: Trailing punctuation in heading
Punctuation: '.'(MD026, no-trailing-punctuation)
557-557: Unordered list style
Expected: asterisk; Actual: dash(MD004, ul-style)
557-557: Unordered list indentation
Expected: 2; Actual: 4(MD007, ul-indent)
559-559: Unordered list style
Expected: asterisk; Actual: dash(MD004, ul-style)
559-559: Trailing punctuation in heading
Punctuation: '.'(MD026, no-trailing-punctuation)
560-560: Unordered list style
Expected: asterisk; Actual: dash(MD004, ul-style)
560-560: Unordered list indentation
Expected: 2; Actual: 4(MD007, ul-indent)
561-561: Unordered list style
Expected: asterisk; Actual: dash(MD004, ul-style)
561-561: Unordered list indentation
Expected: 2; Actual: 4(MD007, ul-indent)
562-562: Unordered list style
Expected: asterisk; Actual: dash(MD004, ul-style)
562-562: Unordered list indentation
Expected: 2; Actual: 4(MD007, ul-indent)
564-564: Unordered list style
Expected: asterisk; Actual: dash(MD004, ul-style)
564-564: Trailing punctuation in heading
Punctuation: '.'(MD026, no-trailing-punctuation)
565-565: Unordered list style
Expected: asterisk; Actual: dash(MD004, ul-style)
565-565: Unordered list indentation
Expected: 2; Actual: 4(MD007, ul-indent)
566-566: Unordered list style
Expected: asterisk; Actual: dash(MD004, ul-style)
566-566: Unordered list indentation
Expected: 4; Actual: 6(MD007, ul-indent)
567-567: Unordered list style
Expected: asterisk; Actual: dash(MD004, ul-style)
567-567: Unordered list indentation
Expected: 2; Actual: 4(MD007, ul-indent)
568-568: Unordered list style
Expected: asterisk; Actual: dash(MD004, ul-style)
568-568: Unordered list indentation
Expected: 4; Actual: 6(MD007, ul-indent)
570-570: Unordered list style
Expected: asterisk; Actual: dash(MD004, ul-style)
570-570: Trailing punctuation in heading
Punctuation: '.'(MD026, no-trailing-punctuation)
571-571: Unordered list style
Expected: asterisk; Actual: dash(MD004, ul-style)
571-571: Unordered list indentation
Expected: 2; Actual: 3(MD007, ul-indent)
573-573: Unordered list style
Expected: asterisk; Actual: dash(MD004, ul-style)
573-573: Trailing punctuation in heading
Punctuation: '.'(MD026, no-trailing-punctuation)
574-574: Unordered list style
Expected: asterisk; Actual: dash(MD004, ul-style)
574-574: Unordered list indentation
Expected: 2; Actual: 3(MD007, ul-indent)
576-576: Unordered list style
Expected: asterisk; Actual: dash(MD004, ul-style)
576-576: Trailing punctuation in heading
Punctuation: '.'(MD026, no-trailing-punctuation)
577-577: Unordered list style
Expected: asterisk; Actual: dash(MD004, ul-style)
577-577: Unordered list indentation
Expected: 2; Actual: 3(MD007, ul-indent)
579-579: Unordered list style
Expected: asterisk; Actual: dash(MD004, ul-style)
579-579: Trailing punctuation in heading
Punctuation: '.'(MD026, no-trailing-punctuation)
580-580: Unordered list style
Expected: asterisk; Actual: dash(MD004, ul-style)
580-580: Unordered list indentation
Expected: 2; Actual: 3(MD007, ul-indent)
src/main/scala/com/github/yruslan/channel/TimeChannels.scala (1)
37-49
: Functional implementation of time-delayed channel.The
after
method correctly implements a one-shot timer channel that emits once and closes. The use ofAsyncChannel
with capacity 1 is appropriate, and the exception handling for channel closure is correct.Consider using
ScheduledExecutorService
instead ofThread.sleep
for better resource efficiency:-Future { - Thread.sleep(duration.toMillis) - try { - channel.sendAndClose(Instant.now()) - } catch { - case _: IllegalStateException => // Ignore if the channel is already closed - } -} +val scheduledExecutor = java.util.concurrent.Executors.newScheduledThreadPool(1) +scheduledExecutor.schedule(new Runnable { + def run(): Unit = { + try { + channel.sendAndClose(Instant.now()) + } catch { + case _: IllegalStateException => // Ignore if the channel is already closed + } + } +}, duration.toMillis, java.util.concurrent.TimeUnit.MILLISECONDS)This avoids blocking threads in the execution context thread pool.
project/ScalacOptions.scala (1)
31-31
: Fix typo in compiler option.There's a typo in the inlining package specification.
- "-opt-inline-from:com.github.yruslan.channel.**" // Enable inlining within the livrary package + "-opt-inline-from:com.github.yruslan.channel.**" // Enable inlining within the library packagesrc/test/scala/com/github/yruslan/channel/impl/SimpleLinkListSuite.scala (2)
20-20
: Test class name typo – keep naming consistent with the class under test
SimpleLinkListSuite
is missing the “ed” in Linked. Renaming improves IDE navigation and searchability.-class SimpleLinkListSuite extends AnyWordSpec { +class SimpleLinkedListSuite extends AnyWordSpec {
110-110
: Typo in test title
"remove 1ll elements from the list"
→"remove all elements from the list"
.src/main/scala/com/github/yruslan/channel/ChannelDecoratorMap.scala (1)
21-36
: Null-safety guard for the mapping function
f
is a public constructor parameter that might benull
. A defensive check keeps the decorator fail-fast and avoids an NPE buried deep in a worker thread.-class ChannelDecoratorMap[T, U](inputChannel: ReadChannel[T], f: T => U) ... +class ChannelDecoratorMap[T, U](inputChannel: ReadChannel[T], f: T => U) + extends ChannelDecorator[T](inputChannel) with ReadChannel[U] { + + require(f != null, "Mapping function must not be null")src/test/scala/com/github/yruslan/channel/ChannelSuite.scala (2)
43-46
: Executor service is shut down but not awaited
ex.shutdown()
allows tasks to keep running; the JVM may terminate the suite prematurely or leak threads in long-running builds. Prefershutdown()
+awaitTermination
(orshutdownNow
) to ensure deterministic teardown.override def afterAll(): Unit = { - ex.shutdown() + ex.shutdown() + ex.awaitTermination(30, TimeUnit.SECONDS) super.afterAll() }
98-112
: Small sleep-based timing windows can introduce flaky testsThe test relies on
Thread.sleep(10)
versus a50 ms
Await timeout.
On a loaded CI box 10 ms sleeps often overshoot, producing sporadic failures.Consider using proper coordination primitives (
Promise
/WaitGroup
) instead ofsleep
, or increase the margin substantially.Example refactor (conceptual):
val barrier = Promise[Unit]() val f = Future { barrier.future.foreach(_ => ()) ch.close() } ch.send(1) barrier.success(())src/main/scala/com/github/yruslan/channel/ReadChannel.scala (1)
42-46
:toList
blocks indefinitely until the channel closes – document or redesign
foreach
only returns whenisClosed
becomes true inside the underlying implementation, thereforetoList
never completes for a long-lived channel.
Callers expecting “snapshot” semantics will hang.Consider one of:
- Rename to
drainToList
and document the blocking behaviour explicitly.- Add a bounded/timeout overload (
toList(timeout: Duration)
).- Implement non-blocking collection using repeated
tryRecv()
.Failing at least to document this can lead to subtle production deadlocks.
src/test/scala/com/github/yruslan/channel/GuaranteesSuite.scala (1)
194-195
: Avoid println in unit tests – use assertions or test‐framework logging instead
println
adds noise to CI logs and slows large test suites. Prefer:info(s"Single consumer throughput: ${throughput.round} rps, latency: ${latency.round} μs")
info
is provided by ScalaTest and will only display when a test fails or when run in verbose mode.src/main/scala/com/github/yruslan/channel/SyncChannel.scala (2)
218-231
: SimplifyhasCapacity
for readability & micro-optimisationThe current imperative scan could be turned into a more idiomatic Scala expression and avoids a mutable flag:
- var foundOtherThread = false - readWaiters.foreach(waiter => if (waiter.threadId != myThreadId) foundOtherThread = true) - - foundOtherThread + readWaiters.findNot(myThreadId).isDefined(
findNot
already exists inSimpleLinkedList
.)This reduces branching and improves maintainability.
250-268
: Fairness issue innotifySyncReaders
Rotating through the wait-queue until a waiter with a different
threadId
is found may starve single-threaded “ping-pong” scenarios (e.g. only two threads, one always sending).
A simpler and fairer approach is to dequeue the first waiter that is not the sender and, if none found, fall back tocrd.signal()
so that a blocking reader (even on the same thread) can check the state.Consider:
val waiterOpt = readWaiters.findNot(sender) waiterOpt.foreach(_.sem.release())This keeps rotation O(n) worst-case but avoids indefinite cycling.
src/main/scala/com/github/yruslan/channel/AsyncChannel.scala (1)
44-61
:send
still callsnotifyReaders()
even when the channel just closedInside
send
, after the blocking loop we enqueue the element only if!closed
, yetnotifyReaders()
is unconditionally executed.
This wakes up waiting readers although there is no new data, causing an extra context switch.- if (!closed) { - q.enqueue(value) - } - notifyReaders() + if (!closed) { + q.enqueue(value) + notifyReaders() // move inside the block + }A tiny optimisation, but worth the change given the method is on a hot path.
src/main/scala/com/github/yruslan/channel/Channel.scala (2)
320-322
: Typo in parameter name propagates through public API
trySelect(timout: Duration, …)
– “timeout” is misspelled in method name and parameter, leaking into user-facing APIs.
Renaming before the library is released will avoid a breaking change later.-def trySelect(timout: Duration, isPriorityOrdered: Boolean, ... +def trySelect(timeout: Duration, isPriorityOrdered: Boolean, ...Same typo exists in the overload on lines 314-322 and in Scaladoc comments (“non-bloaking”).
Please fix consistently.
337-341
: Documentation typo – “non-bloaking”Minor: s/“non-bloaking action”/“non-blocking action”/ on line 337.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (35)
.github/workflows/build.yml
(1 hunks).gitignore
(1 hunks)LICENSE
(1 hunks)README.md
(1 hunks)build.sbt
(1 hunks)project/ScalacOptions.scala
(1 hunks)project/build.properties
(1 hunks)project/plugins.sbt
(1 hunks)publish.sbt
(1 hunks)src/main/scala/com/github/yruslan/channel/AsyncChannel.scala
(1 hunks)src/main/scala/com/github/yruslan/channel/Channel.scala
(1 hunks)src/main/scala/com/github/yruslan/channel/ChannelDecorator.scala
(1 hunks)src/main/scala/com/github/yruslan/channel/ChannelDecoratorFilter.scala
(1 hunks)src/main/scala/com/github/yruslan/channel/ChannelDecoratorMap.scala
(1 hunks)src/main/scala/com/github/yruslan/channel/ChannelLike.scala
(1 hunks)src/main/scala/com/github/yruslan/channel/ReadChannel.scala
(1 hunks)src/main/scala/com/github/yruslan/channel/SyncChannel.scala
(1 hunks)src/main/scala/com/github/yruslan/channel/TimeChannels.scala
(1 hunks)src/main/scala/com/github/yruslan/channel/WaitGroup.scala
(1 hunks)src/main/scala/com/github/yruslan/channel/WriteChannel.scala
(1 hunks)src/main/scala/com/github/yruslan/channel/exception/NegativeWaitGroupCounter.scala
(1 hunks)src/main/scala/com/github/yruslan/channel/impl/Awaiter.scala
(1 hunks)src/main/scala/com/github/yruslan/channel/impl/Selector.scala
(1 hunks)src/main/scala/com/github/yruslan/channel/impl/SimpleLinkedList.scala
(1 hunks)src/main/scala/com/github/yruslan/channel/impl/Waiter.scala
(1 hunks)src/test/scala/com/github/yruslan/channel/ChannelFilterSuite.scala
(1 hunks)src/test/scala/com/github/yruslan/channel/ChannelMapSuite.scala
(1 hunks)src/test/scala/com/github/yruslan/channel/ChannelSuite.scala
(1 hunks)src/test/scala/com/github/yruslan/channel/GuaranteesSuite.scala
(1 hunks)src/test/scala/com/github/yruslan/channel/TestUtils.scala
(1 hunks)src/test/scala/com/github/yruslan/channel/TimeChannelsSuite.scala
(1 hunks)src/test/scala/com/github/yruslan/channel/WaitGroupSuite.scala
(1 hunks)src/test/scala/com/github/yruslan/channel/impl/SimpleLinkListSuite.scala
(1 hunks)src/test/scala/com/github/yruslan/channel/mocks/AsyncChannelSpy.scala
(1 hunks)version.sbt
(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (13)
src/main/scala/com/github/yruslan/channel/ChannelLike.scala (3)
src/main/scala/com/github/yruslan/channel/AsyncChannel.scala (2)
channel
(212-228)isClosed
(180-189)src/main/scala/com/github/yruslan/channel/SyncChannel.scala (1)
isClosed
(206-215)src/main/scala/com/github/yruslan/channel/ChannelDecorator.scala (1)
isClosed
(19-19)
src/main/scala/com/github/yruslan/channel/impl/Selector.scala (1)
src/main/scala/com/github/yruslan/channel/Channel.scala (8)
Channel
(26-230)Channel
(232-491)sendRecv
(80-97)sendRecv
(107-125)sendRecv
(413-415)afterAction
(99-99)afterAction
(127-127)afterAction
(417-417)
src/test/scala/com/github/yruslan/channel/TimeChannelsSuite.scala (1)
src/main/scala/com/github/yruslan/channel/TimeChannels.scala (3)
ticker
(61-75)TimeChannels
(25-76)after
(37-49)
src/test/scala/com/github/yruslan/channel/WaitGroupSuite.scala (3)
src/main/scala/com/github/yruslan/channel/exception/NegativeWaitGroupCounter.scala (1)
NegativeWaitGroupCounter
(18-18)src/test/scala/com/github/yruslan/channel/TestUtils.scala (3)
TestUtils
(22-38)createThread
(23-29)setUncaughtExceptionHandler
(31-37)src/main/scala/com/github/yruslan/channel/WaitGroup.scala (5)
WaitGroup
(27-89)WaitGroup
(91-93)add
(39-53)done
(61-64)await
(74-88)
src/main/scala/com/github/yruslan/channel/impl/Waiter.scala (1)
src/main/scala/com/github/yruslan/channel/AsyncChannel.scala (1)
channel
(212-228)
src/test/scala/com/github/yruslan/channel/mocks/AsyncChannelSpy.scala (1)
src/main/scala/com/github/yruslan/channel/AsyncChannel.scala (1)
AsyncChannel
(23-229)
src/test/scala/com/github/yruslan/channel/ChannelSuite.scala (9)
src/main/scala/com/github/yruslan/channel/AsyncChannel.scala (9)
channel
(212-228)send
(43-65)recv
(115-136)close
(28-41)tryRecv
(138-155)tryRecv
(157-178)trySend
(67-84)trySend
(86-113)isClosed
(180-189)src/main/scala/com/github/yruslan/channel/Channel.scala (14)
Channel
(26-230)Channel
(232-491)select
(284-286)make
(243-245)make
(255-263)makeUnbounded
(271-273)foreach
(51-74)fornew
(42-49)sender
(78-101)recver
(103-129)default
(411-419)trySelect
(309-311)trySelect
(320-322)trySelect
(345-409)src/test/scala/com/github/yruslan/channel/mocks/AsyncChannelSpy.scala (3)
AsyncChannelSpy
(20-24)numOfReaders
(21-21)numOfWriters
(23-23)src/test/scala/com/github/yruslan/channel/TestUtils.scala (3)
TestUtils
(22-38)createThread
(23-29)setUncaughtExceptionHandler
(31-37)src/main/scala/com/github/yruslan/channel/SyncChannel.scala (8)
send
(48-74)recv
(132-157)close
(26-46)tryRecv
(159-181)tryRecv
(183-204)trySend
(76-94)trySend
(96-130)isClosed
(206-215)src/main/scala/com/github/yruslan/channel/ChannelDecoratorMap.scala (6)
recv
(22-23)tryRecv
(25-25)tryRecv
(27-28)foreach
(34-35)fornew
(32-32)recver
(30-30)src/main/scala/com/github/yruslan/channel/ChannelDecoratorFilter.scala (6)
recv
(25-35)tryRecv
(37-46)tryRecv
(48-68)foreach
(74-75)fornew
(72-72)recver
(70-70)src/main/scala/com/github/yruslan/channel/WaitGroup.scala (6)
apply
(92-92)WaitGroup
(27-89)WaitGroup
(91-93)add
(39-53)done
(61-64)await
(74-88)src/main/scala/com/github/yruslan/channel/ReadChannel.scala (3)
toList
(42-46)map
(38-38)filter
(39-39)
src/test/scala/com/github/yruslan/channel/TestUtils.scala (1)
src/main/scala/com/github/yruslan/channel/exception/NegativeWaitGroupCounter.scala (1)
NegativeWaitGroupCounter
(18-18)
src/main/scala/com/github/yruslan/channel/TimeChannels.scala (1)
src/main/scala/com/github/yruslan/channel/AsyncChannel.scala (3)
channel
(212-228)isClosed
(180-189)send
(43-65)
src/main/scala/com/github/yruslan/channel/WaitGroup.scala (1)
src/main/scala/com/github/yruslan/channel/exception/NegativeWaitGroupCounter.scala (1)
NegativeWaitGroupCounter
(18-18)
src/test/scala/com/github/yruslan/channel/impl/SimpleLinkListSuite.scala (1)
src/main/scala/com/github/yruslan/channel/impl/SimpleLinkedList.scala (8)
isEmpty
(41-43)size
(49-49)append
(29-39)head
(51-57)remove
(69-92)clear
(116-120)foreach
(122-128)returnHeadAndRotate
(59-67)
src/main/scala/com/github/yruslan/channel/Channel.scala (9)
src/main/scala/com/github/yruslan/channel/AsyncChannel.scala (9)
channel
(212-228)tryRecv
(138-155)tryRecv
(157-178)hasMessages
(197-199)isClosed
(180-189)fetchValueOpt
(202-209)trySend
(67-84)trySend
(86-113)hasCapacity
(192-194)src/main/scala/com/github/yruslan/channel/impl/Selector.scala (1)
channel
(20-25)src/main/scala/com/github/yruslan/channel/impl/Awaiter.scala (2)
Awaiter
(24-63)await
(28-40)src/main/scala/com/github/yruslan/channel/impl/SimpleLinkedList.scala (7)
SimpleLinkedList
(22-148)nonEmpty
(45-47)foreach
(122-128)append
(29-39)isEmpty
(41-43)returnHeadAndRotate
(59-67)remove
(69-92)src/main/scala/com/github/yruslan/channel/impl/Waiter.scala (1)
Waiter
(20-23)src/main/scala/com/github/yruslan/channel/ChannelDecoratorMap.scala (5)
fornew
(32-32)tryRecv
(25-25)tryRecv
(27-28)foreach
(34-35)recver
(30-30)src/main/scala/com/github/yruslan/channel/ChannelDecoratorFilter.scala (5)
fornew
(72-72)tryRecv
(37-46)tryRecv
(48-68)foreach
(74-75)recver
(70-70)src/main/scala/com/github/yruslan/channel/SyncChannel.scala (8)
tryRecv
(159-181)tryRecv
(183-204)hasMessages
(234-236)isClosed
(206-215)fetchValueOpt
(239-247)trySend
(76-94)trySend
(96-130)hasCapacity
(218-231)src/main/scala/com/github/yruslan/channel/ChannelDecorator.scala (1)
isClosed
(19-19)
src/main/scala/com/github/yruslan/channel/ReadChannel.scala (5)
src/main/scala/com/github/yruslan/channel/impl/Selector.scala (1)
channel
(20-25)src/main/scala/com/github/yruslan/channel/ChannelDecoratorMap.scala (6)
recv
(22-23)tryRecv
(25-25)tryRecv
(27-28)recver
(30-30)fornew
(32-32)foreach
(34-35)src/main/scala/com/github/yruslan/channel/ChannelDecoratorFilter.scala (6)
recv
(25-35)tryRecv
(37-46)tryRecv
(48-68)recver
(70-70)fornew
(72-72)foreach
(74-75)src/main/scala/com/github/yruslan/channel/Channel.scala (3)
recver
(103-129)fornew
(42-49)foreach
(51-74)src/main/scala/com/github/yruslan/channel/impl/SimpleLinkedList.scala (1)
foreach
(122-128)
🪛 LanguageTool
README.md
[uncategorized] ~10-~10: Use a comma before ‘so’ if it connects two independent clauses (unless they are closely connected and short).
Context: ...possible the features provided by GoLang so channels can be used for concurrency co...
(COMMA_COMPOUND_SENTENCE_2)
[uncategorized] ~11-~11: Use a comma before ‘so’ if it connects two independent clauses (unless they are closely connected and short).
Context: ...res as underlying concurrency primitives so the performance is not expected to matc...
(COMMA_COMPOUND_SENTENCE_2)
[typographical] ~24-~24: Consider isolating this adverbial phrase with commas.
Context: ...e these models are similar, but they are in fact very different. Good deescripton on the...
(IN_FACT_COMMA_2)
[style] ~27-~27: As an alternative to the over-used intensifier ‘extremely’, consider replacing this phrase.
Context: .../22622880/1038282 CSP channels provide an extremely simple and uniform building block for designin...
(EN_WEAK_ADJECTIVE)
[grammar] ~36-~36: Consider using either the past participle “sent” or the present participle “sending” here.
Context: ... and a thread are created. A message is send from the new thread and received in the...
(BEEN_PART_AGREEMENT)
[uncategorized] ~41-~41: Possible missing comma found.
Context: ...e Scala-like. Another reason is that in Scala it is not possible to define an infix `...
(AI_HYDRA_LEO_MISSING_COMMA)
[formatting] ~69-~69: Consider inserting a comma after ‘that’.
Context: ...t send()
method returns immediately. To do that you need to specify the maximum number ...
(TO_VERB_COMMA)
[style] ~92-~92: Consider a more concise word here.
Context: ...require limited but unknown buffer size in order to satisfy progress guarantees. This port ...
(IN_ORDER_TO_PREMIUM)
[duplication] ~178-~178: Possible typo: you repeated a word.
Context: ...` Output: message
### Closing channels Channels can be closed which prevents sending mo...
(ENGLISH_WORD_REPEAT_RULE)
[uncategorized] ~242-~242: Possible missing comma found.
Context: ...eral channels at the same time. In this example select()
is used to wait for any of t...
(AI_HYDRA_LEO_MISSING_COMMA)
[typographical] ~243-~243: Consider adding a comma here.
Context: ... an incoming message. Once a message is available it is received. ```scala import com.gi...
(IF_THAT_S_NOT_POSSIBLE_COMMA)
[uncategorized] ~319-~319: Possible missing comma found.
Context: ....close() ``` #### WaitGroup Similar to GoLang you can use WaitGroup
to coordinate m...
(AI_HYDRA_LEO_MISSING_COMMA)
[typographical] ~337-~337: It seems that a comma is missing.
Context: ... without any waiting. If the timeout is specified the methods will wait the specified amo...
(IF_COMMA)
[formatting] ~338-~338: Consider inserting a comma after an introductory phrase for better readability.
Context: .... Timeout can be set to Duration.Inf
. In this case these methods are equivalent to their b...
(IN_THAT_CASE_COMMA)
[style] ~338-~338: Consider using “except” or “except for”
Context: ...e equivalent to their blocking variants with the exception of the type of returned value. * `trySend...
(WITH_THE_EXCEPTION_OF)
[uncategorized] ~339-~339: Possible missing article found.
Context: ...iants with the exception of the type of returned value. * trySend()
returns a boolean...
(AI_HYDRA_LEO_MISSING_THE)
[typographical] ~341-~341: The word “otherwise” is an adverb that can’t be used like a conjunction, and therefore needs to be separated from the sentence.
Context: ...n. If true
, the message has been sent successfully, otherwise it is failed for whatever reason (ma...
(THUS_SENTENCE)
[uncategorized] ~342-~342: Use a comma before “or” if it connects two independent clauses (unless they are closely connected and short).
Context: ... reason (maybe the channel was closed or the buffer is full). * tryRecv()
retu...
(COMMA_COMPOUND_SENTENCE_2)
[misspelling] ~343-~343: Use “an” instead of ‘a’ if the following word starts with a vowel sound, e.g. ‘an article’, ‘an hour’.
Context: ... buffer is full). * tryRecv()
returns a optional value. If there are no pending...
(EN_A_VS_AN)
[uncategorized] ~343-~343: Possible missing comma found.
Context: ...optional value. If there are no pending messages the method returns None
. * `trySelect...
(AI_HYDRA_LEO_MISSING_COMMA)
[typographical] ~380-~380: Consider adding a comma.
Context: ...ters. So if sclect()
returns a channel there are no guarantees that another thread w...
(IF_THERE_COMMA)
[uncategorized] ~425-~425: Possible missing article found.
Context: ...def ``` ### A balancer example Here is example function that balances inputs from two ...
(AI_HYDRA_LEO_MISSING_AN)
[grammar] ~457-~457: With the plural noun ‘features’, the verb inflection ‘is’ is not correct.
Context: ...a-specific channel features Since Scala is a functional language, this implementat...
(NNS_IN_NNP_VBZ)
[typographical] ~561-~561: Consider adding a comma here.
Context: ...or channels. When several selectors are ready the first one will take precedence. ...
(IF_THAT_S_NOT_POSSIBLE_COMMA)
🪛 markdownlint-cli2 (0.17.2)
README.md
20-20: Bare URL used
null
(MD034, no-bare-urls)
25-25: Bare URL used
null
(MD034, no-bare-urls)
40-40: Emphasis style
Expected: underscore; Actual: asterisk
(MD049, emphasis-style)
40-40: Emphasis style
Expected: underscore; Actual: asterisk
(MD049, emphasis-style)
61-61: Fenced code blocks should have a language specified
null
(MD040, fenced-code-language)
85-85: Fenced code blocks should have a language specified
null
(MD040, fenced-code-language)
110-110: Fenced code blocks should have a language specified
null
(MD040, fenced-code-language)
140-140: Fenced code blocks should have a language specified
null
(MD040, fenced-code-language)
174-174: Fenced code blocks should have a language specified
null
(MD040, fenced-code-language)
196-196: Fenced code blocks should have a language specified
null
(MD040, fenced-code-language)
232-232: Fenced code blocks should have a language specified
null
(MD040, fenced-code-language)
271-271: Fenced code blocks should have a language specified
null
(MD040, fenced-code-language)
370-370: Fenced code blocks should have a language specified
null
(MD040, fenced-code-language)
417-417: Fenced code blocks should have a language specified
null
(MD040, fenced-code-language)
547-547: Heading levels should only increment by one level at a time
Expected: h3; Actual: h4
(MD001, heading-increment)
547-547: Unordered list style
Expected: asterisk; Actual: dash
(MD004, ul-style)
547-547: Trailing punctuation in heading
Punctuation: '.'
(MD026, no-trailing-punctuation)
548-548: Unordered list style
Expected: asterisk; Actual: dash
(MD004, ul-style)
548-548: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
549-549: Unordered list style
Expected: asterisk; Actual: dash
(MD004, ul-style)
549-549: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
551-551: Unordered list style
Expected: asterisk; Actual: dash
(MD004, ul-style)
551-551: Trailing punctuation in heading
Punctuation: '.'
(MD026, no-trailing-punctuation)
552-552: Unordered list style
Expected: asterisk; Actual: dash
(MD004, ul-style)
552-552: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
553-553: Unordered list style
Expected: asterisk; Actual: dash
(MD004, ul-style)
553-553: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
554-554: Unordered list style
Expected: asterisk; Actual: dash
(MD004, ul-style)
554-554: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
556-556: Unordered list style
Expected: asterisk; Actual: dash
(MD004, ul-style)
556-556: Trailing punctuation in heading
Punctuation: '.'
(MD026, no-trailing-punctuation)
557-557: Unordered list style
Expected: asterisk; Actual: dash
(MD004, ul-style)
557-557: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
559-559: Unordered list style
Expected: asterisk; Actual: dash
(MD004, ul-style)
559-559: Trailing punctuation in heading
Punctuation: '.'
(MD026, no-trailing-punctuation)
560-560: Unordered list style
Expected: asterisk; Actual: dash
(MD004, ul-style)
560-560: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
561-561: Unordered list style
Expected: asterisk; Actual: dash
(MD004, ul-style)
561-561: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
562-562: Unordered list style
Expected: asterisk; Actual: dash
(MD004, ul-style)
562-562: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
564-564: Unordered list style
Expected: asterisk; Actual: dash
(MD004, ul-style)
564-564: Trailing punctuation in heading
Punctuation: '.'
(MD026, no-trailing-punctuation)
565-565: Unordered list style
Expected: asterisk; Actual: dash
(MD004, ul-style)
565-565: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
566-566: Unordered list style
Expected: asterisk; Actual: dash
(MD004, ul-style)
566-566: Unordered list indentation
Expected: 4; Actual: 6
(MD007, ul-indent)
567-567: Unordered list style
Expected: asterisk; Actual: dash
(MD004, ul-style)
567-567: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
568-568: Unordered list style
Expected: asterisk; Actual: dash
(MD004, ul-style)
568-568: Unordered list indentation
Expected: 4; Actual: 6
(MD007, ul-indent)
570-570: Unordered list style
Expected: asterisk; Actual: dash
(MD004, ul-style)
570-570: Trailing punctuation in heading
Punctuation: '.'
(MD026, no-trailing-punctuation)
571-571: Unordered list style
Expected: asterisk; Actual: dash
(MD004, ul-style)
571-571: Unordered list indentation
Expected: 2; Actual: 3
(MD007, ul-indent)
573-573: Unordered list style
Expected: asterisk; Actual: dash
(MD004, ul-style)
573-573: Trailing punctuation in heading
Punctuation: '.'
(MD026, no-trailing-punctuation)
574-574: Unordered list style
Expected: asterisk; Actual: dash
(MD004, ul-style)
574-574: Unordered list indentation
Expected: 2; Actual: 3
(MD007, ul-indent)
576-576: Unordered list style
Expected: asterisk; Actual: dash
(MD004, ul-style)
576-576: Trailing punctuation in heading
Punctuation: '.'
(MD026, no-trailing-punctuation)
577-577: Unordered list style
Expected: asterisk; Actual: dash
(MD004, ul-style)
577-577: Unordered list indentation
Expected: 2; Actual: 3
(MD007, ul-indent)
579-579: Unordered list style
Expected: asterisk; Actual: dash
(MD004, ul-style)
579-579: Trailing punctuation in heading
Punctuation: '.'
(MD026, no-trailing-punctuation)
580-580: Unordered list style
Expected: asterisk; Actual: dash
(MD004, ul-style)
580-580: Unordered list indentation
Expected: 2; Actual: 3
(MD007, ul-indent)
🔇 Additional comments (38)
LICENSE (1)
1-203
: Confirm license transition to Apache 2.0.
The project license has been updated from MIT to Apache License 2.0; ensure all contributors and downstream users are aware of the change and that documentation, NOTICE files, and any legal references have been updated accordingly.project/build.properties (1)
1-1
: SBt version configuration is correct.
Thesbt.version
is set to1.9.9
, aligning with the new build setup. Verify your local and CI environments are using this version to avoid inconsistencies.version.sbt (1)
1-1
: Snapshot version is updated.
The build version is now0.2.2-SNAPSHOT
. Confirm that this matches your release strategy and that documentation or release notes reflect this new version.project/plugins.sbt (1)
16-19
: Plugin declarations look solid.
The SBT plugins (sbt-scoverage
,sbt-pgp
,sbt-header
,sbt-release
) and specified versions are appropriate for coverage, signing, header management, and release automation.src/main/scala/com/github/yruslan/channel/exception/NegativeWaitGroupCounter.scala (1)
18-18
: LGTM! Clean exception implementation.The exception class follows Scala best practices with a descriptive name and clear error message. The fixed message approach is appropriate for this specific error condition in WaitGroup operations.
src/main/scala/com/github/yruslan/channel/ChannelDecorator.scala (1)
18-20
: Excellent decorator pattern implementation.The abstract base class correctly implements the decorator pattern by wrapping a
ReadChannel[T]
and delegating theisClosed
behavior to the wrapped channel. This provides a clean foundation for channel transformations like map and filter operations.src/test/scala/com/github/yruslan/channel/ChannelMapSuite.scala (9)
24-25
: Good practice handling Scala 2.13 compatibility.The explicit import to avoid conflicts with Scala 2.13's builtin Channel object is a thoughtful approach for cross-version compatibility.
27-30
: Well-configured test execution context.The ExecutionContext with a fixed thread pool of 10 threads is appropriate for testing concurrent channel operations and ensures consistent test behavior.
32-46
: Comprehensive test coverage for recv() method.The test correctly verifies that mapped channels transform values properly while maintaining the blocking receive semantics.
48-62
: Good coverage of non-blocking tryRecv() behavior.The test properly validates both empty channel (returns None) and available value (returns transformed value) scenarios.
64-78
: Appropriate timeout testing with tryRecv(duration).The test uses short timeouts (2ms) which is suitable for unit testing while avoiding flaky test behavior.
80-92
: Correct testing of selection mechanism with recver().The test properly integrates with the Channel.select() API to verify that mapped channels work correctly in selection operations.
94-106
: Good coverage of fornew() method behavior.The test verifies that the fornew() method (likely for receiving new values) correctly applies the mapping transformation.
108-121
: Proper testing of foreach() iteration.The test confirms that foreach() correctly iterates over mapped channel values until the channel is closed.
123-139
: Excellent test for method chaining.The chained map operations test (int → string → int+1) verifies that the decorator pattern correctly composes multiple transformations, which is crucial for functional programming patterns.
src/test/scala/com/github/yruslan/channel/TimeChannelsSuite.scala (2)
27-42
: LGTM! Well-structured timing test.The test correctly validates the ticker functionality by measuring timing between ticks and uses reasonable upper bounds (500ms) to account for system timing variations.
56-66
: LGTM! Comprehensive single-shot channel test.The test properly validates both the timing behavior (at least 10ms delay) and the closure semantics (channel is closed after emitting) of the
after
method..github/workflows/build.yml (1)
1-28
: LGTM! Well-configured CI workflow.The workflow is properly structured with:
- Appropriate triggers for push and pull requests
- Good matrix strategy covering relevant Scala versions (2.11.12, 2.12.19, 2.13.13)
- Current GitHub Actions versions (checkout@v4, setup-java@v4)
- Proper Java 8 setup with caching for sbt dependencies
- Correct test command using the matrix Scala version
This provides solid continuous integration for the channel library across supported Scala versions.
src/test/scala/com/github/yruslan/channel/WaitGroupSuite.scala (3)
9-20
: LGTM! Solid concurrency test.The test correctly validates the core WaitGroup functionality by:
- Creating 10 concurrent threads that each call
add()
and laterdone()
- Using
await()
to block until all threads complete- Using TestUtils for Scala 2.11-compatible thread creation
The 1-second sleep provides sufficient time for proper concurrency testing.
23-30
: LGTM! Proper exception testing.The test correctly validates that calling
done()
when the counter is already zero throwsNegativeWaitGroupCounter
, which matches the expected behavior from the WaitGroup implementation.
34-52
: LGTM! Comprehensive negative counter test.The test properly validates that:
- The thread calling
add(-3)
(making counter negative) gets the exception via uncaught exception handler- The main thread calling
await()
also receives theNegativeWaitGroupCounter
exception- Uses proper thread synchronization with
join()
This thoroughly tests the exception propagation behavior documented in the WaitGroup implementation.
src/test/scala/com/github/yruslan/channel/mocks/AsyncChannelSpy.scala (1)
20-24
: LGTM! Clean test spy implementation.The
AsyncChannelSpy
properly extendsAsyncChannel
and exposes the internalreaders
andwriters
counters as public methods. This is a common and effective testing pattern for observing internal concurrency state during test execution.The implementation is minimal, focused, and follows the existing codebase patterns.
build.sbt (2)
18-36
: LGTM! Well-configured build setup.The build configuration is well-structured with:
- Multi-version Scala support (2.11, 2.12, 2.13)
- Proper cross-compilation setup
- Appropriate test dependencies
- Release automation with PGP signing
The configuration follows Scala project best practices.
16-16
:✅ Verification successful
Verify that ScalacOptions object exists.
The import statement references
ScalacOptions._
but this object is not visible in the provided context.
🏁 Script executed:
#!/bin/bash # Description: Verify that ScalacOptions object exists in the project # Expected: Should find ScalacOptions.scala file with scalacOptionsFor method fd -t f "ScalacOptions.scala" project/Length of output: 66
ScalacOptions import verified as present
The file
project/ScalacOptions.scala
exists, so theimport ScalacOptions._
statement is valid. No changes required.src/test/scala/com/github/yruslan/channel/TestUtils.scala (1)
22-38
: LGTM! Excellent Scala 2.11 compatibility utilities.The utility methods are well-implemented:
- Explicit Scala 2.11 compatibility with clear comments
- Proper thread creation using
new Runnable
pattern- Correct uncaught exception handler setup
- Clear warnings to prevent "modernization" that would break compatibility
These utilities will ensure the test suite works correctly across all supported Scala versions.
src/test/scala/com/github/yruslan/channel/ChannelFilterSuite.scala (1)
31-296
: Excellent comprehensive test coverage!This test suite provides outstanding coverage of channel filtering functionality:
- Tests all receive methods (recv, tryRecv, tryRecv with timeout, recver, fornew, foreach)
- Covers edge cases (zero timeout, empty channels, non-matching filters)
- Tests chaining operations (filter + filter, filter + map)
- Validates both positive and negative scenarios
- Uses appropriate test data and assertions
The test structure is clear, well-organized, and follows good testing practices.
.gitignore (1)
1-40
: LGTM! Comprehensive .gitignore configuration.The .gitignore file properly covers all essential patterns for Scala development including IDE files, build artifacts, and temporary files. The
syntax: glob
directive is correctly used and the patterns are well-organized by category.src/main/scala/com/github/yruslan/channel/impl/Awaiter.scala (2)
24-40
: Well-designed timeout utility with solid implementation.The
Awaiter
class provides a clean abstraction for timeout management on condition variables. The logic correctly handles the three timeout scenarios:
Duration.Zero
: Returns immediately without waiting- Finite duration: Waits with calculated remaining time
- Infinite duration: Waits indefinitely
The timing calculations using
Instant
and the thread-safety considerations are well implemented.
58-61
: Consider edge case handling for negative time calculations.The
timeLeft()
method correctly handles negative elapsed time by returning 0L, which ensures thatcond.await(0, TimeUnit.MILLISECONDS)
will return immediately if the timeout has already expired. This is the correct behavior.publish.sbt (1)
1-47
: LGTM! Publishing configuration is well-structured.The publishing configuration follows Maven Central best practices with proper SCM information, developer metadata, and Sonatype OSS Nexus repository settings. All URLs and configurations appear correct.
src/main/scala/com/github/yruslan/channel/WaitGroup.scala (1)
62-64
: LGTM! Clean implementation of done method.The
done()
method correctly delegates toadd(-1)
, maintaining consistency with the Go WaitGroup API.src/main/scala/com/github/yruslan/channel/WriteChannel.scala (1)
22-34
: LGTM! Well-designed channel interface.The
WriteChannel
trait properly uses contravariant type parameter[-T]
for write-only operations, includes comprehensive send methods (blocking, non-blocking, timed), and integrates well with the selector mechanism. The interface design follows good functional programming practices.project/ScalacOptions.scala (2)
52-63
: LGTM! Robust version detection and configuration.The
scalacOptionsFor
method correctly usesCrossVersion.partialVersion
for version detection and provides appropriate fallbacks. The debug output is helpful for troubleshooting compiler option issues.
19-50
:✅ Verification successful
Verify compiler options compatibility across Scala versions.
The compiler options look reasonable, but some optimization flags may not be available in all specified Scala versions.
🏁 Script executed:
#!/bin/bash # Description: Check if the specified compiler options are valid for each Scala version # Expected: All options should be recognized by their respective Scala versions echo "Checking Scala compiler options compatibility..." # Check if common optimization flags are available in Scala 2.11 echo "=== Checking Scala 2.11 compatibility ===" echo "Note: -opt-warnings and -opt:l:inline flags were introduced in Scala 2.12" echo "Scala 2.11 may not recognize these optimization flags" # Check if JVM target options are correct echo "=== Checking JVM target options ===" echo "Scala 2.11: Uses -target:jvm-1.8 (deprecated syntax)" echo "Scala 2.12+: Uses -release:8 (preferred syntax)"Length of output: 774
No changes needed – compiler options are correctly scoped per Scala version
All optimization flags are only applied to Scala 2.12 and 2.13, and Scala 2.11 uses only-target:jvm-1.8
; Scala 2.12+ correctly uses-release:8
. No compatibility issues detected.src/main/scala/com/github/yruslan/channel/SyncChannel.scala (1)
60-70
:❌ Incorrect review comment
send
returns before the value is actually consumedFor synchronous (unbuffered) channels the send operation should complete only after a receiver has taken the value.
HeretrySend
places the value intosyncValue
, notifies readers and returns immediately – the receiver may not yet have consumed it.If exact Go-style semantics are required, the sender must wait until
syncValue
becomesNone
again.syncValue = Option(value) sender = Thread.currentThread().getId notifySyncReaders() -true +while (syncValue.nonEmpty && !closed) { + awaitWriters() +} +notifyWriters() +truePlease double-check the expected contract; if “immediate fire-and-forget” is intended, consider renaming the method (e.g.
offer
) to avoid confusion with Go’sselect { case ch <- v: … }
semantics.
I’d like to confirm how
trySend
/send
is implemented end-to-end inSyncChannel.scala
—specifically whether it still returns immediately or now truly blocks until the reader consumes the value. Can you help me pull in the surrounding code?
🏁 Script executed:
#!/bin/bash # Locate the SyncChannel source file file=$(fd SyncChannel.scala) echo "== $file ==" # Show the class and its send/trySend definitions with context rg -n -C3 'class SyncChannel|def trySend|def send' "$file"Length of output: 1019
I’d like to see the full body of
trySend
to confirm whether it already blocks after notifying readers. Let’s pull lines 45–110 ofSyncChannel.scala
:
🏁 Script executed:
#!/bin/bash sed -n '45,110p' src/main/scala/com/github/yruslan/channel/SyncChannel.scalaLength of output: 1496
No change needed:
send
already blocks until the value is consumed;trySend
is non-blocking by designThe
send
method’s loopwhile (syncValue.nonEmpty && !closed) awaitWriters()ensures that it does not return until a reader has taken the value. In contrast,
trySend
is explicitly designed as a non-blocking “offer” and should return immediately. If you need blocking, callsend
; if you want a non-blocking offer, usetrySend
.Please disregard the suggested diff.
Likely an incorrect or invalid review comment.
src/main/scala/com/github/yruslan/channel/AsyncChannel.scala (1)
180-189
:isClosed
may mislead callers
isClosed
returnsfalse
while there are buffered messages, even though no further sends are allowed.
This deviates from Go’schannel
behaviour where<-ch
succeeds until the buffer is drained andok == false
flags the closed state.Consider exposing both states – “closed” and “drained” – or documenting clearly that
isClosed
means “closed and empty”.src/main/scala/com/github/yruslan/channel/Channel.scala (2)
468-478
: Potential waiter leak on early success
removeWaiters
is only invoked for indices< numberOfWaiters
.
IfsendRecv(Some(waiter))
succeeds immediately for index 0, the waiter is never appended to other channels, but you still attempt to remove it for them, which is fine.
However, if success happens for indexk > 0
, waiters for selectorsk+1 … n
have not been appended yet and therefore never removed, but they were never added either. All good.This logic is correct but subtle—consider adding a short comment explaining the rationale to future maintainers.
42-49
: 🛠️ Refactor suggestion
fornew
busy-loops when the channel is empty
tryRecv()
is called in a tight loop with no blocking or back-off, causing 100 % CPU when the channel is empty but open.Replace with a blocking
recv()
or an adaptive back-off:- var valueOpt = tryRecv() - while (valueOpt.nonEmpty) { + var valueOpt = recv() // blocks + while (true) { valueOpt.foreach(v => f(v)) - valueOpt = tryRecv() + valueOpt = recv() }(or introduce a small
Thread.onSpinWait()
/sleep
if a non-blocking variant is desirable).Likely an incorrect or invalid review comment.
def ticker(duration: Duration)(implicit executor: ExecutionContext): Channel[Instant] = { | ||
val channel = new SyncChannel[Instant] | ||
Future { | ||
while (!channel.isClosed) { | ||
Thread.sleep(duration.toMillis) | ||
try { | ||
channel.send(Instant.now()) | ||
} catch { | ||
case _: IllegalStateException => // Ignore if the channel is already closed | ||
} | ||
} | ||
} | ||
|
||
channel | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Verify the ticker implementation handles resource cleanup.
The ticker implementation correctly uses a SyncChannel
for synchronous behavior and handles channel closure appropriately. The exception handling prevents crashes when the channel is closed during operation.
The ticker method uses Thread.sleep
in a loop, which could consume thread pool resources if many tickers are created. Additionally, verify that the Future completes properly when the channel is closed.
🏁 Script executed:
#!/bin/bash
# Description: Check if there are other time-based utilities in the codebase that might use different patterns
# Search for scheduled executor usage
rg -A 5 -B 5 "ScheduledExecutor|Timer|schedule"
# Search for Thread.sleep usage patterns
rg -A 3 -B 3 "Thread\.sleep"
# Check for Future cleanup patterns
rg -A 5 -B 5 "Future.*complete|Future.*cancel"
Length of output: 19555
Wrap blocking calls in ticker
with scala.concurrent.blocking
To avoid starving the default ExecutionContext
when sleeping in a loop, wrap the Thread.sleep
call in scala.concurrent.blocking
. The Future
will naturally complete once channel.isClosed
becomes true (due to the while
guard), but signalling blocking to the thread pool improves resource management. For very high‐frequency or many concurrent tickers, you may also consider a dedicated ScheduledExecutorService
or a separate blocking execution context.
• File: src/main/scala/com/github/yruslan/channel/TimeChannels.scala
(lines 61–76)
Replace:
Thread.sleep(duration.toMillis)
With:
scala.concurrent.blocking {
Thread.sleep(duration.toMillis)
}
🤖 Prompt for AI Agents
In src/main/scala/com/github/yruslan/channel/TimeChannels.scala around lines 61
to 76, the ticker method uses Thread.sleep inside a Future without wrapping it
in scala.concurrent.blocking, which can lead to starvation of the default
ExecutionContext threads. To fix this, wrap the Thread.sleep(duration.toMillis)
call inside scala.concurrent.blocking { ... } to properly signal the blocking
operation to the thread pool and improve resource management.
def await(): Unit = { | ||
lock.lock() | ||
try { | ||
while (counter.get() > 0) { | ||
cond.await() | ||
} | ||
if (counter.get() < 0) { | ||
throw new NegativeWaitGroupCounter | ||
} | ||
} finally { | ||
lock.unlock() | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Update await method to check error state consistently.
The await method should also check the error state flag to ensure consistent exception behavior across all waiting threads.
def await(): Unit = {
lock.lock()
try {
- while (counter.get() > 0) {
+ while (counter.get() > 0 && !errorState) {
cond.await()
}
- if (counter.get() < 0) {
+ if (errorState) {
throw new NegativeWaitGroupCounter
}
} finally {
lock.unlock()
}
}
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In src/main/scala/com/github/yruslan/channel/WaitGroup.scala between lines 76
and 88, the await method currently only checks if the counter is negative to
throw an exception. Update the await method to also check the error state flag
inside the waiting loop so that all waiting threads consistently detect and
respond to error conditions by throwing the appropriate exception. This ensures
uniform exception behavior across threads waiting on the condition.
def add(delta: Int = 1): Unit = { | ||
val after = counter.addAndGet(delta) | ||
|
||
if (after <= 0) { | ||
lock.lock() | ||
try { | ||
cond.signalAll() | ||
if (after < 0) | ||
throw new NegativeWaitGroupCounter | ||
} finally { | ||
lock.unlock() | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Potential race condition in exception handling.
The current implementation has a subtle race condition: after signaling all waiters when the counter becomes negative, another thread could call add()
with a positive value, making the counter positive again before waiting threads wake up and check for the negative state. This could cause some waiters to miss the error condition.
Consider using a separate boolean flag to track the error state:
class WaitGroup {
private val lock = new ReentrantLock()
private val cond = lock.newCondition()
private val counter: AtomicInteger = new AtomicInteger(0)
+ @volatile private var errorState: Boolean = false
def add(delta: Int = 1): Unit = {
val after = counter.addAndGet(delta)
if (after <= 0) {
lock.lock()
try {
- cond.signalAll()
- if (after < 0)
+ if (after < 0) {
+ errorState = true
+ }
+ cond.signalAll()
+ if (errorState)
throw new NegativeWaitGroupCounter
} finally {
lock.unlock()
}
}
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def add(delta: Int = 1): Unit = { | |
val after = counter.addAndGet(delta) | |
if (after <= 0) { | |
lock.lock() | |
try { | |
cond.signalAll() | |
if (after < 0) | |
throw new NegativeWaitGroupCounter | |
} finally { | |
lock.unlock() | |
} | |
} | |
} | |
class WaitGroup { | |
private val lock = new ReentrantLock() | |
private val cond = lock.newCondition() | |
private val counter: AtomicInteger = new AtomicInteger(0) | |
+ @volatile private var errorState: Boolean = false | |
def add(delta: Int = 1): Unit = { | |
val after = counter.addAndGet(delta) | |
if (after <= 0) { | |
lock.lock() | |
try { | |
- cond.signalAll() | |
- if (after < 0) | |
- throw new NegativeWaitGroupCounter | |
+ if (after < 0) { | |
+ errorState = true | |
+ } | |
+ cond.signalAll() | |
+ if (errorState) | |
+ throw new NegativeWaitGroupCounter | |
} finally { | |
lock.unlock() | |
} | |
} | |
} | |
// ... rest of class unchanged | |
} |
🤖 Prompt for AI Agents
In src/main/scala/com/github/yruslan/channel/WaitGroup.scala around lines 40 to
53, the current add() method signals all waiting threads before checking for a
negative counter, which can cause a race condition where waiters miss the
negative state if the counter becomes positive again. To fix this, introduce a
separate boolean flag to track the negative error state, set this flag before
signaling, and have waiters check this flag to reliably detect the error
condition regardless of counter changes after signaling.
|
||
import org.scalatest.wordspec.AnyWordSpec | ||
|
||
private class SimpleLinkListSuite extends AnyWordSpec { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Top-level private
class will not compile
In Scala a top-level definition cannot be private
. The compiler will raise:
illegal modifier for the top-level definition
Remove the modifier (or wrap the suite in an object
if you really need encapsulation).
-private class SimpleLinkListSuite extends AnyWordSpec {
+class SimpleLinkListSuite extends AnyWordSpec {
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
private class SimpleLinkListSuite extends AnyWordSpec { | |
class SimpleLinkListSuite extends AnyWordSpec { |
🤖 Prompt for AI Agents
In src/test/scala/com/github/yruslan/channel/impl/SimpleLinkListSuite.scala at
line 20, the top-level class is declared as private, which is not allowed in
Scala and causes a compilation error. Remove the private modifier from the class
declaration to fix the issue. If encapsulation is needed, wrap the class inside
an object instead.
"remove an element in the middle of the list" in { | ||
val lst = new SimpleLinkedList[Int] | ||
|
||
lst.append(1) | ||
lst.append(2) | ||
lst.append(3) | ||
|
||
lst.remove(1) | ||
|
||
assert(!lst.isEmpty) | ||
assert(lst.size == 2) | ||
assert(lst.head == 2) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
“Middle-element” test removes the head – assertion isn’t exercising the intended scenario
The description says remove an element in the middle, but the test removes 1
, which is the head.
Remove 2
instead so the head/tail logic is both exercised and the description matches the behaviour.
- lst.remove(1)
+ lst.remove(2)
- assert(!lst.isEmpty)
- assert(lst.size == 2)
- assert(lst.head == 2)
+ assert(!lst.isEmpty)
+ assert(lst.size == 2)
+ assert(lst.head == 1) // head should stay unchanged
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
"remove an element in the middle of the list" in { | |
val lst = new SimpleLinkedList[Int] | |
lst.append(1) | |
lst.append(2) | |
lst.append(3) | |
lst.remove(1) | |
assert(!lst.isEmpty) | |
assert(lst.size == 2) | |
assert(lst.head == 2) | |
} | |
"remove an element in the middle of the list" in { | |
val lst = new SimpleLinkedList[Int] | |
lst.append(1) | |
lst.append(2) | |
lst.append(3) | |
lst.remove(2) | |
assert(!lst.isEmpty) | |
assert(lst.size == 2) | |
assert(lst.head == 1) // head should stay unchanged | |
} |
🤖 Prompt for AI Agents
In src/test/scala/com/github/yruslan/channel/impl/SimpleLinkListSuite.scala
between lines 65 and 77, the test labeled "remove an element in the middle of
the list" incorrectly removes the head element (1) instead of a middle element.
Update the test to remove the element with value 2 to correctly test removal of
a middle element, ensuring the assertions reflect this change and the test
description matches the behavior.
def isEmpty: Boolean = { | ||
first == null | ||
} | ||
|
||
def nonEmpty: Boolean = { | ||
first != null | ||
} | ||
|
||
def size: Int = count | ||
|
||
def head: T = { | ||
if (first == null) { | ||
throw new NoSuchElementException | ||
} else { | ||
first.el | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Unsynchronized read-only helpers can return stale data or race with writers
isEmpty
, nonEmpty
, size
, and especially head
read first
/ count
without the synchronized
guard that protects all mutating operations.
Under concurrent access a thread may:
- Observe a non-null
first
, then immediately after another thread callsclear()
, causinghead
to dereference anull
. - Read a stale
count
producing an incorrectsize
. - See
isEmpty == false
and immediately afterwardfirst == null
, breaking caller invariants.
Wrap these helpers in the same monitor or mark the state fields @volatile
. A minimal fix:
- def isEmpty: Boolean = {
- first == null
- }
+ def isEmpty: Boolean = this.synchronized { first == null }
- def nonEmpty: Boolean = {
- first != null
- }
+ def nonEmpty: Boolean = this.synchronized { first != null }
- def size: Int = count
+ def size: Int = this.synchronized { count }
- def head: T = {
- if (first == null) {
- throw new NoSuchElementException
- } else {
- first.el
- }
- }
+ def head: T = this.synchronized {
+ if (first == null) throw new NoSuchElementException
+ first.el
+ }
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def isEmpty: Boolean = { | |
first == null | |
} | |
def nonEmpty: Boolean = { | |
first != null | |
} | |
def size: Int = count | |
def head: T = { | |
if (first == null) { | |
throw new NoSuchElementException | |
} else { | |
first.el | |
} | |
} | |
def isEmpty: Boolean = this.synchronized { first == null } | |
def nonEmpty: Boolean = this.synchronized { first != null } | |
def size: Int = this.synchronized { count } | |
def head: T = this.synchronized { | |
if (first == null) throw new NoSuchElementException | |
first.el | |
} |
🤖 Prompt for AI Agents
In src/main/scala/com/github/yruslan/channel/impl/SimpleLinkedList.scala around
lines 41 to 57, the methods isEmpty, nonEmpty, size, and head access shared
mutable state without synchronization, risking stale reads or race conditions.
To fix this, wrap the bodies of these methods in the same synchronized block or
monitor used by mutating operations to ensure consistent visibility and
atomicity. Alternatively, mark the shared state variables first and count as
@volatile to guarantee visibility across threads.
val timeoutMilli = if (timeout.isFinite) timeout.toMillis else 0L | ||
val startInstant = Instant.now() | ||
var elapsedTime = 0L | ||
|
||
while (elapsedTime <= timeoutMilli) { | ||
val newTimeout = Duration(timeoutMilli - elapsedTime, MILLISECONDS) | ||
val valueOpt = inputChannel.tryRecv(newTimeout) | ||
val found = valueOpt.isEmpty || valueOpt.forall(v => pred(v)) | ||
elapsedTime = java.time.Duration.between(startInstant, now).toMillis | ||
if (found) { | ||
return valueOpt | ||
} | ||
} | ||
None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Infinite timeout is handled as zero – causes premature return
When timeout
is infinite (Duration.Inf
), timeout.isFinite
is false
, so timeoutMilli
becomes 0L
.
The loop condition elapsedTime <= timeoutMilli
is satisfied only for the very first iteration; after that the method returns None
even though it should keep waiting.
- val timeoutMilli = if (timeout.isFinite) timeout.toMillis else 0L
+ val timeoutMilli = if (timeout.isFinite) timeout.toMillis else Long.MaxValue
(or delegate the Duration.Inf
case straight to inputChannel.tryRecv(Duration.Inf)
).
Without this fix callers relying on an indefinite wait will spuriously miss messages.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
val timeoutMilli = if (timeout.isFinite) timeout.toMillis else 0L | |
val startInstant = Instant.now() | |
var elapsedTime = 0L | |
while (elapsedTime <= timeoutMilli) { | |
val newTimeout = Duration(timeoutMilli - elapsedTime, MILLISECONDS) | |
val valueOpt = inputChannel.tryRecv(newTimeout) | |
val found = valueOpt.isEmpty || valueOpt.forall(v => pred(v)) | |
elapsedTime = java.time.Duration.between(startInstant, now).toMillis | |
if (found) { | |
return valueOpt | |
} | |
} | |
None | |
val timeoutMilli = if (timeout.isFinite) timeout.toMillis else Long.MaxValue | |
val startInstant = Instant.now() | |
var elapsedTime = 0L | |
while (elapsedTime <= timeoutMilli) { | |
val newTimeout = Duration(timeoutMilli - elapsedTime, MILLISECONDS) | |
val valueOpt = inputChannel.tryRecv(newTimeout) | |
val found = valueOpt.isEmpty || valueOpt.forall(v => pred(v)) | |
elapsedTime = java.time.Duration.between(startInstant, now).toMillis | |
if (found) { | |
return valueOpt | |
} | |
} | |
None |
🤖 Prompt for AI Agents
In src/main/scala/com/github/yruslan/channel/ChannelDecoratorFilter.scala around
lines 54 to 67, the code incorrectly treats infinite timeout as zero, causing
premature return. Fix this by explicitly checking if the timeout is infinite; if
so, call inputChannel.tryRecv with an infinite duration directly. Otherwise,
proceed with the existing logic using finite timeout values. This ensures that
callers expecting indefinite waits do not miss messages.
final override def close(): Unit = { | ||
lock.lock() | ||
try { | ||
if (!closed) { | ||
closed = true | ||
readWaiters.foreach(w => w.sem.release()) | ||
writeWaiters.foreach(w => w.sem.release()) | ||
crd.signalAll() | ||
cwr.signalAll() | ||
|
||
writers += 1 | ||
while (syncValue.nonEmpty) { | ||
awaitWriters() | ||
} | ||
writers -= 1 | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential dead-lock when closing a non-empty channel
close()
waits inside a while (syncValue.nonEmpty)
loop until some receiver drains the last message.
If no thread ever calls recv()
after close()
is invoked (e.g. the producer is the last live thread), the method blocks forever, effectively dead-locking the closer.
- writers += 1
- while (syncValue.nonEmpty) {
- awaitWriters()
- }
- writers -= 1
+ // Drop the remaining message to avoid indefinite blocking.
+ syncValue = None // safe because no new messages can arrive after `closed = true`
+ sender = -1
Consider documenting the intended semantics (drop vs. force-delivery) and implementing either the quick drop (shown above) or a bounded wait with timeout to prevent permanent blocking.
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In src/main/scala/com/github/yruslan/channel/SyncChannel.scala around lines 27
to 42, the close() method can deadlock by waiting indefinitely for syncValue to
become empty if no receiver consumes messages after close is called. To fix
this, clarify the intended behavior in comments—whether to drop remaining
messages immediately or wait for delivery. Then either modify close() to drop
messages without waiting or implement a bounded wait with a timeout to avoid
permanent blocking.
Summary by CodeRabbit
New Features
Build & Chores
Tests