Synchronization Locks for D
Locks is a library of synchronization constructs for the D
programming language based on the concurrent locks library
by Doug Lea.
For more info about D see DigitalMars D home page. The
library can be downloaded
here
or as part of
MinTL.
For more information about the Java library see
JSR-166. For an initial port see
dsource.
This library is in the public domain.
Portions written by Ben Hinkle, 2004, portions ported from code
written by Doug Lea.
Email comments and bug reports to ben.hinkle@gmail.com
Overview
The D language has builtin support for defining critical sections
using the synchronized statement but does not include
POSIX synchronization constructs like locks and condition variables.
The purpose of the Locks library is to extend the builtin D
capabilities to support not only POSIX constructs but also support
latches, barriers and exchangers. Concurrent containers like queues,
stacks and associative arrays are in the MinTL library in the
package mintl.concurrent. When using concurrent algorithms
be careful to use the volatile statement to ensure data
is properly updated in all the threads.
The primary interface of the Locks library is the Lock interface. It
defines two methods, lock and unlock, that aquire
and release the lock. In general the Lock interface makes no
guarentee that a thread can lock a lock that it already
owns. The ReentrantLock class, which implements
Lock, does guarantee that the thread that hold the lock can
call lock without blocking. If a thread calls lock
and the lock is held by another thread then the calling thread is
parked until the lock is released. The tryLock functions
attempts to acquire the lock immediately or within a specified time interval.
For example a typical
class X that uses a ReentrantLock to control access
to function m uses try-finally blocks to insure the lock is
released before the function returns:
class X {
private ReentrantLock lock;
// ...
this() {
lock = new ReentrantLock;
}
void m() {
lock.lock(); // block until lock is available
try {
// ... method body
} finally {
lock.unlock()
}
}
}
A ScopedLock can simplify the code around managing locks.
The class X could instead use a ScopedLock
in m:
class X {
private ReentrantLock lock;
// ...
this() {
lock = new ReentrantLock;
}
void m() {
auto ScopedLock slock = new ScopedLock(lock);
// ... method body
}
}
The only difference between the two implementations is that the
ScopedLock, as written, will allocate memory from the GC each
time it is called.
The Condition interface defines a condition variable for a given lock.
A condition variable allows two or more threads to hand-off ownership
of the lock atomically by calling wait and notify.
If a thread owns the lock and calls wait
on a condition variable then the thread releases the lock and
blocks until notified by the condition variable.
Once notified the thread attempts to acquire the lock and once successful
continues execution. The wait function accepts timeout values to stop
blocking after a certain amount of time. A thread that fails the timeout
still must reacquire the lock before proceeding. A typical use of condition
variables is to signal when an event has happened.
The function Lock.newCondition creates and returns a Condition
instance.
For example, the
condition below signals when the data variable has been set:
int data;
Thread getter, setter;
ReentrantLock lock = new ReentrantLock;
Condition is_ready = lock.newCondition;
setter = new Thread(
delegate int() {
lock.lock();
try {
data = 10;
is_ready.notify();
} finally {
lock.unlock();
}
return 0;
});
getter = new Thread(
delegate int() {
lock.lock();
try {
is_ready.wait();
printf("%d\n",data);
} finally {
lock.unlock();
}
return 0;
});
getter.start();
setter.start();
To start several threads and have them wait until a signal from a
coordinating thread use a CountDownLatch with a count of
1. For example,
CountDownLatch go = new CountDownLatch(1);
Thread[4] t;
for (int i=0; i < 4; i++) {
t[i] = new Thread(
delegate int() {
go.wait(); // wait for signal from main thread
// ... do something interesting ...
return 0;
});
t[i].start();
}
go.countDown(); // let worker threads go
Conversely to signal a coordinating thread that the worker threads
are finished have each worker thread decrement another
CountDownLatch:
CountDownLatch go = new CountDownLatch(1);
CountDownLatch allDone = new CountDownLatch(4);
Thread[4] t;
for (int i=0; i < 4; i++) {
t[i] = new Thread(
delegate int() {
go.wait(); // wait for signal from main thread
// ... do something interesting ...
allDone.countDown();
return 0;
});
t[i].start();
}
go.countDown(); // let worker threads go
allDone.wait(); // wait for all workers to finish
A CyclicBarrier is similar to a CountDownLatch
except the cyclic barrier is used without a controlling thread.
A thread that reaches the barrier waits until the barrier count
is exhausted before continuing. Once the barrier is tripped it
optionally runs a function and resets to zero. Continuing the
example from the previous paragraph the worker threads might need
to rendezvous at a certain point mid-way through their task:
CountDownLatch go = new CountDownLatch(1);
CountDownLatch allDone = new CountDownLatch(4);
CyclicBarrier barrier = new CyclicBarrier(4);
Thread[4] t;
for (int i=0; i < 4; i++) {
t[i] = new Thread(
delegate int() {
go.wait(); // wait for signal from main thread
// ... do something interesting ...
barrier.wait(); // wait for all workers to get to barrier
// ... do something else interesting ...
allDone.countDown();
return 0;
});
t[i].start();
}
go.countDown(); // let worker threads go
allDone.wait(); // wait for all workers to finish
A Semaphore maintains a given number of permits. When a
thread acquires a permit the semaphore decremements the number of
available permits and when a thread releases the permit (any thread
can release the permit) the semaphore increments the number of
available permits. Semaphores don't have a concept of threads
owning permits - it only gives out and recieves permits atomically.
A typical use case for semaphores is to manage access by multiple
threads to a fixed collection of objects.
API Reference
This section lists the public structs and functions in the library without
detailed explanation. For more information see the documentation before
the function or class in the source file.
The API is organized by module:
- locks.condition
- locks.countdown
- locks.exchanger
- locks.lock
- locks.platformutils
- locks.readwritelock
- locks.reentrantlock
- locks.semaphore
- locks.timeunit
locks.condition
- interface Condition
- A condition variable
- void wait()
- Cause current thread to wait until notified
- long waitNanos(long nanosTimeout)
- Cause current thread to wait until notified or time expires
- bool wait(long time, TimeUnit unit)
- Cause current thread to wait until notified or time expires
- void notify()
- Wake up one waiting thread
- void notifyAll()
- Wake up all waiting threads
locks.countdown
- class CountDownLatch
- Allow one or more threads to wait for a set of other threads.
- this(int count)
- Construct the latch with the given count before releasing
- void wait()
- Causes the current thread to wait until the count reaches zero
- void wait(long timeout, TimeUnit unit)
- Causes the current thread to wait until the count reaches zero or time expires
- void countDown()
- Decrement count
- long count
- Get the current count
- char[] toString
- Return a string summary of the latch
locks.cyclicbarrier
- class CyclicBarrier
- Allow a fixed group of threads to wait for each other
- this(int parties, int delegate() barrierAction = null)
- Construct the barrier with given number of parties and concluding action
- int parties
- Return number of parties for this barrier
- int wait()
- Causes the current thread to wait for all parties to reach the barrier
- int wait(long timeout, TimeUnit unit)
- Causes the current thread to wait only for the specified time
- bool isBroken()
- Returns true if the barrier has been broken
- void reset()
- Break the barrier for waiting parties and reset to initial state
- int getNumberWaiting
- Get the current number of waiting parties
locks.exchanger
- class Exchanger(Value)
- Allow two threads to safely exchange values.
- this()
- Construct the exchanger
- Value exchange(Value v)
- Offer v for exchange and wait for response
- Value exchange(Value v, long timeout, TimeUnit unit)
- Offer v for exchange and wait for response with possible timeout
locks.lock
- interface Lock
- The interface for all lock implementations.
- void lock()
- Acquires the lock
- bool tryLock()
- Acquires the lock only if it is free at the time of invocation
- bool tryLock(long time, TimeUnit unit)
- Acquires the lock if it is free within the given waiting time
- void unlock()
- Releases the lock
- Condition newCondition
- Returns a new Condition instance that is bound to this lock instance
- auto final class ScopedLock
- An auto class for aquiring and releasing a lock in a scope
- this(Lock lock)
- Initializes the ScopedLock and acquires the supplied lock
- ~this()
- Release the lock
locks.platformutils
- bit compareAndSet32(void* mem, void* expect, void* update)
- Compare the 32 bit value expect with the value at *mem and if equal
set to update and return true. This assumes a pointer is 32 bits.
- bit compareAndSet32(void* mem, int expect, int update)
- Convenience overload for compareAndSet32 when the data are integers
instead of pointers
- bit compareAndSet64(void* mem, void* expect, void* update)
- Compare the 64 bit value at *expect with the value at *mem and if equal
set to *update and return true.
- int atomicAdd32(int* val, int x);
- Atomically add x to *val and return previous value of *val
- int atomicExchange32(int* val, int x);
- Atomically store x to *val and return previous value of *val
- void atomicInc32(int* val);
- Atomically increment *val
- void atomicDec32(int* val);
- Atomically decrement *val
- long currentTimeMillis()
- Return the current system time in milliseconds
- long currentTimeNanos()
- Return the current system time in nanoseconds
- void sleepNanos(long duration)
- Sleep the current thread for the specified duration in nanoseconds
locks.readwritelock
- interface ReadWriteLock
- A pair of read-write locks
- Lock readLock()
- Return the read lock
- Lock writeLock()
- Return the write lock
- class ReentrantReadWriteLock : ReadWriteLock
- A pair of reentrant read-write locks
- this(bool fair = false)
- Construct the lock with specified fairness policy
- Lock readLock()
- Return the read lock
- Lock writeLock()
- Return the write lock
locks.reentrantlock
- class ReentrantLock : Lock
- A reentrant mutual exclusive lock with condition variables
- this(bool fair = false)
- Construct the lock with specified fairness policy
- void lock()
- Acquires the lock
- bool tryLock()
- Acquires the lock only if it is free at the time of invocation
- bool tryLock(long time, TimeUnit unit)
- Acquires the lock if it is free within the given waiting time
- void unlock()
- Releases the lock
- Condition newCondition
- Returns a new Condition instance that is bound to this lock instance
- int getHoldCount()
- Get the number of holds on this lock by the current thread
- bool isHeldByCurrentThread()
- Query if the lock is held by the current thread
- bool isLocked()
- Query if the lock is held by any thread
- bool isFair()
- Query if the lock is fair
- char[] toString()
- return a string representation of the lock
locks.semaphore
- class Semaphore
- A counting semaphore for maintaining a set of permits
- this(int permits, bool fair = false)
- Construct the semaphore with the given number of permits and fairness policy
- void acquire(int permits = 1)
- Acquires n permits, blocking until all are available
- bool tryAcquire(int permits = 1)
- Acquires n permit from this semaphore only if they are immediately available
- bool tryAcquire(long timeout, TimeUnit unit, int permits = 1)
- Attempt acquiring n permits within the specified time interval
- void release(int permits = 1)
- Release n permits
- int availablePermits
- Get the current number of available permits
- bool isFair()
- return true if the semaphore is fair
- char[] toString
- Return a string summary of the semaphore
locks.timeunit
- enum TimeUnit
- Time units common in synchronization
- NanoSeconds = 0
- MicroSeconds
- MilliSeconds
- Seconds
- long convert(long duration, TimeUnit fromUnit, TimeUnit toUnit);
- Convert the given time duration in the given unit to this unit.
- long toNanos(long duration, TimeUnit fromUnit);
- Convert to nanoseconds.
- long toMicros(long duration, TimeUnit fromUnit);
- Convert to microseconds.
- long toMillis(long duration, TimeUnit fromUnit);
- Convert to milliseconds.
- long toSeconds(long duration, TimeUnit fromUnit);
- Convert to seconds.