The concept of Java concurrency has been around since the early versions of the language. It was introduced to perform multiple tasks simultaneously within a single program. Therefore, previous developers primarily wrote single-threaded programs because most systems at the time had single-core processors. With the rise of multi-core processors and high-performance computing in computer science, the need for effective Java concurrency became more critical. As a result, we can observe numerous developments in the concurrent application development domain carried out by Java development companies.

In this Java concurrency tutorial, we’ll deep dive into all the important concepts associated with concurrent programming with proper code implementations.

1. Understanding Concurrency in Java

Concurrency in Java allows programs to perform multiple tasks simultaneously by managing their execution through concurrency APIs. This doesn’t necessarily mean that the tasks run at the exact same instant, especially on single-core processors. Instead, their execution overlaps in time, giving the illusion of parallelism. The package java.util.concurrent is used for concurrent programming in Java. Thread is the base of Java concurrency, which we’ll learn in the upcoming section.

2. Understanding Threads in Java

A thread is a lightweight process, i.e., containing a small set of instructions that can run in parallel with other threads within the same program. Threads are subprocesses that access the same memory space and resources of their parent process but execute independently. Java applications begin with a single thread of execution referred to as the main thread, created by the JVM. This main thread is responsible for executing the main() method.

3. Java Concurrency: Thread Creation

Threads are instances of the java.lang.Thread class. There are mainly two approaches to creating threads in Java to achieve concurrency:

3.1 Extending the Thread Class

Here, we’ll create a new class, MyThread, that inherits from the Thread class. This makes MyThread itself a type of thread. This will allow us to override the run() method of the Thread class in our subclass. Define any tasks you want the thread to perform inside the run() method. This method is the entry point for the thread’s execution. As soon as the thread starts, the run() method gets invoked, and the code within this method will be executed.

package com.tatvasoft.concurrency; 
class MyThread extends Thread { 
    @Override  
    public void run() { 
        System.out.println("run method of Thread class"); 
    } 
}  
public class ThreadExampleWithThread { 
    public static void main(String[] args) { 
        //Thread using Thread class 
        MyThread thread1 = new MyThread(); 
        thread1.start(); 
    }  
}

3.2 Implementing the Runnable Interface

The runnable interface in Java is a functional interface that contains a single abstract method called run(). The Java class needs to implement this interface and override the run() method. This method will include the tasks to be performed by the thread during execution.

package com.tatvasoft.concurrency;  
class MyRunnable implements Runnable { 
    @Override 
    public void run() { 
        System.out.println("Run method of Runnable interface"); 
    } 
}  
public class ThreadExampleWithRunnable {  
    public static void main(String[] args) {  
        //Thread using Runnable interface 
        MyRunnable task = new MyRunnable(); 
        Thread thread2 = new Thread(task); 
        thread2.start();  
    } 
}

In the above code:

  • class MyRunnable implements Runnable: We create a class called MyRunnable that implements the Runnable interface, which means it includes the code or task that should run independently in a separate thread.  
  • @Override public void run(): We define the run() method to include the specific logic or actions the thread should perform when executed.  
  • Thread thread2=new Thread(task): We create a new Thread instance and provide it with an object of MyRunnable, which contains the task to be executed. Therefore, the thread is linked to the run() method defined in MyRunnable, and when the thread starts, it knows exactly what code to run. 
  • thread2.start(): This method is invoked on a Thread instance, thread2, to launch a new thread. Once started, the thread begins execution by automatically calling the run() method defined in its associated Runnable task. 

In modern Java development, using the Runnable interface is considered the best practice for creating threads. It provides better flexibility and allows for cleaner separation of logic, making your code more maintainable. Since Java supports single inheritance, implementing Runnable avoids the restriction of extending the Thread class, which can limit your class design. Extending the Thread is typically only necessary when you need to significantly alter its internal behavior, which is uncommon in most cases.

4. Life Cycle of Java Threads

A thread’s lifecycle starts from the moment it is created and goes through several distinct states, such as New, Runnable, Blocked, Waiting, Timed Waiting, and Terminated, until it completes its execution. Each of these states represents a specific stage in the thread’s operation. The following section will explore how a thread transitions between these states during its lifetime.

Life Cycle of Java Threads

4.1 Thread States

  1. New: This is the beginning state when a thread is created but hasn’t been started yet.
  2. Runnable: Once you call the start() method, the thread moves to the runnable state. It’s ready to run, but might wait for the CPU as the thread scheduler controls execution.
  3. Blocked: A thread is blocked when it’s waiting to gain access to a synchronized resource that’s currently locked by another thread.
  4. Waiting: This state depicts a thread waiting for an infinite time for another thread to perform a specific action, such as sending a notification to continue.
  5. Timed Waiting: Here, the thread waits for a set amount of time for example, during calls to sleep(), join(), or wait(timeout).
  6. Terminated: A thread enters this final state once it finishes execution or is stopped due to an error or external interruption.

4.2 Thread Methods

Java provides several methods to control thread behavior. These methods help start, pause, interrupt, and manage threads’ lifecycles, as well as ensure proper synchronization and communication between threads.

  1. start(): Initiates the execution of a thread by calling its run() method in a separate call stack.
  2. sleep(long millis): Puts the currently executing thread on hold for a specified number of milliseconds, temporarily pausing its execution.
  3. join(): Makes the current thread wait until the other thread on which join() is invoked has completed its execution.
  4. interrupt(): Sends an interrupt signal to a thread, which can stop it if it’s in a blocked or sleeping state.
  5. setPriority(int priority): Assigns a priority level to a thread, ranging from 1 (lowest) to 10 (highest), with 5 being the normal/default level.
  6. getName(): Retrieves the name assigned to a thread.
  7. getPriority(): Fetches the current priority value of a thread.

