View Javadoc

1   /* $HeadURL::                                                                            $
2    * $Id$
3    *
4    * Copyright (c) 2009-2010 DuraSpace
5    * http://duraspace.org
6    *
7    * In collaboration with Topaz Inc.
8    * http://www.topazproject.org
9    *
10   * Licensed under the Apache License, Version 2.0 (the "License");
11   * you may not use this file except in compliance with the License.
12   * You may obtain a copy of the License at
13   *
14   *     http://www.apache.org/licenses/LICENSE-2.0
15   *
16   * Unless required by applicable law or agreed to in writing, software
17   * distributed under the License is distributed on an "AS IS" BASIS,
18   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19   * See the License for the specific language governing permissions and
20   * limitations under the License.
21   */
22  
23  package org.akubraproject.txn;
24  
25  import java.io.InputStream;
26  import java.io.OutputStream;
27  import java.io.IOException;
28  import java.net.URI;
29  import java.util.ArrayList;
30  import java.util.List;
31  import java.util.Map;
32  
33  import javax.transaction.Status;
34  import javax.transaction.Synchronization;
35  import javax.transaction.Transaction;
36  
37  import com.google.common.collect.MapMaker;
38  
39  import org.slf4j.Logger;
40  import org.slf4j.LoggerFactory;
41  
42  import org.akubraproject.Blob;
43  import org.akubraproject.BlobStore;
44  import org.akubraproject.BlobStoreConnection;
45  import org.akubraproject.DuplicateBlobException;
46  import org.akubraproject.MissingBlobException;
47  import org.akubraproject.UnsupportedIdException;
48  import org.akubraproject.impl.AbstractBlob;
49  import org.akubraproject.impl.AbstractBlobStoreConnection;
50  
51  /**
52   * A basic superclass for transactional store connections. This implements the common blob-handling
53   * parts of a transactional connection, leaving subclasses to implement the transactional
54   * management of the id-mappings.
55   *
56   * <p>Subclasses must implement {@link #getRealId getRealId}, {@link #remNameEntry remNameEntry},
57   * {@link #addNameEntry addNameEntry}, {@link BlobStoreConnection#listBlobIds listBlobIds}, and
58   * override {@link #close close}; in addition they may want to override {@link
59   * #beforeCompletion beforeCompletion} and/or {@link #afterCompletion afterCompletion} for pre-
60   * and post-commit/rollback processing.
61   *
62   * <p>The subclass is expected to implement id mapping, mapping upper-level blob-id's to underlying
63   * blob-id's; these mappings are managed via the <code>remNameEntry</code> and
64   * <code>addNameEntry</code> method, and <code>getRealId</code> is used to query the mapping.
65   *
66   * @author Ronald Tschalär
67   */
68  public abstract class AbstractTransactionalConnection extends AbstractBlobStoreConnection
69      implements Synchronization {
70    private static final Logger logger = LoggerFactory.getLogger(AbstractTransactionalConnection.class);
71  
72    /** the underlying blob-store that actually stores the blobs */
73    protected final BlobStoreConnection  bStoreCon;
74    /** the transaction this connection belongs to */
75    protected final Transaction          tx;
76    /** Whether or not the current transaction has been completed yet */
77    protected       boolean              isCompleted = false;
78    /** the list of underlying id's of added blobs */
79    protected final List<URI>            newBlobs = new ArrayList<URI>();
80    /** the list of underlying id's of deleted blobs */
81    protected final List<URI>            delBlobs = new ArrayList<URI>();
82    /** a cache of blobs */
83    protected final Map<URI, Blob>       blobCache =
84                              new MapMaker().weakValues().concurrencyLevel(1).<URI, Blob>makeMap();
85  
86    /**
87     * Create a new transactional connection.
88     *
89     * @param owner   the blob-store we belong to
90     * @param bStore  the underlying blob-store to use
91     * @param tx      the transaction we belong to
92     * @param hints   A set of hints for the <code>openConnection</code> on the wrapped store
93     * @throws IOException if an error occurs initializing this connection
94     */
95    protected AbstractTransactionalConnection(BlobStore owner, BlobStore bStore, Transaction tx,
96                                              Map<String, String> hints)
97        throws IOException {
98      super(owner);
99      this.bStoreCon = bStore.openConnection(null, hints);
100     this.tx        = tx;
101 
102     try {
103       tx.registerSynchronization(this);
104     } catch (Exception e) {
105       throw new IOException("Error registering txn synchronization", e);
106     }
107 
108     if (logger.isDebugEnabled())
109       logger.debug("opened connection " + this);
110   }
111 
112   @Override
113   public Blob getBlob(URI blobId, Map<String, String> hints) throws IOException {
114     ensureOpen();
115 
116     if (blobId != null)
117       validateId(blobId);
118     else
119       blobId = (URI) createBlob(null, hints)[0];
120 
121     Blob b = blobCache.get(blobId);
122     if (b == null)
123       blobCache.put(blobId, b = new TxnBlob(blobId, hints));
124 
125     return b;
126   }
127 
128   @Override
129   public void sync() throws IOException {
130     ensureOpen();
131 
132     bStoreCon.sync();
133   }
134 
135   private Object[] createBlob(URI blobId, Map<String, String> hints) throws IOException {
136     if (blobId == null)
137       throw new UnsupportedOperationException("id-generation is not currently supported");
138 
139     if (logger.isDebugEnabled())
140       logger.debug("creating blob '" + blobId + "' (" + this + ")");
141 
142     Blob res = bStoreCon.getBlob(blobId , hints);
143     if (res.exists()) {
144       if (logger.isDebugEnabled())
145         logger.debug("duplicate id - retrying with generated id");
146       res = bStoreCon.getBlob(null, hints);
147     }
148 
149     boolean added = false;
150     try {
151       addNameEntry(blobId, res.getId());
152       addBlob(blobId, res.getId());
153       added = true;
154     } finally {
155       if (!added) {
156         try {
157           res.delete();
158         } catch (Throwable t) {
159           logger.warn("Error removing created blob during exception handling: lower-blob-id = '" +
160                       res.getId() + "'", t);
161         }
162       }
163     }
164 
165     if (logger.isDebugEnabled())
166       logger.debug("created blob '" + blobId + "' with underlying id '" + res.getId() + "' (" +
167                    this + ")");
168 
169     return new Object[] { blobId, res };
170   }
171 
172   private void renameBlob(URI oldBlobId, URI newBlobId, URI storeId)
173       throws DuplicateBlobException, IOException, MissingBlobException {
174     if (logger.isDebugEnabled())
175       logger.debug("renaming blob '" + oldBlobId + "' to '" + newBlobId + "' (" + this + ")");
176 
177     if (getRealId(newBlobId) != null)
178       throw new DuplicateBlobException(newBlobId);
179 
180     remNameEntry(oldBlobId, storeId);
181     addNameEntry(newBlobId, storeId);
182   }
183 
184   private void removeBlob(URI blobId, URI storeId) throws IOException {
185     if (logger.isDebugEnabled())
186       logger.debug("removing blob '" + blobId + "' (" + this + ")");
187 
188     if (storeId == null)
189       return;
190 
191     remNameEntry(blobId, storeId);
192     remBlob(blobId, storeId);
193 
194     if (logger.isDebugEnabled())
195       logger.debug("removed blob '" + blobId + "' with underlying id '" + storeId +
196                    "' (" + this + ")");
197   }
198 
199   /**
200    * Check whether we can store this id.
201    *
202    * @param blobId  the upper level blob-id
203    * @throws UnsupportedIdException if the id cannot be stored
204    */
205   protected void validateId(URI blobId) throws UnsupportedIdException {
206   }
207 
208   /**
209    * Look up the underlying store's blob-id for the given upper-level blob-id.
210    *
211    * @param blobId  the upper level blob-id
212    * @return the underlying blob-id that <var>blobId</var> maps to, or null if no such mapping
213    *         exists (i.e. <var>blobId</var> is not a known upper-level blob-id)
214    * @throws IOException if an error occurred looking up the id
215    */
216   protected abstract URI getRealId(URI blobId) throws IOException;
217 
218   /**
219    * Remove an id mapping.
220    *
221    * @param ourId   the upper-level blob-id
222    * @param storeId the underlying store's blob-id
223    * @throws IOException if an error occurred removing the mapping or the mapping does not exist
224    */
225   protected abstract void remNameEntry(URI ourId, URI storeId) throws IOException;
226 
227   /**
228    * Add an id mapping.
229    *
230    * @param ourId   the upper-level blob-id to map
231    * @param storeId the underlying store's blob-id to map <var>ourId</var> to
232    * @throws IOException if an error occurred adding the mapping or the mapping already exists
233    */
234   protected abstract void addNameEntry(URI ourId, URI storeId) throws IOException;
235 
236   /**
237    * Remove a blob from the underlying store. This implementation just updates the {@link #newBlobs}
238    * and {@link #delBlobs} lists; actual blob deletion is deferred till commit.
239    *
240    * @param ourId   the upper-level blob-id
241    * @param storeId the underlying store's blob-id
242    * @throws IOException if an error occurred removing the blob or the blob does not exist
243    */
244   protected void remBlob(URI ourId, URI storeId) throws IOException {
245     if (newBlobs.contains(storeId)) {
246       newBlobs.remove(storeId);
247       bStoreCon.getBlob(storeId, null).delete();
248     } else {
249       delBlobs.add(storeId);
250     }
251   }
252 
253   /**
254    * Add a blob to the underlying store. This implementation just updates the {@link #newBlobs}
255    * list; actual blob writing is done via the blob itself..
256    *
257    * @param ourId   the upper-level blob-id
258    * @param storeId the underlying store's blob-id
259    * @throws IOException if an error occurred removing the blob or the blob does not exist
260    */
261   protected void addBlob(URI ourId, URI storeId) throws IOException {
262     newBlobs.add(storeId);
263   }
264 
265   /**
266    * Invoked before the transaction is completed, i.e. before a rollback or commit is started.
267    * Whether or not this is called on a rollback may vary.
268    *
269    * @see Synchronization#beforeCompletion
270    */
271   public void beforeCompletion() {
272     try {
273       bStoreCon.sync();
274     } catch (UnsupportedOperationException uoe) {
275       logger.warn("Sync'ing underlying connection '" + bStoreCon + "' not supported", uoe);
276     } catch (IOException ioe) {
277       throw new RuntimeException("Error sync'ing underlying connection " + bStoreCon, ioe);
278     }
279   }
280 
281   /**
282    * Invoked after the transaction has completed, i.e. after a rollback or commit has finished.
283    * This is always callled.
284    *
285    * <p>Subclasses that override this must make sure to invoke <code>super.afterCompletion</code>
286    * so that the cleanup code in this implementation is run. This implementation cleans up deleted
287    * or added blobs (depending on the outcome of the transaction).
288    *
289    * @see Synchronization#afterCompletion
290    */
291   public void afterCompletion(int status) {
292     if (isCompleted)    // I've seen BTM call us twice here after a timeout and rollback.
293       return;
294     isCompleted = true;
295 
296     try {
297       if (status == Status.STATUS_COMMITTED) {
298         for (URI blobId : delBlobs) {
299           try {
300             bStoreCon.getBlob(blobId, null).delete();
301           } catch (IOException ioe) {
302             logger.error("Error deleting removed blob after commit: blobId = '" + blobId + "'",
303                          ioe);
304           }
305         }
306       } else {
307         for (URI blobId : newBlobs) {
308           try {
309             bStoreCon.getBlob(blobId, null).delete();
310           } catch (IOException ioe) {
311             logger.error("Error deleting added blob after rollback: blobId = '" + blobId + "'", ioe);
312           }
313         }
314       }
315     } finally {
316       bStoreCon.close();
317     }
318   }
319 
320   /**
321    * A transactional blob implementation. This blob caches underlying infos such as the
322    * store-id and the store-blob, and hence only works properly in conjunction with the
323    * blob-cache which guarantees only one instance of this class per blob-id at any given
324    * time.
325    */
326   protected class TxnBlob extends AbstractBlob {
327     private final Map<String, String> hints;
328     private       boolean needToCopy;
329     private       URI     storeId;
330     private       Blob    storeBlob = null;
331 
332     public TxnBlob(URI blobId, Map<String, String> hints) throws IOException {
333       super(AbstractTransactionalConnection.this, blobId);
334       this.hints = hints;
335 
336       storeId    = getRealId(blobId);
337       needToCopy = true;
338     }
339 
340     @Override
341     public URI getCanonicalId() {
342       return getId();
343     }
344 
345     @Override
346     public boolean exists() throws IOException {
347       check(false, false);
348       return (storeId != null);
349     }
350 
351     @Override
352     public void delete() throws IOException {
353       check(false, false);
354       removeBlob(getId(), storeId);
355       storeBlob = null;
356       storeId   = null;
357     }
358 
359     @Override
360     public Blob moveTo(URI blobId, Map<String, String> hints) throws IOException {
361       check(true, false);
362       TxnBlob dest = (TxnBlob) getConnection().getBlob(blobId, hints);
363 
364       renameBlob(getId(), blobId, storeId);
365 
366       dest.storeBlob = storeBlob;
367       dest.storeId = storeId;
368       storeBlob = null;
369       storeId   = null;
370       return dest;
371     }
372 
373     @Override
374     public long getSize() throws IOException {
375       getStoreBlob();
376       return storeBlob.getSize();
377     }
378 
379     @Override
380     public InputStream openInputStream() throws IOException {
381       getStoreBlob();
382       return storeBlob.openInputStream();
383     }
384 
385     @Override
386     public OutputStream openOutputStream(long estimatedSize, boolean overwrite)
387         throws IOException, DuplicateBlobException {
388       check(false, !overwrite);
389 
390       if (needToCopy || storeId == null) {
391         if (storeId != null)
392           removeBlob(getId(), storeId);
393 
394         storeBlob  = (Blob) createBlob(getId(), hints)[1];
395         storeId    = storeBlob.getId();
396         needToCopy = false;
397       } else {
398         getStoreBlob();
399       }
400 
401       return storeBlob.openOutputStream(estimatedSize, true);
402     }
403 
404     private void getStoreBlob() throws IOException, MissingBlobException {
405       check(true, false);
406       if (storeBlob == null)
407         storeBlob = bStoreCon.getBlob(storeId, hints);
408     }
409 
410     private void check(boolean mustExist, boolean mustNotExist)
411         throws IllegalStateException, MissingBlobException, DuplicateBlobException {
412       ensureOpen();
413       if (mustExist && storeId == null)
414         throw new MissingBlobException(getId());
415       if (mustNotExist && storeId != null)
416         throw new DuplicateBlobException(getId());
417     }
418   }
419 }