Procesy BEAM i Elixir: ogromna współbieżność bez współdzielonych wątków
BEAM może wykonywać miliony izolowanych, lekkich procesów: każdy proces ma własną stertę, komunikuje się wyłącznie za pomocą wiadomości i nie udostępnia stanu. Dowiedz się dlaczego ten wzorzec eliminuje warunki wyścigu z założenia i sposób, w jaki przekłada się to na prawdziwy kod Elixir.
Model konkurencji BEAM
Współbieżność w językach tradycyjnych opiera się na wątkach systemu operacyjnego którzy dzielą pamięć. Model ten nieuchronnie prowadzi do warunków wyścigowych, zakleszczenia i błędy trudne do odtworzenia. Język musi zapewniać muteksy, semafory, lock — mechanizmy koordynujące współdzielony dostęp.
BEAM wybrał radykalnie inne podejście: model Actor. Procesy BEAM to całkowicie izolowane byty, które się komunikują wyłącznie poprzez wiadomości asynchroniczne. Nie ma pamięci współdzielonej. Nie ma żadnych zamków. Warunki wyścigowe nie są możliwe z założenia, a nie z powodu dyscypliny programisty.
Czego się nauczysz
- Różnica między procesami BEAM a wątkami systemu operacyjnego
- spawn i spawn_link: twórz izolowane procesy
- wysyłaj i odbieraj: przesyłanie wiadomości między procesami
- Skrzynka pocztowa: Kolejka komunikatów każdego procesu
- Monitorowanie procesu za pomocą Process.monitor
- Self() i pid: tożsamość procesu
- Zadanie: Abstrakcja wysokiego poziomu dla współbieżnego spawnu
Procesy BEAM a system operacyjny Thread
Proces BEAM nie jest wątkiem systemu operacyjnego. To harmonogram Poziom eliksiru zaimplementowany w maszynie wirtualnej. Każdy węzeł Erlang/Elixir zazwyczaj używa jednego wątku OS na rdzeń procesora, ale może zaplanować setki tysięcy (lub miliony) procesów BEAM w tych wątkach.
# Dimensioni a confronto (approssimative)
# Thread OS (Linux):
# - Stack: 8MB default
# - Context switch: ~1-10 microsecond (syscall kernel)
# - Max per processo: centinaia
# Processo BEAM:
# - Stack iniziale: ~300 parole (pochi KB, cresce dinamicamente)
# - Context switch: sub-microsecondo (nello spazio utente)
# - Max per nodo: milioni (default: 262.144, configurabile)
# Verificare i limiti:
iex> :erlang.system_info(:process_limit)
# 262144 (default, configurabile con +P flag)
iex> :erlang.system_info(:process_count)
# Numero di processi attualmente in esecuzione
# BEAM usa preemptive scheduling basato su "reduction counts"
# Ogni processo ha un budget di "reduction" (operazioni base)
# Quando esaurito, lo scheduler puo' passare a un altro processo
# Questo garantisce fairness anche con processi CPU-intensive
spawn: Twórz procesy
# spawn: crea un processo che esegue una funzione anonima
pid = spawn(fn ->
IO.puts("Sono un processo! PID: #{inspect(self())}")
:timer.sleep(1000)
IO.puts("Fine processo")
end)
IO.puts("Processo padre, PID figlio: #{inspect(pid)}")
# Il padre continua senza aspettare il figlio
# Output (ordine non garantito):
# Processo padre, PID figlio: #PID<0.108.0>
# Sono un processo! PID: #PID<0.108.0>
# Fine processo
# spawn con modulo e funzione
defmodule Worker do
def run(task) do
IO.puts("Worker #{inspect(self())} processing: #{task}")
:timer.sleep(100)
{:done, task}
end
end
pid = spawn(Worker, :run, ["task-1"])
# self(): PID del processo corrente
IO.puts("Main process: #{inspect(self())}")
# Verifica se un processo e' ancora in vita
Process.alive?(pid) # true o false
wysyłaj i odbieraj: przekazywanie wiadomości
Procesy komunikowane są poprzez wiadomości wysyłane na skrzynkę pocztową. Skrzynka pocztowa jest
jedna kolejka FIFO na proces. receive wyodrębnia pierwszą pasującą wiadomość
jeden ze wzorców — jeśli żaden komunikat nie pasuje, proces wstrzymuje oczekiwanie.
# Comunicazione padre-figlio tramite messaggi
defmodule PingPong do
def start do
parent = self()
# Crea processo figlio passando il PID del padre
child = spawn(fn -> child_loop(parent) end)
# Invia messaggio al figlio
send(child, {:ping, "hello"})
# Ricevi risposta
receive do
{:pong, message} ->
IO.puts("Parent received: #{message}")
{:error, reason} ->
IO.puts("Error: #{reason}")
after 5000 ->
IO.puts("Timeout: no response in 5 seconds")
end
end
defp child_loop(parent_pid) do
receive do
{:ping, message} ->
IO.puts("Child received: #{message}")
send(parent_pid, {:pong, "world"})
child_loop(parent_pid) # Ricomincia il ciclo
:stop ->
IO.puts("Child stopping")
# Il processo termina quando la funzione ritorna
end
end
end
PingPong.start()
# Child received: hello
# Parent received: world
# Pattern: processo server con stato via ricorsione
defmodule Counter do
def start(initial_value \\ 0) do
spawn(__MODULE__, :loop, [initial_value])
end
def increment(pid), do: send(pid, {:increment, 1})
def add(pid, n), do: send(pid, {:increment, n})
def get(pid) do
send(pid, {:get, self()})
receive do
{:value, n} -> n
after 5000 -> {:error, :timeout}
end
end
def stop(pid), do: send(pid, :stop)
# Loop interno: mantiene lo stato tramite ricorsione di coda
def loop(count) do
receive do
{:increment, n} ->
loop(count + n) # Ricorsione: nuovo stato
{:get, caller} ->
send(caller, {:value, count})
loop(count) # Stato invariato
:stop ->
IO.puts("Counter stopping at #{count}")
# Non ricorre: il processo termina
end
end
end
# Uso
counter = Counter.start(0)
Counter.increment(counter)
Counter.increment(counter)
Counter.add(counter, 10)
IO.puts(Counter.get(counter)) # 12
Counter.stop(counter)
spawn_link i propagacja awarii
spawn_link tworzy proces potomny i ustanawia a połączyć
dwukierunkowy: jeśli jeden z dwóch procesów ulegnie awarii, propagowany jest sygnał wyjścia
do drugiego. Jest to podstawa zasady „niech się rozbije” firmy OTP.
# spawn vs spawn_link
defmodule CrashDemo do
def demo_spawn do
# spawn: il crash del figlio NON impatta il padre
child = spawn(fn ->
:timer.sleep(100)
raise "Crash nel figlio!"
end)
IO.puts("Padre #{inspect(self())} vivo, figlio: #{inspect(child)}")
:timer.sleep(500)
IO.puts("Padre ancora vivo nonostante crash figlio")
# Process.alive?(child) == false (il figlio e' morto)
end
def demo_spawn_link do
parent = self()
# spawn_link: il crash del figlio PROPAGA al padre
# Il padre riceve un exit signal {:EXIT, child_pid, reason}
child = spawn_link(fn ->
:timer.sleep(100)
raise "Crash nel figlio con link!"
end)
IO.puts("Padre #{inspect(self())} vivo, figlio: #{inspect(child)}")
:timer.sleep(500)
# Se il padre non e' un Supervisor, anche lui crashera'
# Questo e' intenzionale: il fallimento si propaga verso l'alto
# finche' un Supervisor non lo gestisce
end
def demo_trap_exit do
# trap_exit: intercetta gli exit signal invece di crashare
Process.flag(:trap_exit, true)
child = spawn_link(fn ->
:timer.sleep(100)
raise "Crash!"
end)
receive do
{:EXIT, ^child, reason} ->
IO.puts("Figlio crashato: #{inspect(reason)}")
# Il padre puo' decidere come gestire il crash
end
end
end
Process.monitor: Monitorowanie jednokierunkowe
# Process.monitor: ricevi notifica quando un processo muore
# (senza il link bidirezionale di spawn_link)
defmodule ProcessMonitor do
def watch(target_pid) do
ref = Process.monitor(target_pid)
IO.puts("Monitoring #{inspect(target_pid)}, ref: #{inspect(ref)}")
receive do
{:DOWN, ^ref, :process, ^target_pid, reason} ->
IO.puts("Process #{inspect(target_pid)} down: #{inspect(reason)}")
after 10_000 ->
IO.puts("Timeout: process still running after 10s")
Process.demonitor(ref)
end
end
end
# Crea un processo che si ferma dopo 500ms
worker = spawn(fn ->
:timer.sleep(500)
IO.puts("Worker done")
end)
# Monitora in background
spawn(fn -> ProcessMonitor.watch(worker) end)
# Attendi
:timer.sleep(1000)
# Output:
# Worker done
# Process #PID<...> down: :normal
Zadanie: Abstrakcja wysokiego poziomu
Formularz Task zapewnia wygodniejszą abstrakcję spawn
dla typowych wzorców: wykonaj operacje asynchroniczne i poczekaj na wyniki.
# Task.async e Task.await: parallelismo strutturato
defmodule ParallelFetcher do
def fetch_all(urls) do
urls
|> Enum.map(fn url ->
# Lancia un task per ogni URL in parallelo
Task.async(fn -> fetch(url) end)
end)
|> Enum.map(fn task ->
# Attende il risultato di ciascun task (max 5 secondi)
Task.await(task, 5_000)
end)
end
defp fetch(url) do
# Simula una chiamata HTTP
:timer.sleep(Enum.random(100..500))
{:ok, "Response from #{url}"}
end
end
# 3 richieste in parallelo: tempo totale ~max(latenze)
urls = ["https://api1.example.com", "https://api2.example.com", "https://api3.example.com"]
results = ParallelFetcher.fetch_all(urls)
# [{:ok, "Response from ..."}, ...]
# Task.async_stream: parallelo con concorrenza limitata
def process_items(items, max_concurrency \\ 10) do
items
|> Task.async_stream(
fn item -> expensive_operation(item) end,
max_concurrency: max_concurrency,
timeout: 10_000,
on_timeout: :kill_task,
)
|> Enum.map(fn
{:ok, result} -> result
{:exit, reason} ->
IO.puts("Task failed: #{inspect(reason)}")
nil
end)
|> Enum.reject(&is_nil/1)
end
# Task.start: fire-and-forget (non attendi il risultato)
Task.start(fn ->
send_notification_email(user) # Background job, non bloccante
end)
Rejestr procesów: nazewnictwo procesów
# Process Registry: registra un processo con un nome atom
defmodule NamedCounter do
def start do
pid = spawn(__MODULE__, :loop, [0])
# Registra con nome globale (atom)
Process.register(pid, :main_counter)
pid
end
def increment do
# Invia a nome simbolico invece di PID
send(:main_counter, {:increment, 1})
end
def get do
send(:main_counter, {:get, self()})
receive do
{:value, n} -> n
after 1_000 -> nil
end
end
def loop(n) do
receive do
{:increment, delta} -> loop(n + delta)
{:get, caller} ->
send(caller, {:value, n})
loop(n)
end
end
end
NamedCounter.start()
NamedCounter.increment()
NamedCounter.increment()
IO.puts(NamedCounter.get()) # 2
# Trova il PID da un nome registrato
pid = Process.whereis(:main_counter)
IO.puts(inspect(pid)) # #PID<0.120.0>
Wnioski
Model aktora BEAM sprawia, że Elixir jest wyjątkowy. Każdy proces jest izolowanym wszechświatem z własną pamięcią i skrzynką pocztową. Tworzenie milionów procesów jest tanie. Komunikacja poprzez wiadomości eliminuje potrzebę jawnej synchronizacji. Propagacja awarii via link to mechanizm, na którym OTP buduje odporność na awarie. Następny artykuł omawia GenServer, podstawowy element, z którego zbudowany jest serwer GenServer przenosi to wszystko na wyższy poziom abstrakcji.
Nadchodzące artykuły z serii Elixir
- Artykuł 3: OTP i GenServer — stan rozproszony z wbudowaną odpornością na błędy
- Artykuł 4: Drzewa nadzorców — projektowanie systemów, które nigdy nie umierają
- Artykuł 5: Ecto — komponowalne mapowanie zapytań i schematów dla PostgreSQL