4.3 Thread Communication

Threads many times require communicating and coordinating with one another to perform concurrent tasks effectively.

In earlier versions of Java, methods like suspend(), stop(), and resume() were used to control threads. However, these methods have been deprecated because they can lead to serious problems such as deadlocks, race conditions, and inconsistent thread behavior. While methods like wait(), notify(), and notifyAll() from the object class are still available and commonly used for thread communication, they require careful handling and synchronization to avoid issues.

Let’s discuss these three methods in detail:

  1. wait(): Tells the current thread to release the lock and go into a waiting state until another thread notifies it.
  2. notify(): Wakes up only one thread that is waiting on the object’s monitor.
  3. notifyAll(): Wakes up all threads that are waiting for their turn on the object’s monitor.

The following diagram illustrates how inter-thread communication works in Java:

  1. Multiple threads want to acquire a shared resource by attempting to acquire its lock.
  2. Only a single thread can hold the lock and execute the synchronized code at a particular time.
  3. If the thread calls the wait() method on the object, it releases the lock and moves into a waiting state, pausing its execution until further notice. If wait() is not called, the thread simply releases the lock after finishing its task.
  4. When another thread calls notify() or notifyAll() on the object, the waiting thread(s) are awakened and move back to the runnable (ready) state.
  5. Once notified, the thread tries to reacquire the lock to resume execution.
  6. After its operation, the thread releases the lock and exits the monitor, letting other threads acquire the lock and continue their execution.
Thread Communication

In modern Java development, it’s recommended to use the more robust and flexible tools, such as the ExecutorService interface for managing thread pools, Lock and ReentrantLock classes for more precise control over synchronization, and Condition objects for thread signaling.

package com.tatvasoft.concurrency;
import java.util.LinkedList;
import java.util.Queue;
public class ThreadCommunication {
private static final Queue<Integer> queue = new LinkedList<>();
private static final int QUEUE_CAPACITY = 10;
private static final Runnable thread1 = new Runnable() {
    @Override
    public void run() {
        while (true) {
            synchronized (queue) {
                // Wait till the queue is full
                while (queue.size() == QUEUE_CAPACITY) {
                    try {
                        System.out.println("Queue is full, and the thread 
                        goes into waiting state");
                        queue.wait();
                    } catch (InterruptedException e) {
                        System.out.println("An error occurred during 
                        processing: " + e.getMessage());
                    }
                }
                // Add items to the queue
                queue.add(10);
                System.out.println("10 added to the queue");
                // Notify all waiting threads
                queue.notifyAll();
 
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    System.out.println("An error occurred during sleep: " + 
                    e.getMessage());
                }
            }
        }
    }
};
private static final Runnable thread2 = new Runnable() {
    @Override
    public void run() {
        while (true) {
            synchronized (queue) {
                // Wait till the queue is not empty
                while (queue.isEmpty()) {
                    try {
                        System.out.println("Queue is empty");
                        queue.wait();
                    } catch (InterruptedException e) {
                        System.out.println("An error occurred during 
                        processing: " + e.getMessage());
                    }
                }
                // Remove items from the queue
                queue.remove();
                System.out.println("Removed 10 from the queue");
                // Notify all waiting threads
                queue.notifyAll();
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    System.out.println("An error occurred during sleep: " + 
                    e.getMessage());
                }
            }
        }
    }
};
 
public static void main(String[] args) {
    System.out.println("Main thread started");
    Thread myThread1 = new Thread(thread1);
    Thread myThread2 = new Thread(thread2);
    myThread1.start();
    System.out.println("Thread1 starts");
    myThread2.start();
    System.out.println("Thread2 starts");
    System.out.println("Main thread exit");
}       
}

Output

Output

4.4 Thread Priority

Java, as an object-oriented language, operates in a multithreaded environment where multiple threads can run concurrently. At the time of thread creation, a priority number is assigned to every thread. You can determine the significance of a particular thread over the other with the help of thread priority. The thread scheduler determines which thread gets processor time, primarily based on the thread’s priority. This priority can either be set automatically by the Java Virtual Machine (JVM) or manually defined by the developer.

Higher-priority threads are generally given preference by the scheduler, although the exact behavior may vary depending on the underlying platform. These priority levels are represented by numbers from 1 to 10. Here, 1 denotes the lowest priority while 10 denotes the highest.

public static int NORM_PRIORITYSets the default priority for the Thread. (Priority: 5)
public static int MIN_PRIORITYSets the Minimum Priority for the Thread. (Priority: 1)
public static int MAX_PRIORITYSets the Maximum Priority for the Thread. (Priority: 10)
  • The thread with the maximum priority amongst all will be executed first. 
  • By default, the priority of the main thread is 5, which you can change in the future. 
  • Let’s suppose two threads have the same priority, who will execute first and why? The thread scheduler’s algorithm, such as Round-Robin, First Come First Serve, etc., decides the order of execution.
package com.tatvasoft.concurrency;
public class ThreadPriority extends Thread {
    @Override
    public void run() {
        System.out.println("Priority of the thread " 
                + Thread.currentThread().getName() 
                + " is " + Thread.currentThread().getPriority());
    }
    public static void main(String[] args) {
        // Creating multiple threads
        ThreadPriority thread1 = new ThreadPriority();
        ThreadPriority thread2 = new ThreadPriority();
        ThreadPriority thread3 = new ThreadPriority();
        System.out.println("Default priority of thread1 is: " + 
        thread1.getPriority());
        System.out.println("Default priority of thread2 is: " + 
        thread2.getPriority());
        System.out.println("Default priority of thread3 is: " + 
        thread3.getPriority());
        // Explicitly setting the priorities
        // You can set it between 1 and 10, otherwise it will throw an error
        thread1.setPriority(1);
        thread2.setPriority(10);
        thread3.setPriority(6);
        System.out.println("Priority of thread1 is: " + 
        thread1.getPriority());
        System.out.println("Priority of thread2 is: " + 
        thread2.getPriority());
        System.out.println("Priority of thread3 is: " + 
        thread3.getPriority());
        // Starting the threads
        thread1.start();
        thread2.start();
        thread3.start();
    }
}

Output

Output

4.5 Thread Synchronization

In a multi-threaded environment, synchronization ensures that only one thread gets exclusive access to a critical section of code or a shared resource at a particular point in time. This helps maintain data integrity and reliability throughout the execution of a Java program.

package com.tatvasoft.concurrency;
public class ThreadSynchronization {
    private static int c = 0;
    public static void increment() {
        for(int i = 0; i < 1000; i++){
            c++;
        }
    }
    public static void main(String[] args) {
       Thread thread1 = new Thread(ThreadSynchronization::increment);
       Thread thread2 = new Thread(ThreadSynchronization::increment);
        thread1.start();
        thread2.start();
        try{
            thread1.join();
            thread2.join();
        }catch (InterruptedException e){
            System.out.println("An error occurred during processing : 
            "+e.getMessage());
        }
        System.out.println("Final counter value is :" + 
        ThreadSynchronization.c);
    }
}

When you run this program, the output will vary each time, typically falling within the range of 1000 to 2000, due to the asynchronous nature of thread execution.

In Java, synchronization is based on the intrinsic lock or monitor lock mechanism. Every Java object has an associated lock. When a thread needs exclusive and consistent access to an object’s data, it must first acquire that object’s lock. Once the thread has finished working with the object, it must release the lock so other threads can access it. This locking mechanism monitors threads entering critical sections and ensures that only a single thread can access that part of the code at a time, helping to prevent issues like race conditions and maintaining data integrity in concurrent applications.

The synchronized keyword, when applied to methods or blocks of code, enforces synchronization.

public static synchronized void increment() {
        for (int i = 0; i < 1000; i++) {
            c++;
        }
    }

4.6 Thread Pool

A thread pool in Java is a group of pre-created threads that are kept ready to execute tasks. Instead of creating a new thread every time a task needs to run, you can borrow a thread from the pool, use it, and then return it to the pool for reuse. This approach saves the overhead of generating and destroying threads repeatedly, making applications more efficient, especially when handling a large number of tasks.

The ExecutorService interface and classes like ThreadPoolExecutor in the java.util.concurrent package gives built-in support for thread pools.

Key Components of a Thread Pool

