View Javadoc

1   /* $HeadURL::                                                                            $
2    * $Id$
3    *
4    * Copyright (c) 2009-2010 DuraSpace
5    * http://duraspace.org
6    *
7    * In collaboration with Topaz Inc.
8    * http://www.topazproject.org
9    *
10   * Licensed under the Apache License, Version 2.0 (the "License");
11   * you may not use this file except in compliance with the License.
12   * You may obtain a copy of the License at
13   *
14   *     http://www.apache.org/licenses/LICENSE-2.0
15   *
16   * Unless required by applicable law or agreed to in writing, software
17   * distributed under the License is distributed on an "AS IS" BASIS,
18   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19   * See the License for the specific language governing permissions and
20   * limitations under the License.
21   */
22  
23  package org.akubraproject.txn.derby;
24  
25  import java.io.IOException;
26  import java.net.URI;
27  import java.sql.Connection;
28  import java.sql.ResultSet;
29  import java.sql.SQLException;
30  import java.sql.Statement;
31  import java.sql.PreparedStatement;
32  import java.util.Collections;
33  import java.util.HashSet;
34  import java.util.Iterator;
35  import java.util.Map;
36  import java.util.Set;
37  
38  import javax.sql.XAConnection;
39  import javax.transaction.Transaction;
40  
41  import org.slf4j.Logger;
42  import org.slf4j.LoggerFactory;
43  import org.apache.derby.jdbc.EmbeddedXADataSource;
44  import org.apache.derby.tools.sysinfo;
45  
46  import org.akubraproject.BlobStore;
47  import org.akubraproject.BlobStoreConnection;
48  import org.akubraproject.txn.AbstractTransactionalStore;
49  
50  /**
51   * A simple transactional store using Derby db for the transaction logging and id mappings. It
52   * provides snapshot isolation with fail-fast semantics, meaning it will immediately throw a
53   * {@link org.akubraproject.txn.ConcurrentBlobUpdateException ConcurrentBlobUpdateException}
54   * if a transaction tries to modify (insert, delete, or overwrite) a blob which was modified by
55   * another transaction since the start of the first transaction (even if the change by the other
56   * transaction hasn't been committed yet). The assumption is that rollbacks are rare and that it is
57   * better to be notified of a conflict immediately rather than wasting time uploading large amounts
58   * of data that will just have to be deleted again.
59   *
60   * <p>In general a transaction must be considered failed and should be rolled back after any
61   * exception occurred.
62   *
63   * <p>This store must be configured with exactly one underlying blob-store. It supports arbitrary
64   * application-ids and maps them to the underlying blob-store's id's; it currently requires that
65   * the underlying blob-store be capable of generating ids.
66   *
67   * <p>Snapshot isolation is implemented using a MVCC design as follows. A name-map holds a list of
68   * versioned id mappings which maps application-ids to underlying store-ids; in addition, each
69   * mapping has two flags indicating whether the mapping has been deleted and whether it has been
70   * committed. When a transaction starts it is given a read version number (these increase
71   * monotonically); only committed map entries with a version less than this read version or
72   * uncommitted entries with a version the same as the read version will be read; if there are
73   * multiple such entries for a given app-id, then the one with the highest version is used. If the
74   * transaction makes a change (adding, removing, replacing, etc), a new entry in recorded in the
75   * map with the version set to the read-version and with the committed flag set to false. On commit
76   * the transaction is assigned a write version number (which is higher than any previously issued
77   * read version numbers) and which it then sets on all entries written as part of this transaction;
78   * it also sets the committed flag to true on these entries.
79   *
80   * <p>Old entries (and the underlying blobs) are cleaned out as they become unreferenced, i.e. when
81   * no active transaction could refer to them anymore. In order to speed up the discovery of such
82   * entries, a separate deleted-list is kept into which an entry is made each time an entry in the
83   * main map is marked as deleted and each time a blob is marked as deleted. This list is processed
84   * at the end of every transaction and upon startup (on startup the list is completely cleared as
85   * there are no active transactions).
86   *
87   * <p><em>A note on locking</em>: Derby, even in read-uncommitted mode, likes to acquire exclusive
88   * locks on rows when doing inserts, deletes, and updates. This would be ok, except that it
89   * sometimes attempts to lock rows it won't change. This can lead to deadlocks. The way around this
90   * that I've found is to ensure Derby always uses an index when searching for the rows to update or
91   * delete. This is accomplished by giving the optimizer explicit instructions via the
92   * <var>DERBY-PROPERTIES</var> directive in the queries. Since this directive is only supported in
93   * select statements, all updates and deletes are done via updatable queries (result-sets). This
94   * actually performs about the same as a direct update or delete statement. See also the thread <a
95   * href="http://mail-archives.apache.org/mod_mbox/db-derby-user/200903.mbox/%3c20090330092451.GD26813@innovation.ch%3e">disabling locking</a> (<a
96   * href="http://mail-archives.apache.org/mod_mbox/db-derby-user/200904.mbox/%3c20090401001750.GB5281@innovation.ch%3e">continued</a>),
97   * or at <a href="http://news.gmane.org/find-root.php?message_id=%3c20090330092451.GD26813%40innovation.ch%3e">gmane</a>.
98   * Unfortunately, however, this does not seem to be sufficient: Derby may still lock other rows, as
99   * documented in <a
100  * href="http://db.apache.org/derby/docs/10.4/devguide/rdevconcepts8424.html">Scope of locks</a>
101  * in Derbys's developer guide. When this happens, the wait for the lock will eventually time out
102  * and an exception will be thrown. However, I have not enountered this issue so far. But a related
103  * issue is present in 10.4 and earlier, namely <a
104  * href="https://issues.apache.org/jira/browse/DERBY-2991">DERBY-2991</a>; testing with 10.5
105  * indicates this issue has been resolved. For these reasons a flag is provided to restrict the
106  * number of concurrent write-transactions to one, and the
107  * {@link #TransactionalStore(URI, BlobStore, String) three-argument-constructor} will set this
108  * single-writer flag to true for derby 10.4 and earlier.
109  *
110  * @author Ronald Tschalär
111  */
112 public class TransactionalStore extends AbstractTransactionalStore {
113   /** The SQL table used by this store to hold the name mappings */
114   public static final String NAME_TABLE = "NAME_MAP";
115   /** The SQL table used by this store to hold the list of deleted blobs */
116   public static final String DEL_TABLE  = "DELETED_LIST";
117 
118   private static final Logger logger = LoggerFactory.getLogger(TransactionalStore.class);
119 
120   private final EmbeddedXADataSource dataSource;
121   private final Set<Long>            activeTxns = new HashSet<Long>();
122   private final Set<URI>             uriLocks = new HashSet<URI>();
123   private final boolean              singleWriter;
124   private       long                 nextVersion;
125   private       long                 writeVersion = -1;
126   private       long                 writeLockHolder = -1;
127   private       boolean              purgeInProgress = false;
128   private       int                  numPurgesDelayed = 0;
129   private       boolean              started = false;
130 
131   /**
132    * Create a new transactional store. The single-writer flag will be determined automatically
133    * depending on the version of derby being used.
134    *
135    * @param id           the id of this store
136    * @param wrappedStore the wrapped non-transactional store
137    * @param dbDir        the directory to use to store the transaction information
138    * @throws IOException if there was an error initializing the db
139    */
140   public TransactionalStore(URI id, BlobStore wrappedStore, String dbDir) throws IOException {
141     this(id, wrappedStore, dbDir, needSingleWriter());
142   }
143 
144   private static boolean needSingleWriter() {
145     return sysinfo.getMajorVersion() < 10 ||
146            sysinfo.getMajorVersion() == 10 && sysinfo.getMinorVersion() < 5;
147   }
148 
149   /**
150    * Create a new transactional store.
151    *
152    * @param id           the id of this store
153    * @param wrappedStore the wrapped non-transactional store
154    * @param dbDir        the directory to use to store the transaction information
155    * @param singleWriter if true, serialize all writers to avoid all locking issues with
156    *                     Derby; if false, some transactions may fail sometimes due to
157    *                     locks timing out
158    * @throws IOException if there was an error initializing the db
159    */
160   public TransactionalStore(URI id, BlobStore wrappedStore,
161                             String dbDir, boolean singleWriter) throws IOException {
162     super(id, wrappedStore);
163     this.singleWriter = singleWriter;
164 
165     //TODO: redirect logging to logger
166     //System.setProperty("derby.stream.error.logSeverityLevel", "50000");
167     //System.setProperty("derby.stream.error.file", new File(base, "derby.log").toString());
168     //System.setProperty("derby.language.logStatementText", "true");
169     //System.setProperty("derby.stream.error.method", "java.sql.DriverManager.getLogStream");
170 
171     dataSource = new EmbeddedXADataSource();
172     dataSource.setDatabaseName(dbDir);
173     dataSource.setCreateDatabase("create");
174 
175     Runtime.getRuntime().addShutdownHook(new Thread() {
176       @Override
177       public void run() {
178         try {
179           dataSource.setShutdownDatabase("shutdown");
180           dataSource.getXAConnection().getConnection();
181         } catch (Exception e) {
182           logger.warn("Error shutting down derby", e);
183         }
184       }
185     });
186 
187     createTables();
188     nextVersion = findYoungestVersion() + 1;
189     logger.info("TransactionalStore started: dbDir='" + dbDir + "', version=" + nextVersion);
190   }
191 
192   private void createTables() throws IOException {
193     runInCon(new Action<Void>() {
194       public Void run(Connection con) throws SQLException {
195         // test if table exists
196         ResultSet rs = con.getMetaData().getTables(null, null, NAME_TABLE, null);
197         try {
198           if (rs.next())
199             return null;
200         } finally {
201           rs.close();
202         }
203 
204         // nope, so create it
205         logger.info("Creating tables and indexes for name-map");
206 
207         Statement stmt = con.createStatement();
208         try {
209           stmt.execute("CREATE TABLE " + NAME_TABLE +
210                        " (appId VARCHAR(1000) NOT NULL, storeId VARCHAR(1000) NOT NULL, " +
211                        "  version BIGINT NOT NULL, deleted SMALLINT, committed SMALLINT)");
212           stmt.execute("CREATE INDEX " + NAME_TABLE + "_AIIDX ON " + NAME_TABLE + "(appId)");
213           stmt.execute("CREATE INDEX " + NAME_TABLE + "_VIDX ON " + NAME_TABLE + "(version)");
214 
215           stmt.execute("CREATE TABLE " + DEL_TABLE + " (appId VARCHAR(1000) NOT NULL, " +
216                        " storeId VARCHAR(1000), version BIGINT NOT NULL)");
217           stmt.execute("CREATE INDEX " + DEL_TABLE + "_VIDX ON " + DEL_TABLE + "(version)");
218 
219           // ensure Derby never uses table-locks, only row-locks
220           stmt.execute(
221             "CALL SYSCS_UTIL.SYSCS_SET_DATABASE_PROPERTY('derby.locks.escalationThreshold', '" +
222             Integer.MAX_VALUE + "')");
223 
224           // we should really never be waiting for a lock let alone deadlock, but just in case
225           stmt.execute(
226             "CALL SYSCS_UTIL.SYSCS_SET_DATABASE_PROPERTY('derby.locks.deadlockTimeout', '30')");
227         } finally {
228           stmt.close();
229         }
230 
231         return null;
232       }
233     }, "Failed to create tables");
234   }
235 
236   private long findYoungestVersion() throws IOException {
237     return runInCon(new Action<Long>() {
238       public Long run(Connection con) throws SQLException {
239         Statement stmt = con.createStatement();
240         try {
241           stmt.setMaxRows(1);
242           ResultSet rs =                                // NOPMD
243               stmt.executeQuery("SELECT version FROM " + NAME_TABLE + " ORDER BY version DESC");
244           return rs.next() ? rs.getLong(1) : -1L;
245         } finally {
246           stmt.close();
247         }
248       }
249     }, "Failed to find youngest version");
250   }
251 
252   /**
253    * @throws IllegalStateException if no backing store has been set yet
254    */
255   @Override
256   public BlobStoreConnection openConnection(Transaction tx, Map<String, String> hints)
257       throws IllegalStateException, IOException {
258     long version;
259     synchronized (this) {
260 
261       if (!started) {
262         started = true;
263         purgeOldVersions(0);
264       }
265 
266       while (writeVersion >= 0 && nextVersion == writeVersion) {
267         if (logger.isDebugEnabled())
268           logger.debug("Out of available versions - waiting for write-lock to be released");
269 
270         try {
271           wait();
272         } catch (InterruptedException ie) {
273           throw new IOException("wait for write-lock interrupted", ie);
274         }
275       }
276 
277       version = nextVersion++;
278 
279       boolean isNew = activeTxns.add(version);
280       assert isNew : "duplicate version " + version;
281     }
282 
283     boolean ok = false;
284     try {
285       XAConnection xaCon;
286       Connection   con;
287       synchronized (dataSource) {
288         xaCon = dataSource.getXAConnection();
289         con   = xaCon.getConnection();
290       }
291 
292       con.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED);
293 
294       tx.enlistResource(xaCon.getXAResource());
295 
296       BlobStoreConnection bsc =
297           new TransactionalConnection(this, wrappedStore, xaCon, con, tx, hints, version);
298 
299       if (logger.isDebugEnabled())
300         logger.debug("Opened connection, read-version=" + version);
301 
302       ok = true;
303       return bsc;
304     } catch (IOException ioe) {
305       throw ioe;
306     } catch (Exception e) {
307       throw new IOException("Error connecting to db", e);
308     } finally {
309       if (!ok) {
310         synchronized (this) {
311           activeTxns.remove(version);
312         }
313       }
314     }
315   }
316 
317   boolean singleWriter() {
318     return singleWriter;
319   }
320 
321   /**
322    * Acquire the write lock. This is a simple, re-entrant lock without a lock count. If the lock
323    * is already held this will block until it is free.
324    *
325    * @param version the version acquiring the lock
326    * @throws InterruptedException if waiting for the lock was interrupted
327    */
328   synchronized void acquireWriteLock(long version) throws InterruptedException {
329     while (writeLockHolder >= 0 && writeLockHolder != version)
330       wait();
331 
332     if (logger.isTraceEnabled())
333       logger.trace("Transaction " + version + " acquired write lock");
334 
335     writeLockHolder = version;
336   }
337 
338   /**
339    * Release the write lock. This always completely releases lock no matter how often {@link
340    * #acquireWriteLock} was invoked.
341    *
342    * @param version the version that acquired the lock
343    * @throws IllegalStateException if the lock is not held by <var>version</var>
344    */
345   synchronized void releaseWriteLock(long version) {
346     if (writeLockHolder != version)
347       throw new IllegalStateException("Connection '" + version + "' is not the holder of the " +
348                                       "write lock; '" + writeLockHolder + "' is");
349 
350     if (logger.isTraceEnabled())
351       logger.trace("Transaction " + version + " released write lock");
352 
353     writeLockHolder = -1;
354     notifyAll();
355   }
356 
357   /**
358    * Acquire a lock on the given URI. Each lock for each URI is a simple, non-reentrant lock and
359    * each lock for each URI is independent of the others. If the lock is already held this will
360    * block until it is free.
361    *
362    * @param uri the URI for which to acquire the lock
363    * @throws InterruptedException if waiting for the lock was interrupted
364    */
365   void acquireUriLock(URI uri) throws InterruptedException {
366     synchronized (uriLocks) {
367       while (uriLocks.contains(uri))
368         uriLocks.wait();
369 
370       uriLocks.add(uri);
371     }
372   }
373 
374   /**
375    * Release the lock on the given URI.
376    *
377    * @param uri the URI for which to release the lock
378    * @throws IllegalStateException if the lock was not held
379    */
380   void releaseUriLock(URI uri) throws IllegalStateException {
381     synchronized (uriLocks) {
382       if (!uriLocks.remove(uri))
383         throw new IllegalStateException("Uri lock for <" + uri + "> was not held");
384 
385       uriLocks.notifyAll();
386     }
387   }
388 
389   /**
390    * Prepare the transaction. This acquires the write-lock and hence must always be followed by
391    * {@link #txnComplete} to release it.
392    *
393    * @param numMods the number of modifications made during this transaction; this is used
394    *                to estimate how long the commit might take
395    * @param version the transaction's read-version - used for logging
396    * @return the write version
397    * @throws InterruptedException if interrupted while waiting for the write-lock
398    */
399   synchronized long txnPrepare(int numMods, long version) throws InterruptedException {
400     if (logger.isDebugEnabled())
401       logger.debug("Preparing transaction " + version);
402 
403     acquireWriteLock(version);
404 
405     /* Leave a little space in the version number sequence so other transactions may start while
406      * this one completes. The constant '1/100' is pulled out of thin air, and represents a guess
407      * on the upper bound on how many transactions are likely to be started during the time it
408      * takes this one to complete; if it is too large then we just have larger holes and the
409      * transaction numbers jump more than necessary, which isn't tragic as long as the jumps are
410      * not so large that we run into a real possibility of version number wrap-around; if it is too
411      * small then that just means transactions may be needlessly held up waiting for this one to
412      * complete. Also, we always leave a little extra room to account for the fact that there's a
413      * semi-fixed overhead that a commit will take even if there are only a few changes.
414      */
415     writeVersion = Math.max(nextVersion + numMods / 100, 10);
416 
417     if (logger.isDebugEnabled())
418       logger.debug("Prepared transaction " + version + ", write-version=" + writeVersion);
419 
420     return writeVersion;
421   }
422 
423   /**
424    * Signal that the transaction is complete. This must always be invoked.
425    *
426    * @param committed whether the transaction was committed or rolled back
427    * @param version   the transaction's read-version
428    */
429   synchronized void txnComplete(boolean committed, long version) {
430     if (logger.isDebugEnabled())
431       logger.debug("Transaction " + version + " completed " +
432                    (committed ? "(committed)" : "(rolled back)"));
433 
434     boolean wasActive = activeTxns.remove(version);
435     assert wasActive : "completed unknown transaction " + version +
436                        (committed ? "(committed)" : "(rolled back)");
437 
438     if (writeLockHolder != version)
439       return;           // never prepared (e.g. r/o txn, or rollback)
440 
441     if (committed && writeVersion >= 0)
442       nextVersion = writeVersion + 1;
443     writeVersion = -1;
444 
445     releaseWriteLock(version);
446   }
447 
448   /**
449    * Purge all old versions that are not being used anymore.
450    *
451    * @param lastCompletedVersion the version of the recently completed transaction; if there are
452    *                             other, older transactions still active then the purge can be
453    *                             avoided, i.e. this is just for optimization.
454    */
455   void purgeOldVersions(long lastCompletedVersion) {
456     final long minVers;
457     synchronized (this) {
458       minVers = activeTxns.isEmpty() ? nextVersion : Collections.min(activeTxns);
459       if (minVers < lastCompletedVersion)
460         return;           // we didn't release anything
461 
462       /* Derby has issues trying to run multiple purges in parallel (NPE's, waiting for
463        * locks that should not be held by anybody, and even deadlocks). Also, there isn't
464        * that much point in running multiple purges simultaneously, as the next purge
465        * will clean up stuff too.
466        *
467        * However, just short-circuiting here if a purge is already in progress can cause
468        * the purging to fall seriously behind under load (in a sort of negative feedback
469        * loop: the more it falls behind, the longer it takes to catch up, the more it
470        * falls behind, ...). Hence we keep track of how many times we've skipped the
471        * purge and after some threshhold we start blocking to let the purge catch up.
472        */
473       while (purgeInProgress) {
474         if (numPurgesDelayed < 10) {
475           numPurgesDelayed++;
476           return;
477         }
478 
479         try {
480           wait();
481         } catch (InterruptedException ie) {
482           throw new RuntimeException("Interrupted waiting for purge lock", ie);
483         }
484       }
485 
486       purgeInProgress  = true;
487       numPurgesDelayed = 0;
488     }
489 
490     try {
491       if (singleWriter)
492         acquireWriteLock(lastCompletedVersion);
493 
494       runInCon(new Action<Void>() {
495         public Void run(Connection con) throws SQLException {
496           if (logger.isDebugEnabled())
497             logger.debug("Purging deleted blobs older than revision " + minVers);
498 
499           // clean out stale mapping entries
500           PreparedStatement findOld = con.prepareStatement(
501               "SELECT appId, version FROM " + DEL_TABLE + " WHERE version < ?");
502           findOld.setLong(1, minVers);
503           ResultSet rs = findOld.executeQuery();        // NOPMD
504           int cntM = 0;
505 
506           try {
507             if (!rs.next())
508               return null;
509 
510             PreparedStatement purge = con.prepareStatement(
511                 "SELECT version FROM " + NAME_TABLE + " -- DERBY-PROPERTIES index=NAME_MAP_AIIDX \n" +
512                 " WHERE appId = ? AND (version < ? OR version = ? AND deleted <> 0)",
513                 ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE);
514 
515             do {
516               purge.setString(1, rs.getString(1));
517               purge.setLong(2, rs.getLong(2));
518               purge.setLong(3, rs.getLong(2));
519 
520               ResultSet rs2 = purge.executeQuery();     // NOPMD
521               try {
522                 while (rs2.next()) {
523                   cntM++;
524                   rs2.deleteRow();
525                 }
526               } finally {
527                 rs2.close();
528               }
529             } while (rs.next());
530 
531             purge.close();
532           } finally {
533             try {
534               rs.close();
535             } finally {
536               findOld.close();
537             }
538           }
539 
540           // remove unreferenced blobs
541           findOld = con.prepareStatement(
542               "SELECT storeId FROM " + DEL_TABLE + " WHERE version < ? AND storeId IS NOT NULL");
543           findOld.setLong(1, minVers);
544           rs = findOld.executeQuery();
545           int cntB = 0;
546 
547           try {
548             BlobStoreConnection bsc = wrappedStore.openConnection(null, null);
549             try {
550               while (rs.next()) {
551                 cntB++;
552                 String storeId = rs.getString(1);
553                 if (logger.isTraceEnabled())
554                   logger.trace("Purging deleted blob '" + storeId + "'");
555 
556                 try {
557                   bsc.getBlob(URI.create(storeId), null).delete();
558                 } catch (IOException ioe) {
559                   logger.warn("Error purging blob '" + storeId + "'", ioe);
560                 }
561               }
562             } finally {
563               bsc.close();
564             }
565           } catch (IOException ioe) {
566             logger.warn("Error opening connection to underlying store to purge old versions", ioe);
567           } finally {
568             try {
569               rs.close();
570             } finally {
571               findOld.close();
572             }
573           }
574 
575           // purge processed entries from the delete table
576           String sql = "SELECT version FROM " + DEL_TABLE +
577                        " -- DERBY-PROPERTIES index=DELETED_LIST_VIDX \n WHERE version < ?";
578           PreparedStatement purge =
579               con.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE);
580           purge.setLong(1, minVers);
581           rs = purge.executeQuery();
582           int cntD = 0;
583 
584           try {
585             while (rs.next()) {
586               cntD++;
587               rs.deleteRow();
588             }
589           } finally {
590             try {
591               rs.close();
592             } finally {
593               purge.close();
594             }
595           }
596 
597           // debug log the stats
598           try {
599             int cntL = 0;
600             if (logger.isTraceEnabled()) {
601               BlobStoreConnection bsc = wrappedStore.openConnection(null, null);
602               for (Iterator<URI> iter = bsc.listBlobIds(null); iter.hasNext(); iter.next())
603                 cntL++;
604               bsc.close();
605             }
606 
607             if (logger.isDebugEnabled())
608               logger.debug("purged: " + cntM + " mappings, " + cntB + " blobs, " + cntD +
609                            " deletes" + (logger.isTraceEnabled() ? "; " + cntL + " blobs left" : ""));
610           } catch (Exception e) {
611             e.printStackTrace();
612           }
613 
614           return null;
615         }
616       }, "Error purging old versions");
617     } catch (Exception e) {
618       logger.warn("Error purging old versions", e);
619     } finally {
620       try {
621         if (singleWriter)
622           releaseWriteLock(lastCompletedVersion);
623       } finally {
624         synchronized (this) {
625           purgeInProgress = false;
626           notifyAll();
627         }
628       }
629     }
630   }
631 
632   private <T> T runInCon(Action<T> action, String errMsg) throws IOException {
633     try {
634       XAConnection xaCon;
635       Connection   con;
636       synchronized (dataSource) {
637         xaCon = dataSource.getXAConnection();
638         con   = xaCon.getConnection();
639       }
640 
641       con.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED);
642       con.setAutoCommit(false);
643       boolean committed = false;
644 
645       try {
646         T res = action.run(con);
647         con.commit();
648         committed = true;
649         return res;
650       } finally {
651         if (!committed) {
652           try {
653             con.rollback();
654           } catch (SQLException sqle) {
655             logger.error("Error rolling back after failure", sqle);
656           }
657         }
658         xaCon.close();
659       }
660     } catch (SQLException sqle) {
661       throw new IOException(errMsg, sqle);
662     }
663   }
664 
665   private static interface Action<T> {
666     public T run(Connection con) throws SQLException;
667   }
668 }