BEAM Yarışma Modeli

Geleneksel dillerdeki eşzamanlılık, işletim sistemi iş parçacıklarına dayanır hafızayı paylaşanlar. Bu model kaçınılmaz olarak yarış koşullarına yol açıyor, yeniden üretilmesi zor olan kilitlenmeler ve hatalar. Dil muteksleri, semaforları sağlamalıdır. kilit — paylaşılan erişimi koordine etmeye yönelik mekanizmalar.

BEAM tamamen farklı bir yaklaşım seçmiştir: Aktör modeli. Süreçler BEAM'in tamamı iletişim kuran izole varlıklardır. münhasıran asenkron mesajlar aracılığıyla. Paylaşılan hafıza yok. Kilit yok. Yarış koşulları programcı disiplini ile değil tasarımla mümkün değildir.

Ne Öğreneceksiniz

  • BEAM işlemleri ve işletim sistemi iş parçacıkları arasındaki fark
  • yumurtlama ve yumurtlama_bağlantısı: yalıtılmış süreçler oluşturun
  • gönder ve al: işlemler arasında ileti geçişi
  • Posta Kutusu: Her işlemin mesaj kuyruğu
  • Process.monitor ile süreç izleme
  • Self() ve pid: bir sürecin kimliği
  • Görev: Eşzamanlı üreme için yüksek seviyeli soyutlama

BEAM Süreçleri ve İş Parçacığı İşletim Sistemi

BEAM işlemi bir işletim sistemi iş parçacığı değildir. Bu bir zamanlayıcı VM'de iksir düzeyi uygulandı. Her Erlang/İksir düğümü genellikle bir iş parçacığı kullanır CPU çekirdeği başına işletim sistemi, ancak yüzbinlerce (veya milyonlarca) işlemi planlayabilir Bu konular üzerinde BEAM.

# Dimensioni a confronto (approssimative)

# Thread OS (Linux):
# - Stack: 8MB default
# - Context switch: ~1-10 microsecond (syscall kernel)
# - Max per processo: centinaia

# Processo BEAM:
# - Stack iniziale: ~300 parole (pochi KB, cresce dinamicamente)
# - Context switch: sub-microsecondo (nello spazio utente)
# - Max per nodo: milioni (default: 262.144, configurabile)

# Verificare i limiti:
iex> :erlang.system_info(:process_limit)
# 262144 (default, configurabile con +P flag)

iex> :erlang.system_info(:process_count)
# Numero di processi attualmente in esecuzione

# BEAM usa preemptive scheduling basato su "reduction counts"
# Ogni processo ha un budget di "reduction" (operazioni base)
# Quando esaurito, lo scheduler puo' passare a un altro processo
# Questo garantisce fairness anche con processi CPU-intensive

yumurtlama: Süreçler Oluştur

# spawn: crea un processo che esegue una funzione anonima
pid = spawn(fn ->
  IO.puts("Sono un processo! PID: #{inspect(self())}")
  :timer.sleep(1000)
  IO.puts("Fine processo")
end)

IO.puts("Processo padre, PID figlio: #{inspect(pid)}")

# Il padre continua senza aspettare il figlio
# Output (ordine non garantito):
# Processo padre, PID figlio: #PID<0.108.0>
# Sono un processo! PID: #PID<0.108.0>
# Fine processo

# spawn con modulo e funzione
defmodule Worker do
  def run(task) do
    IO.puts("Worker #{inspect(self())} processing: #{task}")
    :timer.sleep(100)
    {:done, task}
  end
end

pid = spawn(Worker, :run, ["task-1"])

# self(): PID del processo corrente
IO.puts("Main process: #{inspect(self())}")

# Verifica se un processo e' ancora in vita
Process.alive?(pid)    # true o false

gönderme ve alma: Mesaj İletme

Süreçler posta kutusuna gönderilen mesajlar aracılığıyla iletilir. Posta kutusu işlem başına bir FIFO kuyruğu. receive eşleşen ilk mesajı çıkarır kalıplardan biri — hiçbir mesaj eşleşmezse süreç beklemeyi duraklatır.

