1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 package org.akubraproject.txn.derby;
24
25 import java.io.IOException;
26 import java.net.URI;
27 import java.sql.Connection;
28 import java.sql.ResultSet;
29 import java.sql.SQLException;
30 import java.sql.Statement;
31 import java.sql.PreparedStatement;
32 import java.util.Collections;
33 import java.util.HashSet;
34 import java.util.Iterator;
35 import java.util.Map;
36 import java.util.Set;
37
38 import javax.sql.XAConnection;
39 import javax.transaction.Transaction;
40
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43 import org.apache.derby.jdbc.EmbeddedXADataSource;
44 import org.apache.derby.tools.sysinfo;
45
46 import org.akubraproject.BlobStore;
47 import org.akubraproject.BlobStoreConnection;
48 import org.akubraproject.txn.AbstractTransactionalStore;
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112 public class TransactionalStore extends AbstractTransactionalStore {
113
114 public static final String NAME_TABLE = "NAME_MAP";
115
116 public static final String DEL_TABLE = "DELETED_LIST";
117
118 private static final Logger logger = LoggerFactory.getLogger(TransactionalStore.class);
119
120 private final EmbeddedXADataSource dataSource;
121 private final Set<Long> activeTxns = new HashSet<Long>();
122 private final Set<URI> uriLocks = new HashSet<URI>();
123 private final boolean singleWriter;
124 private long nextVersion;
125 private long writeVersion = -1;
126 private long writeLockHolder = -1;
127 private boolean purgeInProgress = false;
128 private int numPurgesDelayed = 0;
129 private boolean started = false;
130
131
132
133
134
135
136
137
138
139
140 public TransactionalStore(URI id, BlobStore wrappedStore, String dbDir) throws IOException {
141 this(id, wrappedStore, dbDir, needSingleWriter());
142 }
143
144 private static boolean needSingleWriter() {
145 return sysinfo.getMajorVersion() < 10 ||
146 sysinfo.getMajorVersion() == 10 && sysinfo.getMinorVersion() < 5;
147 }
148
149
150
151
152
153
154
155
156
157
158
159
160 public TransactionalStore(URI id, BlobStore wrappedStore,
161 String dbDir, boolean singleWriter) throws IOException {
162 super(id, wrappedStore);
163 this.singleWriter = singleWriter;
164
165
166
167
168
169
170
171 dataSource = new EmbeddedXADataSource();
172 dataSource.setDatabaseName(dbDir);
173 dataSource.setCreateDatabase("create");
174
175 Runtime.getRuntime().addShutdownHook(new Thread() {
176 @Override
177 public void run() {
178 try {
179 dataSource.setShutdownDatabase("shutdown");
180 dataSource.getXAConnection().getConnection();
181 } catch (Exception e) {
182 logger.warn("Error shutting down derby", e);
183 }
184 }
185 });
186
187 createTables();
188 nextVersion = findYoungestVersion() + 1;
189 logger.info("TransactionalStore started: dbDir='" + dbDir + "', version=" + nextVersion);
190 }
191
192 private void createTables() throws IOException {
193 runInCon(new Action<Void>() {
194 public Void run(Connection con) throws SQLException {
195
196 ResultSet rs = con.getMetaData().getTables(null, null, NAME_TABLE, null);
197 try {
198 if (rs.next())
199 return null;
200 } finally {
201 rs.close();
202 }
203
204
205 logger.info("Creating tables and indexes for name-map");
206
207 Statement stmt = con.createStatement();
208 try {
209 stmt.execute("CREATE TABLE " + NAME_TABLE +
210 " (appId VARCHAR(1000) NOT NULL, storeId VARCHAR(1000) NOT NULL, " +
211 " version BIGINT NOT NULL, deleted SMALLINT, committed SMALLINT)");
212 stmt.execute("CREATE INDEX " + NAME_TABLE + "_AIIDX ON " + NAME_TABLE + "(appId)");
213 stmt.execute("CREATE INDEX " + NAME_TABLE + "_VIDX ON " + NAME_TABLE + "(version)");
214
215 stmt.execute("CREATE TABLE " + DEL_TABLE + " (appId VARCHAR(1000) NOT NULL, " +
216 " storeId VARCHAR(1000), version BIGINT NOT NULL)");
217 stmt.execute("CREATE INDEX " + DEL_TABLE + "_VIDX ON " + DEL_TABLE + "(version)");
218
219
220 stmt.execute(
221 "CALL SYSCS_UTIL.SYSCS_SET_DATABASE_PROPERTY('derby.locks.escalationThreshold', '" +
222 Integer.MAX_VALUE + "')");
223
224
225 stmt.execute(
226 "CALL SYSCS_UTIL.SYSCS_SET_DATABASE_PROPERTY('derby.locks.deadlockTimeout', '30')");
227 } finally {
228 stmt.close();
229 }
230
231 return null;
232 }
233 }, "Failed to create tables");
234 }
235
236 private long findYoungestVersion() throws IOException {
237 return runInCon(new Action<Long>() {
238 public Long run(Connection con) throws SQLException {
239 Statement stmt = con.createStatement();
240 try {
241 stmt.setMaxRows(1);
242 ResultSet rs =
243 stmt.executeQuery("SELECT version FROM " + NAME_TABLE + " ORDER BY version DESC");
244 return rs.next() ? rs.getLong(1) : -1L;
245 } finally {
246 stmt.close();
247 }
248 }
249 }, "Failed to find youngest version");
250 }
251
252
253
254
255 @Override
256 public BlobStoreConnection openConnection(Transaction tx, Map<String, String> hints)
257 throws IllegalStateException, IOException {
258 long version;
259 synchronized (this) {
260
261 if (!started) {
262 started = true;
263 purgeOldVersions(0);
264 }
265
266 while (writeVersion >= 0 && nextVersion == writeVersion) {
267 if (logger.isDebugEnabled())
268 logger.debug("Out of available versions - waiting for write-lock to be released");
269
270 try {
271 wait();
272 } catch (InterruptedException ie) {
273 throw new IOException("wait for write-lock interrupted", ie);
274 }
275 }
276
277 version = nextVersion++;
278
279 boolean isNew = activeTxns.add(version);
280 assert isNew : "duplicate version " + version;
281 }
282
283 boolean ok = false;
284 try {
285 XAConnection xaCon;
286 Connection con;
287 synchronized (dataSource) {
288 xaCon = dataSource.getXAConnection();
289 con = xaCon.getConnection();
290 }
291
292 con.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED);
293
294 tx.enlistResource(xaCon.getXAResource());
295
296 BlobStoreConnection bsc =
297 new TransactionalConnection(this, wrappedStore, xaCon, con, tx, hints, version);
298
299 if (logger.isDebugEnabled())
300 logger.debug("Opened connection, read-version=" + version);
301
302 ok = true;
303 return bsc;
304 } catch (IOException ioe) {
305 throw ioe;
306 } catch (Exception e) {
307 throw new IOException("Error connecting to db", e);
308 } finally {
309 if (!ok) {
310 synchronized (this) {
311 activeTxns.remove(version);
312 }
313 }
314 }
315 }
316
317 boolean singleWriter() {
318 return singleWriter;
319 }
320
321
322
323
324
325
326
327
328 synchronized void acquireWriteLock(long version) throws InterruptedException {
329 while (writeLockHolder >= 0 && writeLockHolder != version)
330 wait();
331
332 if (logger.isTraceEnabled())
333 logger.trace("Transaction " + version + " acquired write lock");
334
335 writeLockHolder = version;
336 }
337
338
339
340
341
342
343
344
345 synchronized void releaseWriteLock(long version) {
346 if (writeLockHolder != version)
347 throw new IllegalStateException("Connection '" + version + "' is not the holder of the " +
348 "write lock; '" + writeLockHolder + "' is");
349
350 if (logger.isTraceEnabled())
351 logger.trace("Transaction " + version + " released write lock");
352
353 writeLockHolder = -1;
354 notifyAll();
355 }
356
357
358
359
360
361
362
363
364
365 void acquireUriLock(URI uri) throws InterruptedException {
366 synchronized (uriLocks) {
367 while (uriLocks.contains(uri))
368 uriLocks.wait();
369
370 uriLocks.add(uri);
371 }
372 }
373
374
375
376
377
378
379
380 void releaseUriLock(URI uri) throws IllegalStateException {
381 synchronized (uriLocks) {
382 if (!uriLocks.remove(uri))
383 throw new IllegalStateException("Uri lock for <" + uri + "> was not held");
384
385 uriLocks.notifyAll();
386 }
387 }
388
389
390
391
392
393
394
395
396
397
398
399 synchronized long txnPrepare(int numMods, long version) throws InterruptedException {
400 if (logger.isDebugEnabled())
401 logger.debug("Preparing transaction " + version);
402
403 acquireWriteLock(version);
404
405
406
407
408
409
410
411
412
413
414
415 writeVersion = Math.max(nextVersion + numMods / 100, 10);
416
417 if (logger.isDebugEnabled())
418 logger.debug("Prepared transaction " + version + ", write-version=" + writeVersion);
419
420 return writeVersion;
421 }
422
423
424
425
426
427
428
429 synchronized void txnComplete(boolean committed, long version) {
430 if (logger.isDebugEnabled())
431 logger.debug("Transaction " + version + " completed " +
432 (committed ? "(committed)" : "(rolled back)"));
433
434 boolean wasActive = activeTxns.remove(version);
435 assert wasActive : "completed unknown transaction " + version +
436 (committed ? "(committed)" : "(rolled back)");
437
438 if (writeLockHolder != version)
439 return;
440
441 if (committed && writeVersion >= 0)
442 nextVersion = writeVersion + 1;
443 writeVersion = -1;
444
445 releaseWriteLock(version);
446 }
447
448
449
450
451
452
453
454
455 void purgeOldVersions(long lastCompletedVersion) {
456 final long minVers;
457 synchronized (this) {
458 minVers = activeTxns.isEmpty() ? nextVersion : Collections.min(activeTxns);
459 if (minVers < lastCompletedVersion)
460 return;
461
462
463
464
465
466
467
468
469
470
471
472
473 while (purgeInProgress) {
474 if (numPurgesDelayed < 10) {
475 numPurgesDelayed++;
476 return;
477 }
478
479 try {
480 wait();
481 } catch (InterruptedException ie) {
482 throw new RuntimeException("Interrupted waiting for purge lock", ie);
483 }
484 }
485
486 purgeInProgress = true;
487 numPurgesDelayed = 0;
488 }
489
490 try {
491 if (singleWriter)
492 acquireWriteLock(lastCompletedVersion);
493
494 runInCon(new Action<Void>() {
495 public Void run(Connection con) throws SQLException {
496 if (logger.isDebugEnabled())
497 logger.debug("Purging deleted blobs older than revision " + minVers);
498
499
500 PreparedStatement findOld = con.prepareStatement(
501 "SELECT appId, version FROM " + DEL_TABLE + " WHERE version < ?");
502 findOld.setLong(1, minVers);
503 ResultSet rs = findOld.executeQuery();
504 int cntM = 0;
505
506 try {
507 if (!rs.next())
508 return null;
509
510 PreparedStatement purge = con.prepareStatement(
511 "SELECT version FROM " + NAME_TABLE + " -- DERBY-PROPERTIES index=NAME_MAP_AIIDX \n" +
512 " WHERE appId = ? AND (version < ? OR version = ? AND deleted <> 0)",
513 ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE);
514
515 do {
516 purge.setString(1, rs.getString(1));
517 purge.setLong(2, rs.getLong(2));
518 purge.setLong(3, rs.getLong(2));
519
520 ResultSet rs2 = purge.executeQuery();
521 try {
522 while (rs2.next()) {
523 cntM++;
524 rs2.deleteRow();
525 }
526 } finally {
527 rs2.close();
528 }
529 } while (rs.next());
530
531 purge.close();
532 } finally {
533 try {
534 rs.close();
535 } finally {
536 findOld.close();
537 }
538 }
539
540
541 findOld = con.prepareStatement(
542 "SELECT storeId FROM " + DEL_TABLE + " WHERE version < ? AND storeId IS NOT NULL");
543 findOld.setLong(1, minVers);
544 rs = findOld.executeQuery();
545 int cntB = 0;
546
547 try {
548 BlobStoreConnection bsc = wrappedStore.openConnection(null, null);
549 try {
550 while (rs.next()) {
551 cntB++;
552 String storeId = rs.getString(1);
553 if (logger.isTraceEnabled())
554 logger.trace("Purging deleted blob '" + storeId + "'");
555
556 try {
557 bsc.getBlob(URI.create(storeId), null).delete();
558 } catch (IOException ioe) {
559 logger.warn("Error purging blob '" + storeId + "'", ioe);
560 }
561 }
562 } finally {
563 bsc.close();
564 }
565 } catch (IOException ioe) {
566 logger.warn("Error opening connection to underlying store to purge old versions", ioe);
567 } finally {
568 try {
569 rs.close();
570 } finally {
571 findOld.close();
572 }
573 }
574
575
576 String sql = "SELECT version FROM " + DEL_TABLE +
577 " -- DERBY-PROPERTIES index=DELETED_LIST_VIDX \n WHERE version < ?";
578 PreparedStatement purge =
579 con.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE);
580 purge.setLong(1, minVers);
581 rs = purge.executeQuery();
582 int cntD = 0;
583
584 try {
585 while (rs.next()) {
586 cntD++;
587 rs.deleteRow();
588 }
589 } finally {
590 try {
591 rs.close();
592 } finally {
593 purge.close();
594 }
595 }
596
597
598 try {
599 int cntL = 0;
600 if (logger.isTraceEnabled()) {
601 BlobStoreConnection bsc = wrappedStore.openConnection(null, null);
602 for (Iterator<URI> iter = bsc.listBlobIds(null); iter.hasNext(); iter.next())
603 cntL++;
604 bsc.close();
605 }
606
607 if (logger.isDebugEnabled())
608 logger.debug("purged: " + cntM + " mappings, " + cntB + " blobs, " + cntD +
609 " deletes" + (logger.isTraceEnabled() ? "; " + cntL + " blobs left" : ""));
610 } catch (Exception e) {
611 e.printStackTrace();
612 }
613
614 return null;
615 }
616 }, "Error purging old versions");
617 } catch (Exception e) {
618 logger.warn("Error purging old versions", e);
619 } finally {
620 try {
621 if (singleWriter)
622 releaseWriteLock(lastCompletedVersion);
623 } finally {
624 synchronized (this) {
625 purgeInProgress = false;
626 notifyAll();
627 }
628 }
629 }
630 }
631
632 private <T> T runInCon(Action<T> action, String errMsg) throws IOException {
633 try {
634 XAConnection xaCon;
635 Connection con;
636 synchronized (dataSource) {
637 xaCon = dataSource.getXAConnection();
638 con = xaCon.getConnection();
639 }
640
641 con.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED);
642 con.setAutoCommit(false);
643 boolean committed = false;
644
645 try {
646 T res = action.run(con);
647 con.commit();
648 committed = true;
649 return res;
650 } finally {
651 if (!committed) {
652 try {
653 con.rollback();
654 } catch (SQLException sqle) {
655 logger.error("Error rolling back after failure", sqle);
656 }
657 }
658 xaCon.close();
659 }
660 } catch (SQLException sqle) {
661 throw new IOException(errMsg, sqle);
662 }
663 }
664
665 private static interface Action<T> {
666 public T run(Connection con) throws SQLException;
667 }
668 }