  • Core Pool Size: This refers to the minimum number of threads that are always kept alive in the pool, even when they are performing any task. Threads up to the core size are created when tasks arrive and remain ready for future tasks.
  • Maximum Pool Size: This indicates the maximum number of threads that can be active in the pool at any given time. If the task queue is full and all core threads are occupied, new threads are created up to this maximum size.
  • Task Queue: This is a queue that temporarily holds tasks awaiting execution by the threads in the pool. It serves as a buffer between incoming tasks and busy threads.
  • Keep-Alive Time: The maximum duration that excess (non-core) threads can remain idle before being terminated is referred to as the keep-alive time. If a thread stays inactive for longer than the keep-alive time, it is shut down.

Types of Thread Pools in Java

  1. Fixed Thread Pool: As the name suggests, this is a thread pool with a definite number of threads. New tasks will wait in a queue if all threads are busy.
  2. Cached Thread Pool: It is a flexible thread pool that creates new threads as needed but reuses existing ones when available. It is beneficial for executing numerous short-lived asynchronous tasks.
  3. Single-Thread Executor: A thread pool consisting of only one thread to execute tasks. Tasks are executed in a consecutive order of their submission.
  4. Scheduled Thread Pool: It schedules tasks to run after a wait period or executes tasks periodically at specific intervals. It’s useful for time-based operations like setting timers, reminders, or repeated background tasks.

5. Concurrency Utilities

Java provides a comprehensive set of concurrency utilities, i.e., classes and interfaces in the java.util.concurrent package to help manage multithreading and synchronization easily and efficiently. These utilities make it safer and easier to build concurrent applications.

The following is a list of important components of the Java concurrency tutorial:

  • Executor Framework
  • CountDownLatch
  • CyclicBarrier
  • Phaser
  • Semaphore
  • Lock
  • ReentrantLock
  • Callable
  • Future
  • ThreadFactory
  • Exchanger
  • Concurrent Collections

We’ll now study all these components in detail.

5.1 Executor Framework

The Java Executor framework is a multithreading management system that handles the thread creation, management, and execution without the overhead of manually using the Thread class.

Key Constituents of the Java Executor Framework:

  1. Executor Interface: It is the central interface having a single method, execute(Runnable command), that takes and submits a task to the thread pool for execution.
  2. ExecutorService Interface: It extends the Executor interface and provides additional methods for submitting tasks, managing their lifecycle, and retrieving return values.
  3. ThreadPoolExecutor: It manages and reuses a pool of worker threads for concurrent programming.
  4. ScheduledExecutorService: It extends ExecutorService and schedules tasks to execute periodically with fixed-rate or fixed-delay execution.

5.2 CountDownLatch

CountDownLatch is a powerful synchronization tool in concurrent programming that enables one or more threads to pause execution until a specified number of events, carried out by other threads, are completed. It keeps track of a countdown and releases the waiting threads as soon as the count becomes zero.

Key Methods of CountDownLatch:

