Skip to content

Concurrency (std.async)

The std.async module provides threading primitives built on POSIX threads (pthreads).

nic
import std.async;

Threads

Creating Threads

The Thread class wraps pthread creation and joining:

nic
import std.async.thread(Thread);

fn worker(arg: opaque) -> unit {
    println("Hello from worker thread!");
}

fn main() -> i32 {
    let thread = new Thread(worker, nil as *u8);

    // Do work in main thread...

    thread.join();  // Wait for thread to complete
    release thread;

    return 0;
}

Thread with Data

Pass data to threads via the argument pointer:

nic
import std.async.thread(Thread);

struct WorkItem {
    id: i32,
    data: string
}

fn process_work(arg: opaque) -> unit {
    let item = arg as *WorkItem;
    printf("Processing item %d: %s\n", item.id, item.data);
}

fn main() -> i32 {
    let item = new WorkItem { id: 1, data: "task data" };

    let thread = new Thread(process_work, item as *u8);
    thread.join();

    release item;
    release thread;

    return 0;
}

Multiple Threads

nic
import std.async.thread(Thread);

fn worker(arg: opaque) -> unit {
    let id = arg as i32;
    printf("Thread %d running\n", id);
}

fn main() -> i32 {
    let threads = Vec[*Thread].empty();

    // Spawn 4 threads
    for i in 0..4 {
        let t = new Thread(worker, i as *u8);
        threads.push(t);
    }

    // Wait for all threads
    for i in 0..threads.len() {
        match threads.get(i) {
            Some(t) => {
                t.join();
                release *t;
            },
            None => {},
        }
    }

    release threads;
    return 0;
}

Mutex (Mutual Exclusion)

The Mutex class provides thread-safe locking:

nic
import std.async.mutex(Mutex);

class Counter {
    value: i32,
    mutex: Mutex

    pub init(self) -> unit {
        self.value = 0;
        self.mutex.init();
    }

    pub increment(self) -> unit {
        self.mutex.lock();
        self.value = self.value + 1;
        self.mutex.unlock();
    }

    pub get(self) -> i32 {
        self.mutex.lock();
        let v = self.value;
        self.mutex.unlock();
        return v;
    }

    pub deinit(self) -> unit {
        self.mutex.deinit();
    }
}

Mutex Methods

nic
let mutex = new Mutex();
mutex.init();

// Blocking lock
mutex.lock();
// Critical section...
mutex.unlock();

// Non-blocking try
if mutex.try_lock() {
    // Got the lock
    mutex.unlock();
} else {
    // Lock was held by another thread
}

mutex.deinit();
release mutex;

Condition Variables

The CondVar class allows threads to wait for conditions:

nic
import std.async.mutex(Mutex, CondVar);

class Queue {
    items: *Vec[i32],
    mutex: Mutex,
    not_empty: CondVar

    pub init(self) -> unit {
        self.items = Vec[i32].empty();
        self.mutex.init();
        self.not_empty.init();
    }

    pub push(self, item: i32) -> unit {
        self.mutex.lock();
        self.items.push(item);
        self.not_empty.signal();  // Wake one waiting thread
        self.mutex.unlock();
    }

    pub pop(self) -> i32 {
        self.mutex.lock();
        while self.items.len() == 0 {
            self.not_empty.wait(&self.mutex);  // Wait for signal
        }
        let item = self.items.pop();
        self.mutex.unlock();
        return item;
    }

    pub deinit(self) -> unit {
        self.mutex.deinit();
        self.not_empty.deinit();
        release self.items;
    }
}

CondVar Methods

nic
let cond = new CondVar();
cond.init();

// Wait for signal (releases mutex while waiting)
cond.wait(&mutex);

// Wake one waiting thread
cond.signal();

// Wake all waiting threads
cond.broadcast();

cond.deinit();
release cond;

Complete Example: Producer-Consumer

nic
import std.async.thread(Thread);
import std.async.mutex(Mutex, CondVar);

struct SharedState {
    buffer: *Vec[i32],
    mutex: Mutex,
    not_empty: CondVar,
    not_full: CondVar,
    done: bool
}

fn producer(arg: opaque) -> unit {
    let state = arg as *SharedState;

    for i in 0..10 {
        state.mutex.lock();

        // Wait if buffer is full (max 5 items)
        while state.buffer.len() >= 5 {
            state.not_full.wait(&state.mutex);
        }

        state.buffer.push(i);
        printf("Produced: %d\n", i);

        state.not_empty.signal();
        state.mutex.unlock();
    }

    state.mutex.lock();
    state.done = true;
    state.not_empty.broadcast();  // Wake all consumers
    state.mutex.unlock();
}

fn consumer(arg: opaque) -> unit {
    let state = arg as *SharedState;

    while true {
        state.mutex.lock();

        while state.buffer.len() == 0 && !state.done {
            state.not_empty.wait(&state.mutex);
        }

        if state.buffer.len() == 0 && state.done {
            state.mutex.unlock();
            break;
        }

        let item = state.buffer.pop();
        printf("Consumed: %d\n", item);

        state.not_full.signal();
        state.mutex.unlock();
    }
}

fn main() -> i32 {
    let state = new SharedState {
        buffer: Vec[i32].empty(),
        done: false
    };
    state.mutex.init();
    state.not_empty.init();
    state.not_full.init();

    let prod = new Thread(producer, state as *u8);
    let cons = new Thread(consumer, state as *u8);

    prod.join();
    cons.join();

    state.mutex.deinit();
    state.not_empty.deinit();
    state.not_full.deinit();
    release state.buffer;
    release state;
    release prod;
    release cons;

    return 0;
}

API Reference

Thread Class

nic
pub class Thread {
    /// Create and start a new thread
    pub init(self, start_routine: *fn(bare, opaque)->unit, arg: *u8) -> unit;

    /// Wait for thread to complete
    pub join(self) -> unit;
}

Mutex Class

nic
pub class Mutex {
    /// Initialize the mutex
    pub init(self) -> unit;

    /// Acquire the lock (blocks if held)
    pub lock(self) -> unit;

    /// Release the lock
    pub unlock(self) -> unit;

    /// Try to acquire lock without blocking
    /// Returns true if lock acquired, false if already held
    pub try_lock(self) -> bool;

    /// Destroy the mutex
    pub deinit(self) -> unit;
}

CondVar Class

nic
pub class CondVar {
    /// Initialize the condition variable
    pub init(self) -> unit;

    /// Wait for signal (atomically releases mutex while waiting)
    pub wait(self, mutex: *Mutex) -> unit;

    /// Wake one waiting thread
    pub signal(self) -> unit;

    /// Wake all waiting threads
    pub broadcast(self) -> unit;

    /// Destroy the condition variable
    pub deinit(self) -> unit;
}

Notes

  • Thread functions must have the signature fn(bare, opaque) -> unit
  • Always pair lock() with unlock() to avoid deadlocks
  • Call init() before using Mutex or CondVar
  • Call deinit() to clean up resources
  • The implementation uses POSIX threads (pthreads) on Unix systems

See Also

Released under the MIT License.