Banner

Saturday, January 31, 2015

BufferedLatch



























Multithreading can be hard. When you have several threads modifying and contending for shared objects, variables, and resources... many things can go wrong if you are not deliberate and careful.

In Java, that is why it is critical to utilize synchronizers like CountDownLatch, Semaphore, and Cyclic Barrier (in addition to other multithreading tools like volatile and final keywords). These all ensure thread safety or prevent them them from proceeding until certain conditions are met and it is safe to proceed.

An analogy that I find really helpful is comparing this to the Kentucky Derby. The race horses (the "threads") walk up to the gate (the "synchronizer"), but the gate is closed so they cannot proceed any further (in a state of "await()"). Once a certain condition is cleared, the gate opens ("notifyAll()") and the horses are free to charge forward.

The CountDownLatch is probably the synchronizer I use the most. Its policy is simple and effective. It starts at a specified number on construction, and decrements on each call to "countDown()". When it reaches zero, the CountDownLatch lets any other threads waiting on it pass through.

If you have three tasks each running on a separate thread, but you don't want the main thread to proceed until these three tasks are done, create a CountDownLatch and let each task countDown() it when they are done. Have the main thread await() on the CountDownLatch, and when all three tasks are done the main thread will proceed.

The BufferedLatch

However, what if we don't know what the count will be in advance? We do not have a number for the CountDownLatch to start at, but we will only know later in the process. I ran into this issue a lot dealing with buffered streams from database queries. As I looped through each database record, I wanted to kick it off into a task and submit it into a fixed threadpool. The problem is once all the records are looped through, how do I pause until all the runnables are complete that are processing those records?

ResultSet rs = //issue some query
ExecutorService executor = //create a fixed thread pool

while (rs.next()) { 
 FinanceDay financeDay = convertRecordToFinanceDay(rs);
 executor.submit(() -> addToConcurrentHashMap(financeDay));
}

//I want to wait for the executor to finish all the runnables here
rs.close();
executor.shutDown();

Looking at the conceptual code above, we are looping through each record in the query as each record comes in. But we do not know how many records there will be until rs.next() returns false and we finish looping. This is problematic if we are going to convert each record into an object (in this case "FinanceDay"), and then pass that object to a Runnable that runs on an entirely separate thread. If we don't find a synchronization solution, we have an enormous risk of creating a race condition where this method will finish prematurely. Upon completion, the FinanceDay objects may still be getting processed and are not ready for the application to use.

Introducing the only synchronizer I ever wrote... the BufferedLatch

public final class BufferedLatch {
 private int recordCount;
 private int processedRecordCount;
 private boolean iterationComplete = false;
 
 public synchronized void incrementRecordCount() { 
  if (iterationComplete) { 
   throw new RuntimeException("Cannot increment record count after iteration is flagged complete!");
  }
  else { 
  recordCount++;
  }
 }
 public synchronized void incrementProcessedRecordCount() { 
  processedRecordCount++;
  if (iterationComplete && recordCount == processedRecordCount) { 
   this.notifyAll();
  }
 }
 public synchronized void setIterationComplete() { 
  iterationComplete = true;
  if (recordCount == processedRecordCount) { 
   this.notifyAll();
  }
 }
 public void await() throws InterruptedException { 
  while (! (iterationComplete && recordCount == processedRecordCount)) { 
   this.wait();
  }
 }
}

The BufferedLatch solves this problem. Its purpose is very similar to the CountDownLatch, except it is for cases where the countdown number is unknown, and will not be known until later mid-process.

There are three methods that control the state of the synchronizer (incrementRecordCount(), incrementProcessedRecordCount(), and setIterationComplete()) as well a method for waiting threads (await())

incrementRecordCount() is called every time a record is iterated.

incrementProcessedRecordCount() is called every time a runnable of that record is completed.

setIterationComplete() is only called once by the looping thread to flag that the iteration is complete.

Any threads waiting for the runnables to complete will need to call await(). In my uses, this always has been the thread that does the iteration and calls setIterationComplete(), and then it calls await() and sits until all the runnables are done.

