1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 package org.akubraproject.txn;
24
25 import java.io.InputStream;
26 import java.io.OutputStream;
27 import java.io.IOException;
28 import java.net.URI;
29 import java.util.ArrayList;
30 import java.util.List;
31 import java.util.Map;
32
33 import javax.transaction.Status;
34 import javax.transaction.Synchronization;
35 import javax.transaction.Transaction;
36
37 import com.google.common.collect.MapMaker;
38
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 import org.akubraproject.Blob;
43 import org.akubraproject.BlobStore;
44 import org.akubraproject.BlobStoreConnection;
45 import org.akubraproject.DuplicateBlobException;
46 import org.akubraproject.MissingBlobException;
47 import org.akubraproject.UnsupportedIdException;
48 import org.akubraproject.impl.AbstractBlob;
49 import org.akubraproject.impl.AbstractBlobStoreConnection;
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68 public abstract class AbstractTransactionalConnection extends AbstractBlobStoreConnection
69 implements Synchronization {
70 private static final Logger logger = LoggerFactory.getLogger(AbstractTransactionalConnection.class);
71
72
73 protected final BlobStoreConnection bStoreCon;
74
75 protected final Transaction tx;
76
77 protected boolean isCompleted = false;
78
79 protected final List<URI> newBlobs = new ArrayList<URI>();
80
81 protected final List<URI> delBlobs = new ArrayList<URI>();
82
83 protected final Map<URI, Blob> blobCache =
84 new MapMaker().weakValues().concurrencyLevel(1).<URI, Blob>makeMap();
85
86
87
88
89
90
91
92
93
94
95 protected AbstractTransactionalConnection(BlobStore owner, BlobStore bStore, Transaction tx,
96 Map<String, String> hints)
97 throws IOException {
98 super(owner);
99 this.bStoreCon = bStore.openConnection(null, hints);
100 this.tx = tx;
101
102 try {
103 tx.registerSynchronization(this);
104 } catch (Exception e) {
105 throw new IOException("Error registering txn synchronization", e);
106 }
107
108 if (logger.isDebugEnabled())
109 logger.debug("opened connection " + this);
110 }
111
112 @Override
113 public Blob getBlob(URI blobId, Map<String, String> hints) throws IOException {
114 ensureOpen();
115
116 if (blobId != null)
117 validateId(blobId);
118 else
119 blobId = (URI) createBlob(null, hints)[0];
120
121 Blob b = blobCache.get(blobId);
122 if (b == null)
123 blobCache.put(blobId, b = new TxnBlob(blobId, hints));
124
125 return b;
126 }
127
128 @Override
129 public void sync() throws IOException {
130 ensureOpen();
131
132 bStoreCon.sync();
133 }
134
135 private Object[] createBlob(URI blobId, Map<String, String> hints) throws IOException {
136 if (blobId == null)
137 throw new UnsupportedOperationException("id-generation is not currently supported");
138
139 if (logger.isDebugEnabled())
140 logger.debug("creating blob '" + blobId + "' (" + this + ")");
141
142 Blob res = bStoreCon.getBlob(blobId , hints);
143 if (res.exists()) {
144 if (logger.isDebugEnabled())
145 logger.debug("duplicate id - retrying with generated id");
146 res = bStoreCon.getBlob(null, hints);
147 }
148
149 boolean added = false;
150 try {
151 addNameEntry(blobId, res.getId());
152 addBlob(blobId, res.getId());
153 added = true;
154 } finally {
155 if (!added) {
156 try {
157 res.delete();
158 } catch (Throwable t) {
159 logger.warn("Error removing created blob during exception handling: lower-blob-id = '" +
160 res.getId() + "'", t);
161 }
162 }
163 }
164
165 if (logger.isDebugEnabled())
166 logger.debug("created blob '" + blobId + "' with underlying id '" + res.getId() + "' (" +
167 this + ")");
168
169 return new Object[] { blobId, res };
170 }
171
172 private void renameBlob(URI oldBlobId, URI newBlobId, URI storeId)
173 throws DuplicateBlobException, IOException, MissingBlobException {
174 if (logger.isDebugEnabled())
175 logger.debug("renaming blob '" + oldBlobId + "' to '" + newBlobId + "' (" + this + ")");
176
177 if (getRealId(newBlobId) != null)
178 throw new DuplicateBlobException(newBlobId);
179
180 remNameEntry(oldBlobId, storeId);
181 addNameEntry(newBlobId, storeId);
182 }
183
184 private void removeBlob(URI blobId, URI storeId) throws IOException {
185 if (logger.isDebugEnabled())
186 logger.debug("removing blob '" + blobId + "' (" + this + ")");
187
188 if (storeId == null)
189 return;
190
191 remNameEntry(blobId, storeId);
192 remBlob(blobId, storeId);
193
194 if (logger.isDebugEnabled())
195 logger.debug("removed blob '" + blobId + "' with underlying id '" + storeId +
196 "' (" + this + ")");
197 }
198
199
200
201
202
203
204
205 protected void validateId(URI blobId) throws UnsupportedIdException {
206 }
207
208
209
210
211
212
213
214
215
216 protected abstract URI getRealId(URI blobId) throws IOException;
217
218
219
220
221
222
223
224
225 protected abstract void remNameEntry(URI ourId, URI storeId) throws IOException;
226
227
228
229
230
231
232
233
234 protected abstract void addNameEntry(URI ourId, URI storeId) throws IOException;
235
236
237
238
239
240
241
242
243
244 protected void remBlob(URI ourId, URI storeId) throws IOException {
245 if (newBlobs.contains(storeId)) {
246 newBlobs.remove(storeId);
247 bStoreCon.getBlob(storeId, null).delete();
248 } else {
249 delBlobs.add(storeId);
250 }
251 }
252
253
254
255
256
257
258
259
260
261 protected void addBlob(URI ourId, URI storeId) throws IOException {
262 newBlobs.add(storeId);
263 }
264
265
266
267
268
269
270
271 public void beforeCompletion() {
272 try {
273 bStoreCon.sync();
274 } catch (UnsupportedOperationException uoe) {
275 logger.warn("Sync'ing underlying connection '" + bStoreCon + "' not supported", uoe);
276 } catch (IOException ioe) {
277 throw new RuntimeException("Error sync'ing underlying connection " + bStoreCon, ioe);
278 }
279 }
280
281
282
283
284
285
286
287
288
289
290
291 public void afterCompletion(int status) {
292 if (isCompleted)
293 return;
294 isCompleted = true;
295
296 try {
297 if (status == Status.STATUS_COMMITTED) {
298 for (URI blobId : delBlobs) {
299 try {
300 bStoreCon.getBlob(blobId, null).delete();
301 } catch (IOException ioe) {
302 logger.error("Error deleting removed blob after commit: blobId = '" + blobId + "'",
303 ioe);
304 }
305 }
306 } else {
307 for (URI blobId : newBlobs) {
308 try {
309 bStoreCon.getBlob(blobId, null).delete();
310 } catch (IOException ioe) {
311 logger.error("Error deleting added blob after rollback: blobId = '" + blobId + "'", ioe);
312 }
313 }
314 }
315 } finally {
316 bStoreCon.close();
317 }
318 }
319
320
321
322
323
324
325
326 protected class TxnBlob extends AbstractBlob {
327 private final Map<String, String> hints;
328 private boolean needToCopy;
329 private URI storeId;
330 private Blob storeBlob = null;
331
332 public TxnBlob(URI blobId, Map<String, String> hints) throws IOException {
333 super(AbstractTransactionalConnection.this, blobId);
334 this.hints = hints;
335
336 storeId = getRealId(blobId);
337 needToCopy = true;
338 }
339
340 @Override
341 public URI getCanonicalId() {
342 return getId();
343 }
344
345 @Override
346 public boolean exists() throws IOException {
347 check(false, false);
348 return (storeId != null);
349 }
350
351 @Override
352 public void delete() throws IOException {
353 check(false, false);
354 removeBlob(getId(), storeId);
355 storeBlob = null;
356 storeId = null;
357 }
358
359 @Override
360 public Blob moveTo(URI blobId, Map<String, String> hints) throws IOException {
361 check(true, false);
362 TxnBlob dest = (TxnBlob) getConnection().getBlob(blobId, hints);
363
364 renameBlob(getId(), blobId, storeId);
365
366 dest.storeBlob = storeBlob;
367 dest.storeId = storeId;
368 storeBlob = null;
369 storeId = null;
370 return dest;
371 }
372
373 @Override
374 public long getSize() throws IOException {
375 getStoreBlob();
376 return storeBlob.getSize();
377 }
378
379 @Override
380 public InputStream openInputStream() throws IOException {
381 getStoreBlob();
382 return storeBlob.openInputStream();
383 }
384
385 @Override
386 public OutputStream openOutputStream(long estimatedSize, boolean overwrite)
387 throws IOException, DuplicateBlobException {
388 check(false, !overwrite);
389
390 if (needToCopy || storeId == null) {
391 if (storeId != null)
392 removeBlob(getId(), storeId);
393
394 storeBlob = (Blob) createBlob(getId(), hints)[1];
395 storeId = storeBlob.getId();
396 needToCopy = false;
397 } else {
398 getStoreBlob();
399 }
400
401 return storeBlob.openOutputStream(estimatedSize, true);
402 }
403
404 private void getStoreBlob() throws IOException, MissingBlobException {
405 check(true, false);
406 if (storeBlob == null)
407 storeBlob = bStoreCon.getBlob(storeId, hints);
408 }
409
410 private void check(boolean mustExist, boolean mustNotExist)
411 throws IllegalStateException, MissingBlobException, DuplicateBlobException {
412 ensureOpen();
413 if (mustExist && storeId == null)
414 throw new MissingBlobException(getId());
415 if (mustNotExist && storeId != null)
416 throw new DuplicateBlobException(getId());
417 }
418 }
419 }