Bulk loading a Coherence cache from an Oracle Database

Recently, I’ve worked on a couple of projects that required pre-loading data from an Oracle Database table into an Oracle Coherence cache. There are many ways to accomplish this task, but what I’ve found to work well is to distribute the load processing to the Coherence grid itself. To do this, I use the Coherence Invocation Service.

To get started, let’s look at the SQL query that will be used to retrieve all rows from the underlying table (all columns have type VARCHAR2):

SELECT id, customer_id, zip5, zip4 FROM customer_zip

The class definition for the objects to be cached looks like:

public class CustomerZip
        implements ExternalizableLite, PortableObject
    {
    // ----- constructors ----------------------------------------------------

    public CustomerZip()
        {
        }

    public CustomerZip(String sId, String sCustomerId, String sZip5, String sZip4)
        {
        m_sId = sId;
        m_sCustomerId = sCustomerId;
        m_sZip5 = sZip5;
        m_sZip4 = sZip4;
        }

    // ----- accessors/mutators ----------------------------------------------

    /* removed for brevity */

    // ----- ExternalizableLite interface ------------------------------------

    /* removed for brevity */

    // ----- PortableObject interface ----------------------------------------

    /* removed for brevity */

    // ----- data members ----------------------------------------------------

    private String m_sZip5;
    private String m_sZip4;
    private String m_sCustomerId;
    private String m_sId;
    }

The first approach I tried involved pulling back all of the ids from the table, running them through PartitionedService#getKeyOwner and then submitting a task to each member with the set of ids for that member to load. This method leverages Coherence’s data partitioning to distribute the rows among the loading members. This worked fine in my testing with a small number of rows, but when I applied this to the full data set of over 13 million rows, I quickly ran out of memory trying to query and process all of the ids. In addition, querying and processing the ids takes time.

The second, and final, approach I tried involved pulling back only the row count. Dividing the rows up among the loading members was now simply a matter of establishing the first and last rows to load for each member. I can then use the Oracle pseudocolumn ROWNUM to execute the following query on each member:

SELECT * FROM
  (SELECT a.*, ROWNUM r FROM
    (SELECT id, customer_id, zip5, zip4 FROM customer_zip ORDER BY id) a
  WHERE ROWNUM <= ?)
WHERE r >= ?

This query allows each loading member to specify the last and first rows to load and allows the database to filter out all of the rows outside its range. In my testing, I found that range sizes beyond a certain threshold started performing exponentially slower (perhaps a DB tuning issue, but IANADBA ;-) ). You could easily run into this scenario with a large number of rows and a small number of loading members. To handle this situation, I further broke down each member’s range into smaller ranges and had each member execute multiple queries. Processing the results of these queries and performing bulk puts into the cache requires breaking up the results into batches as well. Here’s a look at the code that actually executes the query and inserts the entries into Coherence (to be executed on each loading member). This code is actually part of a CacheLoader implementation that is used for read-through as well. Having the read-through and pre-load logic co-located allows me to share database properties (connection information, SQL statements, etc…).

public void preload(NamedCache cache, int iFirstRow, int iLastRow,
        int cFetchSize, int cMaxQueryRange)
    {
    String sSqlQuery = ...; // see above
    String sCacheName = cache.getCacheName();
    Connection con = null;
    PreparedStatement stmtPrep = null;
    ResultSet rs = null;
    try
        {
        con = getConnection();
        stmtPrep = con.prepareStatement(sSqlQuery);
        stmtPrep.setFetchSize(cFetchSize);

        // break the query up into batches based on cMaxQueryRange
        int cRows = (iLastRow - iFirstRow) + 1;
        int cBatches = cRows / cMaxQueryRange;
        int cRemaining = cRows % cMaxQueryRange;
        // add additional batch to handle any remainder
        cBatches += cRemaining == 0 ? 0 : 1;

        Map mapBuffer = new HashMap(cFetchSize);
        int iBatchFirstRow;
        int iBatchLastRow = iFirstRow - 1;
        int cRowsLoadedTotal = 0;
        log("Executing preload query in " + cBatches + " batches");
        for (int i = 0; i < cBatches; ++i)
            {
            iBatchFirstRow = iBatchLastRow + 1;
            // last row for the batch or the entire range
            iBatchLastRow = Math.min(iLastRow, iBatchFirstRow + (cMaxQueryRange - 1));
            stmtPrep.setInt(1, iBatchLastRow);
            stmtPrep.setInt(2, iBatchFirstRow);
            rs = stmtPrep.executeQuery();

            // process cFetchSize rows at a time
            while (processResults(rs, mapBuffer, cFetchSize))
                {
                cache.putAll(mapBuffer);
                mapBuffer.clear();
                }
            rs.close();
            }
        }
    catch (SQLException e)
        {
        log(e);
        throw new RuntimeException(e);
        }
    finally
        {
        close(con, stmtPrep, rs);
        }
    }