# Comunicazione padre-figlio tramite messaggi
defmodule PingPong do
  def start do
    parent = self()

    # Crea processo figlio passando il PID del padre
    child = spawn(fn -> child_loop(parent) end)

    # Invia messaggio al figlio
    send(child, {:ping, "hello"})

    # Ricevi risposta
    receive do
      {:pong, message} ->
        IO.puts("Parent received: #{message}")

      {:error, reason} ->
        IO.puts("Error: #{reason}")

    after 5000 ->
      IO.puts("Timeout: no response in 5 seconds")
    end
  end

  defp child_loop(parent_pid) do
    receive do
      {:ping, message} ->
        IO.puts("Child received: #{message}")
        send(parent_pid, {:pong, "world"})
        child_loop(parent_pid)  # Ricomincia il ciclo

      :stop ->
        IO.puts("Child stopping")
        # Il processo termina quando la funzione ritorna
    end
  end
end

PingPong.start()
# Child received: hello
# Parent received: world
# Pattern: processo server con stato via ricorsione
defmodule Counter do
  def start(initial_value \\ 0) do
    spawn(__MODULE__, :loop, [initial_value])
  end

  def increment(pid), do: send(pid, {:increment, 1})
  def add(pid, n), do: send(pid, {:increment, n})

  def get(pid) do
    send(pid, {:get, self()})
    receive do
      {:value, n} -> n
    after 5000 -> {:error, :timeout}
    end
  end

  def stop(pid), do: send(pid, :stop)

  # Loop interno: mantiene lo stato tramite ricorsione di coda
  def loop(count) do
    receive do
      {:increment, n} ->
        loop(count + n)     # Ricorsione: nuovo stato

      {:get, caller} ->
        send(caller, {:value, count})
        loop(count)         # Stato invariato

      :stop ->
        IO.puts("Counter stopping at #{count}")
        # Non ricorre: il processo termina
    end
  end
end

# Uso
counter = Counter.start(0)

Counter.increment(counter)
Counter.increment(counter)
Counter.add(counter, 10)

IO.puts(Counter.get(counter))   # 12

Counter.stop(counter)

Spawn_link ve Çökme Yayılımı

spawn_link bir alt süreç yaratır ve bir bağlantı çift yönlü: iki işlemden biri çökerse çıkış sinyali yayılır diğerine. Bu, OTP'nin "bırak çöksün" ilkesinin temelidir.

# spawn vs spawn_link
defmodule CrashDemo do
  def demo_spawn do
    # spawn: il crash del figlio NON impatta il padre
    child = spawn(fn ->
      :timer.sleep(100)
      raise "Crash nel figlio!"
    end)

    IO.puts("Padre #{inspect(self())} vivo, figlio: #{inspect(child)}")
    :timer.sleep(500)
    IO.puts("Padre ancora vivo nonostante crash figlio")
    # Process.alive?(child) == false (il figlio e' morto)
  end

  def demo_spawn_link do
    parent = self()

    # spawn_link: il crash del figlio PROPAGA al padre
    # Il padre riceve un exit signal {:EXIT, child_pid, reason}
    child = spawn_link(fn ->
      :timer.sleep(100)
      raise "Crash nel figlio con link!"
    end)

    IO.puts("Padre #{inspect(self())} vivo, figlio: #{inspect(child)}")
    :timer.sleep(500)
    # Se il padre non e' un Supervisor, anche lui crashera'
    # Questo e' intenzionale: il fallimento si propaga verso l'alto
    # finche' un Supervisor non lo gestisce
  end

  def demo_trap_exit do
    # trap_exit: intercetta gli exit signal invece di crashare
    Process.flag(:trap_exit, true)

    child = spawn_link(fn ->
      :timer.sleep(100)
      raise "Crash!"
    end)

    receive do
      {:EXIT, ^child, reason} ->
        IO.puts("Figlio crashato: #{inspect(reason)}")
        # Il padre puo' decidere come gestire il crash
    end
  end
end

Process.monitor: Tek yönlü izleme

# Process.monitor: ricevi notifica quando un processo muore
# (senza il link bidirezionale di spawn_link)
defmodule ProcessMonitor do
  def watch(target_pid) do
    ref = Process.monitor(target_pid)
    IO.puts("Monitoring #{inspect(target_pid)}, ref: #{inspect(ref)}")

    receive do
      {:DOWN, ^ref, :process, ^target_pid, reason} ->
        IO.puts("Process #{inspect(target_pid)} down: #{inspect(reason)}")

    after 10_000 ->
      IO.puts("Timeout: process still running after 10s")
      Process.demonitor(ref)
    end
  end
