Channels
2026-03-25
Channels are Raku's thread-safe message-passing mechanism. They let separate threads communicate by sending and receiving values through a shared queue. If you have used Go channels or Java's BlockingQueue, the concept will be familiar.
Creating and Using Channels
A Channel is a FIFO queue that multiple threads can write to and read from safely:
my $channel = Channel.new;
$channel.send("hello");
$channel.send("world");
say $channel.receive;
say $channel.receive;
Producer-Consumer Pattern
The classic use case for channels is the producer-consumer pattern:
my $ch = Channel.new;
start {
for 1..10 -> $n {
$ch.send($n);
sleep 0.1;
}
$ch.close;
};
start {
react {
whenever $ch -> $value {
say "Received: $value";
}
}
say "Channel closed, consumer done.";
};
sleep 3;
Calling .close on a channel signals consumers that no more values will arrive.
Iterating Over a Channel
A closed channel can be iterated like a list:
my $ch = Channel.new;
start {
$ch.send($_) for 'a'..'f';
$ch.close;
};
for $ch.list -> $item {
say "Got: $item";
}
say "Done!";
The .list method blocks until the channel is closed and returns all values as a lazy list.
Multiple Producers
Multiple threads can safely send to the same channel:
my $ch = Channel.new;
my @producers = (1..5).map: -> $id {
start {
for 1..3 -> $n {
$ch.send("Producer $id: item $n");
sleep rand * 0.5;
}
}
};
start {
await @producers;
$ch.close;
};
for $ch.list -> $msg {
say $msg;
}
Multiple Consumers
You can also have multiple consumers reading from one channel. Each value is delivered to exactly one consumer:
my $ch = Channel.new;
start {
$ch.send($_) for 1..20;
$ch.close;
};
my @consumers = (1..3).map: -> $id {
start {
react {
whenever $ch -> $value {
say "Worker $id processed: $value";
sleep 0.1;
}
}
}
};
await @consumers;
say "All work done";
Channel as a Pipeline
You can chain channels together to form processing pipelines:
sub stage(Channel $in, &transform --> Channel) {
my $out = Channel.new;
start {
react {
whenever $in -> $value {
$out.send(transform($value));
}
}
$out.close;
};
$out
}
my $source = Channel.new;
my $doubled = stage($source, * * 2);
my $final = stage($doubled, * + 10);
start {
$source.send($_) for 1..5;
$source.close;
};
for $final.list -> $result {
say $result;
}
Polling with .poll
Unlike .receive which blocks, .poll returns immediately. It returns Nil if no value is available:
my $ch = Channel.new;
start {
sleep 1;
$ch.send("ready!");
};
loop {
my $val = $ch.poll;
if $val.defined {
say "Got: $val";
last;
}
say "Nothing yet...";
sleep 0.3;
}
Error Handling
If a producer throws an exception, you can propagate it through the channel using .fail:
my $ch = Channel.new;
start {
for 1..5 -> $n {
if $n == 3 {
$ch.fail("Error at item 3!");
last;
}
$ch.send($n);
}
};
sleep 0.5;
try {
for $ch.list -> $v {
say $v;
}
CATCH {
default { say "Channel error: {.message}" }
}
}
Buffered vs Unbuffered
Raku channels are unbounded by default. They will accept as many values as you send without blocking the sender. If you need backpressure, you can implement it yourself:
my $ch = Channel.new;
my $semaphore = Channel.new;
$semaphore.send(True) for ^10;
start {
for 1..100 -> $n {
$semaphore.receive;
$ch.send($n);
}
$ch.close;
};
start {
react {
whenever $ch -> $value {
say "Processing: $value";
sleep 0.1;
$semaphore.send(True);
}
}
};
sleep 15;
Practical Example: Parallel File Search
sub parallel-search(IO::Path $dir, Str $pattern --> Channel) {
my $results = Channel.new;
start {
my @files = $dir.dir(test => /\.txt$/);
my @searchers = @files.map: -> $file {
start {
for $file.lines.kv -> $num, $line {
if $line.contains($pattern) {
$results.send("$file:$num: $line");
}
}
}
};
await @searchers;
$results.close;
};
$results
}
Channels are the right choice when you need explicit control over how data flows between threads. For more reactive, event-driven patterns, look into Supplies, which we will cover next.