Concurrency and Multithreading in Java
Concurrent programming allows multiple operations to execute simultaneously, making full use of modern multi-core CPUs. Java offers powerful tools for managing threads, synchronization, and parallelism.
What You'll Learn
- Creating and managing Threads
- Runnable vs Callable
- ExecutorService and Thread Pools
- Synchronization and thread-safety
- Locks, semaphores, and barriers
- Atomic classes and ConcurrentCollections
- CompletableFuture for async programming
Threads: Fundamental Concepts
A thread is an independent flow of execution within a program. Java has supported multithreading natively since its first version.
// METHOD 1: Extend Thread
class MyThread extends Thread {
private String name;
public MyThread(String name) {
this.name = name;
}
@Override
public void run() {
for (int i = 0; i < 5; i++) {
System.out.println(name + ": iteration " + i);
try {
Thread.sleep(100); // Pause for 100ms
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
// METHOD 2: Implement Runnable (preferred)
class MyRunnable implements Runnable {
private String name;
public MyRunnable(String name) {
this.name = name;
}
@Override
public void run() {
for (int i = 0; i < 5; i++) {
System.out.println(name + ": iteration " + i);
}
}
}
// Usage
public class ThreadDemo {
public static void main(String[] args) {
// Method 1
MyThread t1 = new MyThread("Thread-1");
t1.start(); // start() launches the thread
// Method 2
Thread t2 = new Thread(new MyRunnable("Thread-2"));
t2.start();
// Method 3: Lambda (Java 8+)
Thread t3 = new Thread(() -> {
System.out.println("Lambda Thread running");
});
t3.start();
System.out.println("Main thread continues...");
}
}
Thread vs Runnable
| Aspect | Extending Thread | Implementing Runnable |
|---|---|---|
| Inheritance | Cannot extend other classes | Can extend other classes |
| Reusability | Less reusable | More flexible and reusable |
| Separation | Logic tied to Thread | Logic separated from thread management |
Thread States
public class ThreadState {
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
System.out.println("Initial state: " + thread.getState()); // NEW
thread.start();
System.out.println("After start: " + thread.getState()); // RUNNABLE
Thread.sleep(100);
System.out.println("During sleep: " + thread.getState()); // TIMED_WAITING
thread.join(); // Wait for thread to finish
System.out.println("After join: " + thread.getState()); // TERMINATED
}
}
// Possible states:
// NEW - Thread created but not yet started
// RUNNABLE - Running or ready to run
// BLOCKED - Waiting for a lock
// WAITING - Indefinite wait (wait, join without timeout)
// TIMED_WAITING - Wait with timeout (sleep, wait with timeout)
// TERMINATED - Execution completed
Synchronization
When multiple threads access the same resources, race conditions can occur. Synchronization ensures exclusive access.
// PROBLEM: Race condition
class BankAccount {
private int balance = 1000;
// WITHOUT synchronization - DANGEROUS!
public void unsafeWithdraw(int amount) {
if (balance >= amount) {
// Another thread could modify balance here!
balance -= amount;
System.out.println("Withdrawn: " + amount + ", Balance: " + balance);
}
}
// WITH synchronization - SAFE
public synchronized void safeWithdraw(int amount) {
if (balance >= amount) {
balance -= amount;
System.out.println("Withdrawn: " + amount + ", Balance: " + balance);
}
}
// Alternative: synchronized block
public void deposit(int amount) {
synchronized (this) {
balance += amount;
System.out.println("Deposited: " + amount + ", Balance: " + balance);
}
}
public synchronized int getBalance() {
return balance;
}
}
// Concurrent test
public class ConcurrencyTest {
public static void main(String[] args) throws InterruptedException {
BankAccount account = new BankAccount();
Runnable operation = () -> {
for (int i = 0; i < 10; i++) {
account.safeWithdraw(50);
}
};
Thread t1 = new Thread(operation);
Thread t2 = new Thread(operation);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("Final balance: " + account.getBalance());
}
}
Explicit Locks
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
class BankAccountWithLock {
private int balance = 1000;
private final Lock lock = new ReentrantLock();
public void withdraw(int amount) {
lock.lock(); // Acquire the lock
try {
if (balance >= amount) {
balance -= amount;
System.out.println(Thread.currentThread().getName() +
" withdrew " + amount + ", balance: " + balance);
}
} finally {
lock.unlock(); // ALWAYS release lock in finally
}
}
// tryLock: non-blocking
public boolean tryWithdraw(int amount) {
if (lock.tryLock()) {
try {
if (balance >= amount) {
balance -= amount;
return true;
}
} finally {
lock.unlock();
}
}
return false;
}
}
// ReadWriteLock: parallel reads, exclusive writes
class CacheWithReadWriteLock {
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock();
private String value;
public String read() {
readLock.lock();
try {
return value;
} finally {
readLock.unlock();
}
}
public void write(String newValue) {
writeLock.lock();
try {
value = newValue;
} finally {
writeLock.unlock();
}
}
}
ExecutorService and Thread Pools
Creating threads manually is inefficient. ExecutorService manages pools of reusable threads for better performance.
import java.util.concurrent.*;
import java.util.List;
import java.util.ArrayList;
public class ExecutorDemo {
public static void main(String[] args) throws Exception {
// 1. Fixed Thread Pool: fixed number of threads
ExecutorService fixedPool = Executors.newFixedThreadPool(4);
// 2. Cached Thread Pool: creates threads on-demand
ExecutorService cachedPool = Executors.newCachedThreadPool();
// 3. Single Thread: one thread only
ExecutorService singleThread = Executors.newSingleThreadExecutor();
// 4. Scheduled: for scheduled tasks
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(2);
// Example with FixedThreadPool
for (int i = 0; i < 10; i++) {
final int taskId = i;
fixedPool.execute(() -> {
System.out.println("Task " + taskId + " executed by " +
Thread.currentThread().getName());
});
}
// Proper shutdown
fixedPool.shutdown(); // Don't accept new tasks
try {
// Wait max 60 seconds for completion
if (!fixedPool.awaitTermination(60, TimeUnit.SECONDS)) {
fixedPool.shutdownNow(); // Force shutdown
}
} catch (InterruptedException e) {
fixedPool.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
Callable and Future
import java.util.concurrent.*;
import java.util.List;
import java.util.ArrayList;
// Callable: like Runnable but can return a value
class CalculateFactorial implements Callable<Long> {
private final int number;
public CalculateFactorial(int number) {
this.number = number;
}
@Override
public Long call() throws Exception {
long result = 1;
for (int i = 2; i <= number; i++) {
result *= i;
Thread.sleep(100); // Simulate work
}
return result;
}
}
public class CallableDemo {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(3);
// submit() returns a Future
Future<Long> future = executor.submit(new CalculateFactorial(10));
// Other operations while calculation is in progress...
System.out.println("Calculation in progress...");
// get() blocks until result is ready
Long result = future.get(); // Can throw ExecutionException
System.out.println("10! = " + result);
// get() with timeout
Future<Long> future2 = executor.submit(new CalculateFactorial(20));
try {
Long result2 = future2.get(2, TimeUnit.SECONDS);
System.out.println("20! = " + result2);
} catch (TimeoutException e) {
System.out.println("Timeout! Cancelling task...");
future2.cancel(true); // Interrupt the task
}
// invokeAll: execute all tasks
List<Callable<Long>> tasks = new ArrayList<>();
tasks.add(new CalculateFactorial(5));
tasks.add(new CalculateFactorial(6));
tasks.add(new CalculateFactorial(7));
List<Future<Long>> futures = executor.invokeAll(tasks);
for (Future<Long> f : futures) {
System.out.println("Result: " + f.get());
}
executor.shutdown();
}
}
CompletableFuture
CompletableFuture (Java 8+) enables more elegant asynchronous programming with operation chaining and error handling.
import java.util.concurrent.*;
public class CompletableFutureDemo {
public static void main(String[] args) throws Exception {
// Async execution without value
CompletableFuture<Void> cf1 = CompletableFuture.runAsync(() -> {
System.out.println("Async task in background");
});
// Async execution with value
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Result from future";
});
// Chaining: thenApply, thenAccept, thenRun
CompletableFuture<Integer> pipeline = CompletableFuture
.supplyAsync(() -> "42")
.thenApply(s -> Integer.parseInt(s)) // String -> Integer
.thenApply(n -> n * 2); // Integer -> Integer
System.out.println("Pipeline result: " + pipeline.get());
// Error handling
CompletableFuture<Integer> withError = CompletableFuture
.supplyAsync(() -> {
if (true) throw new RuntimeException("Simulated error!");
return 42;
})
.exceptionally(ex -> {
System.out.println("Error handling: " + ex.getMessage());
return -1; // Default value
});
System.out.println("With error handling: " + withError.get());
// Combining multiple CompletableFutures
CompletableFuture<String> futureA = CompletableFuture
.supplyAsync(() -> "Hello");
CompletableFuture<String> futureB = CompletableFuture
.supplyAsync(() -> "World");
// thenCombine: combine two futures
CompletableFuture<String> combined = futureA
.thenCombine(futureB, (a, b) -> a + " " + b);
System.out.println(combined.get()); // Hello World
// allOf: wait for all to complete
CompletableFuture<Void> all = CompletableFuture.allOf(futureA, futureB);
all.get();
// anyOf: returns as soon as one completes
CompletableFuture<Object> first = CompletableFuture.anyOf(futureA, futureB);
System.out.println("First completed: " + first.get());
}
}
Atomic Classes and Concurrent Collections
Atomic* classes and ConcurrentCollections provide thread-safety without explicit synchronization.
import java.util.concurrent.atomic.*;
public class AtomicDemo {
// AtomicInteger: atomic operations on int
private static AtomicInteger counter = new AtomicInteger(0);
// AtomicLong, AtomicBoolean, AtomicReference
private static AtomicLong bigCounter = new AtomicLong(0);
private static AtomicBoolean flag = new AtomicBoolean(false);
private static AtomicReference<String> reference =
new AtomicReference<>("initial");
public static void main(String[] args) throws InterruptedException {
// Atomic operations
counter.incrementAndGet(); // ++counter
counter.decrementAndGet(); // --counter
counter.addAndGet(10); // counter += 10
counter.getAndAdd(5); // return counter; counter += 5
// Compare and Set (CAS)
boolean success = counter.compareAndSet(15, 20);
System.out.println("CAS succeeded: " + success);
// updateAndGet with function
counter.updateAndGet(n -> n * 2);
// Concurrent test
Runnable increment = () -> {
for (int i = 0; i < 1000; i++) {
counter.incrementAndGet();
}
};
Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) {
threads[i] = new Thread(increment);
threads[i].start();
}
for (Thread t : threads) {
t.join();
}
// With atomic: always 10000
System.out.println("Final counter: " + counter.get());
}
}
Concurrent Collections
import java.util.concurrent.*;
import java.util.*;
public class ConcurrentCollectionsDemo {
public static void main(String[] args) {
// ConcurrentHashMap: thread-safe HashMap
ConcurrentMap<String, Integer> map = new ConcurrentHashMap<>();
// Atomic operations
map.put("A", 1);
map.putIfAbsent("B", 2); // Only if absent
map.compute("A", (k, v) -> v + 10); // Update atomically
map.merge("C", 1, Integer::sum); // Insert or merge
// CopyOnWriteArrayList: great for many reads, few writes
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
list.add("element1");
list.add("element2");
// Safe to iterate during modifications
for (String s : list) {
System.out.println(s);
list.add("new"); // Doesn't cause ConcurrentModificationException
}
// BlockingQueue: for producer-consumer pattern
BlockingQueue<String> queue = new LinkedBlockingQueue<>(10);
// Producer
new Thread(() -> {
try {
queue.put("message"); // Blocks if full
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
// Consumer
new Thread(() -> {
try {
String msg = queue.take(); // Blocks if empty
System.out.println("Received: " + msg);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
// ConcurrentSkipListMap/Set: thread-safe TreeMap/TreeSet
ConcurrentSkipListMap<Integer, String> skipMap =
new ConcurrentSkipListMap<>();
skipMap.put(1, "one");
skipMap.put(2, "two");
}
}
Advanced Synchronizers
import java.util.concurrent.*;
public class SynchronizersDemo {
// CountDownLatch: waits for N events to occur
public static void countDownLatchDemo() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
final int id = i;
new Thread(() -> {
System.out.println("Worker " + id + " completed");
latch.countDown(); // Decrements the counter
}).start();
}
latch.await(); // Blocks until count = 0
System.out.println("All workers finished!");
}
// CyclicBarrier: synchronizes N threads at a point
public static void cyclicBarrierDemo() throws Exception {
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("All ready, let's go!");
});
for (int i = 0; i < 3; i++) {
final int id = i;
new Thread(() -> {
try {
System.out.println("Thread " + id + " waiting at barrier");
barrier.await(); // Wait for others
System.out.println("Thread " + id + " continues");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
// Semaphore: limits access to resource
public static void semaphoreDemo() {
Semaphore semaphore = new Semaphore(2); // Max 2 accesses
for (int i = 0; i < 5; i++) {
final int id = i;
new Thread(() -> {
try {
semaphore.acquire(); // Request permit
System.out.println("Thread " + id + " using resource");
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
semaphore.release(); // Release permit
System.out.println("Thread " + id + " released");
}
}).start();
}
}
public static void main(String[] args) throws Exception {
System.out.println("=== CountDownLatch ===");
countDownLatchDemo();
Thread.sleep(1000);
System.out.println("\n=== CyclicBarrier ===");
cyclicBarrierDemo();
Thread.sleep(1000);
System.out.println("\n=== Semaphore ===");
semaphoreDemo();
}
}
Producer-Consumer Pattern
import java.util.concurrent.*;
class Producer implements Runnable {
private final BlockingQueue<Integer> queue;
private final int maxProducts;
public Producer(BlockingQueue<Integer> queue, int maxProducts) {
this.queue = queue;
this.maxProducts = maxProducts;
}
@Override
public void run() {
try {
for (int i = 0; i < maxProducts; i++) {
int product = i;
queue.put(product); // Blocks if queue is full
System.out.println("Produced: " + product +
" [queue: " + queue.size() + "]");
Thread.sleep(100);
}
queue.put(-1); // End signal
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
class Consumer implements Runnable {
private final BlockingQueue<Integer> queue;
public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
Integer product = queue.take(); // Blocks if queue is empty
if (product == -1) {
queue.put(-1); // Propagate end signal
break;
}
System.out.println("Consumed: " + product);
Thread.sleep(200);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public class ProducerConsumerDemo {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
Thread producer = new Thread(new Producer(queue, 10));
Thread consumer1 = new Thread(new Consumer(queue));
Thread consumer2 = new Thread(new Consumer(queue));
producer.start();
consumer1.start();
consumer2.start();
}
}
Best Practices
Rules for Thread-Safe Code
- Minimize shared state: prefer immutable objects
- Use concurrent classes: ConcurrentHashMap, AtomicInteger...
- Prefer ExecutorService: avoid manual thread creation
- Always release locks: use try-finally
- Avoid deadlocks: order lock acquisition, use timeouts
- Don't ignore InterruptedException: propagate or restore flag
- Document thread-safety: indicate if a class is thread-safe