end

# Crea un processo che si ferma dopo 500ms
worker = spawn(fn ->
  :timer.sleep(500)
  IO.puts("Worker done")
end)

# Monitora in background
spawn(fn -> ProcessMonitor.watch(worker) end)

# Attendi
:timer.sleep(1000)
# Output:
# Worker done
# Process #PID<...> down: :normal

Görev: Yüksek Düzey Soyutlama

form Task daha uygun bir soyutlama sağlar spawn ortak modeller için: eşzamansız işlemler gerçekleştirin ve sonuçları bekleyin.

# Task.async e Task.await: parallelismo strutturato
defmodule ParallelFetcher do
  def fetch_all(urls) do
    urls
    |> Enum.map(fn url ->
      # Lancia un task per ogni URL in parallelo
      Task.async(fn -> fetch(url) end)
    end)
    |> Enum.map(fn task ->
      # Attende il risultato di ciascun task (max 5 secondi)
      Task.await(task, 5_000)
    end)
  end

  defp fetch(url) do
    # Simula una chiamata HTTP
    :timer.sleep(Enum.random(100..500))
    {:ok, "Response from #{url}"}
  end
end

# 3 richieste in parallelo: tempo totale ~max(latenze)
urls = ["https://api1.example.com", "https://api2.example.com", "https://api3.example.com"]
results = ParallelFetcher.fetch_all(urls)
# [{:ok, "Response from ..."}, ...]

# Task.async_stream: parallelo con concorrenza limitata
def process_items(items, max_concurrency \\ 10) do
  items
  |> Task.async_stream(
    fn item -> expensive_operation(item) end,
    max_concurrency: max_concurrency,
    timeout: 10_000,
    on_timeout: :kill_task,
  )
  |> Enum.map(fn
    {:ok, result} -> result
    {:exit, reason} ->
      IO.puts("Task failed: #{inspect(reason)}")
      nil
  end)
  |> Enum.reject(&is_nil/1)
end

# Task.start: fire-and-forget (non attendi il risultato)
Task.start(fn ->
  send_notification_email(user)  # Background job, non bloccante
end)

Süreç Kaydı: Süreçleri Adlandırma

# Process Registry: registra un processo con un nome atom
defmodule NamedCounter do
  def start do
    pid = spawn(__MODULE__, :loop, [0])
    # Registra con nome globale (atom)
    Process.register(pid, :main_counter)
    pid
  end

  def increment do
    # Invia a nome simbolico invece di PID
    send(:main_counter, {:increment, 1})
  end

  def get do
    send(:main_counter, {:get, self()})
    receive do
      {:value, n} -> n
    after 1_000 -> nil
    end
  end

  def loop(n) do
    receive do
      {:increment, delta} -> loop(n + delta)
      {:get, caller} ->
        send(caller, {:value, n})
        loop(n)
    end
  end
end

NamedCounter.start()
NamedCounter.increment()
NamedCounter.increment()
IO.puts(NamedCounter.get())  # 2

# Trova il PID da un nome registrato
pid = Process.whereis(:main_counter)
IO.puts(inspect(pid))   # #PID<0.120.0>

Sonuçlar

BEAM'in oyuncu modeli Elixir'i eşsiz kılan şeydir. Her süreç, kendi belleği ve posta kutusuyla izole edilmiş bir evrendir. Milyonlarca işlemin üretilmesi ucuzdur. Mesajlar yoluyla iletişim açık senkronizasyon ihtiyacını ortadan kaldırır. Kilitlenme yayılımı via link, OTP'nin hata toleransını geliştirdiği mekanizmadır. Bir sonraki makale, temel yapı taşı olan GenServer'ı araştırıyor tüm bunları daha yüksek bir soyutlama düzeyine taşıyor.

İksir Serisinde Gelecek Makaleler

  • Madde 3: OTP ve GenServer — Yerleşik Hata Toleransı ile Dağıtılmış Durum
  • Madde 4: Denetleyici Ağaçlar — Asla Ölmeyen Sistemler Tasarlamak
  • Madde 5: Ecto — PostgreSQL için Şekillendirilebilir Sorgu ve Şema Eşleme