  1. countDown(): Threads completing their task call the countDown() method. Each time it’s called, the count is reduced by one. As soon as the count becomes zero, all threads waiting on await() are set free.
  2. await(): It blocks the present thread until the latch’s count becomes zero. If the count is already zero when the method is called, the thread does not block and continues immediately.
  3. getCount(): It returns the current count value of the latch to check how many tasks are still pending.
package com.tatvasoft.concurrency;
 
import java.util.concurrent.CountDownLatch;
public class CountDownLatchExample {
    public static void main(String[] args) {
        int totalThreads = 3;
        CountDownLatch latch = new CountDownLatch(totalThreads);
        for(int i = 0; i < totalThreads; i++){
            new Thread(new Worker(latch)).start();
        }
        try{
            latch.await();
            System.out.println("All threads have finished their tasks");
        }catch (InterruptedException e){
            System.out.println("Main thread interrupted : "+ 
            e.getMessage());
        }
    }
    static class Worker implements Runnable{
        private final CountDownLatch latch;
        public Worker(CountDownLatch latch){
            this.latch = latch;
        }
        @Override
        public void run(){
            try{
                Thread.sleep(5000);
                System.out.println(Thread.currentThread().getName() + " has 
                finished its task.");
            }catch (InterruptedException e){
                Thread.currentThread().interrupt();
            }finally {
                latch.countDown();
            }
        }
    }
}

5.3 CyclicBarrier

The CyclicBarrier class helps a group of threads reach a common barrier point by making them wait for each other. Once all threads have reached the barrier, they are all released to continue execution. Unlike CountDownLatch, it can be reused. It’s primarily used for coordinating threads working in parallel on different tasks and synchronizing at certain checkpoints.

Key Methods of CyclicBarrier:

