Skip to content

Commit

Permalink
v25 (Refinement): Implement BlockingQueueSplit spec in Java and C.
Browse files Browse the repository at this point in the history
  • Loading branch information
lemmy committed Sep 25, 2024
1 parent eadcdb4 commit 12b6e11
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 30 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ This tutorial is work in progress. More chapters will be added in the future. In

--------------------------------------------------------------------------

### v25 (Refinement): Implement BlockingQueueSplit spec in Java and C.

Knowing that ```BlockingQueueSplit``` refines ```BlockingQueue``` and thus is deadlock-free, we shift our attention to the Java (and C) program. Instead of Java's ```synchronized``` statement, we implement BlockingQueueSplit with the help of the low-level synchronization primitive ```java.util.concurrent.locks.ReentrantLock``` and ```java.util.concurrent.locks.Condition```. Executing the new program for a couple of hours with configuration p2c1b1 reveals no deadlock (the broken version of the program deadlocked within seconds). This, and the fact that Java's own [```ArrayBlockingQueue```](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/ArrayBlockingQueue.html) has a similar implementation, should silence our concerns to put the program into production.

### v24 (Refinement): Prove refinement mapping of BlockingQueueSplit.

Below, TLC checked the refinement mapping for a finite model/particular configuration, which gives for ```BlockingQueueSplit``` sufficient confidence that the refinement mapping is correct. The fact that the refinement mapping is straight forward indicates that a TLAPS prove is likely straight forward too. So let's give in to the academic in us and prove the correctness of the refinement mapping. To prove that ```BlockingQueueSplit``` implements ```BlockingQueue```, we first prove ```TypeInv``` inductive with the now know invariance proof rule. Once we have proven this LEMMA, we reuse it and the [proof rule for refinement (section 4.2)](https://members.loria.fr/SMerz/papers/tla+logic2008.pdf) to prove ```THEOREM Implements == Spec => A!Spec```.
Expand Down
15 changes: 8 additions & 7 deletions impl/producer_consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ uint32_t *buffer;
uint32_t fillIndex, useIndex, count = 0;

// See https://stackoverflow.com/a/2087046/6291195 to relate this to the Java impl.
pthread_cond_t modify;
pthread_cond_t empty, full;
pthread_mutex_t mutex;

void append(uint32_t value) {
Expand All @@ -35,15 +35,15 @@ void *producer (void * arg) {
printf("w#p#-1\n"); fflush(stdout);
#endif

pthread_cond_wait(&modify, &mutex);
pthread_cond_wait(&empty, &mutex);
}

append(rand() % (10)); // produce!
#ifdef TRACE
printf("e#p#-1\n"); fflush(stdout);
#endif

pthread_cond_signal(&modify); // broadcast that the buffer is full
pthread_cond_signal(&full); // broadcast that the buffer is full
pthread_mutex_unlock(&mutex); // release the lock
}
}
Expand All @@ -59,15 +59,15 @@ void *consumer (void * arg) {
printf("w#c#%d\n", id); fflush(stdout);
#endif

pthread_cond_wait(&modify, &mutex); // wait for the buffer to be filled
pthread_cond_wait(&full, &mutex); // wait for the buffer to be filled
}

head(); // consume (we don't care about the value)!
#ifdef TRACE
printf("d#c#%d\n", id); fflush(stdout);
#endif

pthread_cond_signal(&modify); // signal that the buffer is empty
pthread_cond_signal(&empty); // signal that the buffer is empty
pthread_mutex_unlock(&mutex); // release the lock

#ifndef TRACE
Expand Down Expand Up @@ -96,7 +96,8 @@ int main(int argc, char * argv[]) {
#endif

pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&modify, NULL);
pthread_cond_init(&empty, NULL);
pthread_cond_init(&full, NULL);

/* Allocate space for the buffer */
buffer = malloc(sizeof(int) * buff_size);
Expand Down
70 changes: 47 additions & 23 deletions impl/src/org/kuppe/BlockingQueue.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.kuppe;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

import org.kuppe.App2TLA.BufferDeqEvent;
import org.kuppe.App2TLA.BufferEnqEvent;
import org.kuppe.App2TLA.BufferWaitEvent;
Expand All @@ -8,13 +11,22 @@ public final class BlockingQueue<E> {

private final E[] store;

private final ReentrantLock lock;
private final Condition waitC;
private final Condition waitP;

private int head;
private int tail;
private int size;

@SuppressWarnings("unchecked")
public BlockingQueue(final int capacity) {
this.store = (E[]) new Object[capacity];

// see BlockingQueueSplit.tla
this.lock = new ReentrantLock();
this.waitC = lock.newCondition();
this.waitP = lock.newCondition();
}

/**
Expand All @@ -23,18 +35,24 @@ public BlockingQueue(final int capacity) {
*
* @see {@link BlockingQueue#take()}.
*/
public synchronized void put(final E e) throws InterruptedException {
while (isFull()) {
new BufferWaitEvent("p").commit();
System.out.println("Buffer full; P waits");
wait();
System.out.println("P notified");
public void put(final E e) throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (isFull()) {
new BufferWaitEvent("p").commit();
System.out.println("Buffer full; P waits");
waitP.await();
System.out.println("P notified");
}
waitC.signal();

// Add e and do bookkeeping.
new BufferEnqEvent().commit();
append(e);
} finally {
lock.unlock();
}
notify();

// Add e and do bookkeeping.
new BufferEnqEvent().commit();
append(e);
}

/**
Expand All @@ -43,18 +61,24 @@ public synchronized void put(final E e) throws InterruptedException {
*
* @see {@link BlockingQueue#put(Object)}.
*/
public synchronized E take() throws InterruptedException {
while (isEmpty()) {
new BufferWaitEvent("c").commit();
System.out.println("Buffer empty; C waits");
wait();
System.out.println("C notified");
}
notify();

// Remove e and do bookkeeping.
new BufferDeqEvent().commit();
return head();
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (isEmpty()) {
new BufferWaitEvent("c").commit();
System.out.println("Buffer empty; C waits");
waitC.await();
System.out.println("C notified");
}
waitP.signal();

// Remove e and do bookkeeping.
new BufferDeqEvent().commit();
return head();
} finally {
lock.unlock();
}
}


Expand Down

0 comments on commit 12b6e11

Please sign in to comment.