For our example above, this is how BufferedLatch would be implemented.

ResultSet rs = //issue some query
ExecutorService executor = //create a fixed thread pool
BufferedLatch latch = new BufferedLatch();

while (rs.next()) { 
 FinanceDay financeDay = convertRecordToFinanceDay(rs);
 executor.submit(() -> {
  addToConcurrentHashMap(financeDay);
  latch.incrementProcessedRecordCount();
 });
 latch.incrementRecordCount();
}
rs.close();
latch.setIterationComplete();
latch.await();
executor.shutDown();


The way this works now is a BufferedLatch is created before any recordset iteration starts. After a record is iterated, converted to a FinanceDay object, and passed off as a Runnable to the executor, incrementRecordCount() is called.

When a Runnable (the lambda passed to the executor submit() method) finishes processing the FinanceDay, it calls incrementProcessedRecordCount();

After the entire ResultSet is looped through, the setIterationComplete() is called to flag that no more records are coming in. The query has been iterated through completely. If any more incrementRecordCount() is called, a RuntimeException will be thrown because iterating records should not happen after setIterationComplete() is called.

Finally, the original thread will come to the latch's await() method and will pause until the runnables complete by calling incrementProcessedRecordCount() enough times to match the recordCount. After that, every record has been iterated and processed, and the original thread is now free to shutdown the executor and move on.

Conclusions

My only regret is this latch does add some boilerplate to the client code by having four different methods that need to be called, where CountDownLatch typically only has two (countDown() and await()). If anybody has suggestions I am very willing to hear them. But I have not found a latch that accomplishes anything like this, perhaps because this problem is somewhat niche.

One disclaimer: like any multithreading decision... first evaluate if it is even worth multithreading the task in question. Test and ensure there will be performance gains over a single-threaded approach. If your database query is quick, it might be worth importing all the data first before doing anything with it. But if you have worked with painfully slow data connections like me, or are issuing an intensive query, you may want to utilize that idle CPU time and use the solution above.





Tuesday, January 20, 2015

LazyProperty


Encapsulating Lazy Initialization With Suppliers


Now on GitHub
package com.nield.utilities;

import java.util.function.Supplier;

public final class LazyProperty<T> {
    private volatile T value;
    private final Supplier<T> supplier;
    
    private LazyProperty(Supplier<T> supplier) { 
        this.supplier = supplier;
    }
    public T get() { 
        if (value == null) { 
            synchronized(this) { 
                if (value == null) { 
                    value = supplier.get();
                }
            }
        }
        return value;
    }
    public void reset() {
        if (value != null) { 
            synchronized(this) { 
                if (value != null) { 
                    this.value = null;
                }
            }
        }
    }
    public static <B> LazyProperty<B> forSupplier(Supplier<B> supplier) { 
        return new LazyProperty<B>(supplier);
    }
}

Lazy initialization is deferring calculation of a value until it is needed, and then caching it for all uses thereafter. It often is used to hold off an expensive calculation and then saving the result for future uses. A typical lazy initialization would look something like this...
 
import java.math.BigDecimal;
import java.util.Date;

public final class FinanceDay {
 private final Date date = new Date();
 private BigDecimal balance;
 
 public Date getDate() { 
  return date;
 }
 public BigDecimal getBalance() { 
  if (balance == null) { 
      balance = BalanceCalculator.forDate(date);
  }
  return balance;
 }
}
The "balance" variable is initially null. When the getBalance() method is called, it first checks if it is null and calculates and assigns the "balance" value before returning it. After the initial call, it will not have to calculate it again as the "if (balance == null)" condition will return false.

Lazy initialization is not optimal for most cases. You should always strive to initialize values on construction. Even better, you should make them final and immutable if possible.

But there are certainly cases lazy initialization is applicable, especially where properties are either a) expensive to build or b) depend on the object already being constructed. I have found lazy initialization to be greatly needed in business decision process applications that go through a lot of decision calculations. This is not only expensive but also requires the business objects to be constructed and fully aware of themselves before they can engage with any algorithms. This is a good use of Lazy Initialization.

