raku.gg / concurrency

Channels

2026-03-25

Channels are Raku's thread-safe message-passing mechanism. They let separate threads communicate by sending and receiving values through a shared queue. If you have used Go channels or Java's BlockingQueue, the concept will be familiar.

Creating and Using Channels

A Channel is a FIFO queue that multiple threads can write to and read from safely:
my $channel = Channel.new; # Send values $channel.send("hello"); $channel.send("world"); # Receive values (blocks if empty) say $channel.receive; # hello say $channel.receive; # world

Producer-Consumer Pattern

The classic use case for channels is the producer-consumer pattern:
my $ch = Channel.new; # Producer thread start { for 1..10 -> $n { $ch.send($n); sleep 0.1; } $ch.close; # Signal that no more values are coming }; # Consumer thread start { react { whenever $ch -> $value { say "Received: $value"; } } say "Channel closed, consumer done."; }; sleep 3; # Wait for everything to finish
Calling .close on a channel signals consumers that no more values will arrive.

Iterating Over a Channel

A closed channel can be iterated like a list:
my $ch = Channel.new; start { $ch.send($_) for 'a'..'f'; $ch.close; }; # .list waits for the channel to close, then gives all values for $ch.list -> $item { say "Got: $item"; } say "Done!";
The .list method blocks until the channel is closed and returns all values as a lazy list.

Multiple Producers

Multiple threads can safely send to the same channel:
my $ch = Channel.new; # Spawn 5 producer threads my @producers = (1..5).map: -> $id { start { for 1..3 -> $n { $ch.send("Producer $id: item $n"); sleep rand * 0.5; } } }; # Wait for all producers to finish, then close the channel start { await @producers; $ch.close; }; # Single consumer for $ch.list -> $msg { say $msg; }

Multiple Consumers

You can also have multiple consumers reading from one channel. Each value is delivered to exactly one consumer:
my $ch = Channel.new; # Producer start { $ch.send($_) for 1..20; $ch.close; }; # Three consumer threads my @consumers = (1..3).map: -> $id { start { react { whenever $ch -> $value { say "Worker $id processed: $value"; sleep 0.1; # Simulate work } } } }; await @consumers; say "All work done";

Channel as a Pipeline

You can chain channels together to form processing pipelines:
sub stage(Channel $in, &;transform --> Channel) { my $out = Channel.new; start { react { whenever $in -> $value { $out.send(transform($value)); } } $out.close; }; $out } my $source = Channel.new; # Build pipeline: source -> double -> add 10 -> output my $doubled = stage($source, * * 2); my $final = stage($doubled, * + 10); # Feed the pipeline start { $source.send($_) for 1..5; $source.close; }; # Read results for $final.list -> $result { say $result; # 12, 14, 16, 18, 20 }

Polling with .poll

Unlike .receive which blocks, .poll returns immediately. It returns Nil if no value is available:
my $ch = Channel.new; start { sleep 1; $ch.send("ready!"); }; # Non-blocking check loop { my $val = $ch.poll; if $val.defined { say "Got: $val"; last; } say "Nothing yet..."; sleep 0.3; }

Error Handling

If a producer throws an exception, you can propagate it through the channel using .fail:
my $ch = Channel.new; start { for 1..5 -> $n { if $n == 3 { $ch.fail("Error at item 3!"); last; } $ch.send($n); } }; sleep 0.5; try { for $ch.list -> $v { say $v; } CATCH { default { say "Channel error: {.message}" } } }

Buffered vs Unbuffered

Raku channels are unbounded by default. They will accept as many values as you send without blocking the sender. If you need backpressure, you can implement it yourself:
my $ch = Channel.new; my $semaphore = Channel.new; # Pre-fill semaphore with N tokens for bounded buffer $semaphore.send(True) for ^10; # Producer -- waits for a token before sending start { for 1..100 -> $n { $semaphore.receive; # Block if buffer is "full" $ch.send($n); } $ch.close; }; # Consumer -- returns token after processing start { react { whenever $ch -> $value { say "Processing: $value"; sleep 0.1; $semaphore.send(True); # Release a slot } } }; sleep 15;

Practical Example: Parallel File Search

sub parallel-search(IO::Path $dir, Str $pattern --> Channel) { my $results = Channel.new; start { my @files = $dir.dir(test => /\.txt$/); my @searchers = @files.map: -> $file { start { for $file.lines.kv -> $num, $line { if $line.contains($pattern) { $results.send("$file:$num: $line"); } } } }; await @searchers; $results.close; }; $results } # Usage: # for parallel-search("/path/to/dir".IO, "ERROR").list -> $match { # say $match; # }
Channels are the right choice when you need explicit control over how data flows between threads. For more reactive, event-driven patterns, look into Supplies, which we will cover next.