1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package com.healthmarketscience.rmiio;
18
19 import java.io.Closeable;
20 import java.io.IOException;
21 import java.io.ObjectOutputStream;
22 import java.util.Iterator;
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58 public class SerialRemoteIteratorServer<DataType>
59 extends EncodingRemoteIteratorServer<DataType>
60 {
61
62
63
64 public static final int DEFAULT_RESET_NUM_OBJECTS = 1000;
65
66
67 private ObjectOutputStream _objOStream;
68
69 private final IOIterator<DataType> _localIterator;
70
71 private int _numObjectsWrittenSinceLastReset;
72
73
74 private final int _resetNumObjects;
75
76 public SerialRemoteIteratorServer(Iterator<DataType> localIterator)
77 throws IOException
78 {
79 this(true, localIterator);
80 }
81
82 public SerialRemoteIteratorServer(boolean useCompression,
83 Iterator<DataType> localIterator)
84 throws IOException
85 {
86 this(useCompression, false, localIterator);
87 }
88
89 public SerialRemoteIteratorServer(boolean useCompression,
90 boolean noDelay,
91 Iterator<DataType> localIterator)
92 throws IOException
93 {
94 this(useCompression, noDelay, RemoteInputStreamServer.DUMMY_MONITOR,
95 localIterator);
96 }
97
98 public SerialRemoteIteratorServer(
99 boolean useCompression,
100 RemoteStreamMonitor<RemoteInputStreamServer> monitor,
101 Iterator<DataType> localIterator)
102 throws IOException
103 {
104 this(useCompression, false, monitor, localIterator);
105 }
106
107 public SerialRemoteIteratorServer(
108 boolean useCompression,
109 boolean noDelay,
110 RemoteStreamMonitor<RemoteInputStreamServer> monitor,
111 Iterator<DataType> localIterator)
112 throws IOException
113 {
114 this(useCompression, noDelay, monitor,
115 RemoteInputStreamServer.DEFAULT_CHUNK_SIZE, localIterator);
116 }
117
118 public SerialRemoteIteratorServer(
119 boolean useCompression,
120 boolean noDelay,
121 RemoteStreamMonitor<RemoteInputStreamServer> monitor,
122 int chunkSize,
123 Iterator<DataType> localIterator)
124 throws IOException
125 {
126 this(useCompression, noDelay, monitor, chunkSize,
127 RmiioUtil.adapt(localIterator));
128 }
129
130 public SerialRemoteIteratorServer(IOIterator<DataType> localIterator)
131 throws IOException
132 {
133 this(true, localIterator);
134 }
135
136 public SerialRemoteIteratorServer(boolean useCompression,
137 IOIterator<DataType> localIterator)
138 throws IOException
139 {
140 this(useCompression, false, localIterator);
141 }
142
143 public SerialRemoteIteratorServer(boolean useCompression,
144 boolean noDelay,
145 IOIterator<DataType> localIterator)
146 throws IOException
147 {
148 this(useCompression, noDelay, RemoteInputStreamServer.DUMMY_MONITOR,
149 localIterator);
150 }
151
152 public SerialRemoteIteratorServer(
153 boolean useCompression,
154 RemoteStreamMonitor<RemoteInputStreamServer> monitor,
155 IOIterator<DataType> localIterator)
156 throws IOException
157 {
158 this(useCompression, false, monitor, localIterator);
159 }
160
161 public SerialRemoteIteratorServer(
162 boolean useCompression,
163 boolean noDelay,
164 RemoteStreamMonitor<RemoteInputStreamServer> monitor,
165 IOIterator<DataType> localIterator)
166 throws IOException
167 {
168 this(useCompression, noDelay, monitor,
169 RemoteInputStreamServer.DEFAULT_CHUNK_SIZE,
170 localIterator);
171 }
172
173 public SerialRemoteIteratorServer(
174 boolean useCompression,
175 boolean noDelay,
176 RemoteStreamMonitor<RemoteInputStreamServer> monitor,
177 int chunkSize,
178 IOIterator<DataType> localIterator)
179 throws IOException
180 {
181 this(useCompression, noDelay, monitor, chunkSize, localIterator,
182 DEFAULT_RESET_NUM_OBJECTS);
183 }
184
185 public SerialRemoteIteratorServer(
186 boolean useCompression,
187 boolean noDelay,
188 RemoteStreamMonitor<RemoteInputStreamServer> monitor,
189 int chunkSize,
190 IOIterator<DataType> localIterator,
191 int resetNumObjects)
192 throws IOException
193 {
194 super(useCompression, noDelay, monitor, chunkSize);
195 if(localIterator == null) {
196 throw new IllegalArgumentException("Iterator cannot be null");
197 }
198 _localIterator = localIterator;
199 _resetNumObjects = resetNumObjects;
200 }
201
202 @Override
203 protected boolean writeNextObject()
204 throws IOException
205 {
206
207
208 if(_objOStream == null) {
209 _objOStream = new ObjectOutputStream(_localOStream);
210 }
211
212 if(_localIterator.hasNext()) {
213
214 serializeObject(_objOStream, _localIterator.next());
215 return true;
216 }
217
218
219 return false;
220 }
221
222 @Override
223 protected void closeIterator()
224 throws IOException
225 {
226 if(_objOStream != null) {
227
228 _objOStream.close();
229 }
230
231 super.closeIterator();
232 }
233
234 @Override
235 protected void closeImpl(boolean readSuccess)
236 throws IOException
237 {
238
239
240 if(_localIterator instanceof Closeable) {
241 RmiioUtil.closeQuietly((Closeable)_localIterator);
242 }
243 super.closeImpl(readSuccess);
244 }
245
246
247
248
249
250
251
252
253
254
255
256 protected void serializeObject(ObjectOutputStream ostream, Object obj)
257 throws IOException
258 {
259 ostream.writeUnshared(obj);
260 _numObjectsWrittenSinceLastReset++;
261 if(_numObjectsWrittenSinceLastReset >= _resetNumObjects) {
262 ostream.reset();
263 _numObjectsWrittenSinceLastReset = 0;
264 }
265 }
266
267 }