1 /* $HeadURL:: $
2 * $Id$
3 *
4 * Copyright (c) 2009-2010 DuraSpace
5 * http://duraspace.org
6 *
7 * In collaboration with Topaz Inc.
8 * http://www.topazproject.org
9 *
10 * Licensed under the Apache License, Version 2.0 (the "License");
11 * you may not use this file except in compliance with the License.
12 * You may obtain a copy of the License at
13 *
14 * http://www.apache.org/licenses/LICENSE-2.0
15 *
16 * Unless required by applicable law or agreed to in writing, software
17 * distributed under the License is distributed on an "AS IS" BASIS,
18 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 * See the License for the specific language governing permissions and
20 * limitations under the License.
21 */
22
23 package org.akubraproject.txn.derby;
24
25 import java.io.IOException;
26 import java.net.URI;
27 import java.sql.Connection;
28 import java.sql.ResultSet;
29 import java.sql.SQLException;
30 import java.sql.Statement;
31 import java.sql.PreparedStatement;
32 import java.util.Collections;
33 import java.util.HashSet;
34 import java.util.Iterator;
35 import java.util.Map;
36 import java.util.Set;
37
38 import javax.sql.XAConnection;
39 import javax.transaction.Transaction;
40
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43 import org.apache.derby.jdbc.EmbeddedXADataSource;
44 import org.apache.derby.tools.sysinfo;
45
46 import org.akubraproject.BlobStore;
47 import org.akubraproject.BlobStoreConnection;
48 import org.akubraproject.txn.AbstractTransactionalStore;
49
50 /**
51 * A simple transactional store using Derby db for the transaction logging and id mappings. It
52 * provides snapshot isolation with fail-fast semantics, meaning it will immediately throw a
53 * {@link org.akubraproject.txn.ConcurrentBlobUpdateException ConcurrentBlobUpdateException}
54 * if a transaction tries to modify (insert, delete, or overwrite) a blob which was modified by
55 * another transaction since the start of the first transaction (even if the change by the other
56 * transaction hasn't been committed yet). The assumption is that rollbacks are rare and that it is
57 * better to be notified of a conflict immediately rather than wasting time uploading large amounts
58 * of data that will just have to be deleted again.
59 *
60 * <p>In general a transaction must be considered failed and should be rolled back after any
61 * exception occurred.
62 *
63 * <p>This store must be configured with exactly one underlying blob-store. It supports arbitrary
64 * application-ids and maps them to the underlying blob-store's id's; it currently requires that
65 * the underlying blob-store be capable of generating ids.
66 *
67 * <p>Snapshot isolation is implemented using a MVCC design as follows. A name-map holds a list of
68 * versioned id mappings which maps application-ids to underlying store-ids; in addition, each
69 * mapping has two flags indicating whether the mapping has been deleted and whether it has been
70 * committed. When a transaction starts it is given a read version number (these increase
71 * monotonically); only committed map entries with a version less than this read version or
72 * uncommitted entries with a version the same as the read version will be read; if there are
73 * multiple such entries for a given app-id, then the one with the highest version is used. If the
74 * transaction makes a change (adding, removing, replacing, etc), a new entry in recorded in the
75 * map with the version set to the read-version and with the committed flag set to false. On commit
76 * the transaction is assigned a write version number (which is higher than any previously issued
77 * read version numbers) and which it then sets on all entries written as part of this transaction;
78 * it also sets the committed flag to true on these entries.
79 *
80 * <p>Old entries (and the underlying blobs) are cleaned out as they become unreferenced, i.e. when
81 * no active transaction could refer to them anymore. In order to speed up the discovery of such
82 * entries, a separate deleted-list is kept into which an entry is made each time an entry in the
83 * main map is marked as deleted and each time a blob is marked as deleted. This list is processed
84 * at the end of every transaction and upon startup (on startup the list is completely cleared as
85 * there are no active transactions).
86 *
87 * <p><em>A note on locking</em>: Derby, even in read-uncommitted mode, likes to acquire exclusive
88 * locks on rows when doing inserts, deletes, and updates. This would be ok, except that it
89 * sometimes attempts to lock rows it won't change. This can lead to deadlocks. The way around this
90 * that I've found is to ensure Derby always uses an index when searching for the rows to update or
91 * delete. This is accomplished by giving the optimizer explicit instructions via the
92 * <var>DERBY-PROPERTIES</var> directive in the queries. Since this directive is only supported in
93 * select statements, all updates and deletes are done via updatable queries (result-sets). This
94 * actually performs about the same as a direct update or delete statement. See also the thread <a
95 * href="http://mail-archives.apache.org/mod_mbox/db-derby-user/200903.mbox/%3c20090330092451.GD26813@innovation.ch%3e">disabling locking</a> (<a
96 * href="http://mail-archives.apache.org/mod_mbox/db-derby-user/200904.mbox/%3c20090401001750.GB5281@innovation.ch%3e">continued</a>),
97 * or at <a href="http://news.gmane.org/find-root.php?message_id=%3c20090330092451.GD26813%40innovation.ch%3e">gmane</a>.
98 * Unfortunately, however, this does not seem to be sufficient: Derby may still lock other rows, as
99 * documented in <a
100 * href="http://db.apache.org/derby/docs/10.4/devguide/rdevconcepts8424.html">Scope of locks</a>
101 * in Derbys's developer guide. When this happens, the wait for the lock will eventually time out
102 * and an exception will be thrown. However, I have not enountered this issue so far. But a related
103 * issue is present in 10.4 and earlier, namely <a
104 * href="https://issues.apache.org/jira/browse/DERBY-2991">DERBY-2991</a>; testing with 10.5
105 * indicates this issue has been resolved. For these reasons a flag is provided to restrict the
106 * number of concurrent write-transactions to one, and the
107 * {@link #TransactionalStore(URI, BlobStore, String) three-argument-constructor} will set this
108 * single-writer flag to true for derby 10.4 and earlier.
109 *
110 * @author Ronald Tschalär
111 */
112 public class TransactionalStore extends AbstractTransactionalStore {
113 /** The SQL table used by this store to hold the name mappings */
114 public static final String NAME_TABLE = "NAME_MAP";
115 /** The SQL table used by this store to hold the list of deleted blobs */
116 public static final String DEL_TABLE = "DELETED_LIST";
117
118 private static final Logger logger = LoggerFactory.getLogger(TransactionalStore.class);
119
120 private final EmbeddedXADataSource dataSource;
121 private final Set<Long> activeTxns = new HashSet<Long>();
122 private final Set<URI> uriLocks = new HashSet<URI>();
123 private final boolean singleWriter;
124 private long nextVersion;
125 private long writeVersion = -1;
126 private long writeLockHolder = -1;
127 private boolean purgeInProgress = false;
128 private int numPurgesDelayed = 0;
129 private boolean started = false;
130
131 /**
132 * Create a new transactional store. The single-writer flag will be determined automatically
133 * depending on the version of derby being used.
134 *
135 * @param id the id of this store
136 * @param wrappedStore the wrapped non-transactional store
137 * @param dbDir the directory to use to store the transaction information
138 * @throws IOException if there was an error initializing the db
139 */
140 public TransactionalStore(URI id, BlobStore wrappedStore, String dbDir) throws IOException {
141 this(id, wrappedStore, dbDir, needSingleWriter());
142 }
143
144 private static boolean needSingleWriter() {
145 return sysinfo.getMajorVersion() < 10 ||
146 sysinfo.getMajorVersion() == 10 && sysinfo.getMinorVersion() < 5;
147 }
148
149 /**
150 * Create a new transactional store.
151 *
152 * @param id the id of this store
153 * @param wrappedStore the wrapped non-transactional store
154 * @param dbDir the directory to use to store the transaction information
155 * @param singleWriter if true, serialize all writers to avoid all locking issues with
156 * Derby; if false, some transactions may fail sometimes due to
157 * locks timing out
158 * @throws IOException if there was an error initializing the db
159 */
160 public TransactionalStore(URI id, BlobStore wrappedStore,
161 String dbDir, boolean singleWriter) throws IOException {
162 super(id, wrappedStore);
163 this.singleWriter = singleWriter;
164
165 //TODO: redirect logging to logger
166 //System.setProperty("derby.stream.error.logSeverityLevel", "50000");
167 //System.setProperty("derby.stream.error.file", new File(base, "derby.log").toString());
168 //System.setProperty("derby.language.logStatementText", "true");
169 //System.setProperty("derby.stream.error.method", "java.sql.DriverManager.getLogStream");
170
171 dataSource = new EmbeddedXADataSource();
172 dataSource.setDatabaseName(dbDir);
173 dataSource.setCreateDatabase("create");
174
175 Runtime.getRuntime().addShutdownHook(new Thread() {
176 @Override
177 public void run() {
178 try {
179 dataSource.setShutdownDatabase("shutdown");
180 dataSource.getXAConnection().getConnection();
181 } catch (Exception e) {
182 logger.warn("Error shutting down derby", e);
183 }
184 }
185 });
186
187 createTables();
188 nextVersion = findYoungestVersion() + 1;
189 logger.info("TransactionalStore started: dbDir='" + dbDir + "', version=" + nextVersion);
190 }
191
192 private void createTables() throws IOException {
193 runInCon(new Action<Void>() {
194 public Void run(Connection con) throws SQLException {
195 // test if table exists
196 ResultSet rs = con.getMetaData().getTables(null, null, NAME_TABLE, null);
197 try {
198 if (rs.next())
199 return null;
200 } finally {
201 rs.close();
202 }
203
204 // nope, so create it
205 logger.info("Creating tables and indexes for name-map");
206
207 Statement stmt = con.createStatement();
208 try {
209 stmt.execute("CREATE TABLE " + NAME_TABLE +
210 " (appId VARCHAR(1000) NOT NULL, storeId VARCHAR(1000) NOT NULL, " +
211 " version BIGINT NOT NULL, deleted SMALLINT, committed SMALLINT)");
212 stmt.execute("CREATE INDEX " + NAME_TABLE + "_AIIDX ON " + NAME_TABLE + "(appId)");
213 stmt.execute("CREATE INDEX " + NAME_TABLE + "_VIDX ON " + NAME_TABLE + "(version)");
214
215 stmt.execute("CREATE TABLE " + DEL_TABLE + " (appId VARCHAR(1000) NOT NULL, " +
216 " storeId VARCHAR(1000), version BIGINT NOT NULL)");
217 stmt.execute("CREATE INDEX " + DEL_TABLE + "_VIDX ON " + DEL_TABLE + "(version)");
218
219 // ensure Derby never uses table-locks, only row-locks
220 stmt.execute(
221 "CALL SYSCS_UTIL.SYSCS_SET_DATABASE_PROPERTY('derby.locks.escalationThreshold', '" +
222 Integer.MAX_VALUE + "')");
223
224 // we should really never be waiting for a lock let alone deadlock, but just in case
225 stmt.execute(
226 "CALL SYSCS_UTIL.SYSCS_SET_DATABASE_PROPERTY('derby.locks.deadlockTimeout', '30')");
227 } finally {
228 stmt.close();
229 }
230
231 return null;
232 }
233 }, "Failed to create tables");
234 }
235
236 private long findYoungestVersion() throws IOException {
237 return runInCon(new Action<Long>() {
238 public Long run(Connection con) throws SQLException {
239 Statement stmt = con.createStatement();
240 try {
241 stmt.setMaxRows(1);
242 ResultSet rs = // NOPMD
243 stmt.executeQuery("SELECT version FROM " + NAME_TABLE + " ORDER BY version DESC");
244 return rs.next() ? rs.getLong(1) : -1L;
245 } finally {
246 stmt.close();
247 }
248 }
249 }, "Failed to find youngest version");
250 }
251
252 /**
253 * @throws IllegalStateException if no backing store has been set yet
254 */
255 @Override
256 public BlobStoreConnection openConnection(Transaction tx, Map<String, String> hints)
257 throws IllegalStateException, IOException {
258 long version;
259 synchronized (this) {
260
261 if (!started) {
262 started = true;
263 purgeOldVersions(0);
264 }
265
266 while (writeVersion >= 0 && nextVersion == writeVersion) {
267 if (logger.isDebugEnabled())
268 logger.debug("Out of available versions - waiting for write-lock to be released");
269
270 try {
271 wait();
272 } catch (InterruptedException ie) {
273 throw new IOException("wait for write-lock interrupted", ie);
274 }
275 }
276
277 version = nextVersion++;
278
279 boolean isNew = activeTxns.add(version);
280 assert isNew : "duplicate version " + version;
281 }
282
283 boolean ok = false;
284 try {
285 XAConnection xaCon;
286 Connection con;
287 synchronized (dataSource) {
288 xaCon = dataSource.getXAConnection();
289 con = xaCon.getConnection();
290 }
291
292 con.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED);
293
294 tx.enlistResource(xaCon.getXAResource());
295
296 BlobStoreConnection bsc =
297 new TransactionalConnection(this, wrappedStore, xaCon, con, tx, hints, version);
298
299 if (logger.isDebugEnabled())
300 logger.debug("Opened connection, read-version=" + version);
301
302 ok = true;
303 return bsc;
304 } catch (IOException ioe) {
305 throw ioe;
306 } catch (Exception e) {
307 throw new IOException("Error connecting to db", e);
308 } finally {
309 if (!ok) {
310 synchronized (this) {
311 activeTxns.remove(version);
312 }
313 }
314 }
315 }
316
317 boolean singleWriter() {
318 return singleWriter;
319 }
320
321 /**
322 * Acquire the write lock. This is a simple, re-entrant lock without a lock count. If the lock
323 * is already held this will block until it is free.
324 *
325 * @param version the version acquiring the lock
326 * @throws InterruptedException if waiting for the lock was interrupted
327 */
328 synchronized void acquireWriteLock(long version) throws InterruptedException {
329 while (writeLockHolder >= 0 && writeLockHolder != version)
330 wait();
331
332 if (logger.isTraceEnabled())
333 logger.trace("Transaction " + version + " acquired write lock");
334
335 writeLockHolder = version;
336 }
337
338 /**
339 * Release the write lock. This always completely releases lock no matter how often {@link
340 * #acquireWriteLock} was invoked.
341 *
342 * @param version the version that acquired the lock
343 * @throws IllegalStateException if the lock is not held by <var>version</var>
344 */
345 synchronized void releaseWriteLock(long version) {
346 if (writeLockHolder != version)
347 throw new IllegalStateException("Connection '" + version + "' is not the holder of the " +
348 "write lock; '" + writeLockHolder + "' is");
349
350 if (logger.isTraceEnabled())
351 logger.trace("Transaction " + version + " released write lock");
352
353 writeLockHolder = -1;
354 notifyAll();
355 }
356
357 /**
358 * Acquire a lock on the given URI. Each lock for each URI is a simple, non-reentrant lock and
359 * each lock for each URI is independent of the others. If the lock is already held this will
360 * block until it is free.
361 *
362 * @param uri the URI for which to acquire the lock
363 * @throws InterruptedException if waiting for the lock was interrupted
364 */
365 void acquireUriLock(URI uri) throws InterruptedException {
366 synchronized (uriLocks) {
367 while (uriLocks.contains(uri))
368 uriLocks.wait();
369
370 uriLocks.add(uri);
371 }
372 }
373
374 /**
375 * Release the lock on the given URI.
376 *
377 * @param uri the URI for which to release the lock
378 * @throws IllegalStateException if the lock was not held
379 */
380 void releaseUriLock(URI uri) throws IllegalStateException {
381 synchronized (uriLocks) {
382 if (!uriLocks.remove(uri))
383 throw new IllegalStateException("Uri lock for <" + uri + "> was not held");
384
385 uriLocks.notifyAll();
386 }
387 }
388
389 /**
390 * Prepare the transaction. This acquires the write-lock and hence must always be followed by
391 * {@link #txnComplete} to release it.
392 *
393 * @param numMods the number of modifications made during this transaction; this is used
394 * to estimate how long the commit might take
395 * @param version the transaction's read-version - used for logging
396 * @return the write version
397 * @throws InterruptedException if interrupted while waiting for the write-lock
398 */
399 synchronized long txnPrepare(int numMods, long version) throws InterruptedException {
400 if (logger.isDebugEnabled())
401 logger.debug("Preparing transaction " + version);
402
403 acquireWriteLock(version);
404
405 /* Leave a little space in the version number sequence so other transactions may start while
406 * this one completes. The constant '1/100' is pulled out of thin air, and represents a guess
407 * on the upper bound on how many transactions are likely to be started during the time it
408 * takes this one to complete; if it is too large then we just have larger holes and the
409 * transaction numbers jump more than necessary, which isn't tragic as long as the jumps are
410 * not so large that we run into a real possibility of version number wrap-around; if it is too
411 * small then that just means transactions may be needlessly held up waiting for this one to
412 * complete. Also, we always leave a little extra room to account for the fact that there's a
413 * semi-fixed overhead that a commit will take even if there are only a few changes.
414 */
415 writeVersion = Math.max(nextVersion + numMods / 100, 10);
416
417 if (logger.isDebugEnabled())
418 logger.debug("Prepared transaction " + version + ", write-version=" + writeVersion);
419
420 return writeVersion;
421 }
422
423 /**
424 * Signal that the transaction is complete. This must always be invoked.
425 *
426 * @param committed whether the transaction was committed or rolled back
427 * @param version the transaction's read-version
428 */
429 synchronized void txnComplete(boolean committed, long version) {
430 if (logger.isDebugEnabled())
431 logger.debug("Transaction " + version + " completed " +
432 (committed ? "(committed)" : "(rolled back)"));
433
434 boolean wasActive = activeTxns.remove(version);
435 assert wasActive : "completed unknown transaction " + version +
436 (committed ? "(committed)" : "(rolled back)");
437
438 if (writeLockHolder != version)
439 return; // never prepared (e.g. r/o txn, or rollback)
440
441 if (committed && writeVersion >= 0)
442 nextVersion = writeVersion + 1;
443 writeVersion = -1;
444
445 releaseWriteLock(version);
446 }
447
448 /**
449 * Purge all old versions that are not being used anymore.
450 *
451 * @param lastCompletedVersion the version of the recently completed transaction; if there are
452 * other, older transactions still active then the purge can be
453 * avoided, i.e. this is just for optimization.
454 */
455 void purgeOldVersions(long lastCompletedVersion) {
456 final long minVers;
457 synchronized (this) {
458 minVers = activeTxns.isEmpty() ? nextVersion : Collections.min(activeTxns);
459 if (minVers < lastCompletedVersion)
460 return; // we didn't release anything
461
462 /* Derby has issues trying to run multiple purges in parallel (NPE's, waiting for
463 * locks that should not be held by anybody, and even deadlocks). Also, there isn't
464 * that much point in running multiple purges simultaneously, as the next purge
465 * will clean up stuff too.
466 *
467 * However, just short-circuiting here if a purge is already in progress can cause
468 * the purging to fall seriously behind under load (in a sort of negative feedback
469 * loop: the more it falls behind, the longer it takes to catch up, the more it
470 * falls behind, ...). Hence we keep track of how many times we've skipped the
471 * purge and after some threshhold we start blocking to let the purge catch up.
472 */
473 while (purgeInProgress) {
474 if (numPurgesDelayed < 10) {
475 numPurgesDelayed++;
476 return;
477 }
478
479 try {
480 wait();
481 } catch (InterruptedException ie) {
482 throw new RuntimeException("Interrupted waiting for purge lock", ie);
483 }
484 }
485
486 purgeInProgress = true;
487 numPurgesDelayed = 0;
488 }
489
490 try {
491 if (singleWriter)
492 acquireWriteLock(lastCompletedVersion);
493
494 runInCon(new Action<Void>() {
495 public Void run(Connection con) throws SQLException {
496 if (logger.isDebugEnabled())
497 logger.debug("Purging deleted blobs older than revision " + minVers);
498
499 // clean out stale mapping entries
500 PreparedStatement findOld = con.prepareStatement(
501 "SELECT appId, version FROM " + DEL_TABLE + " WHERE version < ?");
502 findOld.setLong(1, minVers);
503 ResultSet rs = findOld.executeQuery(); // NOPMD
504 int cntM = 0;
505
506 try {
507 if (!rs.next())
508 return null;
509
510 PreparedStatement purge = con.prepareStatement(
511 "SELECT version FROM " + NAME_TABLE + " -- DERBY-PROPERTIES index=NAME_MAP_AIIDX \n" +
512 " WHERE appId = ? AND (version < ? OR version = ? AND deleted <> 0)",
513 ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE);
514
515 do {
516 purge.setString(1, rs.getString(1));
517 purge.setLong(2, rs.getLong(2));
518 purge.setLong(3, rs.getLong(2));
519
520 ResultSet rs2 = purge.executeQuery(); // NOPMD
521 try {
522 while (rs2.next()) {
523 cntM++;
524 rs2.deleteRow();
525 }
526 } finally {
527 rs2.close();
528 }
529 } while (rs.next());
530
531 purge.close();
532 } finally {
533 try {
534 rs.close();
535 } finally {
536 findOld.close();
537 }
538 }
539
540 // remove unreferenced blobs
541 findOld = con.prepareStatement(
542 "SELECT storeId FROM " + DEL_TABLE + " WHERE version < ? AND storeId IS NOT NULL");
543 findOld.setLong(1, minVers);
544 rs = findOld.executeQuery();
545 int cntB = 0;
546
547 try {
548 BlobStoreConnection bsc = wrappedStore.openConnection(null, null);
549 try {
550 while (rs.next()) {
551 cntB++;
552 String storeId = rs.getString(1);
553 if (logger.isTraceEnabled())
554 logger.trace("Purging deleted blob '" + storeId + "'");
555
556 try {
557 bsc.getBlob(URI.create(storeId), null).delete();
558 } catch (IOException ioe) {
559 logger.warn("Error purging blob '" + storeId + "'", ioe);
560 }
561 }
562 } finally {
563 bsc.close();
564 }
565 } catch (IOException ioe) {
566 logger.warn("Error opening connection to underlying store to purge old versions", ioe);
567 } finally {
568 try {
569 rs.close();
570 } finally {
571 findOld.close();
572 }
573 }
574
575 // purge processed entries from the delete table
576 String sql = "SELECT version FROM " + DEL_TABLE +
577 " -- DERBY-PROPERTIES index=DELETED_LIST_VIDX \n WHERE version < ?";
578 PreparedStatement purge =
579 con.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE);
580 purge.setLong(1, minVers);
581 rs = purge.executeQuery();
582 int cntD = 0;
583
584 try {
585 while (rs.next()) {
586 cntD++;
587 rs.deleteRow();
588 }
589 } finally {
590 try {
591 rs.close();
592 } finally {
593 purge.close();
594 }
595 }
596
597 // debug log the stats
598 try {
599 int cntL = 0;
600 if (logger.isTraceEnabled()) {
601 BlobStoreConnection bsc = wrappedStore.openConnection(null, null);
602 for (Iterator<URI> iter = bsc.listBlobIds(null); iter.hasNext(); iter.next())
603 cntL++;
604 bsc.close();
605 }
606
607 if (logger.isDebugEnabled())
608 logger.debug("purged: " + cntM + " mappings, " + cntB + " blobs, " + cntD +
609 " deletes" + (logger.isTraceEnabled() ? "; " + cntL + " blobs left" : ""));
610 } catch (Exception e) {
611 e.printStackTrace();
612 }
613
614 return null;
615 }
616 }, "Error purging old versions");
617 } catch (Exception e) {
618 logger.warn("Error purging old versions", e);
619 } finally {
620 try {
621 if (singleWriter)
622 releaseWriteLock(lastCompletedVersion);
623 } finally {
624 synchronized (this) {
625 purgeInProgress = false;
626 notifyAll();
627 }
628 }
629 }
630 }
631
632 private <T> T runInCon(Action<T> action, String errMsg) throws IOException {
633 try {
634 XAConnection xaCon;
635 Connection con;
636 synchronized (dataSource) {
637 xaCon = dataSource.getXAConnection();
638 con = xaCon.getConnection();
639 }
640
641 con.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED);
642 con.setAutoCommit(false);
643 boolean committed = false;
644
645 try {
646 T res = action.run(con);
647 con.commit();
648 committed = true;
649 return res;
650 } finally {
651 if (!committed) {
652 try {
653 con.rollback();
654 } catch (SQLException sqle) {
655 logger.error("Error rolling back after failure", sqle);
656 }
657 }
658 xaCon.close();
659 }
660 } catch (SQLException sqle) {
661 throw new IOException(errMsg, sqle);
662 }
663 }
664
665 private static interface Action<T> {
666 public T run(Connection con) throws SQLException;
667 }
668 }