Synchronization Locks for D

Locks is a library of synchronization constructs for the D programming language based on the concurrent locks library by Doug Lea. For more info about D see DigitalMars D home page. The library can be downloaded here or as part of MinTL. For more information about the Java library see JSR-166. For an initial port see dsource.

This library is in the public domain. Portions written by Ben Hinkle, 2004, portions ported from code written by Doug Lea. Email comments and bug reports to ben.hinkle@gmail.com

Overview

The D language has builtin support for defining critical sections using the synchronized statement but does not include POSIX synchronization constructs like locks and condition variables. The purpose of the Locks library is to extend the builtin D capabilities to support not only POSIX constructs but also support latches, barriers and exchangers. Concurrent containers like queues, stacks and associative arrays are in the MinTL library in the package mintl.concurrent. When using concurrent algorithms be careful to use the volatile statement to ensure data is properly updated in all the threads.

The primary interface of the Locks library is the Lock interface. It defines two methods, lock and unlock, that aquire and release the lock. In general the Lock interface makes no guarentee that a thread can lock a lock that it already owns. The ReentrantLock class, which implements Lock, does guarantee that the thread that hold the lock can call lock without blocking. If a thread calls lock and the lock is held by another thread then the calling thread is parked until the lock is released. The tryLock functions attempts to acquire the lock immediately or within a specified time interval. For example a typical class X that uses a ReentrantLock to control access to function m uses try-finally blocks to insure the lock is released before the function returns:

  class X {
    private ReentrantLock lock;
    // ...
    this() { 
      lock = new ReentrantLock; 
    }
    void m() { 
      lock.lock();  // block until lock is available
      try {
        // ... method body
      } finally {
        lock.unlock()
      }
    }
  }
A ScopedLock can simplify the code around managing locks. The class X could instead use a ScopedLock in m:
  class X {
    private ReentrantLock lock;
    // ...
    this() { 
      lock = new ReentrantLock; 
    }
    void m() { 
        auto ScopedLock slock = new ScopedLock(lock);
        // ... method body
    }
  }
The only difference between the two implementations is that the ScopedLock, as written, will allocate memory from the GC each time it is called.

The Condition interface defines a condition variable for a given lock. A condition variable allows two or more threads to hand-off ownership of the lock atomically by calling wait and notify. If a thread owns the lock and calls wait on a condition variable then the thread releases the lock and blocks until notified by the condition variable. Once notified the thread attempts to acquire the lock and once successful continues execution. The wait function accepts timeout values to stop blocking after a certain amount of time. A thread that fails the timeout still must reacquire the lock before proceeding. A typical use of condition variables is to signal when an event has happened. The function Lock.newCondition creates and returns a Condition instance. For example, the condition below signals when the data variable has been set:

  int data;
  Thread getter, setter;
  ReentrantLock lock = new ReentrantLock;
  Condition is_ready = lock.newCondition;
  setter = new Thread(
    delegate int() {
      lock.lock();
      try {
        data = 10;
        is_ready.notify();
      } finally {
        lock.unlock();
      }
      return 0;
    });
  getter = new Thread(
    delegate int() {
      lock.lock();
      try {
        is_ready.wait();
        printf("%d\n",data);
      } finally {
        lock.unlock();
      }
      return 0;
    });
  getter.start();
  setter.start();

To start several threads and have them wait until a signal from a coordinating thread use a CountDownLatch with a count of 1. For example,

  CountDownLatch go = new CountDownLatch(1);
  Thread[4] t;
  for (int i=0; i < 4; i++) {
    t[i] = new Thread(
      delegate int() {
        go.wait(); // wait for signal from main thread
        // ... do something interesting ...
        return 0;
      });
    t[i].start();
  }
  go.countDown(); // let worker threads go
Conversely to signal a coordinating thread that the worker threads are finished have each worker thread decrement another CountDownLatch:
  CountDownLatch go = new CountDownLatch(1);
  CountDownLatch allDone = new CountDownLatch(4);
  Thread[4] t;
  for (int i=0; i < 4; i++) {
    t[i] = new Thread(
      delegate int() {
        go.wait(); // wait for signal from main thread
        // ... do something interesting ...
        allDone.countDown();
        return 0;
      });
    t[i].start();
  }
  go.countDown(); // let worker threads go
  allDone.wait(); // wait for all workers to finish

A CyclicBarrier is similar to a CountDownLatch except the cyclic barrier is used without a controlling thread. A thread that reaches the barrier waits until the barrier count is exhausted before continuing. Once the barrier is tripped it optionally runs a function and resets to zero. Continuing the example from the previous paragraph the worker threads might need to rendezvous at a certain point mid-way through their task:

  CountDownLatch go = new CountDownLatch(1);
  CountDownLatch allDone = new CountDownLatch(4);
  CyclicBarrier barrier = new CyclicBarrier(4);
  Thread[4] t;
  for (int i=0; i < 4; i++) {
    t[i] = new Thread(
      delegate int() {
        go.wait(); // wait for signal from main thread
        // ... do something interesting ...
        barrier.wait(); // wait for all workers to get to barrier
        // ... do something else interesting ...
        allDone.countDown();
        return 0;
      });
    t[i].start();
  }
  go.countDown(); // let worker threads go
  allDone.wait(); // wait for all workers to finish

A Semaphore maintains a given number of permits. When a thread acquires a permit the semaphore decremements the number of available permits and when a thread releases the permit (any thread can release the permit) the semaphore increments the number of available permits. Semaphores don't have a concept of threads owning permits - it only gives out and recieves permits atomically. A typical use case for semaphores is to manage access by multiple threads to a fixed collection of objects.

API Reference