protected boolean processResults(ResultSet rs, Map mapResults, int cFetchSize)
        throws SQLException
    {
    for (int i = 0; i < cFetchSize && rs.next(); ++i)
        {
        // create domain object from single row
        CustomerZip customerZip = createCustomerZip(rs);
        mapResults.put(customerZip.getId(), customerZip);
        }
    return mapResults.size() > 0;
    }

The final piece of required code is the one that generates the ranges for each member and issues each member a task to execute. As I mentioned earlier, I use the Coherence Invocation Service to asynchronously execute a task on each loading member. For my use case, the set of loading members is simply every member running the Invocation Service, except for the member issuing the tasks:

protected Map<Member, PreloadTask> generateTasks(Set<Member> setMembers, int cRows)
    {
    Map<Member, PreloadTask> mapTasks =
            new HashMap<Member, PreloadTask>(setMembers.size());

    if (cRows <= m_cFetchSize)
        {
        // for small number of rows, just send the load to one member
        Member member = setMembers.iterator().next();
        PreloadTask task = new PreloadTask(m_sCacheName, 1, cRows,
                m_cFetchSize, m_cMaxQueryRange);
        mapTasks.put(member, task);
        }
    else
        {
        int cMembers = setMembers.size();
        int cMinRowsPerMember = cRows / cMembers;
        int cRemainingRows = cRows % cMembers;

        int iFirstRow;
        int iLastRow = 0;
        for (Member member : setMembers)
            {
            iFirstRow = iLastRow + 1;
            iLastRow = iFirstRow + cMinRowsPerMember +
                    (cRemainingRows-- > 0 ? 1 : 0) - 1;
            PreloadTask task = new PreloadTask(m_sCacheName, iFirstRow,
                    iLastRow, m_cFetchSize, m_cMaxQueryRange);
            mapTasks.put(member, task);
            }
        }

    return mapTasks;
    }

The final step is to asynchronously invoke each member’s task, and then wait for all of them to finish. I use a CountDownLatch and an InvocationObserver to track the completion of all tasks:

public void preloadCache()
    {
    final String sCacheName = "CustomerZipCache";
    int cRows = getRowCount();
    InvocationService serviceInv = (InvocationService)
            CacheFactory.getService("InvocationService");
    long ldtStart = System.currentTimeMillis();
    Set<Member> setLoadingMembers = getLoadingMembers(serviceInv);
    Map<Member, PreloadTask> mapMemberTasks = generateTasks(setLoadingMembers, cRows);

    // prepare the invocation observer
    int cTasks = mapMemberTasks.size();
    final CountDownLatch latch = new CountDownLatch(cTasks);
    InvocationObserver observer = new InvocationObserver()
        {
        public void memberCompleted(Member member, Object oResult)
            {
            latch.countDown();
            log(String.format("%s: load finished on %s", sCacheName, member.toString()));
            }

        public void memberFailed(Member member, Throwable eFailure)
            {
            // TODO: resubmit tasks due to transient failures
            latch.countDown();
            log(String.format("%s: load failed on %s", sCacheName, member.toString()));
            CacheFactory.log(eFailure);
            }

        public void memberLeft(Member member)
            {
            // TODO: resubmit to a member that is up
            latch.countDown();
            log(String.format("%s: member left before load finished (%s)", sCacheName, member.toString()));
            }

        public void invocationCompleted()
            {
            log(String.format("%s: invocation has completed", sCacheName));
            }
        };

    // asynchronously execute each member's task
    for (Map.Entry<Member, PreloadTask> entry : mapMemberTasks.entrySet())
        {
        Member member = entry.getKey();
        Set setTaskMembers = Collections.singleton(member);
        PreloadTask task = entry.getValue();
        serviceInv.execute(task, setTaskMembers, observer);
        log(String.format("%s: rows %d-%d sent to %s",
                sCacheName, task.getFirstKey(), task.getLastKey(), member.toString()));
        }

    // wait for all tasks to finish
    try
        {
        latch.await();
        }
    catch (InterruptedException e)
        {
        }
    long lDurationMillis = System.currentTimeMillis() - ldtStart;
    log(String.format("%s: pre-loaded %d rows in %.3f secs (%.3f rows/sec)",
            sCacheName, cRows, lDurationMillis / 1000.0,
            cRows / (lDurationMillis / 1000.0)));
    NamedCache cache = CacheFactory.getCache(sCacheName);
    log(String.format("%s: final size is %d", sCacheName, cache.size()));
    }

