View Javadoc

1   /* $HeadURL::                                                                            $
2    * $Id$
3    *
4    * Copyright (c) 2009-2010 DuraSpace
5    * http://duraspace.org
6    *
7    * In collaboration with Topaz Inc.
8    * http://www.topazproject.org
9    *
10   * Licensed under the Apache License, Version 2.0 (the "License");
11   * you may not use this file except in compliance with the License.
12   * You may obtain a copy of the License at
13   *
14   *     http://www.apache.org/licenses/LICENSE-2.0
15   *
16   * Unless required by applicable law or agreed to in writing, software
17   * distributed under the License is distributed on an "AS IS" BASIS,
18   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19   * See the License for the specific language governing permissions and
20   * limitations under the License.
21   */
22  
23  package org.akubraproject.txn.derby;
24  
25  import java.io.IOException;
26  import java.net.URI;
27  import java.sql.Connection;
28  import java.sql.PreparedStatement;
29  import java.sql.ResultSet;
30  import java.sql.SQLException;
31  import java.sql.Statement;
32  import java.util.Iterator;
33  import java.util.Map;
34  
35  import javax.sql.XAConnection;
36  import javax.transaction.Status;
37  import javax.transaction.Transaction;
38  
39  import org.slf4j.Logger;
40  import org.slf4j.LoggerFactory;
41  import org.apache.derby.iapi.services.monitor.Monitor;
42  
43  import org.akubraproject.BlobStore;
44  import org.akubraproject.UnsupportedIdException;
45  import org.akubraproject.txn.ConcurrentBlobUpdateException;
46  import org.akubraproject.txn.SQLTransactionalConnection;
47  
48  /**
49   * A connection for the transactional store.
50   *
51   * @author Ronald Tschalär
52   */
53  public class TransactionalConnection extends SQLTransactionalConnection {
54    private static final Logger logger = LoggerFactory.getLogger(TransactionalConnection.class);
55  
56    private final long              version;
57    private final PreparedStatement nam_get;
58    private final PreparedStatement nam_ins;
59    private final PreparedStatement nam_upd;
60    private final PreparedStatement del_ins;
61    private final PreparedStatement del_upd;
62    private final PreparedStatement nam_cfl;
63    private final PreparedStatement nam_cmt;
64    private final PreparedStatement del_cmt;
65    private final PreparedStatement nam_lst_all;
66    private final PreparedStatement nam_lst_pfx;
67  
68    private int numMods = 0;
69  
70    /**
71     * Create a new transactional connection.
72     *
73     * @param owner   the blob-store we belong to
74     * @param bStore  the underlying blob-store to use
75     * @param xaCon   the xa connection to use
76     * @param con     the db connection to use
77     * @param tx      the transaction we belong to
78     * @param hints   the hints to pass to <code>openConnection<code> on <var>bStore</var>
79     * @param version the read version to use
80     * @throws IOException if an error occurs initializing this connection
81     */
82    TransactionalConnection(BlobStore owner, BlobStore bStore, XAConnection xaCon, Connection con,
83                            Transaction tx, Map<String, String> hints, long version)
84        throws IOException {
85      super(owner, bStore, xaCon, con, tx, hints);
86      this.version = version;
87  
88      try {
89        /* Note: it's important that these all be constant strings (i.e. always the same on each
90         * invocation) so that jdbc prepared-statement caching can kick in.
91         */
92  
93        // get store-id
94        String sql = "SELECT storeId, deleted FROM " + TransactionalStore.NAME_TABLE +
95                     " WHERE appId = ? AND (version < ? AND committed <> 0 OR " +
96                     " version = ?) ORDER BY version DESC";
97        nam_get = con.prepareStatement(sql);
98        nam_get.setMaxRows(1);
99  
100       // update name-table on blob insert/delete/modify
101       sql = "INSERT INTO " + TransactionalStore.NAME_TABLE + " VALUES (?, ?, ?, ?, ?)";
102       nam_ins = con.prepareStatement(sql);
103 
104       sql = "SELECT storeId, deleted FROM " + TransactionalStore.NAME_TABLE +
105             " -- DERBY-PROPERTIES index=NAME_MAP_AIIDX \n WHERE appId = ? AND version = ?";
106       nam_upd = con.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE);
107 
108       // update delete-list on blob delete
109       sql = "INSERT INTO " + TransactionalStore.DEL_TABLE + " VALUES (?, ?, ?)";
110       del_ins = con.prepareStatement(sql);
111 
112       sql = "SELECT storeId FROM " + TransactionalStore.DEL_TABLE +
113             " -- DERBY-PROPERTIES index=DELETED_LIST_VIDX \n WHERE appId = ? AND version = ?";
114       del_upd = con.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE);
115 
116       // detect update conflicts
117       sql = "SELECT version, committed FROM " + TransactionalStore.NAME_TABLE +
118             " WHERE appId = ? ORDER BY version DESC";
119       nam_cfl = con.prepareStatement(sql);
120       nam_cfl.setMaxRows(1);
121 
122       // update name-table and delete-list on commit
123       sql = "SELECT version, committed FROM " + TransactionalStore.NAME_TABLE +
124             " -- DERBY-PROPERTIES index=NAME_MAP_VIDX \n WHERE version = ?";
125       nam_cmt = con.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE);
126 
127       sql = "SELECT version FROM " + TransactionalStore.DEL_TABLE +
128             " -- DERBY-PROPERTIES index=DELETED_LIST_VIDX \n WHERE version = ?";
129       del_cmt = con.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE);
130 
131       // list blob-ids
132       sql = "SELECT appId, version, deleted FROM " + TransactionalStore.NAME_TABLE +
133             " WHERE (version < ? AND committed <> 0 OR version = ?) ORDER BY appId";
134       nam_lst_all = con.prepareStatement(sql);
135 
136       sql = "SELECT appId, version, deleted FROM " + TransactionalStore.NAME_TABLE +
137             " WHERE (version < ? AND committed <> 0 OR version = ?)" +
138             " AND appId LIKE ? ESCAPE '!' ORDER BY appId";
139       nam_lst_pfx = con.prepareStatement(sql);
140     } catch (SQLException sqle) {
141       throw new IOException("Error querying db", sqle);
142     }
143   }
144 
145   @Override
146   public Iterator<URI> listBlobIds(String filterPrefix) throws IOException {
147     ensureOpen();
148 
149     if (logger.isDebugEnabled())
150       logger.debug("listing blob-ids with prefix '" + filterPrefix + "' (" + this + ")");
151 
152     try {
153       PreparedStatement query;
154       if (filterPrefix != null && filterPrefix.trim().length() > 0) {
155         query = nam_lst_pfx;
156         query.setLong(1, version);
157         query.setLong(2, version);
158         query.setString(3, escLike(filterPrefix.trim()) + '%');
159       } else {
160         query = nam_lst_all;
161         query.setLong(1, version);
162         query.setLong(2, version);
163       }
164 
165       ResultSet rs = query.executeQuery();              // NOPMD
166       return new RSBlobIdIterator(rs, false) {
167         private final RSBlobIdIterator idIterator = new RSBlobIdIterator(rs, false);
168 
169         @Override
170         protected URI getNextId() throws SQLException {
171           while (true) {
172             // see if we've reached the end of the result-set
173             if (!idIterator.hasNext())
174               return null;
175 
176             // get all the rows with the same id; the one with the largest version determines isDel
177             long    maxVers = -1;
178             boolean isDel   = true;
179             URI     curId;
180 
181             do {
182               curId = idIterator.next();
183               long v = rs.getLong(2);
184               if (v > maxVers) {
185                 maxVers = v;
186                 isDel   = rs.getBoolean(3);
187               }
188             } while (idIterator.hasNext() && idIterator.peek().equals(curId));
189 
190             // if this id wasn't deleted then we're golden
191             if (!isDel)
192               return curId;
193           }
194         }
195       };
196     } catch (SQLException sqle) {
197       throw new IOException("Error querying db", sqle);
198     }
199   }
200 
201   @Override
202   protected void validateId(URI blobId) throws UnsupportedIdException {
203     if (blobId.toString().length() > 1000)
204       throw new UnsupportedIdException(blobId, "Blob id must be less than 1000 characters long");
205   }
206 
207   @Override
208   protected URI getRealId(URI blobId) throws IOException {
209     try {
210       //System.out.println(dumpResults(con.createStatement().executeQuery(
211       //    "SELECT * FROM " + TransactionalStore.NAME_TABLE)));
212 
213       nam_get.setString(1, blobId.toString());
214       nam_get.setLong(2, version);
215       nam_get.setLong(3, version);
216 
217       ResultSet rs = nam_get.executeQuery();            // NOPMD
218       try {
219         return !rs.next() ? null : rs.getBoolean(2) ? null : URI.create(rs.getString(1));
220       } finally {
221         rs.close();
222       }
223     } catch (SQLException sqle) {
224       throw new IOException("Error querying db", sqle);
225     }
226   }
227 
228   /* Debug helper
229   static String dumpResults(ResultSet rs) throws SQLException {
230     StringBuilder res = new StringBuilder(500);
231     res.append("table dump (").append(rs.getMetaData().getTableName(1)).append(":\n");
232 
233     int numCols = rs.getMetaData().getColumnCount();
234     res.append("  ");
235     for (int idx = 1; idx <= numCols; idx++)
236       res.append(rs.getMetaData().getColumnLabel(idx)).append(idx < numCols ? ", " : "");
237     res.append("\n");
238 
239     while (rs.next()) {
240       res.append("  ");
241       for (int idx = 1; idx <= numCols; idx++)
242         res.append(rs.getString(idx)).append(idx < numCols ? ", " : "");
243       res.append("\n");
244     }
245 
246     rs.close();
247 
248     return res.toString();
249   }
250   */
251 
252   @Override
253   protected void remNameEntry(URI ourId, URI storeId) throws IOException {
254     if (logger.isDebugEnabled())
255       logger.debug("Removing name-entry '" + ourId + "' -> '" + storeId + "', version=" + version);
256 
257     updNameEntry(ourId, storeId, true);
258   }
259 
260   @Override
261   protected void addNameEntry(URI ourId, URI storeId) throws IOException {
262     if (logger.isDebugEnabled())
263       logger.debug("Adding name-entry '" + ourId + "' -> '" + storeId + "', version=" + version);
264 
265     updNameEntry(ourId, storeId, false);
266   }
267 
268   private void updNameEntry(URI ourId, URI storeId, boolean delete) throws IOException {
269     try {
270       // hack to serialize writers if desired (because of Derby locking issues)
271       if (numMods == 0 && ((TransactionalStore) owner).singleWriter()) {
272         try {
273           ((TransactionalStore) owner).acquireWriteLock(version);
274         } catch (InterruptedException ie) {
275           throw new IOException("Interrupted waiting for write lock", ie);
276         }
277       }
278 
279       /* this lock avoids a race-condition due to the conflict check and the name-table update
280        * being two separate operations.
281        */
282       try {
283         ((TransactionalStore) owner).acquireUriLock(ourId);
284       } catch (InterruptedException ie) {
285         throw new IOException("Interrupted waiting for uri lock", ie);
286       }
287 
288       try {
289         boolean useUpdate = false;
290 
291         // check for conflicts
292         nam_cfl.setString(1, ourId.toString());
293         ResultSet rs = nam_cfl.executeQuery();          // NOPMD
294         try {
295           if (rs.next()) {
296             long v = rs.getLong(1);
297             if (v > version || v < version && !rs.getBoolean(2))
298               throw new ConcurrentBlobUpdateException(ourId, "Conflict detected: '" + ourId +
299                                     "' is already being modified in another transaction");
300 
301             if (v == version)
302               useUpdate = true;
303           }
304         } finally {
305           rs.close();
306         }
307 
308         numMods++;
309 
310         // add-to/update the name-map and deleted-list
311         if (useUpdate) {
312           if (logger.isTraceEnabled())
313             logger.trace("Updating existing name-entry");
314 
315           nam_upd.setString(1, ourId.toString());
316           nam_upd.setLong(2, version);
317           doUpdate(nam_upd, storeId.toString(), delete);
318         } else {
319           if (logger.isTraceEnabled())
320             logger.trace("Inserting new name-entry");
321 
322           nam_ins.setString(1, ourId.toString());
323           nam_ins.setString(2, storeId.toString());
324           nam_ins.setLong(3, version);
325           nam_ins.setBoolean(4, delete);
326           nam_ins.setBoolean(5, false);
327           nam_ins.executeUpdate();
328         }
329       } finally {
330         ((TransactionalStore) owner).releaseUriLock(ourId);
331       }
332 
333       if (delete) {
334         del_ins.setString(1, ourId.toString());
335         del_ins.setString(2, null);
336         del_ins.setLong(3, version);
337         del_ins.executeUpdate();
338       }
339     } catch (SQLException sqle) {
340       throw new IOException("Error updating db", sqle);
341     }
342   }
343 
344   @Override
345   protected void remBlob(URI ourId, URI storeId) throws IOException {
346     try {
347       if (newBlobs.contains(storeId)) {
348         newBlobs.remove(storeId);
349         bStoreCon.getBlob(storeId, null).delete();
350       } else {
351         del_upd.setString(1, ourId.toString());
352         del_upd.setLong(2, version);
353         doUpdate(del_upd, storeId.toString());
354       }
355     } catch (SQLException sqle) {
356       throw new IOException("Error updating delete-blobs table", sqle);
357     }
358   }
359 
360   private static String escLike(String str) {
361     return str.replace("!", "!!").replace("_", "!_").replace("%", "!%");
362   }
363 
364   @Override
365   public void beforeCompletion() {
366     if (numMods > 0) {
367       try {
368         long writeVers = ((TransactionalStore) owner).txnPrepare(numMods, version);
369 
370         if (logger.isTraceEnabled())
371           logger.trace("updating name-table for commit (version=" + version + ", write-version=" +
372                        writeVers + ")");
373 
374         nam_cmt.setLong(1, version);
375         doUpdate(nam_cmt, writeVers, true);
376 
377         if (logger.isTraceEnabled())
378           logger.trace("updating delete-table for commit (version=" + version + ", write-version=" +
379                        writeVers + ")");
380 
381         del_cmt.setLong(1, version);
382         doUpdate(del_cmt, writeVers);
383       } catch (InterruptedException ie) {
384         throw new RuntimeException("Error waiting for write lock", ie);
385       } catch (SQLException sqle) {
386         throw new RuntimeException("Error updating db", sqle);
387       }
388     }
389 
390     super.beforeCompletion();
391   }
392 
393   @Override
394   public void afterCompletion(int status) {
395     if (isCompleted)
396       return;
397 
398     try {
399       ((TransactionalStore) owner).txnComplete(status == Status.STATUS_COMMITTED, version);
400       closeStatements();
401     } finally {
402       super.afterCompletion(status);
403     }
404 
405     ((TransactionalStore) owner).purgeOldVersions(version);
406 
407     /* java.util.Timer does not actually remove a cancelled task until purge() is invoked. This
408      * leads to connections not being cleaned up, and eventually an OOM exception. Hence we
409      * invoke purge() here. Derby should be doing this itself, though - see
410      * https://issues.apache.org/jira/browse/DERBY-4137
411      */
412     Monitor.getMonitor().getTimerFactory().getCancellationTimer().purge();
413   }
414 
415   private void closeStatements() {
416     for (Statement stmt : new Statement[] { nam_get, nam_ins, nam_upd, del_ins, del_upd, nam_cfl,
417                                             nam_cmt, del_cmt, nam_lst_all, nam_lst_pfx }) {
418       try {
419         stmt.close();
420       } catch (SQLException sqle) {
421         logger.warn("Error closing prepared statement", sqle);
422       }
423     }
424   }
425 
426   private static void doUpdate(PreparedStatement query, Object... newVals) throws SQLException {
427     ResultSet rs = query.executeQuery();
428     try {
429       while (rs.next()) {
430         int idx = 1;
431         for (Object v : newVals) {
432           if (v instanceof String)
433             rs.updateString(idx++, (String) v);
434           else if (v instanceof Boolean)
435             rs.updateBoolean(idx++, (Boolean) v);
436           else if (v instanceof Long)
437             rs.updateLong(idx++, (Long) v);
438           else
439             throw new Error("Unknown value type " + v.getClass() + " (" + v + ")");
440         }
441         rs.updateRow();
442       }
443     } finally {
444       rs.close();
445     }
446   }
447 }