1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 package org.akubraproject.txn;
24
25 import java.io.IOException;
26 import java.net.URI;
27 import java.sql.Connection;
28 import java.sql.ResultSet;
29 import java.sql.SQLException;
30 import java.util.Map;
31
32 import javax.sql.XAConnection;
33 import javax.transaction.Transaction;
34
35 import com.google.common.collect.AbstractIterator;
36 import com.google.common.collect.PeekingIterator;
37
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 import org.akubraproject.BlobStore;
42 import org.akubraproject.BlobStoreConnection;
43
44
45
46
47
48
49
50
51 public abstract class SQLTransactionalConnection extends AbstractTransactionalConnection {
52 private static final Logger logger = LoggerFactory.getLogger(SQLTransactionalConnection.class);
53
54
55 protected final XAConnection xaCon;
56
57 protected final Connection con;
58
59
60
61
62
63
64
65
66
67
68
69
70 protected SQLTransactionalConnection(BlobStore owner, BlobStore bStore, XAConnection xaCon,
71 Connection con, Transaction tx, Map<String, String> hints)
72 throws IOException {
73 super(owner, bStore, tx, hints);
74 this.con = con;
75 this.xaCon = xaCon;
76 }
77
78 @Override
79 public void afterCompletion(int status) {
80 if (isCompleted)
81 return;
82
83 try {
84 xaCon.close();
85 } catch (SQLException sqle) {
86 logger.error("Error closing db connection", sqle);
87 } finally {
88 super.afterCompletion(status);
89 }
90 }
91
92
93
94
95
96
97
98 protected static class RSBlobIdIterator extends AbstractIterator<URI>
99 implements PeekingIterator<URI> {
100 protected final ResultSet rs;
101 private final boolean closeStmt;
102
103
104
105
106
107
108
109
110
111
112
113 public RSBlobIdIterator(ResultSet rs, boolean closeStmt) throws SQLException {
114 this.rs = rs;
115 this.closeStmt = closeStmt;
116 }
117
118 @Override
119 protected URI computeNext() {
120 try {
121 URI id = getNextId();
122 if (id != null)
123 return id;
124
125 close();
126 return endOfData();
127 } catch (SQLException sqle) {
128 throw new RuntimeException("error reading db results", sqle);
129 }
130 }
131
132
133
134
135
136
137 protected URI getNextId() throws SQLException {
138 if (!rs.next())
139 return null;
140
141 return URI.create(rs.getString(1));
142 }
143
144
145
146
147 protected void close() {
148 try {
149 if (closeStmt)
150 rs.getStatement().close();
151 else
152 rs.close();
153 } catch (SQLException sqle) {
154 logger.error("Error closing statement or result-set", sqle);
155 }
156 }
157 }
158 }