Concorrenza e Multithreading in Java
La programmazione concorrente permette di eseguire più operazioni contemporaneamente, sfruttando al meglio le CPU multi-core moderne. Java offre strumenti potenti per gestire thread, sincronizzazione e parallelismo.
Cosa Imparerai
- Creare e gestire Thread
- Runnable vs Callable
- ExecutorService e Thread Pool
- Sincronizzazione e thread-safety
- Lock, semafori e barriere
- Classi atomiche e ConcurrentCollections
- CompletableFuture per async
Thread: Concetti Fondamentali
Un thread è un flusso di esecuzione indipendente all'interno di un programma. Java supporta il multithreading nativamente fin dalla prima versione.
// METODO 1: Estendere Thread
class MioThread extends Thread {
private String nome;
public MioThread(String nome) {
this.nome = nome;
}
@Override
public void run() {
for (int i = 0; i < 5; i++) {
System.out.println(nome + ": iterazione " + i);
try {
Thread.sleep(100); // Pausa di 100ms
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
// METODO 2: Implementare Runnable (preferito)
class MioRunnable implements Runnable {
private String nome;
public MioRunnable(String nome) {
this.nome = nome;
}
@Override
public void run() {
for (int i = 0; i < 5; i++) {
System.out.println(nome + ": iterazione " + i);
}
}
}
// Uso
public class ThreadDemo {
public static void main(String[] args) {
// Metodo 1
MioThread t1 = new MioThread("Thread-1");
t1.start(); // start() avvia il thread
// Metodo 2
Thread t2 = new Thread(new MioRunnable("Thread-2"));
t2.start();
// Metodo 3: Lambda (Java 8+)
Thread t3 = new Thread(() -> {
System.out.println("Thread Lambda in esecuzione");
});
t3.start();
System.out.println("Main thread continua...");
}
}
Thread vs Runnable
| Aspetto | Estendere Thread | Implementare Runnable |
|---|---|---|
| Ereditarietà | Non puoi estendere altre classi | Puoi estendere altre classi |
| Riusabilità | Meno riusabile | Più flessibile e riusabile |
| Separazione | Logica legata a Thread | Logica separata da gestione thread |
Stati di un Thread
public class StatoThread {
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
System.out.println("Stato iniziale: " + thread.getState()); // NEW
thread.start();
System.out.println("Dopo start: " + thread.getState()); // RUNNABLE
Thread.sleep(100);
System.out.println("Durante sleep: " + thread.getState()); // TIMED_WAITING
thread.join(); // Aspetta che il thread termini
System.out.println("Dopo join: " + thread.getState()); // TERMINATED
}
}
// Stati possibili:
// NEW - Thread creato ma non ancora avviato
// RUNNABLE - In esecuzione o pronto per essere eseguito
// BLOCKED - In attesa di un lock
// WAITING - In attesa indefinita (wait, join senza timeout)
// TIMED_WAITING - In attesa con timeout (sleep, wait con timeout)
// TERMINATED - Esecuzione completata
Sincronizzazione
Quando più thread accedono alle stesse risorse, possono verificarsi race condition. La sincronizzazione garantisce l'accesso esclusivo.
// PROBLEMA: Race condition
class ContoBancario {
private int saldo = 1000;
// SENZA sincronizzazione - PERICOLOSO!
public void prelievoNonSicuro(int importo) {
if (saldo >= importo) {
// Un altro thread potrebbe modificare saldo qui!
saldo -= importo;
System.out.println("Prelevato: " + importo + ", Saldo: " + saldo);
}
}
// CON sincronizzazione - SICURO
public synchronized void prelievoSicuro(int importo) {
if (saldo >= importo) {
saldo -= importo;
System.out.println("Prelevato: " + importo + ", Saldo: " + saldo);
}
}
// Alternativa: blocco synchronized
public void deposito(int importo) {
synchronized (this) {
saldo += importo;
System.out.println("Depositato: " + importo + ", Saldo: " + saldo);
}
}
public synchronized int getSaldo() {
return saldo;
}
}
// Test concorrente
public class TestConcorrenza {
public static void main(String[] args) throws InterruptedException {
ContoBancario conto = new ContoBancario();
Runnable operazione = () -> {
for (int i = 0; i < 10; i++) {
conto.prelievoSicuro(50);
}
};
Thread t1 = new Thread(operazione);
Thread t2 = new Thread(operazione);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("Saldo finale: " + conto.getSaldo());
}
}
Lock Espliciti
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
class ContoBancarioConLock {
private int saldo = 1000;
private final Lock lock = new ReentrantLock();
public void prelievo(int importo) {
lock.lock(); // Acquisisce il lock
try {
if (saldo >= importo) {
saldo -= importo;
System.out.println(Thread.currentThread().getName() +
" ha prelevato " + importo + ", saldo: " + saldo);
}
} finally {
lock.unlock(); // Rilascia SEMPRE il lock nel finally
}
}
// tryLock: non bloccante
public boolean tentaPrelievo(int importo) {
if (lock.tryLock()) {
try {
if (saldo >= importo) {
saldo -= importo;
return true;
}
} finally {
lock.unlock();
}
}
return false;
}
}
// ReadWriteLock: letture parallele, scritture esclusive
class CacheConReadWriteLock {
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock();
private String valore;
public String leggi() {
readLock.lock();
try {
return valore;
} finally {
readLock.unlock();
}
}
public void scrivi(String nuovo) {
writeLock.lock();
try {
valore = nuovo;
} finally {
writeLock.unlock();
}
}
}
ExecutorService e Thread Pool
Creare thread manualmente è inefficiente. Gli ExecutorService gestiscono pool di thread riutilizzabili per migliori prestazioni.
import java.util.concurrent.*;
import java.util.List;
import java.util.ArrayList;
public class ExecutorDemo {
public static void main(String[] args) throws Exception {
// 1. Fixed Thread Pool: numero fisso di thread
ExecutorService fixedPool = Executors.newFixedThreadPool(4);
// 2. Cached Thread Pool: crea thread on-demand
ExecutorService cachedPool = Executors.newCachedThreadPool();
// 3. Single Thread: un solo thread
ExecutorService singleThread = Executors.newSingleThreadExecutor();
// 4. Scheduled: per task pianificati
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(2);
// Esempio con FixedThreadPool
for (int i = 0; i < 10; i++) {
final int taskId = i;
fixedPool.execute(() -> {
System.out.println("Task " + taskId + " eseguito da " +
Thread.currentThread().getName());
});
}
// Shutdown corretto
fixedPool.shutdown(); // Non accetta nuovi task
try {
// Aspetta max 60 secondi per completamento
if (!fixedPool.awaitTermination(60, TimeUnit.SECONDS)) {
fixedPool.shutdownNow(); // Forza chiusura
}
} catch (InterruptedException e) {
fixedPool.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
Callable e Future
import java.util.concurrent.*;
import java.util.List;
import java.util.ArrayList;
// Callable: come Runnable ma può ritornare un valore
class CalcolaFattoriale implements Callable<Long> {
private final int numero;
public CalcolaFattoriale(int numero) {
this.numero = numero;
}
@Override
public Long call() throws Exception {
long risultato = 1;
for (int i = 2; i <= numero; i++) {
risultato *= i;
Thread.sleep(100); // Simula lavoro
}
return risultato;
}
}
public class CallableDemo {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(3);
// submit() ritorna un Future
Future<Long> future = executor.submit(new CalcolaFattoriale(10));
// Altre operazioni mentre il calcolo è in corso...
System.out.println("Calcolo in corso...");
// get() blocca fino a quando il risultato è pronto
Long risultato = future.get(); // Può lanciare ExecutionException
System.out.println("10! = " + risultato);
// get() con timeout
Future<Long> future2 = executor.submit(new CalcolaFattoriale(20));
try {
Long risultato2 = future2.get(2, TimeUnit.SECONDS);
System.out.println("20! = " + risultato2);
} catch (TimeoutException e) {
System.out.println("Timeout! Cancellazione task...");
future2.cancel(true); // Interrompe il task
}
// invokeAll: esegue tutti i task
List<Callable<Long>> tasks = new ArrayList<>();
tasks.add(new CalcolaFattoriale(5));
tasks.add(new CalcolaFattoriale(6));
tasks.add(new CalcolaFattoriale(7));
List<Future<Long>> futures = executor.invokeAll(tasks);
for (Future<Long> f : futures) {
System.out.println("Risultato: " + f.get());
}
executor.shutdown();
}
}
CompletableFuture
CompletableFuture (Java 8+) permette programmazione asincrona più elegante con concatenazione di operazioni e gestione errori.
import java.util.concurrent.*;
public class CompletableFutureDemo {
public static void main(String[] args) throws Exception {
// Esecuzione asincrona senza valore
CompletableFuture<Void> cf1 = CompletableFuture.runAsync(() -> {
System.out.println("Task asincrono in background");
});
// Esecuzione asincrona con valore
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Risultato dal futuro";
});
// Concatenazione: thenApply, thenAccept, thenRun
CompletableFuture<Integer> pipeline = CompletableFuture
.supplyAsync(() -> "42")
.thenApply(s -> Integer.parseInt(s)) // String -> Integer
.thenApply(n -> n * 2); // Integer -> Integer
System.out.println("Risultato pipeline: " + pipeline.get());
// Gestione errori
CompletableFuture<Integer> conErrore = CompletableFuture
.supplyAsync(() -> {
if (true) throw new RuntimeException("Errore simulato!");
return 42;
})
.exceptionally(ex -> {
System.out.println("Gestione errore: " + ex.getMessage());
return -1; // Valore di default
});
System.out.println("Con gestione errore: " + conErrore.get());
// Combinare più CompletableFuture
CompletableFuture<String> futureA = CompletableFuture
.supplyAsync(() -> "Hello");
CompletableFuture<String> futureB = CompletableFuture
.supplyAsync(() -> "World");
// thenCombine: combina due future
CompletableFuture<String> combinato = futureA
.thenCombine(futureB, (a, b) -> a + " " + b);
System.out.println(combinato.get()); // Hello World
// allOf: aspetta che tutti completino
CompletableFuture<Void> tutti = CompletableFuture.allOf(futureA, futureB);
tutti.get();
// anyOf: restituisce appena uno completa
CompletableFuture<Object> primo = CompletableFuture.anyOf(futureA, futureB);
System.out.println("Primo completato: " + primo.get());
}
}
Classi Atomiche e Collections Concorrenti
Le classi Atomic* e le ConcurrentCollections forniscono thread-safety senza bisogno di sincronizzazione esplicita.
import java.util.concurrent.atomic.*;
public class AtomicDemo {
// AtomicInteger: operazioni atomiche su int
private static AtomicInteger contatore = new AtomicInteger(0);
// AtomicLong, AtomicBoolean, AtomicReference
private static AtomicLong contatoreGrande = new AtomicLong(0);
private static AtomicBoolean flag = new AtomicBoolean(false);
private static AtomicReference<String> riferimento =
new AtomicReference<>("iniziale");
public static void main(String[] args) throws InterruptedException {
// Operazioni atomiche
contatore.incrementAndGet(); // ++contatore
contatore.decrementAndGet(); // --contatore
contatore.addAndGet(10); // contatore += 10
contatore.getAndAdd(5); // return contatore; contatore += 5
// Compare and Set (CAS)
boolean successo = contatore.compareAndSet(15, 20);
System.out.println("CAS riuscito: " + successo);
// updateAndGet con funzione
contatore.updateAndGet(n -> n * 2);
// Test concorrente
Runnable incrementa = () -> {
for (int i = 0; i < 1000; i++) {
contatore.incrementAndGet();
}
};
Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) {
threads[i] = new Thread(incrementa);
threads[i].start();
}
for (Thread t : threads) {
t.join();
}
// Con atomic: sempre 10000
System.out.println("Contatore finale: " + contatore.get());
}
}
Concurrent Collections
import java.util.concurrent.*;
import java.util.*;
public class ConcurrentCollectionsDemo {
public static void main(String[] args) {
// ConcurrentHashMap: HashMap thread-safe
ConcurrentMap<String, Integer> mappa = new ConcurrentHashMap<>();
// Operazioni atomiche
mappa.put("A", 1);
mappa.putIfAbsent("B", 2); // Solo se assente
mappa.compute("A", (k, v) -> v + 10); // Aggiorna atomicamente
mappa.merge("C", 1, Integer::sum); // Inserisce o unisce
// CopyOnWriteArrayList: ottima per molte letture, poche scritture
CopyOnWriteArrayList<String> lista = new CopyOnWriteArrayList<>();
lista.add("elemento1");
lista.add("elemento2");
// Sicuro iterare durante modifiche
for (String s : lista) {
System.out.println(s);
lista.add("nuovo"); // Non causa ConcurrentModificationException
}
// BlockingQueue: per pattern producer-consumer
BlockingQueue<String> coda = new LinkedBlockingQueue<>(10);
// Producer
new Thread(() -> {
try {
coda.put("messaggio"); // Blocca se piena
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
// Consumer
new Thread(() -> {
try {
String msg = coda.take(); // Blocca se vuota
System.out.println("Ricevuto: " + msg);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
// ConcurrentSkipListMap/Set: TreeMap/TreeSet thread-safe
ConcurrentSkipListMap<Integer, String> skipMap =
new ConcurrentSkipListMap<>();
skipMap.put(1, "uno");
skipMap.put(2, "due");
}
}
Sincronizzatori Avanzati
import java.util.concurrent.*;
public class SincronizzatoriDemo {
// CountDownLatch: aspetta che N eventi si verifichino
public static void countDownLatchDemo() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
final int id = i;
new Thread(() -> {
System.out.println("Worker " + id + " completato");
latch.countDown(); // Decrementa il contatore
}).start();
}
latch.await(); // Blocca fino a quando count = 0
System.out.println("Tutti i worker hanno finito!");
}
// CyclicBarrier: sincronizza N thread in un punto
public static void cyclicBarrierDemo() throws Exception {
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("Tutti pronti, si parte!");
});
for (int i = 0; i < 3; i++) {
final int id = i;
new Thread(() -> {
try {
System.out.println("Thread " + id + " in attesa alla barriera");
barrier.await(); // Aspetta gli altri
System.out.println("Thread " + id + " continua");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
// Semaphore: limita accesso a risorsa
public static void semaphoreDemo() {
Semaphore semaforo = new Semaphore(2); // Max 2 accessi
for (int i = 0; i < 5; i++) {
final int id = i;
new Thread(() -> {
try {
semaforo.acquire(); // Richiede permesso
System.out.println("Thread " + id + " usa la risorsa");
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
semaforo.release(); // Rilascia permesso
System.out.println("Thread " + id + " rilascia");
}
}).start();
}
}
public static void main(String[] args) throws Exception {
System.out.println("=== CountDownLatch ===");
countDownLatchDemo();
Thread.sleep(1000);
System.out.println("\n=== CyclicBarrier ===");
cyclicBarrierDemo();
Thread.sleep(1000);
System.out.println("\n=== Semaphore ===");
semaphoreDemo();
}
}
Pattern Producer-Consumer
import java.util.concurrent.*;
class Produttore implements Runnable {
private final BlockingQueue<Integer> coda;
private final int maxProdotti;
public Produttore(BlockingQueue<Integer> coda, int maxProdotti) {
this.coda = coda;
this.maxProdotti = maxProdotti;
}
@Override
public void run() {
try {
for (int i = 0; i < maxProdotti; i++) {
int prodotto = i;
coda.put(prodotto); // Blocca se coda piena
System.out.println("Prodotto: " + prodotto +
" [coda: " + coda.size() + "]");
Thread.sleep(100);
}
coda.put(-1); // Segnale di fine
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
class Consumatore implements Runnable {
private final BlockingQueue<Integer> coda;
public Consumatore(BlockingQueue<Integer> coda) {
this.coda = coda;
}
@Override
public void run() {
try {
while (true) {
Integer prodotto = coda.take(); // Blocca se coda vuota
if (prodotto == -1) {
coda.put(-1); // Propaga segnale di fine
break;
}
System.out.println("Consumato: " + prodotto);
Thread.sleep(200);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public class ProducerConsumerDemo {
public static void main(String[] args) {
BlockingQueue<Integer> coda = new ArrayBlockingQueue<>(5);
Thread produttore = new Thread(new Produttore(coda, 10));
Thread consumatore1 = new Thread(new Consumatore(coda));
Thread consumatore2 = new Thread(new Consumatore(coda));
produttore.start();
consumatore1.start();
consumatore2.start();
}
}
Best Practices
Regole per Codice Thread-Safe
- Minimizza stato condiviso: preferisci oggetti immutabili
- Usa classi concurrent: ConcurrentHashMap, AtomicInteger...
- Preferisci ExecutorService: evita creazione manuale di thread
- Rilascia sempre i lock: usa try-finally
- Evita deadlock: ordina acquisizione lock, usa timeout
- Non ignorare InterruptedException: propaga o ripristina flag
- Documenta thread-safety: indica se una classe è thread-safe







