Bulk loading a Coherence cache from an Oracle Database

Recently, I’ve worked on a couple of projects that required pre-loading data from an Oracle Database table into an Oracle Coherence cache. There are many ways to accomplish this task, but what I’ve found to work well is to distribute the load processing to the Coherence grid itself. To do this, I use the Coherence Invocation Service.

To get started, let’s look at the SQL query that will be used to retrieve all rows from the underlying table (all columns have type VARCHAR2):

SELECT id, customer_id, zip5, zip4 FROM customer_zip

The class definition for the objects to be cached looks like:

public class CustomerZip
        implements ExternalizableLite, PortableObject
    {
    // ----- constructors ----------------------------------------------------

    public CustomerZip()
        {
        }

    public CustomerZip(String sId, String sCustomerId, String sZip5, String sZip4)
        {
        m_sId = sId;
        m_sCustomerId = sCustomerId;
        m_sZip5 = sZip5;
        m_sZip4 = sZip4;
        }

    // ----- accessors/mutators ----------------------------------------------

    /* removed for brevity */

    // ----- ExternalizableLite interface ------------------------------------

    /* removed for brevity */

    // ----- PortableObject interface ----------------------------------------

    /* removed for brevity */

    // ----- data members ----------------------------------------------------

    private String m_sZip5;
    private String m_sZip4;
    private String m_sCustomerId;
    private String m_sId;
    }

The first approach I tried involved pulling back all of the ids from the table, running them through PartitionedService#getKeyOwner and then submitting a task to each member with the set of ids for that member to load. This method leverages Coherence’s data partitioning to distribute the rows among the loading members. This worked fine in my testing with a small number of rows, but when I applied this to the full data set of over 13 million rows, I quickly ran out of memory trying to query and process all of the ids. In addition, querying and processing the ids takes time.

The second, and final, approach I tried involved pulling back only the row count. Dividing the rows up among the loading members was now simply a matter of establishing the first and last rows to load for each member. I can then use the Oracle pseudocolumn ROWNUM to execute the following query on each member:

SELECT * FROM
  (SELECT a.*, ROWNUM r FROM
    (SELECT id, customer_id, zip5, zip4 FROM customer_zip ORDER BY id) a
  WHERE ROWNUM <= ?)
WHERE r >= ?

This query allows each loading member to specify the last and first rows to load and allows the database to filter out all of the rows outside its range. In my testing, I found that range sizes beyond a certain threshold started performing exponentially slower (perhaps a DB tuning issue, but IANADBA ;-) ). You could easily run into this scenario with a large number of rows and a small number of loading members. To handle this situation, I further broke down each member’s range into smaller ranges and had each member execute multiple queries. Processing the results of these queries and performing bulk puts into the cache requires breaking up the results into batches as well. Here’s a look at the code that actually executes the query and inserts the entries into Coherence (to be executed on each loading member). This code is actually part of a CacheLoader implementation that is used for read-through as well. Having the read-through and pre-load logic co-located allows me to share database properties (connection information, SQL statements, etc…).

public void preload(NamedCache cache, int iFirstRow, int iLastRow,
        int cFetchSize, int cMaxQueryRange)
    {
    String sSqlQuery = ...; // see above
    String sCacheName = cache.getCacheName();
    Connection con = null;
    PreparedStatement stmtPrep = null;
    ResultSet rs = null;
    try
        {
        con = getConnection();
        stmtPrep = con.prepareStatement(sSqlQuery);
        stmtPrep.setFetchSize(cFetchSize);

        // break the query up into batches based on cMaxQueryRange
        int cRows = (iLastRow - iFirstRow) + 1;
        int cBatches = cRows / cMaxQueryRange;
        int cRemaining = cRows % cMaxQueryRange;
        // add additional batch to handle any remainder
        cBatches += cRemaining == 0 ? 0 : 1;

        Map mapBuffer = new HashMap(cFetchSize);
        int iBatchFirstRow;
        int iBatchLastRow = iFirstRow - 1;
        int cRowsLoadedTotal = 0;
        log("Executing preload query in " + cBatches + " batches");
        for (int i = 0; i < cBatches; ++i)
            {
            iBatchFirstRow = iBatchLastRow + 1;
            // last row for the batch or the entire range
            iBatchLastRow = Math.min(iLastRow, iBatchFirstRow + (cMaxQueryRange - 1));
            stmtPrep.setInt(1, iBatchLastRow);
            stmtPrep.setInt(2, iBatchFirstRow);
            rs = stmtPrep.executeQuery();

            // process cFetchSize rows at a time
            while (processResults(rs, mapBuffer, cFetchSize))
                {
                cache.putAll(mapBuffer);
                mapBuffer.clear();
                }
            rs.close();
            }
        }
    catch (SQLException e)
        {
        log(e);
        throw new RuntimeException(e);
        }
    finally
        {
        close(con, stmtPrep, rs);
        }
    }

