OTP ve GenServer: Entegre Hata Toleransı ile Dağıtılmış Durum Yönetimi
GenServer, OTP'nin temel yapı taşıdır: bir sunucunun nasıl uygulanacağı Handle_call/handle_cast ile durum bilgisi olan, değişmez durumu yönet, işlemleri kaydet adı ve nedeni ile "bırakın çöksün" bir tasarım felsefesidir, bir mazeret değil.
OTP: Açık Telekom Platformu
OTP (Açık Telekom Platformu) bir dizi kütüphane, tasarım modeli ve Erlang'ın Ericsson'daki geçmişinden gelen mimari ilkeler. Bu bir web çerçevesi değil; inşa etmek için bir çerçeve Hataya dayanıklı sistemler. Yapı taşları (GenServer, Supervisor, GenStage, Kayıt Defteri) eşzamanlı süreçleri yapılandırmanız için size standartlaştırılmış bir yol sunar güvenilirlik garantileriyle.
Önceki bölümde durum bilgisi olan bir sunucunun nasıl uygulanacağını gördük. kuyruk özyinelemesi ve manuel alım. GenServer da aynı fikir kurallar, araçlar ve entegrasyonla standart bir OTP davranışı içinde kapsüllenmiştir denetim ağacıyla.
Ne Öğreneceksiniz
- GenServer davranışı: zorunlu ve isteğe bağlı geri aramalar
- hand_call: eşzamanlı istekler (istemci yanıt bekler)
- hand_cast: eşzamansız istekler (ateşle ve unut)
- hand_info: OTP olmayan mesajlar (zamanlayıcı, monitör kesintileri)
- Şu adla kayıt: {:local, :name} ve {:via, module, key}
- başlatma/1 ve sonlandırma/2: sunucu yaşam döngüsü
- “Bırakın çöksün”: hataların İŞLENMEMESİ gerektiğinde
Bir GenServer'ın Anatomisi
# GenServer: struttura base
defmodule MyApp.Counter do
use GenServer
# --- Client API (chiamato da altri processi) ---
@doc "Avvia il server Counter."
def start_link(initial_value \\ 0) do
# Il nome :my_counter permette di riferirsi al server senza PID
GenServer.start_link(__MODULE__, initial_value, name: :my_counter)
end
@doc "Incrementa il contatore di n (default 1). Sincrono."
def increment(n \\ 1) do
GenServer.call(:my_counter, {:increment, n})
end
@doc "Decrementa in modo asincrono (non aspetta conferma)."
def decrement_async(n \\ 1) do
GenServer.cast(:my_counter, {:decrement, n})
end
@doc "Legge il valore corrente."
def get_value do
GenServer.call(:my_counter, :get_value)
end
@doc "Reset asincrono."
def reset do
GenServer.cast(:my_counter, :reset)
end
# --- Server Callbacks (eseguiti nel processo GenServer) ---
@impl true
def init(initial_value) do
{:ok, initial_value}
# Oppure: {:ok, state, timeout_ms} -- handle_info(:timeout, ...) dopo ms
# Oppure: {:stop, reason} -- non avviare il server
end
# handle_call: sincrono, risponde con {:reply, reply, new_state}
@impl true
def handle_call({:increment, n}, _from, state) do
new_state = state + n
{:reply, new_state, new_state}
# {:reply, risposta_al_client, nuovo_stato}
end
def handle_call(:get_value, _from, state) do
{:reply, state, state}
# Lo stato non cambia, ma rispondiamo con il valore corrente
end
# handle_cast: asincrono, NON risponde
@impl true
def handle_cast({:decrement, n}, state) do
{:noreply, state - n}
# {:noreply, nuovo_stato}
end
def handle_cast(:reset, _state) do
{:noreply, 0}
end
# handle_info: messaggi non-OTP (es. :timer.send_after, Process.send_after)
@impl true
def handle_info(:log_state, state) do
IO.puts("[Counter] Current value: #{state}")
{:noreply, state}
end
# Catch-all per messaggi non gestiti (evita crash per messaggi inaspettati)
def handle_info(msg, state) do
IO.puts("Unexpected message: #{inspect(msg)}")
{:noreply, state}
end
# terminate: chiamato prima dello stop (cleanup)
@impl true
def terminate(reason, state) do
IO.puts("Counter stopping. Reason: #{inspect(reason)}, final value: #{state}")
:ok
end
end
# Utilizzo del Counter GenServer
{:ok, _pid} = MyApp.Counter.start_link(0)
MyApp.Counter.increment() # 1
MyApp.Counter.increment(5) # 6
MyApp.Counter.decrement_async(2) # Async: ritorna :ok immediatamente
:timer.sleep(10) # Aspetta che il cast venga processato
MyApp.Counter.get_value() # 4
# Test:
iex> MyApp.Counter.get_value()
4
# Invia un messaggio direttamente al processo
send(:my_counter, :log_state)
# [Counter] Current value: 4
Yapılandırılmış Durum: basit değerlerin ötesinde
Gerçek uygulamalarda GenServer'ın durumu genellikle bir yapıdır. veya birden fazla alanı olan bir harita. Yapı kullanmak kodu daha açık hale getirir ve geri aramalarda durumla ilgili model eşleşmesine izin verir.
# GenServer con stato strutturato
defmodule MyApp.RateLimiter do
use GenServer
defstruct [
:window_ms,
:max_requests,
requests: %{}, # user_id => list of timestamps
]
# --- Client API ---
def start_link(opts \\ []) do
window_ms = Keyword.get(opts, :window_ms, 60_000)
max_requests = Keyword.get(opts, :max_requests, 100)
GenServer.start_link(__MODULE__, {window_ms, max_requests}, name: __MODULE__)
end
@doc "Controlla se la richiesta e' permessa. Ritorna {:ok, remaining} | {:error, :rate_limited}"
def check(user_id) do
GenServer.call(__MODULE__, {:check, user_id})
end
def reset_user(user_id) do
GenServer.cast(__MODULE__, {:reset_user, user_id})
end
# --- Server Callbacks ---
@impl true
def init({window_ms, max_requests}) do
# Avvia cleanup periodico ogni 30 secondi
:timer.send_interval(30_000, :cleanup)
state = %__MODULE__{
window_ms: window_ms,
max_requests: max_requests,
}
{:ok, state}
end
@impl true
def handle_call({:check, user_id}, _from, state) do
now = System.monotonic_time(:millisecond)
window_start = now - state.window_ms
# Recupera timestamps delle richieste precedenti, filtra scadute
user_requests =
Map.get(state.requests, user_id, [])
|> Enum.filter(&(&1 > window_start))
if length(user_requests) >= state.max_requests do
# Rate limited: non aggiorno lo stato
remaining = 0
retry_after = hd(user_requests) + state.window_ms - now
{:reply, {:error, :rate_limited, retry_after}, state}
else
# Permesso: aggiungo il timestamp corrente
new_requests = [now | user_requests]
new_state = put_in(state.requests[user_id], new_requests)
remaining = state.max_requests - length(new_requests)
{:reply, {:ok, remaining}, new_state}
end
end
@impl true
def handle_cast({:reset_user, user_id}, state) do
new_state = update_in(state.requests, &Map.delete(&1, user_id))
{:noreply, new_state}
end
@impl true
def handle_info(:cleanup, state) do
now = System.monotonic_time(:millisecond)
window_start = now - state.window_ms
# Rimuovi utenti senza richieste recenti
cleaned_requests =
state.requests
|> Enum.reject(fn {_user, timestamps} ->
Enum.all?(timestamps, &(&1 <= window_start))
end)
|> Map.new()
{:noreply, %{state | requests: cleaned_requests}}
end
end
"Bırakın Çöksün": OTP Felsefesi
"Bırak çöksün" Erlang/İksir'in en yanlış anlaşılan ilkesidir. Bu "hataları görmezden gelmek" anlamına gelmez; iş kodu anlamına gelir öngörülemeyen herhangi bir durum için savunma hatası işleme yüküne maruz bırakılmamalıdır. Bunun yerine kurtarma, süreçleri otomatik olarak yeniden başlatan Süpervizöre devredilir. o kaza.
# Let it crash: codice senza defensive programming eccessivo
defmodule MyApp.OrderProcessor do
use GenServer
# Approccio SBAGLIATO: defensive programming eccessivo
def process_order_bad(order) do
try do
case validate_order(order) do
{:ok, valid_order} ->
case save_to_db(valid_order) do
{:ok, saved} ->
case send_confirmation(saved) do
{:ok, _} -> {:ok, saved}
{:error, e} -> handle_email_error(e)
end
{:error, e} -> handle_db_error(e)
end
{:error, e} -> handle_validation_error(e)
end
rescue
e -> handle_unexpected_error(e)
end
end
# Approccio CORRETTO: gestisci solo gli errori attesi, crash per il resto
def process_order(order) do
with {:ok, valid_order} <- validate_order(order),
{:ok, saved} <- save_to_db(valid_order),
{:ok, _} <- send_confirmation(saved) do
{:ok, saved}
else
{:error, :invalid_data} = error ->
Logger.warning("Invalid order data: #{inspect(order)}")
error # Errore previsto: gestito
{:error, :duplicate_order} = error ->
Logger.info("Duplicate order ignored: #{order.id}")
{:ok, :duplicate} # Caso atteso: ritorna ok
end
# Per tutto il resto (bug nel codice, db down, etc.):
# Il crash propaghera' al Supervisor che riavviera' il processo
# in uno stato pulito
end
# Errori che NON gestisci:
# - Bug nel codice (FunctionClauseError, MatchError)
# - Dipendenze irraggiungibili (db crash totale)
# - Situazioni che non dovrebbero mai accadere
# Il Supervisor le gestisce riavviando il processo
end
Kayıt İşlemi: Via ve Kayıt
# Registrazione con nome atom globale (semplice ma unico per nodo)
GenServer.start_link(MyServer, args, name: :global_name)
# {:via, module, key}: Registry distribuito (piu' flessibile)
defmodule MyApp.UserSession do
use GenServer
# Avvia con via Registry per supportare piu' istanze
def start_link(user_id) do
GenServer.start_link(
__MODULE__,
%{user_id: user_id},
name: via_tuple(user_id)
)
end
# Helper per costruire il via tuple
defp via_tuple(user_id) do
{:via, Registry, {MyApp.Registry, "user_session:#{user_id}"}}
end
# Client API usa via_tuple invece del PID
def get_session(user_id) do
GenServer.call(via_tuple(user_id), :get_session)
end
def update_session(user_id, updates) do
GenServer.cast(via_tuple(user_id), {:update, updates})
end
# Server callbacks
@impl true
def init(state), do: {:ok, state}
@impl true
def handle_call(:get_session, _from, state) do
{:reply, state, state}
end
@impl true
def handle_cast({:update, updates}, state) do
{:noreply, Map.merge(state, updates)}
end
end
# Setup in application.ex
# children = [
# {Registry, keys: :unique, name: MyApp.Registry},
# ...
# ]
# Uso
{:ok, _} = MyApp.UserSession.start_link("user-1001")
MyApp.UserSession.get_session("user-1001")
# %{user_id: "user-1001"}
MyApp.UserSession.update_session("user-1001", %{last_seen: DateTime.utc_now()})
:via ve DynamicSupervisor özellikli GenServer
# Application setup per piu' GenServer dello stesso tipo
defmodule MyApp.Application do
use Application
@impl true
def start(_type, _args) do
children = [
# Registry per lookup per chiave
{Registry, keys: :unique, name: MyApp.SessionRegistry},
# DynamicSupervisor: supervisore per figli creati dinamicamente
{DynamicSupervisor, name: MyApp.SessionSupervisor, strategy: :one_for_one},
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
end
# Avvia sessioni on-demand
defmodule MyApp.SessionManager do
def start_session(user_id) do
spec = {MyApp.UserSession, user_id}
DynamicSupervisor.start_child(MyApp.SessionSupervisor, spec)
end
def stop_session(user_id) do
case Registry.lookup(MyApp.SessionRegistry, "user_session:#{user_id}") do
[{pid, _}] -> DynamicSupervisor.terminate_child(MyApp.SessionSupervisor, pid)
[] -> {:error, :not_found}
end
end
def active_sessions do
DynamicSupervisor.which_children(MyApp.SessionSupervisor)
|> length()
end
end
GenServer'ı test etme
# Test di un GenServer con ExUnit
defmodule MyApp.CounterTest do
use ExUnit.Case, async: true
setup do
# Avvia una nuova istanza per ogni test (senza nome, usa PID)
{:ok, pid} = GenServer.start_link(MyApp.Counter, 0)
{:ok, counter: pid}
end
test "starts at 0", %{counter: pid} do
assert GenServer.call(pid, :get_value) == 0
end
test "increments correctly", %{counter: pid} do
GenServer.call(pid, {:increment, 5})
GenServer.call(pid, {:increment, 3})
assert GenServer.call(pid, :get_value) == 8
end
test "resets to zero", %{counter: pid} do
GenServer.call(pid, {:increment, 10})
GenServer.cast(pid, :reset)
# Piccola attesa per il cast asincrono
:timer.sleep(10)
assert GenServer.call(pid, :get_value) == 0
end
test "handles concurrent increments safely", %{counter: pid} do
# Spawna 100 processi che incrementano contemporaneamente
tasks = for _ <- 1..100 do
Task.async(fn ->
GenServer.call(pid, {:increment, 1})
end)
end
Enum.each(tasks, &Task.await/1)
assert GenServer.call(pid, :get_value) == 100
end
end
Sonuçlar
GenServer, İksir süreç teorisinin buluştuğu yerdir pratik. OTP davranışı yapı, kurallar ve entegrasyon sağlar denetim ağacıyla. eşzamanlılık için tanıtıcı_çağrı, eş zamanlılık için tanıtıcı_cast ateşle ve unut, harici mesajlar içinhandle_info - bu üç geri arama kullanım durumlarının %95'ini kapsarlar. "Bırakın çöksün" tembellik değil: öyle Çevreyi kirletmeden dayanıklı sistemler kurmamızı sağlayan felsefe her uç durum için savunma programlamalı iş kodu. Bir sonraki makale bunu Süpervizörler ile bir sonraki seviyeye taşıyor.
İksir Serisinde Gelecek Makaleler
- Madde 4: Denetleyici Ağaçları — birimiz birimiz için, hepimizimiz için birimiz ve birimiz için dinlenme stratejileri
- Madde 5: Ecto — PostgreSQL için Şekillendirilebilir Sorgu ve Şema Eşleme
- Madde 6: Phoenix Çerçevesi — Yönlendirici, Denetleyici, Görünüm ve JSON API







