1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 package org.akubraproject.txn.derby;
24
25 import java.io.IOException;
26 import java.net.URI;
27 import java.sql.Connection;
28 import java.sql.PreparedStatement;
29 import java.sql.ResultSet;
30 import java.sql.SQLException;
31 import java.sql.Statement;
32 import java.util.Iterator;
33 import java.util.Map;
34
35 import javax.sql.XAConnection;
36 import javax.transaction.Status;
37 import javax.transaction.Transaction;
38
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41 import org.apache.derby.iapi.services.monitor.Monitor;
42
43 import org.akubraproject.BlobStore;
44 import org.akubraproject.UnsupportedIdException;
45 import org.akubraproject.txn.ConcurrentBlobUpdateException;
46 import org.akubraproject.txn.SQLTransactionalConnection;
47
48
49
50
51
52
53 public class TransactionalConnection extends SQLTransactionalConnection {
54 private static final Logger logger = LoggerFactory.getLogger(TransactionalConnection.class);
55
56 private final long version;
57 private final PreparedStatement nam_get;
58 private final PreparedStatement nam_ins;
59 private final PreparedStatement nam_upd;
60 private final PreparedStatement del_ins;
61 private final PreparedStatement del_upd;
62 private final PreparedStatement nam_cfl;
63 private final PreparedStatement nam_cmt;
64 private final PreparedStatement del_cmt;
65 private final PreparedStatement nam_lst_all;
66 private final PreparedStatement nam_lst_pfx;
67
68 private int numMods = 0;
69
70
71
72
73
74
75
76
77
78
79
80
81
82 TransactionalConnection(BlobStore owner, BlobStore bStore, XAConnection xaCon, Connection con,
83 Transaction tx, Map<String, String> hints, long version)
84 throws IOException {
85 super(owner, bStore, xaCon, con, tx, hints);
86 this.version = version;
87
88 try {
89
90
91
92
93
94 String sql = "SELECT storeId, deleted FROM " + TransactionalStore.NAME_TABLE +
95 " WHERE appId = ? AND (version < ? AND committed <> 0 OR " +
96 " version = ?) ORDER BY version DESC";
97 nam_get = con.prepareStatement(sql);
98 nam_get.setMaxRows(1);
99
100
101 sql = "INSERT INTO " + TransactionalStore.NAME_TABLE + " VALUES (?, ?, ?, ?, ?)";
102 nam_ins = con.prepareStatement(sql);
103
104 sql = "SELECT storeId, deleted FROM " + TransactionalStore.NAME_TABLE +
105 " -- DERBY-PROPERTIES index=NAME_MAP_AIIDX \n WHERE appId = ? AND version = ?";
106 nam_upd = con.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE);
107
108
109 sql = "INSERT INTO " + TransactionalStore.DEL_TABLE + " VALUES (?, ?, ?)";
110 del_ins = con.prepareStatement(sql);
111
112 sql = "SELECT storeId FROM " + TransactionalStore.DEL_TABLE +
113 " -- DERBY-PROPERTIES index=DELETED_LIST_VIDX \n WHERE appId = ? AND version = ?";
114 del_upd = con.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE);
115
116
117 sql = "SELECT version, committed FROM " + TransactionalStore.NAME_TABLE +
118 " WHERE appId = ? ORDER BY version DESC";
119 nam_cfl = con.prepareStatement(sql);
120 nam_cfl.setMaxRows(1);
121
122
123 sql = "SELECT version, committed FROM " + TransactionalStore.NAME_TABLE +
124 " -- DERBY-PROPERTIES index=NAME_MAP_VIDX \n WHERE version = ?";
125 nam_cmt = con.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE);
126
127 sql = "SELECT version FROM " + TransactionalStore.DEL_TABLE +
128 " -- DERBY-PROPERTIES index=DELETED_LIST_VIDX \n WHERE version = ?";
129 del_cmt = con.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE);
130
131
132 sql = "SELECT appId, version, deleted FROM " + TransactionalStore.NAME_TABLE +
133 " WHERE (version < ? AND committed <> 0 OR version = ?) ORDER BY appId";
134 nam_lst_all = con.prepareStatement(sql);
135
136 sql = "SELECT appId, version, deleted FROM " + TransactionalStore.NAME_TABLE +
137 " WHERE (version < ? AND committed <> 0 OR version = ?)" +
138 " AND appId LIKE ? ESCAPE '!' ORDER BY appId";
139 nam_lst_pfx = con.prepareStatement(sql);
140 } catch (SQLException sqle) {
141 throw new IOException("Error querying db", sqle);
142 }
143 }
144
145 @Override
146 public Iterator<URI> listBlobIds(String filterPrefix) throws IOException {
147 ensureOpen();
148
149 if (logger.isDebugEnabled())
150 logger.debug("listing blob-ids with prefix '" + filterPrefix + "' (" + this + ")");
151
152 try {
153 PreparedStatement query;
154 if (filterPrefix != null && filterPrefix.trim().length() > 0) {
155 query = nam_lst_pfx;
156 query.setLong(1, version);
157 query.setLong(2, version);
158 query.setString(3, escLike(filterPrefix.trim()) + '%');
159 } else {
160 query = nam_lst_all;
161 query.setLong(1, version);
162 query.setLong(2, version);
163 }
164
165 ResultSet rs = query.executeQuery();
166 return new RSBlobIdIterator(rs, false) {
167 private final RSBlobIdIterator idIterator = new RSBlobIdIterator(rs, false);
168
169 @Override
170 protected URI getNextId() throws SQLException {
171 while (true) {
172
173 if (!idIterator.hasNext())
174 return null;
175
176
177 long maxVers = -1;
178 boolean isDel = true;
179 URI curId;
180
181 do {
182 curId = idIterator.next();
183 long v = rs.getLong(2);
184 if (v > maxVers) {
185 maxVers = v;
186 isDel = rs.getBoolean(3);
187 }
188 } while (idIterator.hasNext() && idIterator.peek().equals(curId));
189
190
191 if (!isDel)
192 return curId;
193 }
194 }
195 };
196 } catch (SQLException sqle) {
197 throw new IOException("Error querying db", sqle);
198 }
199 }
200
201 @Override
202 protected void validateId(URI blobId) throws UnsupportedIdException {
203 if (blobId.toString().length() > 1000)
204 throw new UnsupportedIdException(blobId, "Blob id must be less than 1000 characters long");
205 }
206
207 @Override
208 protected URI getRealId(URI blobId) throws IOException {
209 try {
210
211
212
213 nam_get.setString(1, blobId.toString());
214 nam_get.setLong(2, version);
215 nam_get.setLong(3, version);
216
217 ResultSet rs = nam_get.executeQuery();
218 try {
219 return !rs.next() ? null : rs.getBoolean(2) ? null : URI.create(rs.getString(1));
220 } finally {
221 rs.close();
222 }
223 } catch (SQLException sqle) {
224 throw new IOException("Error querying db", sqle);
225 }
226 }
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252 @Override
253 protected void remNameEntry(URI ourId, URI storeId) throws IOException {
254 if (logger.isDebugEnabled())
255 logger.debug("Removing name-entry '" + ourId + "' -> '" + storeId + "', version=" + version);
256
257 updNameEntry(ourId, storeId, true);
258 }
259
260 @Override
261 protected void addNameEntry(URI ourId, URI storeId) throws IOException {
262 if (logger.isDebugEnabled())
263 logger.debug("Adding name-entry '" + ourId + "' -> '" + storeId + "', version=" + version);
264
265 updNameEntry(ourId, storeId, false);
266 }
267
268 private void updNameEntry(URI ourId, URI storeId, boolean delete) throws IOException {
269 try {
270
271 if (numMods == 0 && ((TransactionalStore) owner).singleWriter()) {
272 try {
273 ((TransactionalStore) owner).acquireWriteLock(version);
274 } catch (InterruptedException ie) {
275 throw new IOException("Interrupted waiting for write lock", ie);
276 }
277 }
278
279
280
281
282 try {
283 ((TransactionalStore) owner).acquireUriLock(ourId);
284 } catch (InterruptedException ie) {
285 throw new IOException("Interrupted waiting for uri lock", ie);
286 }
287
288 try {
289 boolean useUpdate = false;
290
291
292 nam_cfl.setString(1, ourId.toString());
293 ResultSet rs = nam_cfl.executeQuery();
294 try {
295 if (rs.next()) {
296 long v = rs.getLong(1);
297 if (v > version || v < version && !rs.getBoolean(2))
298 throw new ConcurrentBlobUpdateException(ourId, "Conflict detected: '" + ourId +
299 "' is already being modified in another transaction");
300
301 if (v == version)
302 useUpdate = true;
303 }
304 } finally {
305 rs.close();
306 }
307
308 numMods++;
309
310
311 if (useUpdate) {
312 if (logger.isTraceEnabled())
313 logger.trace("Updating existing name-entry");
314
315 nam_upd.setString(1, ourId.toString());
316 nam_upd.setLong(2, version);
317 doUpdate(nam_upd, storeId.toString(), delete);
318 } else {
319 if (logger.isTraceEnabled())
320 logger.trace("Inserting new name-entry");
321
322 nam_ins.setString(1, ourId.toString());
323 nam_ins.setString(2, storeId.toString());
324 nam_ins.setLong(3, version);
325 nam_ins.setBoolean(4, delete);
326 nam_ins.setBoolean(5, false);
327 nam_ins.executeUpdate();
328 }
329 } finally {
330 ((TransactionalStore) owner).releaseUriLock(ourId);
331 }
332
333 if (delete) {
334 del_ins.setString(1, ourId.toString());
335 del_ins.setString(2, null);
336 del_ins.setLong(3, version);
337 del_ins.executeUpdate();
338 }
339 } catch (SQLException sqle) {
340 throw new IOException("Error updating db", sqle);
341 }
342 }
343
344 @Override
345 protected void remBlob(URI ourId, URI storeId) throws IOException {
346 try {
347 if (newBlobs.contains(storeId)) {
348 newBlobs.remove(storeId);
349 bStoreCon.getBlob(storeId, null).delete();
350 } else {
351 del_upd.setString(1, ourId.toString());
352 del_upd.setLong(2, version);
353 doUpdate(del_upd, storeId.toString());
354 }
355 } catch (SQLException sqle) {
356 throw new IOException("Error updating delete-blobs table", sqle);
357 }
358 }
359
360 private static String escLike(String str) {
361 return str.replace("!", "!!").replace("_", "!_").replace("%", "!%");
362 }
363
364 @Override
365 public void beforeCompletion() {
366 if (numMods > 0) {
367 try {
368 long writeVers = ((TransactionalStore) owner).txnPrepare(numMods, version);
369
370 if (logger.isTraceEnabled())
371 logger.trace("updating name-table for commit (version=" + version + ", write-version=" +
372 writeVers + ")");
373
374 nam_cmt.setLong(1, version);
375 doUpdate(nam_cmt, writeVers, true);
376
377 if (logger.isTraceEnabled())
378 logger.trace("updating delete-table for commit (version=" + version + ", write-version=" +
379 writeVers + ")");
380
381 del_cmt.setLong(1, version);
382 doUpdate(del_cmt, writeVers);
383 } catch (InterruptedException ie) {
384 throw new RuntimeException("Error waiting for write lock", ie);
385 } catch (SQLException sqle) {
386 throw new RuntimeException("Error updating db", sqle);
387 }
388 }
389
390 super.beforeCompletion();
391 }
392
393 @Override
394 public void afterCompletion(int status) {
395 if (isCompleted)
396 return;
397
398 try {
399 ((TransactionalStore) owner).txnComplete(status == Status.STATUS_COMMITTED, version);
400 closeStatements();
401 } finally {
402 super.afterCompletion(status);
403 }
404
405 ((TransactionalStore) owner).purgeOldVersions(version);
406
407
408
409
410
411
412 Monitor.getMonitor().getTimerFactory().getCancellationTimer().purge();
413 }
414
415 private void closeStatements() {
416 for (Statement stmt : new Statement[] { nam_get, nam_ins, nam_upd, del_ins, del_upd, nam_cfl,
417 nam_cmt, del_cmt, nam_lst_all, nam_lst_pfx }) {
418 try {
419 stmt.close();
420 } catch (SQLException sqle) {
421 logger.warn("Error closing prepared statement", sqle);
422 }
423 }
424 }
425
426 private static void doUpdate(PreparedStatement query, Object... newVals) throws SQLException {
427 ResultSet rs = query.executeQuery();
428 try {
429 while (rs.next()) {
430 int idx = 1;
431 for (Object v : newVals) {
432 if (v instanceof String)
433 rs.updateString(idx++, (String) v);
434 else if (v instanceof Boolean)
435 rs.updateBoolean(idx++, (Boolean) v);
436 else if (v instanceof Long)
437 rs.updateLong(idx++, (Long) v);
438 else
439 throw new Error("Unknown value type " + v.getClass() + " (" + v + ")");
440 }
441 rs.updateRow();
442 }
443 } finally {
444 rs.close();
445 }
446 }
447 }