This section lists the public structs and functions in the library without detailed explanation. For more information see the documentation before the function or class in the source file. The API is organized by module:
locks.condition
locks.countdown
locks.exchanger
locks.lock
locks.platformutils
locks.readwritelock
locks.reentrantlock
locks.semaphore
locks.timeunit

locks.condition

interface Condition
A condition variable

void wait()
Cause current thread to wait until notified
long waitNanos(long nanosTimeout)
Cause current thread to wait until notified or time expires
bool wait(long time, TimeUnit unit)
Cause current thread to wait until notified or time expires
void notify()
Wake up one waiting thread
void notifyAll()
Wake up all waiting threads

locks.countdown

class CountDownLatch
Allow one or more threads to wait for a set of other threads.

this(int count)
Construct the latch with the given count before releasing
void wait()
Causes the current thread to wait until the count reaches zero
void wait(long timeout, TimeUnit unit)
Causes the current thread to wait until the count reaches zero or time expires
void countDown()
Decrement count
long count
Get the current count
char[] toString
Return a string summary of the latch

locks.cyclicbarrier

class CyclicBarrier
Allow a fixed group of threads to wait for each other

this(int parties, int delegate() barrierAction = null)
Construct the barrier with given number of parties and concluding action
int parties
Return number of parties for this barrier
int wait()
Causes the current thread to wait for all parties to reach the barrier
int wait(long timeout, TimeUnit unit)
Causes the current thread to wait only for the specified time
bool isBroken()
Returns true if the barrier has been broken
void reset()
Break the barrier for waiting parties and reset to initial state
int getNumberWaiting
Get the current number of waiting parties

locks.exchanger

class Exchanger(Value)
Allow two threads to safely exchange values.

this()
Construct the exchanger
Value exchange(Value v)
Offer v for exchange and wait for response
Value exchange(Value v, long timeout, TimeUnit unit)
Offer v for exchange and wait for response with possible timeout

locks.lock

interface Lock
The interface for all lock implementations.

void lock()
Acquires the lock
bool tryLock()
Acquires the lock only if it is free at the time of invocation
bool tryLock(long time, TimeUnit unit)
Acquires the lock if it is free within the given waiting time
void unlock()
Releases the lock
Condition newCondition
Returns a new Condition instance that is bound to this lock instance
auto final class ScopedLock
An auto class for aquiring and releasing a lock in a scope

this(Lock lock)
Initializes the ScopedLock and acquires the supplied lock
~this()
Release the lock

locks.platformutils

bit compareAndSet32(void* mem, void* expect, void* update)
Compare the 32 bit value expect with the value at *mem and if equal set to update and return true. This assumes a pointer is 32 bits.
bit compareAndSet32(void* mem, int expect, int update)
Convenience overload for compareAndSet32 when the data are integers instead of pointers
bit compareAndSet64(void* mem, void* expect, void* update)
Compare the 64 bit value at *expect with the value at *mem and if equal set to *update and return true.
int atomicAdd32(int* val, int x);
Atomically add x to *val and return previous value of *val
int atomicExchange32(int* val, int x);
Atomically store x to *val and return previous value of *val
void atomicInc32(int* val);
Atomically increment *val
void atomicDec32(int* val);
Atomically decrement *val
long currentTimeMillis()
Return the current system time in milliseconds
long currentTimeNanos()
Return the current system time in nanoseconds
void sleepNanos(long duration)
Sleep the current thread for the specified duration in nanoseconds

locks.readwritelock

interface ReadWriteLock
A pair of read-write locks

Lock readLock()
Return the read lock
Lock writeLock()
Return the write lock

class ReentrantReadWriteLock : ReadWriteLock
A pair of reentrant read-write locks

this(bool fair = false)
Construct the lock with specified fairness policy
Lock readLock()
Return the read lock
Lock writeLock()
Return the write lock

locks.reentrantlock

class ReentrantLock : Lock
A reentrant mutual exclusive lock with condition variables

this(bool fair = false)
Construct the lock with specified fairness policy
void lock()
Acquires the lock
bool tryLock()
Acquires the lock only if it is free at the time of invocation
bool tryLock(long time, TimeUnit unit)
Acquires the lock if it is free within the given waiting time
void unlock()
Releases the lock
Condition newCondition
Returns a new Condition instance that is bound to this lock instance
int getHoldCount()
Get the number of holds on this lock by the current thread
bool isHeldByCurrentThread()
Query if the lock is held by the current thread
bool isLocked()
Query if the lock is held by any thread
bool isFair()
Query if the lock is fair
char[] toString()
return a string representation of the lock

locks.semaphore

class Semaphore
A counting semaphore for maintaining a set of permits

this(int permits, bool fair = false)
Construct the semaphore with the given number of permits and fairness policy
void acquire(int permits = 1)
Acquires n permits, blocking until all are available
bool tryAcquire(int permits = 1)
Acquires n permit from this semaphore only if they are immediately available
bool tryAcquire(long timeout, TimeUnit unit, int permits = 1)
Attempt acquiring n permits within the specified time interval
void release(int permits = 1)
Release n permits
int availablePermits
Get the current number of available permits
bool isFair()
return true if the semaphore is fair
char[] toString
Return a string summary of the semaphore

locks.timeunit

enum TimeUnit
Time units common in synchronization
NanoSeconds = 0
MicroSeconds
MilliSeconds
Seconds

long convert(long duration, TimeUnit fromUnit, TimeUnit toUnit);
Convert the given time duration in the given unit to this unit.
long toNanos(long duration, TimeUnit fromUnit);
Convert to nanoseconds.
long toMicros(long duration, TimeUnit fromUnit);
Convert to microseconds.
long toMillis(long duration, TimeUnit fromUnit);
Convert to milliseconds.
long toSeconds(long duration, TimeUnit fromUnit);
Convert to seconds.