  • await(): Each thread calls the await() method to notify that it has reached the designated barrier point. If a thread arrives before the others, it will be paused until all threads reach the barrier. This method may throw an InterruptedException if the thread is interrupted, or a BrokenBarrierException if the barrier is broken before all threads arrive.
  • getNumberWaiting(): This method returns the count of threads currently waiting at the barrier. You can track how many threads are still waiting for others to reach the barrier before proceeding.
  • reset(): It resets the barrier, restoring it to its initial state. It’s useful when you want to reuse the barrier after it has been tripped, allowing you to start a new round of synchronization with a fresh count.
  • isBroken(): It checks if the barrier has been broken. A barrier is considered broken when a thread is waiting, but the barrier is reset or interrupted prematurely. If the barrier is broken, any subsequent calls to await() will result in a BrokenBarrierException.
package com.tatvasoft.concurrency;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierExample {
    public static void main(String[] args) {
        int totalThreads = 3;
        CyclicBarrier cyclicBarrier = new CyclicBarrier(totalThreads, () ->   
        {
            System.out.println("All threads reached the barrier. Starting 
              the next phase ");
        });
        for(int i = 0; i < totalThreads; i++){
            new Thread(new Worker(cyclicBarrier)).start();
        }
    }
    static class Worker implements Runnable{
        private final CyclicBarrier cyclicBarrier;
        public Worker(CyclicBarrier cyclicBarrier){
            this.cyclicBarrier = cyclicBarrier;
        }
        @Override
        public void run(){
            try{
                Thread.sleep(5000);
                System.out.println(Thread.currentThread().getName() + " has 
                   completed the task");
                cyclicBarrier.await();
            }catch (Exception e){
                System.out.println("An error occurred: " + e.getMessage());
            }
        }
    }
}

5.4 Phaser

Phaser is quite advanced synchronization mechanism than CyclicBarrier and CountDownLatch. It is designed to handle scenarios where threads need to coordinate and synchronize their execution across multiple phases or stages. Unlike other synchronization aids, the Phaser is highly flexible and allows for more dynamic thread coordination, supporting not only a fixed number of threads but also enabling threads to join or leave the phase during execution.

How Phaser Works?

  • Phase: A phase represents a particular stage or checkpoint in thread execution. Threads are synchronized at these phases, and once all threads reach a phase, they can proceed to the next phase. Multiple phases allow threads to work through several stages of a task.
  • Parties: Parties refer to the threads or tasks involved in a given phase. When a Phaser is initialized, it is set up with a certain number of parties, which is typically the number of threads. This number can change dynamically as threads can be registered or deregistered during execution.
  • Advance: To move from one phase to the next, all threads in the current phase must reach the synchronization point. This is done using the arriveAndAwaitAdvance() method, which blocks the threads until all participating threads arrive at the phase. Other methods like arrive() or arriveAndDeregister() can be used when a thread wants to signal its arrival without waiting for others or to deregister itself from further synchronization.

Key Methods of Phaser:

  • register(): Adds a new thread (also called a party) to the phaser. This increases the total number of threads that the phaser will wait for before moving to the next phase.
  • arrive(): Signals that a thread has reached the current synchronization point, but doesn’t wait for others. The thread proceeds immediately, and the phaser updates the arrival count.
  • arriveAndAwaitAdvance(): It tells the phaser the thread has arrived and then makes it wait until all other registered threads also reach that point. Once all are ready, the phase advances.
  • arriveAndDeregister(): Similar to arrive(), but also removes the thread from the phaser. This is useful when a thread no longer needs to participate in future synchronization phases.
  • getPhase(): Returns the number that identifies the current phase of execution. This helps track progress across phases.
  • awaitAdvance(int phase): Makes the thread wait until the specified phase is completed by all participating threads.
  • onAdvance(int phase, int registeredParties): A method that can be overridden to add custom logic that runs right before the phase changes. It’s useful for controlling how or when the transition should happen.
  • getRegisteredParties(): Returns the total number of threads that are currently registered and expected to synchronize in this phase.
package com.tatvasoft.concurrency;
import java.util.concurrent.Phaser;
public class PhaserExample {
    public static void main(String[] args) {
        Phaser phaser = new Phaser(3);
        for(int i = 0; i < 3; i++){
            new Thread(new Task(phaser)).start();
        }
    }
    static class Task implements Runnable{
        private final Phaser phaser;
        public Task(Phaser phaser){
            this.phaser = phaser;
        }
        @Override
        public void run() {
            // Wait for other threads at phase1
            phaser.arriveAndAwaitAdvance();
            // Wait for other threads at phase2
            phaser.arriveAndAwaitAdvance();
            //Wait for other threads at phase3
            phaser.arriveAndAwaitAdvance();
        }
    }
}

5.5 Semaphore

Semaphore is a concurrency construct that restricts concurrent access to a resource by multiple threads with the help of permits. Threads need to acquire or release these permits to enter or exit a critical section of concurrent programs. The permit count determines the number of threads accessing the resource simultaneously.

Types of Semaphores:

  1. Counting Semaphore: It lets a particular number of threads concurrently access a resource.
  2. Binary Semaphore (Mutex): Here, the permit count is 0 or 1, i.e., only a single thread can access a resource at any given time. A binary semaphore is also known by another name, i.e., a mutex (mutual exclusion) lock.

Key Methods of Semaphore:

  • acquire(): Threads acquire a permit to enter a critical section. If available, it proceeds; otherwise, it gets blocked.
  • release(): The Threads release permits the semaphore to be acquired by others.
  • availablePermits(): Returns the number of permits available in the semaphore at any given time.
  • tryAcquire(): It checks if a permit is available and takes it if possible, returning true; if not, it returns false instantly without waiting.
  • reducePermits(int reduction): This method decreases the number of available permits by a given value, limiting future acquisitions.
package com.tatvasoft.concurrency;
import java.util.concurrent.Semaphore;
public class SemaphoreExample {
 
    private static final Semaphore semaphore = new Semaphore(3);
    public static void main(String[] args) {
        for (int i = 0; i < 5; i++) {
            new Thread(new Worker(i)).start();
        }
    }
    static class Worker implements Runnable {
        private final int id;
        public Worker(int id) {
            this.id = id;
        }
        @Override
        public void run() {
            try {
                System.out.println("Worker " + id + " is waiting for a 
                       permit");
                semaphore.acquire();
                System.out.println("Worker " + id + " has acquired a 
                       permit");
                Thread.sleep(2000);
                System.out.println("Worker " + id + " has released a 
                       permit");
                semaphore.release();
            } catch (InterruptedException e) {
                System.out.println("Worker " + id + " was interrupted: " + 
                       e.getMessage());
            }
        }
    }
}
Output

5.6 Lock

Locks see to it that only a single thread accesses a specific resource at a particular instance. The java.util.concurrent.locks package provides the Lock interface and its implementations.

ReentrantLock

ReentrantLock is the most popular implementation of the Lock interface. Here, the term “reentrant” signifies that a thread can acquire and release the same lock multiple times. When a thread first acquires a ReentrantLock, a hold count is automatically set to one. The hold count increments every time the same thread re-enters the lock. The lock is released only when the hold count reaches zero.

package com.tatvasoft.concurrency;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockExample {
    private static final Lock lock = new ReentrantLock();
    public static void main(String[] args) {
        Thread thread1 = new Thread(new Task());
        Thread thread2 = new Thread(new Task());
        thread1.start();
        thread2.start();
    }
    static class Task implements Runnable{
        @Override
        public void run() {
            lock.lock();
            try {
                System.out.println(Thread.currentThread().getName() +
 " has acquired a lock");
                Thread.sleep(1000);
                System.out.println(Thread.currentThread().getName() +
 " released the lock");
            }catch (InterruptedException e){
                Thread.currentThread().interrupt();
            }finally {
                lock.unlock();
            }
        }
    }
}

Output

Output

5.7 Callable

The Callable interface differs from the Runnable interface in one aspect, i.e., its call() method can return a result in the form of any object and can throw both checked and unchecked exceptions. The call() method defines the return type using generics. Callable is used in asynchronous programming to fetch a result or handle any exceptions that may occur while executing the program.

The callable interface contains only the call() method that needs to be implemented when you define a Callable task.

package com.tatvasoft.concurrency;
import java.util.concurrent.Callable;
public class CallableExample implements Callable {
    @Override
    public Object call() throws Exception {
        return String.valueOf(12);
    }    
}

The ExecutorService framework provides methods to submit Callable tasks and retrieve results. The ExecutorService will give you a Future object on submitting a Callable task. 

When you submit a Callable task to an ExecutorService, it returns a Future object.

5.8 Future

In Java concurrency, the Future interface represents the result of an asynchronous operation. It is commonly used with ExecutorService to manage and retrieve the outcome of tasks that run in a separate thread. We read above that ExecutorService returns a Future object on submitting a Callable task. This object acts like a placeholder for the result, allowing the main thread to continue executing other code while the task executes in the background. Future interface provides methods to check the task’s status, retrieve the result, and handle exceptions that occur during execution. After completion of the task, the result can be retrieved using the get() method.

Implementation of the Future:

  • FutureTask: This is a concrete class that implements the Future interface and can be used to represent a task that an ExecutorService can run. It acts as a wrapper around both Runnable and Callable, allowing the task to be executed either synchronously or asynchronously.
  • ScheduledFuture: This is a specialized form of Future used with the ScheduledExecutorService. It represents tasks that are scheduled to run after a delay or periodically, and allows retrieval of the result once the task completes.
  • CompletableFuture: A more advanced implementation of Future, CompletableFuture supports building complex asynchronous workflows. It allows chaining, combining multiple tasks, and handling exceptions efficiently, all while promoting non-blocking programming.  

Key Methods of the Future Interface:

  • get(): Retrieves the computation result. It blocks the thread until the result is out and throws an ExecutionException if an exception occurs during computation.
  • get(Long timeout, TimeUnit unit): Unlike get(), this method waits for the result until a specific time interval. It throws a TimeoutException on the unavailability of the result within the specified duration.
  • cancel(boolean mayInterruptIfRunning): This method tries to cancel a running task. If the task is already completed, canceled, or hasn’t started yet, it returns false. If the task is still running and mayInterruptIfRunning is true, it attempts to interrupt the task and returns true if successful.
  • isCancelled(): Returns true if the task was cancelled before or after its normal completion.
  • isDone(): This method returns true on the completion of the task, whether normally, via exception, or cancelled.
package com.tatvasoft.concurrency;
import java.util.concurrent.*;
public class FutureTaskExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        Callable<String> task = () -> {
            Thread.sleep(2000);
            return "Task Completed";
        };
        Future<String> future = executorService.submit(task);
        try{
            //Result from the task
            future.get();
            future.get(2, TimeUnit.SECONDS);
        } catch (ExecutionException | InterruptedException | TimeoutException e) {
            throw new RuntimeException(e);
        } finally {
            executorService.shutdown();
        }
    }
}

5.9 CompletableFuture

CompletableFuture, introduced in Java 8, implements the Future and CompletionStage interfaces. It provides a flexible non-blocking composition API for asynchronous programming.

Key Methods of CompletableFuture:

  • supplyAsync(Supplier<U> supplier): Runs a task asynchronously that produces a result, typically used for background computations.
  • runAsync(Runnable runnable): Executes a task asynchronously without returning any result, useful for operations that perform actions rather than calculations.
  • completedFuture(T value): Creates a CompletableFuture that is already terminated with a given result.
  • thenApply(): Used to apply a function to the result of the future once it’s available, transforming the output.
  • thenAccept(): It takes the result of the future once it’s available, but doesn’t return any value.
  • thenCompose(): Allows chaining of tasks where the upcoming task is dependent on the outcome of the preceding one and returns another CompletableFuture.
  • thenCombine(): Combines two independent CompletableFuture tasks and merges their results.
  • allOf(): Waits for multiple CompletableFuture instances to complete before proceeding.
  • complete(): This lets you manually set a value to complete a future task, even if the actual task hasn’t been finished.

CompletableFuture also Provides Methods for Error Handling, Such as:

  • exceptionally(): Catches any exceptions during the asynchronous operation and allows you to return an alternate result as a fallback.
  • handle(): Processes both the successful result and any exception that may have occurred, giving you full control to handle both outcomes in one place.
  • whenComplete(): Runs a specific action after the task finishes, regardless of whether it completed successfully or encountered an error.
package com.tatvasoft.concurrency;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureExample {
    public static void main(String[] args) throws ExecutionException,
 InterruptedException {
        CompletableFuture<Integer> supplyFuture = 
          CompletableFuture.supplyAsync(() -> {
            System.out.println("Executing async task");
            return 1;
        });
        CompletableFuture<Integer> transformedFuture = 
          supplyFuture.thenApply(result -> {
            System.out.println("Transformed result: " + result);
            return result;
        });
        transformedFuture.thenAccept(result -> {
            System.out.println("Final result after transformation: " + 
                 result);
        });
        CompletableFuture<Integer> exceptionalFuture = 
           CompletableFuture.supplyAsync(() -> {
            if (Math.random() > 0.5) {
                throw new RuntimeException("Something went wrong!");
            }
            return 5;
        }).exceptionally(ex -> {
            System.out.println("Handled exception: " + ex.getMessage());
            return -1;
        });
        CompletableFuture<Integer> future1 = 
                CompletableFuture.supplyAsync(() -> 10);
        CompletableFuture<Integer> future2 = 
                CompletableFuture.supplyAsync(() -> 20);
        CompletableFuture<Void> allFuture = CompletableFuture.allOf(future1, 
            future2);
        allFuture.thenRun(() -> {
            System.out.println("Both futures are completed");
        });
 
        supplyFuture.get();
        transformedFuture.get();
        exceptionalFuture.get();
        allFuture.get();
    }
}

Output

Output

5.10 ThreadFactory

The ThreadFactory interface provides a way to customize how new threads are created, especially when working with thread pools and executors. Instead of relying on the default thread creation process, ThreadFactory allows developers to define specific behaviors or configurations for threads, such as naming conventions, priorities, or daemon status. The newThread() method creates and returns a new thread.

When using an ExecutorService, you can supply a custom ThreadFactory to control how threads are generated in the pool.

Tasks of ThreadFactory:

  1. Naming a Thread.
  2. Set the Daemon Status.
  3. Assign thread priority.
  4. Assign the thread to a given thread group.
  5. Set a handler for handling uncaught exceptions that may be thrown.
package com.tatvasoft.concurrency;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
public class ThreadFactoryExample implements ThreadFactory {
    private final String prefix;
    private int counter = 0;
    public ThreadFactoryExample(String prefix){
        this.prefix = prefix;
    }
    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(r);
        thread.setName(prefix + "-" + counter++);
        thread.setPriority(Thread.NORM_PRIORITY);
        thread.setDaemon(false);
        return thread;
    }
    public static void main(String[] args) {
        ThreadFactoryExample threadFactory = new  
               ThreadFactoryExample("Test");
        ExecutorService executorService = Executors.newFixedThreadPool(2, 
                 threadFactory);
        for(int i = 0; i < 5; i++){
            int taskId = i;
            executorService.submit(() -> {
                System.out.println(Thread.currentThread().getName() + " is 
                executing task "+ taskId);
            });
        }
        executorService.shutdown();
    }
}

Output

Output

5.11 Exchanger

An exchanger is a concurrency construct that facilitates data exchange between two concurrent tasks. It lets you define a synchronization point where two threads arrive and swap the data structure, enabling communication between them at a specific time during their execution.

Exchanger

An Exchanger is designed for pairwise data exchange between two threads. When one thread calls the exchange() method, it waits until another thread also calls exchange(). Once both threads reach the exchange point, they swap data, and each thread receives the other’s object. If only one thread arrives, it will wait until the partner thread also arrives.

You can declare the Exchanger class in the following manner:

Exchanger<V>

The Exchanger, a generic class, provides two main methods that enable two threads to swap data with each other:

  1. exchange(T x): It lets one thread pass an object to another thread while also receiving the object the other thread provides. Both threads must call this method to complete the exchange.
  2. exchange(T x, long timeout, TimeUnit unit): This version works the same way as the first, but with a time limit. If the second thread doesn’t arrive within the specified time, the method throws a TimeoutException. This helps avoid indefinite blocking in cases where the partner thread might be delayed or never arrive.
package com.tatvasoft.concurrency;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class ExchangerExample {
 
    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();
        Thread thread1 = new Thread(() -> {
            try{
                //Thread 1 is exchanging data
                String data = "Hello from thread 1";
                System.out.println("Thread 1 is exchanging data: " + data);
                //Thread 1 received response
                String response = exchanger.exchange(data, 2, 
                         TimeUnit.SECONDS);
                System.out.println("Thread 1 received response: "+response);
            }catch (InterruptedException | TimeoutException e){
                System.out.println("Exchange timed out or interrupted");
            }
        });
        Thread thread2 = new Thread(() -> {
            try{
                //Thread 2 is exchanging data
                String data = "Hello from thread 2";
                System.out.println("Thread 2 is exchanging data: " + data);
                //For timeout
                Thread.sleep(3000);
 
                //Thread 2 received response
                String response = exchanger.exchange(data);
                System.out.println("Thread 2 received response: "+response);
            }catch (InterruptedException e){
                System.out.println("An error occurred: " + e.getMessage());
            }
        });
        thread1.start();
        thread2.start();
    }
}

Output

  1. Without timeout
    Without timeout
  2. With timeout
    With timeout

5.12 Concurrent Collections

Concurrent collections in Java are specially built thread-safe data structures that safely support many threads accessing or modifying them concurrently without causing data inconsistency. These collections are optimized for performance by using advanced techniques like fine-grained locking or lock-free algorithms, which reduce contention among threads. This means multiple threads can perform operations in parallel without unnecessarily waiting for each other. They are especially useful in common multithreading scenarios such as task scheduling, thread synchronization, and data sharing.

Types of Concurrent Collections in Java:

  • CopyOnWriteArrayList
  • CopyOnWriteArraySet
  • ConcurrentHashMap
  • BlockingQueue Interface and Its Implementations
  • ConcurrentLinkedQueue and ConcurrentLinkedDeque

5.13 Parallelism

Parallelism in Java means executing two or more tasks simultaneously, typically by using multiple CPU cores. This approach enhances performance and accelerates processing, especially for tasks capable of breaking into isolated small units. Parallelism is particularly advantageous in applications that need to handle large datasets or perform complex computations rapidly.

Java Supports Parallelism through Several Built-in Tools and Frameworks:

  • ForkJoinPool: This is a special thread pool built to easily execute tasks that can be split into smaller subtasks, called forking. It uses a “divide-and-conquer” strategy where tasks are recursively broken down and executed in parallel, then combined to produce the final result. The ForkJoinPool handles a worker thread pool, and each thread can steal tasks from other threads if it finishes its own work early. This is known as the work-stealing algorithm.
    Create a subclass of RecursiveTask<T> or RecursiveAction according to the result of the task. So, the following are the two choices:
    • RecursiveTask: If the task returns a result.
    • RecursiveAction: If the task does not return a result.

    package com.tatvasoft.concurrency;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.RecursiveAction;
    import java.util.concurrent.RecursiveTask;
    public class ForkJoinPoolExample {
        private static final ForkJoinPool forkPool = new ForkJoinPool();
        private static final Long[] memo = new Long[101];
        static class Fibonacci extends RecursiveTask < Long > {
            private final long n;
            public Fibonacci(long n) {
                this.n = n;
            }
            @Override
            protected Long compute() {
                if (n <= 1) {
                    return n;
                }
                if (memo[(int) n] != null) {
                    return memo[(int) n];
                }
                Fibonacci f1 = new Fibonacci(n - 1);
                Fibonacci f2 = new Fibonacci(n - 2);
     
                f1.fork();
                Long resultF2 = f2.fork().join();
                Long resultF1 = f1.join();
                Long result = resultF1 + resultF2;
                memo[(int) n] = result;
                return result;
            }
        }
        static class SumArray extends RecursiveAction {
            private static final int threshold = 10;
            private final int[] array;
            private final int start;
            private final int end;
            public SumArray(int[] array, int start, int end) {
                this.array = array;
                this.start = start;
                this.end = end;
            }
            @Override
            protected void compute() {
                if (end - start <= threshold) {
                    int sum = 0;
                    for (int i = start; i < end; i++) {
                        sum += array[i];
                    }
                    System.out.println("Sum of range " + start + " to " +
                        (end - 1) + ": " + sum);
                } else {
                    int middle = (start + end) / 2;
                    SumArray array1 = new SumArray(array, start, middle);
                    SumArray array2 = new SumArray(array, middle, end);
                    invokeAll(array1, array2);
                }
            }
        }
        public static void main(String[] args) {
            Fibonacci question = new Fibonacci(10);
            Long result = forkPool.invoke(question);
            System.out.println("Fibonacci of 10: " + result);
            int[] array = new int[100];
            for (int i = 0; i < 100; i++) {
                array[i] = i + 1;
            }
     
            SumArray array1 = new SumArray(array, 0, array.length);
            forkPool.invoke(array1);
            forkPool.shutdown();
        }
    }

    Output

    Output
  • Java Streams API: Java 8 introduced parallel streams, which enable developers to process collections in parallel with simple syntax. Using .parallelStream(), tasks like filtering, mapping, or reducing elements in a collection are automatically distributed across multiple threads.

    package com.tatvasoft.concurrency;
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
     
    public class ParallelStreamExample {
        public static void main(String[] args) {
            List < Integer > numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
            //make a square of each value using parallel stream
            numbers.parallelStream()
                .forEach(i - > {
                    System.out.println(Thread.currentThread().getName() + " - " + i);
                });
        }
    }

    Output: You can observe that different worker threads process different elements owing to the parallel stream.

    Output

6. Real World Example of Java Concurrency

We saw almost all the important concepts in this Java concurrency tutorial necessary for writing concurrent programs on the Java platform. Let us now implement our understanding in a practical scenario, i.e., parallel data processing in an e-commerce application.

Let us suppose that our e-commerce app has to process massive customer orders, performing crucial tasks like:

  1. Inventory verification of ordered items
  2. Accurate price calculation, including any discounts or promotions
  3. Payment processing
  4. Database updation after the order gets placed successfully

The above tasks are independent as they don’t depend directly on other tasks, even though a particular task may require the result of some other task. Therefore, we can apply the concurrency model in such a scenario. We’ll divide the order processing task into the above four subtasks and use threads and parallel streams for concurrent programming.

The following are the three steps to split the above tasks into smaller parts and go for their concurrent execution to build a responsive and scalable application.

  1. Concurrency in verifying inventory: While one thread checks if an item is in stock, another thread can check the availability of a different item.
  2. Parallel processing of price calculations: Multiple threads can calculate prices for different items or orders.
  3. Parallel database updates: Each order can be processed and updated in parallel.
package com.tatvasoft.concurrency;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class OrderProcessor {
    private final ExecutorService executorService;
    public OrderProcessor(int numberOfThreads) {
      this.executorService = Executors.newFixedThreadPool(numberOfThreads);
    }
    public void processOrder(Order order) {
        Future<Boolean> inventoryCheck = executorService.submit(() -> 
                        checkInventory(order));
        Future<Double> priceCalculation = executorService.submit(() -> 
                        calculatePrice(order));
        Future<Boolean> paymentProcessing = executorService.submit(() -> 
                        processPayment(order));
        try {
            boolean isInventoryAvailable = inventoryCheck.get();
            double finalPrice = priceCalculation.get();
            boolean isPaymentSuccessFull = paymentProcessing.get();
            if (isInventoryAvailable && isPaymentSuccessFull) {
                updateDatabase(order, finalPrice);
            } else {
                System.out.println("Order processing failed for Order: " + 
                order.id());
            }
        }catch (InterruptedException | ExecutionException e){
            System.out.println("An error occurred during process");
        }
    }
    private void updateDatabase(Order order, double finalPrice) {
        System.out.println("Modifying database to store the order details 
               for order id: "+ order.getId());
        //Order details are stored in the database
    }
    private Boolean checkInventory(Order order)  {
        System.out.println("Checking availability of an inventory for order 
                  id: "+ order.getId());
 
        //Assume inventory is available
        return true;
    }
    private Double calculatePrice(Order order) {
        System.out.println("Calculating price for an order id: " + 
          order.getId());
        //return fix value
        return 100.00;
    }
    private Boolean processPayment(Order order) {
        System.out.println("Processing payment for order id: " + 
            order.getId());
 
        //Assume payment is done;
        return true;
    }
    public void shutDown(){
        executorService.shutdown();
    }
    public static void main(String[] args) {
        OrderProcessor orderProcessor = new OrderProcessor(3);
        Order order1 = new Order(1, "Bike");
        Order order2 = new Order(2, "Car");
        orderProcessor.processOrder(order1);
        orderProcessor.processOrder(order2);
        orderProcessor.shutDown();
    }
    record Order(int id, String order) {
        public int getId(){
            return id;
        }
    }
}

Output

Output

7. Benefits of Java Concurrency

The following are the key advantages of Java Concurrency:

  • Enhanced Application Performance: Applications can complete operations faster, resulting in better performance, responsiveness, and reduced waiting times.
  • Efficient Background Processing: Concurrent programming lets businesses utilize the main thread for core tasks and free it from resource-intensive and time-consuming tasks by assigning them to background threads.  
  • Increased Responsiveness: In interactive applications, concurrency ensures that long-running tasks like file downloads don’t block the main thread, keeping the application responsive to user actions.
  • Easier Modeling: Game engines and simulations involve parallel processes, which are easy to implement through Java concurrency mechanisms. 
  • Efficient Resource Utilization: Java concurrency allows programs to utilize system resources like CPU and memory by running multiple threads at once, especially on multi-core processors. 
  • Scalability: Concurrency allows applications to scale effectively by utilizing multi-core processors and distributed systems.

8. Final Words

Java concurrency is advantageous in modern-day application development scenarios. Multithreading and parallel computing are the needs of the era to build reliable and scalable applications. However, Java developers must be mindful of concurrency problems such as deadlocks and race conditions. Therefore, it’s important to ensure thread safety when multiple threads access shared data and resources. This can be achieved by correctly implementing thread-safe synchronization mechanisms and concurrency control strategies. Go through this Java concurrency tutorial to grasp the advanced techniques to build concurrent systems based on design principles like the actor model for concurrent programming.

profile-image
Rakshit Toke

Rakshit Toke is a Java technology innovator and has been managing various Java teams for several years to deliver high-quality software products at TatvaSoft. His profound intelligence and comprehensive technological expertise empower the company to provide innovative solutions and stand out in the marketplace.

Comments

Leave a message...