If you’re reading carefully, you’ll see that I am actually issuing database queries from two logical places: the grid client that generates the tasks and the grid members executing the load. I mentioned earlier that I’m sharing database parameters between read-through and pre-load by using a CacheLoader. I will have to save the details of how I achieve that sharing for another post.

Comments

  1. I’m currently having the same need, googling this post showed up and made me things pretty clear except for one little detail.

    PreloadTask

    As I understand It should extend from AbstractInvocable… but the question is.. How do I call the preloadCache() method?
    Or, how do I get an EntityManager from this PreloadTask method?

    Thanks in advance.

    • Andy Nguyen says:

      I’m not sure I understand your question. Yes, PreloadTask is an Invocable, so it’s designed to execute via the InvocationService. The preloadCache() method is used to generate PreloadTasks and send them off via the InvocationService. Who calls preloadCache() depends on how you want to initiate a preload. You could have a web service call preloadCache(), which means you would initiate a preload by making a web service call. Another option is to just call preloadCache() from main(), which allows you to initiate a preload via command line.

      • Woops, I made a mistake, In my question I was refering to “preload” method, not to “preloadCache”

        If I’ve understood properly..

        And mixing your scenario with mine.

        REST_SERVICE — Calls –> preloadCache.

        prealoadCache — Instantiates –> PreloadTask.

        PreloadTask.run() — ?? –> preload()

        Currently I assume that…
        preload() is a method implemented inside a CacheStore (In my case JpaCacheStore).
        and should be called from the “PreloadTask.run()” by Instantiating a new JpaCacheStore with the required functions.

        Am I right?

        Thanks in advance.

        • Andy Nguyen says:

          That makes more sense.

          Yes, the preload() method is part of the a CacheStore/CacheLoader. To get a reference to this CacheStore from PreloadTask, you need retrieve it from the ReadWriteBackingMap. Something like this:

          public void run()
              {
              NamedCache cache = CacheFactory.getCache(m_sCacheName);
              DefaultConfigurableCacheFactory.Manager manager =
                      (DefaultConfigurableCacheFactory.Manager)
                              cache.getCacheService().getBackingMapManager();
              Map mapBack = manager.getBackingMap(m_sCacheName);
              ReadWriteBackingMap rwbm = (ReadWriteBackingMap) mapBack;
              MyCacheStore store = (MyCacheStore)
                      rwbm.getCacheStore().getCacheStore();
              store.preload(cache, m_iFirstRow, m_iLastRow, m_cFetchSize, m_cMaxQueryRange);
              }
          
          • Thank you very much.

            Finally I managed to get it working.

            In order to recover the CacheStore with coherence 12.1.3 I had to slightly change the metods, as DefaultconfigurableCacheFactory is deprecated.

            [code]
            NamedCache cache = CacheFactory.getCache(cacheName);
            ExtensibleConfigurableCacheFactory.Manager manager =
            (ExtensibleConfigurableCacheFactory.Manager)
            cache.getCacheService().getBackingMapManager();
            @SuppressWarnings(“rawtypes”)
            Map mapBack = manager.getBackingMap(cacheName);
            ReadWriteBackingMap rwbm = (ReadWriteBackingMap) mapBack;
            cacheStore = (SuperFancyCacheStore)
            rwbm.getCacheStore().getStore();
            [/code]

Add Your Comment