View Javadoc
1   /*
2    * Copyright (C) 2011, Google Inc.
3    * and other copyright owners as documented in the project's IP log.
4    *
5    * This program and the accompanying materials are made available
6    * under the terms of the Eclipse Distribution License v1.0 which
7    * accompanies this distribution, is reproduced below, and is
8    * available at http://www.eclipse.org/org/documents/edl-v10.php
9    *
10   * All rights reserved.
11   *
12   * Redistribution and use in source and binary forms, with or
13   * without modification, are permitted provided that the following
14   * conditions are met:
15   *
16   * - Redistributions of source code must retain the above copyright
17   *   notice, this list of conditions and the following disclaimer.
18   *
19   * - Redistributions in binary form must reproduce the above
20   *   copyright notice, this list of conditions and the following
21   *   disclaimer in the documentation and/or other materials provided
22   *   with the distribution.
23   *
24   * - Neither the name of the Eclipse Foundation, Inc. nor the
25   *   names of its contributors may be used to endorse or promote
26   *   products derived from this software without specific prior
27   *   written permission.
28   *
29   * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
30   * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
31   * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
32   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
33   * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
34   * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
35   * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
36   * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
37   * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
38   * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
39   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
40   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
41   * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
42   */
43  
44  package org.eclipse.jgit.internal.storage.dfs;
45  
46  import static org.eclipse.jgit.internal.storage.pack.PackExt.INDEX;
47  import static org.eclipse.jgit.internal.storage.pack.PackExt.PACK;
48  import static org.eclipse.jgit.lib.Constants.OBJ_OFS_DELTA;
49  import static org.eclipse.jgit.lib.Constants.OBJ_REF_DELTA;
50  
51  import java.io.BufferedInputStream;
52  import java.io.EOFException;
53  import java.io.IOException;
54  import java.io.InputStream;
55  import java.io.OutputStream;
56  import java.nio.ByteBuffer;
57  import java.security.MessageDigest;
58  import java.text.MessageFormat;
59  import java.util.Collection;
60  import java.util.Collections;
61  import java.util.HashSet;
62  import java.util.List;
63  import java.util.Set;
64  import java.util.zip.CRC32;
65  import java.util.zip.DataFormatException;
66  import java.util.zip.Deflater;
67  import java.util.zip.DeflaterOutputStream;
68  import java.util.zip.Inflater;
69  import java.util.zip.InflaterInputStream;
70  
71  import org.eclipse.jgit.annotations.Nullable;
72  import org.eclipse.jgit.errors.CorruptObjectException;
73  import org.eclipse.jgit.errors.IncorrectObjectTypeException;
74  import org.eclipse.jgit.errors.LargeObjectException;
75  import org.eclipse.jgit.internal.JGitText;
76  import org.eclipse.jgit.internal.storage.file.PackIndex;
77  import org.eclipse.jgit.internal.storage.file.PackIndexWriter;
78  import org.eclipse.jgit.internal.storage.pack.PackExt;
79  import org.eclipse.jgit.lib.AbbreviatedObjectId;
80  import org.eclipse.jgit.lib.AnyObjectId;
81  import org.eclipse.jgit.lib.Constants;
82  import org.eclipse.jgit.lib.ObjectId;
83  import org.eclipse.jgit.lib.ObjectIdOwnerMap;
84  import org.eclipse.jgit.lib.ObjectInserter;
85  import org.eclipse.jgit.lib.ObjectLoader;
86  import org.eclipse.jgit.lib.ObjectReader;
87  import org.eclipse.jgit.lib.ObjectStream;
88  import org.eclipse.jgit.transport.PackedObjectInfo;
89  import org.eclipse.jgit.util.BlockList;
90  import org.eclipse.jgit.util.IO;
91  import org.eclipse.jgit.util.NB;
92  import org.eclipse.jgit.util.TemporaryBuffer;
93  import org.eclipse.jgit.util.io.CountingOutputStream;
94  import org.eclipse.jgit.util.sha1.SHA1;
95  
96  /**
97   * Inserts objects into the DFS.
98   */
99  public class DfsInserter extends ObjectInserter {
100 	/** Always produce version 2 indexes, to get CRC data. */
101 	private static final int INDEX_VERSION = 2;
102 
103 	final DfsObjDatabase db;
104 	int compression = Deflater.BEST_COMPRESSION;
105 
106 	List<PackedObjectInfo> objectList;
107 	ObjectIdOwnerMap<PackedObjectInfo> objectMap;
108 
109 	DfsBlockCache cache;
110 	DfsStreamKey packKey;
111 	DfsPackDescription packDsc;
112 	PackStream packOut;
113 	private boolean rollback;
114 	private boolean checkExisting = true;
115 
116 	/**
117 	 * Initialize a new inserter.
118 	 *
119 	 * @param db
120 	 *            database the inserter writes to.
121 	 */
122 	protected DfsInserter(DfsObjDatabase db) {
123 		this.db = db;
124 	}
125 
126 	/**
127 	 * Check existence
128 	 *
129 	 * @param check
130 	 *            if {@code false}, will write out possibly-duplicate objects
131 	 *            without first checking whether they exist in the repo; default
132 	 *            is true.
133 	 */
134 	public void checkExisting(boolean check) {
135 		checkExisting = check;
136 	}
137 
138 	void setCompressionLevel(int compression) {
139 		this.compression = compression;
140 	}
141 
142 	/** {@inheritDoc} */
143 	@Override
144 	public DfsPackParser newPackParser(InputStream in) throws IOException {
145 		return new DfsPackParser(db, this, in);
146 	}
147 
148 	/** {@inheritDoc} */
149 	@Override
150 	public ObjectReader newReader() {
151 		return new Reader();
152 	}
153 
154 	/** {@inheritDoc} */
155 	@Override
156 	public ObjectId insert(int type, byte[] data, int off, int len)
157 			throws IOException {
158 		ObjectId id = idFor(type, data, off, len);
159 		if (objectMap != null && objectMap.contains(id))
160 			return id;
161 		// Ignore unreachable (garbage) objects here.
162 		if (checkExisting && db.has(id, true))
163 			return id;
164 
165 		long offset = beginObject(type, len);
166 		packOut.compress.write(data, off, len);
167 		packOut.compress.finish();
168 		return endObject(id, offset);
169 	}
170 
171 	/** {@inheritDoc} */
172 	@Override
173 	public ObjectId insert(int type, long len, InputStream in)
174 			throws IOException {
175 		byte[] buf = insertBuffer(len);
176 		if (len <= buf.length) {
177 			IO.readFully(in, buf, 0, (int) len);
178 			return insert(type, buf, 0, (int) len);
179 		}
180 
181 		long offset = beginObject(type, len);
182 		SHA1 md = digest();
183 		md.update(Constants.encodedTypeString(type));
184 		md.update((byte) ' ');
185 		md.update(Constants.encodeASCII(len));
186 		md.update((byte) 0);
187 
188 		while (0 < len) {
189 			int n = in.read(buf, 0, (int) Math.min(buf.length, len));
190 			if (n <= 0)
191 				throw new EOFException();
192 			md.update(buf, 0, n);
193 			packOut.compress.write(buf, 0, n);
194 			len -= n;
195 		}
196 		packOut.compress.finish();
197 		return endObject(md.toObjectId(), offset);
198 	}
199 
200 	private byte[] insertBuffer(long len) {
201 		byte[] buf = buffer();
202 		if (len <= buf.length)
203 			return buf;
204 		if (len < db.getReaderOptions().getStreamFileThreshold()) {
205 			try {
206 				return new byte[(int) len];
207 			} catch (OutOfMemoryError noMem) {
208 				return buf;
209 			}
210 		}
211 		return buf;
212 	}
213 
214 	/** {@inheritDoc} */
215 	@Override
216 	public void flush() throws IOException {
217 		if (packDsc == null)
218 			return;
219 
220 		if (packOut == null)
221 			throw new IOException();
222 
223 		byte[] packHash = packOut.writePackFooter();
224 		packDsc.addFileExt(PACK);
225 		packDsc.setFileSize(PACK, packOut.getCount());
226 		packOut.close();
227 		packOut = null;
228 
229 		sortObjectsById();
230 
231 		PackIndex index = writePackIndex(packDsc, packHash, objectList);
232 		db.commitPack(Collections.singletonList(packDsc), null);
233 		rollback = false;
234 
235 		DfsPackFile p = new DfsPackFile(cache, packDsc);
236 		if (index != null)
237 			p.setPackIndex(index);
238 		db.addPack(p);
239 		clear();
240 	}
241 
242 	/** {@inheritDoc} */
243 	@Override
244 	public void close() {
245 		if (packOut != null) {
246 			try {
247 				packOut.close();
248 			} catch (IOException err) {
249 				// Ignore a close failure, the pack should be removed.
250 			} finally {
251 				packOut = null;
252 			}
253 		}
254 		if (rollback && packDsc != null) {
255 			try {
256 				db.rollbackPack(Collections.singletonList(packDsc));
257 			} finally {
258 				packDsc = null;
259 				rollback = false;
260 			}
261 		}
262 		clear();
263 	}
264 
265 	private void clear() {
266 		objectList = null;
267 		objectMap = null;
268 		packKey = null;
269 		packDsc = null;
270 	}
271 
272 	private long beginObject(int type, long len) throws IOException {
273 		if (packOut == null)
274 			beginPack();
275 		long offset = packOut.getCount();
276 		packOut.beginObject(type, len);
277 		return offset;
278 	}
279 
280 	private ObjectId endObject(ObjectId id, long offset) {
281 		PackedObjectInfo obj = new PackedObjectInfo(id);
282 		obj.setOffset(offset);
283 		obj.setCRC((int) packOut.crc32.getValue());
284 		objectList.add(obj);
285 		objectMap.addIfAbsent(obj);
286 		return id;
287 	}
288 
289 	private void beginPack() throws IOException {
290 		objectList = new BlockList<>();
291 		objectMap = new ObjectIdOwnerMap<>();
292 		cache = DfsBlockCache.getInstance();
293 
294 		rollback = true;
295 		packDsc = db.newPack(DfsObjDatabase.PackSource.INSERT);
296 		DfsOutputStream dfsOut = db.writeFile(packDsc, PACK);
297 		packDsc.setBlockSize(PACK, dfsOut.blockSize());
298 		packOut = new PackStream(dfsOut);
299 		packKey = packDsc.getStreamKey(PACK);
300 
301 		// Write the header as though it were a single object pack.
302 		byte[] buf = packOut.hdrBuf;
303 		System.arraycopy(Constants.PACK_SIGNATURE, 0, buf, 0, 4);
304 		NB.encodeInt32(buf, 4, 2); // Always use pack version 2.
305 		NB.encodeInt32(buf, 8, 1); // Always assume 1 object.
306 		packOut.write(buf, 0, 12);
307 	}
308 
309 	private void sortObjectsById() {
310 		Collections.sort(objectList);
311 	}
312 
313 	@Nullable
314 	private TemporaryBuffer.Heap maybeGetTemporaryBuffer(
315 			List<PackedObjectInfo> list) {
316 		if (list.size() <= 58000) {
317 			return new TemporaryBuffer.Heap(2 << 20);
318 		}
319 		return null;
320 	}
321 
322 	PackIndex writePackIndex(DfsPackDescription pack, byte[] packHash,
323 			List<PackedObjectInfo> list) throws IOException {
324 		pack.setIndexVersion(INDEX_VERSION);
325 		pack.setObjectCount(list.size());
326 
327 		// If there are less than 58,000 objects, the entire index fits in under
328 		// 2 MiB. Callers will probably need the index immediately, so buffer
329 		// the index in process and load from the buffer.
330 		PackIndex packIndex = null;
331 		try (TemporaryBuffer.Heap buf = maybeGetTemporaryBuffer(list);
332 				DfsOutputStream os = db.writeFile(pack, INDEX);
333 				CountingOutputStream cnt = new CountingOutputStream(os)) {
334 			if (buf != null) {
335 				index(buf, packHash, list);
336 				packIndex = PackIndex.read(buf.openInputStream());
337 				buf.writeTo(cnt, null);
338 			} else {
339 				index(cnt, packHash, list);
340 			}
341 			pack.addFileExt(INDEX);
342 			pack.setBlockSize(INDEX, os.blockSize());
343 			pack.setFileSize(INDEX, cnt.getCount());
344 		}
345 		return packIndex;
346 	}
347 
348 	private static void index(OutputStream out, byte[] packHash,
349 			List<PackedObjectInfo> list) throws IOException {
350 		PackIndexWriter.createVersion(out, INDEX_VERSION).write(list, packHash);
351 	}
352 
353 	private class PackStream extends OutputStream {
354 		private final DfsOutputStream out;
355 		private final MessageDigest md;
356 		final byte[] hdrBuf;
357 		private final Deflater deflater;
358 		private final int blockSize;
359 
360 		private long currPos; // Position of currBuf[0] in the output stream.
361 		private int currPtr; // Number of bytes in currBuf.
362 		private byte[] currBuf;
363 
364 		final CRC32 crc32;
365 		final DeflaterOutputStream compress;
366 
367 		PackStream(DfsOutputStream out) {
368 			this.out = out;
369 
370 			hdrBuf = new byte[32];
371 			md = Constants.newMessageDigest();
372 			crc32 = new CRC32();
373 			deflater = new Deflater(compression);
374 			compress = new DeflaterOutputStream(this, deflater, 8192);
375 
376 			int size = out.blockSize();
377 			if (size <= 0)
378 				size = cache.getBlockSize();
379 			else if (size < cache.getBlockSize())
380 				size = (cache.getBlockSize() / size) * size;
381 			blockSize = size;
382 			currBuf = new byte[blockSize];
383 		}
384 
385 		long getCount() {
386 			return currPos + currPtr;
387 		}
388 
389 		void beginObject(int objectType, long length) throws IOException {
390 			crc32.reset();
391 			deflater.reset();
392 			write(hdrBuf, 0, encodeTypeSize(objectType, length));
393 		}
394 
395 		private int encodeTypeSize(int type, long rawLength) {
396 			long nextLength = rawLength >>> 4;
397 			hdrBuf[0] = (byte) ((nextLength > 0 ? 0x80 : 0x00) | (type << 4) | (rawLength & 0x0F));
398 			rawLength = nextLength;
399 			int n = 1;
400 			while (rawLength > 0) {
401 				nextLength >>>= 7;
402 				hdrBuf[n++] = (byte) ((nextLength > 0 ? 0x80 : 0x00) | (rawLength & 0x7F));
403 				rawLength = nextLength;
404 			}
405 			return n;
406 		}
407 
408 		@Override
409 		public void write(int b) throws IOException {
410 			hdrBuf[0] = (byte) b;
411 			write(hdrBuf, 0, 1);
412 		}
413 
414 		@Override
415 		public void write(byte[] data, int off, int len) throws IOException {
416 			crc32.update(data, off, len);
417 			md.update(data, off, len);
418 			writeNoHash(data, off, len);
419 		}
420 
421 		private void writeNoHash(byte[] data, int off, int len)
422 				throws IOException {
423 			while (0 < len) {
424 				int n = Math.min(len, currBuf.length - currPtr);
425 				if (n == 0) {
426 					flushBlock();
427 					currBuf = new byte[blockSize];
428 					continue;
429 				}
430 
431 				System.arraycopy(data, off, currBuf, currPtr, n);
432 				off += n;
433 				len -= n;
434 				currPtr += n;
435 			}
436 		}
437 
438 		private void flushBlock() throws IOException {
439 			out.write(currBuf, 0, currPtr);
440 
441 			byte[] buf;
442 			if (currPtr == currBuf.length)
443 				buf = currBuf;
444 			else
445 				buf = copyOf(currBuf, 0, currPtr);
446 			cache.put(new DfsBlock(packKey, currPos, buf));
447 
448 			currPos += currPtr;
449 			currPtr = 0;
450 			currBuf = null;
451 		}
452 
453 		private byte[] copyOf(byte[] src, int ptr, int cnt) {
454 			byte[] dst = new byte[cnt];
455 			System.arraycopy(src, ptr, dst, 0, cnt);
456 			return dst;
457 		}
458 
459 		byte[] writePackFooter() throws IOException {
460 			byte[] packHash = md.digest();
461 			writeNoHash(packHash, 0, packHash.length);
462 			if (currPtr != 0)
463 				flushBlock();
464 			return packHash;
465 		}
466 
467 		int read(long pos, byte[] dst, int ptr, int cnt) throws IOException {
468 			int r = 0;
469 			while (pos < currPos && r < cnt) {
470 				DfsBlock b = getOrLoadBlock(pos);
471 				int n = b.copy(pos, dst, ptr + r, cnt - r);
472 				pos += n;
473 				r += n;
474 			}
475 			if (currPos <= pos && r < cnt) {
476 				int s = (int) (pos - currPos);
477 				int n = Math.min(currPtr - s, cnt - r);
478 				System.arraycopy(currBuf, s, dst, ptr + r, n);
479 				r += n;
480 			}
481 			return r;
482 		}
483 
484 		byte[] inflate(DfsReader ctx, long pos, int len) throws IOException,
485 				DataFormatException {
486 			byte[] dstbuf;
487 			try {
488 				dstbuf = new byte[len];
489 			} catch (OutOfMemoryError noMemory) {
490 				return null; // Caller will switch to large object streaming.
491 			}
492 
493 			Inflater inf = ctx.inflater();
494 			pos += setInput(pos, inf);
495 			for (int dstoff = 0;;) {
496 				int n = inf.inflate(dstbuf, dstoff, dstbuf.length - dstoff);
497 				dstoff += n;
498 				if (inf.finished())
499 					return dstbuf;
500 				if (inf.needsInput())
501 					pos += setInput(pos, inf);
502 				else if (n == 0)
503 					throw new DataFormatException();
504 			}
505 		}
506 
507 		private int setInput(long pos, Inflater inf)
508 				throws IOException, DataFormatException {
509 			if (pos < currPos)
510 				return getOrLoadBlock(pos).setInput(pos, inf);
511 			if (pos < currPos + currPtr) {
512 				int s = (int) (pos - currPos);
513 				int n = currPtr - s;
514 				inf.setInput(currBuf, s, n);
515 				return n;
516 			}
517 			throw new EOFException(JGitText.get().unexpectedEofInPack);
518 		}
519 
520 		private DfsBlock getOrLoadBlock(long pos) throws IOException {
521 			long s = toBlockStart(pos);
522 			DfsBlock b = cache.get(packKey, s);
523 			if (b != null)
524 				return b;
525 
526 			byte[] d = new byte[blockSize];
527 			for (int p = 0; p < blockSize;) {
528 				int n = out.read(s + p, ByteBuffer.wrap(d, p, blockSize - p));
529 				if (n <= 0)
530 					throw new EOFException(JGitText.get().unexpectedEofInPack);
531 				p += n;
532 			}
533 			b = new DfsBlock(packKey, s, d);
534 			cache.put(b);
535 			return b;
536 		}
537 
538 		private long toBlockStart(long pos) {
539 			return (pos / blockSize) * blockSize;
540 		}
541 
542 		@Override
543 		public void close() throws IOException {
544 			deflater.end();
545 			out.close();
546 		}
547 	}
548 
549 	private class Reader extends ObjectReader {
550 		private final DfsReader ctx = db.newReader();
551 
552 		@Override
553 		public ObjectReader newReader() {
554 			return db.newReader();
555 		}
556 
557 		@Override
558 		public Collection<ObjectId> resolve(AbbreviatedObjectId id)
559 				throws IOException {
560 			Collection<ObjectId> stored = ctx.resolve(id);
561 			if (objectList == null)
562 				return stored;
563 
564 			Set<ObjectId> r = new HashSet<>(stored.size() + 2);
565 			r.addAll(stored);
566 			for (PackedObjectInfo obj : objectList) {
567 				if (id.prefixCompare(obj) == 0)
568 					r.add(obj.copy());
569 			}
570 			return r;
571 		}
572 
573 		@Override
574 		public ObjectLoader open(AnyObjectId objectId, int typeHint)
575 				throws IOException {
576 			if (objectMap == null)
577 				return ctx.open(objectId, typeHint);
578 
579 			PackedObjectInfo obj = objectMap.get(objectId);
580 			if (obj == null)
581 				return ctx.open(objectId, typeHint);
582 
583 			byte[] buf = buffer();
584 			int cnt = packOut.read(obj.getOffset(), buf, 0, 20);
585 			if (cnt <= 0)
586 					throw new EOFException(JGitText.get().unexpectedEofInPack);
587 
588 			int c = buf[0] & 0xff;
589 			int type = (c >> 4) & 7;
590 			if (type == OBJ_OFS_DELTA || type == OBJ_REF_DELTA)
591 				throw new IOException(MessageFormat.format(
592 						JGitText.get().cannotReadBackDelta, Integer.toString(type)));
593 			if (typeHint != OBJ_ANY && type != typeHint) {
594 				throw new IncorrectObjectTypeException(objectId.copy(), typeHint);
595 			}
596 
597 			long sz = c & 0x0f;
598 			int ptr = 1;
599 			int shift = 4;
600 			while ((c & 0x80) != 0) {
601 				if (ptr >= cnt)
602 					throw new EOFException(JGitText.get().unexpectedEofInPack);
603 				c = buf[ptr++] & 0xff;
604 				sz += ((long) (c & 0x7f)) << shift;
605 				shift += 7;
606 			}
607 
608 			long zpos = obj.getOffset() + ptr;
609 			if (sz < ctx.getStreamFileThreshold()) {
610 				byte[] data = inflate(obj, zpos, (int) sz);
611 				if (data != null)
612 					return new ObjectLoader.SmallObject(type, data);
613 			}
614 			return new StreamLoader(obj.copy(), type, sz, packKey, zpos);
615 		}
616 
617 		private byte[] inflate(PackedObjectInfo obj, long zpos, int sz)
618 				throws IOException, CorruptObjectException {
619 			try {
620 				return packOut.inflate(ctx, zpos, sz);
621 			} catch (DataFormatException dfe) {
622 				throw new CorruptObjectException(
623 						MessageFormat.format(
624 								JGitText.get().objectAtHasBadZlibStream,
625 								Long.valueOf(obj.getOffset()),
626 								packDsc.getFileName(PackExt.PACK)),
627 						dfe);
628 			}
629 		}
630 
631 		@Override
632 		public boolean has(AnyObjectId objectId) throws IOException {
633 			return (objectMap != null && objectMap.contains(objectId))
634 					|| ctx.has(objectId);
635 		}
636 
637 		@Override
638 		public Set<ObjectId> getShallowCommits() throws IOException {
639 			return ctx.getShallowCommits();
640 		}
641 
642 		@Override
643 		public ObjectInserter getCreatedFromInserter() {
644 			return DfsInserter.this;
645 		}
646 
647 		@Override
648 		public void close() {
649 			ctx.close();
650 		}
651 	}
652 
653 	private class StreamLoader extends ObjectLoader {
654 		private final ObjectId id;
655 		private final int type;
656 		private final long size;
657 
658 		private final DfsStreamKey srcPack;
659 		private final long pos;
660 
661 		StreamLoader(ObjectId id, int type, long sz,
662 				DfsStreamKey key, long pos) {
663 			this.id = id;
664 			this.type = type;
665 			this.size = sz;
666 			this.srcPack = key;
667 			this.pos = pos;
668 		}
669 
670 		@Override
671 		public ObjectStream openStream() throws IOException {
672 			@SuppressWarnings("resource") // Explicitly closed below
673 			final DfsReader ctx = db.newReader();
674 			if (srcPack != packKey) {
675 				try {
676 					// Post DfsInserter.flush() use the normal code path.
677 					// The newly created pack is registered in the cache.
678 					return ctx.open(id, type).openStream();
679 				} finally {
680 					ctx.close();
681 				}
682 			}
683 
684 			int bufsz = 8192;
685 			final Inflater inf = ctx.inflater();
686 			return new ObjectStream.Filter(type,
687 					size, new BufferedInputStream(new InflaterInputStream(
688 							new ReadBackStream(pos), inf, bufsz), bufsz)) {
689 				@Override
690 				public void close() throws IOException {
691 					ctx.close();
692 					super.close();
693 				}
694 			};
695 		}
696 
697 		@Override
698 		public int getType() {
699 			return type;
700 		}
701 
702 		@Override
703 		public long getSize() {
704 			return size;
705 		}
706 
707 		@Override
708 		public boolean isLarge() {
709 			return true;
710 		}
711 
712 		@Override
713 		public byte[] getCachedBytes() throws LargeObjectException {
714 			throw new LargeObjectException.ExceedsLimit(
715 					db.getReaderOptions().getStreamFileThreshold(), size);
716 		}
717 	}
718 
719 	private final class ReadBackStream extends InputStream {
720 		private long pos;
721 
722 		ReadBackStream(long offset) {
723 			pos = offset;
724 		}
725 
726 		@Override
727 		public int read() throws IOException {
728 			byte[] b = new byte[1];
729 			int n = read(b);
730 			return n == 1 ? b[0] & 0xff : -1;
731 		}
732 
733 		@Override
734 		public int read(byte[] buf, int ptr, int len) throws IOException {
735 			int n = packOut.read(pos, buf, ptr, len);
736 			if (n > 0) {
737 				pos += n;
738 			}
739 			return n;
740 		}
741 	}
742 }