View Javadoc
1   /*
2   Copyright (c) 2007 Health Market Science, Inc.
3   
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7   
8       http://www.apache.org/licenses/LICENSE-2.0
9   
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15  */
16  
17  package com.healthmarketscience.rmiio;
18  
19  import java.io.Closeable;
20  import java.io.IOException;
21  import java.io.ObjectOutputStream;
22  import java.util.Iterator;
23  
24  
25  /**
26   * Implementation of RemoteIteratorServer which uses java serialization to
27   * send objects to the RemoteIteratorClient.  Objects are grabbed from the
28   * localIterator as needed and serialized to the local output stream.  Note
29   * that a RemoteIterator is accepted for the local iterator so that the
30   * iterator may throw IOExceptions.
31   * <p>
32   * Note, the objects are written to the ObjectOutputStream using the
33   * {@link java.io.ObjectOutputStream#writeUnshared} method, and the
34   * {@link java.io.ObjectOutputStream#reset} method is called periodically on
35   * the output stream.  These measures are taken because memory can build up in
36   * the ObjectOutputStream over time and a large data set can run the client
37   * and/or server out of memory.  In general, the objects being iterated over
38   * most likely do not have shared references, so nothing will be lost by this
39   * choice.  However, if shared references are desired, the
40   * {@link #serializeObject} method can be overriden by a custom subclass to
41   * change this behavior.
42   * <p>
43   * In the event that a RemoteIterator is being used to return low-latency,
44   * low-bandwidth update data to the client, the noDelay option can be enabled
45   * for the underlying stream which will effectively disable buffering of data
46   * on the server side.  This can be very useful for implementing remote
47   * progress monitors, for example.
48   * <p>
49   * Note, since it is a common idiom for the local iterator to implement
50   * Closeable in order to close local resources, this implementation will
51   * automatically close a Closeable local iterator after the underlying server
52   * is shutdown.
53   * 
54   * @see <a href="{@docRoot}/overview-summary.html#Usage_Notes">Usage Notes</a>
55   *
56   * @author James Ahlborn
57   */
58  public class SerialRemoteIteratorServer<DataType>
59    extends EncodingRemoteIteratorServer<DataType>
60  {
61  
62    /** Default value for the setting indicating how often the
63        ObjectOutputStream should be reset */
64    public static final int DEFAULT_RESET_NUM_OBJECTS = 1000;
65    
66    /** the output stream which does the java serialization work */
67    private ObjectOutputStream _objOStream;
68    /** local iterator from which we are getting Serializable objects */
69    private final IOIterator<DataType> _localIterator;
70    /** keeps track of num objects written so we can do periodic reset */
71    private int _numObjectsWrittenSinceLastReset;
72    /** setting which indicates how often the ObjectOutputStream should be reset
73        (after this many objects are written). */
74    private final int _resetNumObjects;
75  
76    public SerialRemoteIteratorServer(Iterator<DataType> localIterator)
77      throws IOException
78    {
79      this(true, localIterator);
80    }
81  
82    public SerialRemoteIteratorServer(boolean useCompression,
83                                      Iterator<DataType> localIterator)
84      throws IOException
85    {
86      this(useCompression, false, localIterator);
87    }
88  
89    public SerialRemoteIteratorServer(boolean useCompression,
90                                      boolean noDelay,
91                                      Iterator<DataType> localIterator)
92      throws IOException
93    {
94      this(useCompression, noDelay, RemoteInputStreamServer.DUMMY_MONITOR,
95           localIterator);
96    }
97  
98    public SerialRemoteIteratorServer(
99        boolean useCompression,
100       RemoteStreamMonitor<RemoteInputStreamServer> monitor,
101       Iterator<DataType> localIterator)
102     throws IOException
103   {
104     this(useCompression, false, monitor, localIterator);
105   }
106   
107   public SerialRemoteIteratorServer(
108       boolean useCompression,
109       boolean noDelay,
110       RemoteStreamMonitor<RemoteInputStreamServer> monitor,
111       Iterator<DataType> localIterator)
112     throws IOException
113   {
114     this(useCompression, noDelay, monitor,
115          RemoteInputStreamServer.DEFAULT_CHUNK_SIZE, localIterator);
116   }
117   
118   public SerialRemoteIteratorServer(
119       boolean useCompression,
120       boolean noDelay,
121       RemoteStreamMonitor<RemoteInputStreamServer> monitor,
122       int chunkSize,
123       Iterator<DataType> localIterator)
124     throws IOException
125   {
126     this(useCompression, noDelay, monitor, chunkSize,
127          RmiioUtil.adapt(localIterator));
128   }
129   
130   public SerialRemoteIteratorServer(IOIterator<DataType> localIterator)
131     throws IOException
132   {
133     this(true, localIterator);
134   }
135 
136   public SerialRemoteIteratorServer(boolean useCompression,
137                                     IOIterator<DataType> localIterator)
138     throws IOException
139   {
140     this(useCompression, false, localIterator);
141   }
142 
143   public SerialRemoteIteratorServer(boolean useCompression,
144                                     boolean noDelay,
145                                     IOIterator<DataType> localIterator)
146     throws IOException
147   {
148     this(useCompression, noDelay, RemoteInputStreamServer.DUMMY_MONITOR,
149          localIterator);
150   }
151 
152   public SerialRemoteIteratorServer(
153       boolean useCompression,
154       RemoteStreamMonitor<RemoteInputStreamServer> monitor,
155       IOIterator<DataType> localIterator)
156     throws IOException
157   {
158     this(useCompression, false, monitor, localIterator);
159   }
160 
161   public SerialRemoteIteratorServer(
162       boolean useCompression,
163       boolean noDelay,
164       RemoteStreamMonitor<RemoteInputStreamServer> monitor,
165       IOIterator<DataType> localIterator)
166     throws IOException
167   {
168     this(useCompression, noDelay, monitor,
169          RemoteInputStreamServer.DEFAULT_CHUNK_SIZE,
170          localIterator);
171   }
172 
173   public SerialRemoteIteratorServer(
174       boolean useCompression,
175       boolean noDelay,
176       RemoteStreamMonitor<RemoteInputStreamServer> monitor,
177       int chunkSize,
178       IOIterator<DataType> localIterator)
179     throws IOException
180   {
181     this(useCompression, noDelay, monitor, chunkSize, localIterator,
182          DEFAULT_RESET_NUM_OBJECTS);
183   }
184 
185   public SerialRemoteIteratorServer(
186       boolean useCompression,
187       boolean noDelay,
188       RemoteStreamMonitor<RemoteInputStreamServer> monitor,
189       int chunkSize,
190       IOIterator<DataType> localIterator,
191       int resetNumObjects)
192     throws IOException
193   {
194     super(useCompression, noDelay, monitor, chunkSize);
195     if(localIterator == null) {
196       throw new IllegalArgumentException("Iterator cannot be null");
197     }
198     _localIterator = localIterator;
199     _resetNumObjects = resetNumObjects;
200   }
201   
202   @Override
203   protected boolean writeNextObject()
204     throws IOException
205   {
206     // have to create output stream on demand because constructor generates
207     // output!
208     if(_objOStream == null) {
209       _objOStream = new ObjectOutputStream(_localOStream);
210     }
211       
212     if(_localIterator.hasNext()) {
213       // write out next object
214       serializeObject(_objOStream, _localIterator.next());
215       return true;
216     }
217 
218     // no more
219     return false;
220   }
221 
222   @Override
223   protected void closeIterator()
224     throws IOException
225   {
226     if(_objOStream != null) {
227       // close (flush) object stream
228       _objOStream.close();
229     }
230     // close parent
231     super.closeIterator();
232   }
233 
234   @Override
235   protected void closeImpl(boolean readSuccess)
236     throws IOException
237   {
238     // close our local iterator if it is Closeable.  Swallow exceptions
239     // because at this point, they do not matter.
240     if(_localIterator instanceof Closeable) {
241       RmiioUtil.closeQuietly((Closeable)_localIterator);
242     }
243     super.closeImpl(readSuccess);
244   }
245   
246   /**
247    * Writes the given object to the given output stream.  The default
248    * implementation uses {@link java.io.ObjectOutputStream#writeUnshared} as
249    * well as periodically calls {@link java.io.ObjectOutputStream#reset} on
250    * the output stream.  Subclasses may choose to change this behavior by
251    * overriding this method.
252    *
253    * @param ostream the output stream to which the object should be written
254    * @param obj the object to write
255    */
256   protected void serializeObject(ObjectOutputStream ostream, Object obj)
257     throws IOException
258   {
259     ostream.writeUnshared(obj);
260     _numObjectsWrittenSinceLastReset++;
261     if(_numObjectsWrittenSinceLastReset >= _resetNumObjects) {
262       ostream.reset();
263       _numObjectsWrittenSinceLastReset = 0;
264     }
265   }
266   
267 }