However, lazy initialization gets even more complicated when you multithread your application (which is inevitable for any reasonably complex application). To prevent race conditions or contention between threads, you have to synchronize on a lock and protect the "balance" value, and you have to check the null condition twice.
 
import java.math.BigDecimal;
import java.util.Date;

public final class FinanceDay {
 private final Date date = new Date();
 private BigDecimal balance;
 
 public Date getDate() { 
  return date;
 }
 public BigDecimal getBalance() { 
  if (balance == null) { 
         synchronized(this) {  
             if (balance == null) { 
                balance = BalanceCalculator.forDate(date);
             }
          }
        }
        return balance;
  }
}
If you have five properties that use lazy initialization in a multithreaded environment, your class getter methods can get messy very quickly like the one above. This pattern is very repetitive, and therefore is a good candidate to be encapsulated into a utility class. With a supplier lambda expression, it becomes even easier.


Here is LazyProperty, a solution to fulfill that need using all the means described.
 

import java.util.function.Supplier;

public final class LazyProperty<T> {
    private volatile T value;
    private final Supplier<T> supplier;
    
    private LazyProperty(Supplier<T> supplier) { 
        this.supplier = supplier;
    }
    public T get() { 
        if (value == null) { 
            synchronized(this) { 
                if (value == null) { 
                    value = supplier.get();
                }
            }
        }
        return value;
    }
    public static <B> LazyProperty<B> forSupplier(Supplier<B> supplier) { 
        return new LazyProperty<B>(supplier);
    }
}

LazyProperty takes care of this entire pattern and is generic so it works with any type. It also uses a lambda supplier to provide instructions on how to construct the value. Invoking LazyProperty is simple. Call LazyProperty.forSupplier() and pass a supplier lambda to the static method. It will return a new LazyProperty which will create and cache the value once called. It will also manage all thread synchronization so it is threadsafe. Use the get() method to extract the value out of LazyProperty.

 
import java.math.BigDecimal;
import java.util.Date;

public final class FinanceDay {
 private final Date date = new Date();
 private final LazyProperty<BigDecimal> balance = 
            LazyProperty.forSupplier(() -> BalanceCalculator.forDate(date));
 
 public Date getDate() { 
  return date;
 }
 public BigDecimal getBalance() { 
  return balance.get();
 }
}

Just make sure you do not create a stack overflow or null pointer exception by making the supplier call itself by calling the get() method of the LazyProperty it is populating. To my understanding, you can use references of  "this" in the supplier safely as the lambda is holding a soft reference to the class which may be in partially constructed state, but if implemented properly the get() will not be called until the entire class is constructed.

Optionally, you can implement a reset() method to clear the cached value so it can be recalculated again.

package com.nield.utilities;

import java.util.function.Supplier;

public final class LazyProperty<T> {
    private volatile T value;
    private final Supplier<T> supplier;
    
    private LazyProperty(Supplier<T> supplier) { 
        this.supplier = supplier;
    }
    public T get() { 
        if (value == null) { 
            synchronized(this) { 
                if (value == null) { 
                    value = supplier.get();
                }
            }
        }
        return value;
    }
    public void reset() {
        if (value != null) { 
            synchronized(this) { 
                if (value != null) { 
                    this.value = null;
                }
            }
        }
    }
    public static <B> LazyProperty<B> forSupplier(Supplier<B> supplier) { 
        return new LazyProperty<B>(supplier);
    }
}

The reset() method is also synchronized in similar fashion to the get() by only allowing one thread to check and change the value. Some die hard Immutable-ists like me could argue this introduces undesirable mutability and may present opportunity for bad mutable designs, but I cannot see the harm since the supplier is final. Assuming the supplier is designed correctly, the supplier should always return the same value every time, or the most up-to-date value. Abuse  could come from excessive calls to reset() which may hit performance. It could also encourage bad designs by allowing reuse of objects rather than creating new ones once properties change. But if used intelligently, it is nice to have reset() available for certain solutions.

Hope you find this utility helpful!