protected boolean processResults(ResultSet rs, Map mapResults, int cFetchSize)
        throws SQLException
    {
    for (int i = 0; i < cFetchSize && rs.next(); ++i)
        {
        // create domain object from single row
        CustomerZip customerZip = createCustomerZip(rs);
        mapResults.put(customerZip.getId(), customerZip);
        }
    return mapResults.size() > 0;
    }

The final piece of required code is the one that generates the ranges for each member and issues each member a task to execute. As I mentioned earlier, I use the Coherence Invocation Service to asynchronously execute a task on each loading member. For my use case, the set of loading members is simply every member running the Invocation Service, except for the member issuing the tasks:

protected Map<Member, PreloadTask> generateTasks(Set<Member> setMembers, int cRows)
    {
    Map<Member, PreloadTask> mapTasks =
            new HashMap<Member, PreloadTask>(setMembers.size());

    if (cRows <= m_cFetchSize)
        {
        // for small number of rows, just send the load to one member
        Member member = setMembers.iterator().next();
        PreloadTask task = new PreloadTask(m_sCacheName, 1, cRows,
                m_cFetchSize, m_cMaxQueryRange);
        mapTasks.put(member, task);
        }
    else
        {
        int cMembers = setMembers.size();
        int cMinRowsPerMember = cRows / cMembers;
        int cRemainingRows = cRows % cMembers;

        int iFirstRow;
        int iLastRow = 0;
        for (Member member : setMembers)
            {
            iFirstRow = iLastRow + 1;
            iLastRow = iFirstRow + cMinRowsPerMember +
                    (cRemainingRows-- > 0 ? 1 : 0) - 1;
            PreloadTask task = new PreloadTask(m_sCacheName, iFirstRow,
                    iLastRow, m_cFetchSize, m_cMaxQueryRange);
            mapTasks.put(member, task);
            }
        }

    return mapTasks;
    }

The final step is to asynchronously invoke each member’s task, and then wait for all of them to finish. I use a CountDownLatch and an InvocationObserver to track the completion of all tasks:

public void preloadCache()
    {
    final String sCacheName = "CustomerZipCache";
    int cRows = getRowCount();
    InvocationService serviceInv = (InvocationService)
            CacheFactory.getService("InvocationService");
    long ldtStart = System.currentTimeMillis();
    Set<Member> setLoadingMembers = getLoadingMembers(serviceInv);
    Map<Member, PreloadTask> mapMemberTasks = generateTasks(setLoadingMembers, cRows);

    // prepare the invocation observer
    int cTasks = mapMemberTasks.size();
    final CountDownLatch latch = new CountDownLatch(cTasks);
    InvocationObserver observer = new InvocationObserver()
        {
        public void memberCompleted(Member member, Object oResult)
            {
            latch.countDown();
            log(String.format("%s: load finished on %s", sCacheName, member.toString()));
            }

        public void memberFailed(Member member, Throwable eFailure)
            {
            // TODO: resubmit tasks due to transient failures
            latch.countDown();
            log(String.format("%s: load failed on %s", sCacheName, member.toString()));
            CacheFactory.log(eFailure);
            }

        public void memberLeft(Member member)
            {
            // TODO: resubmit to a member that is up
            latch.countDown();
            log(String.format("%s: member left before load finished (%s)", sCacheName, member.toString()));
            }

        public void invocationCompleted()
            {
            log(String.format("%s: invocation has completed", sCacheName));
            }
        };

    // asynchronously execute each member's task
    for (Map.Entry<Member, PreloadTask> entry : mapMemberTasks.entrySet())
        {
        Member member = entry.getKey();
        Set setTaskMembers = Collections.singleton(member);
        PreloadTask task = entry.getValue();
        serviceInv.execute(task, setTaskMembers, observer);
        log(String.format("%s: rows %d-%d sent to %s",
                sCacheName, task.getFirstKey(), task.getLastKey(), member.toString()));
        }

    // wait for all tasks to finish
    try
        {
        latch.await();
        }
    catch (InterruptedException e)
        {
        }
    long lDurationMillis = System.currentTimeMillis() - ldtStart;
    log(String.format("%s: pre-loaded %d rows in %.3f secs (%.3f rows/sec)",
            sCacheName, cRows, lDurationMillis / 1000.0,
            cRows / (lDurationMillis / 1000.0)));
    NamedCache cache = CacheFactory.getCache(sCacheName);
    log(String.format("%s: final size is %d", sCacheName, cache.size()));
    }

If you’re reading carefully, you’ll see that I am actually issuing database queries from two logical places: the grid client that generates the tasks and the grid members executing the load. I mentioned earlier that I’m sharing database parameters between read-through and pre-load by using a CacheLoader. I will have to save the details of how I achieve that sharing for another post.

Add Your Comment