Christos TZOTZIOY Georgiou wrote:
2. wrap your file-like object in a custom object, which implements a
pushback method and its read method returns first from the push-back
buffer. If you read data that you shouldn't, push them back and give
your custom object to the StreamReader.
Thanks for the suggestion.
Instead of a pushback method, I added a peek method. Below is what I
came up with.
-- Jason
class PeekableFile:
def __init__(self, source):
self.source = source
self.buffer = None
def peek(self, size):
if self.buffer:
n = len(self.buffer)
if size > n:
self.buffer += self.source.read(size - n)
else:
self.buffer = self.source.read(size)
return self.buffer[:size]
def read(self, size=-1):
if self.buffer:
if size >= 0:
n = len(self.buffer)
if size < n:
s = self.buffer[:size]
self.buffer = self.buffer[size:]
elif size == n:
s = self.buffer
self.buffer = None
else:
s = self.buffer + self.source.read(size - n)
self.buffer = None
else:
s = self.buffer + self.source.read()
self.buffer = None
else:
s = self.source.read(size)
return s
def main():
import StringIO
import unittest
class PeekableFileTests(unittest.TestCase):
def setUp(self):
f = StringIO.StringIO('abc')
self.pf = PeekableFile(f)
def testPeek0(self):
self.failUnlessEqual(self.pf.peek(0), '')
def testPeek1(self):
self.failUnlessEqual(self.pf.peek(1), 'a')
def testPeek1Read1(self):
self.failUnlessEqual(self.pf.peek(1), 'a')
self.failUnlessEqual(self.pf.read(1), 'a')
def testPeek1Read2(self):
self.failUnlessEqual(self.pf.peek(1), 'a')
self.failUnlessEqual(self.pf.read(2), 'ab')
def testPeek1ReadAll(self):
self.failUnlessEqual(self.pf.peek(1), 'a')
self.failUnlessEqual(self.pf.read(), 'abc')
def testPeek1Read1Read1(self):
self.failUnlessEqual(self.pf.peek(1), 'a')
self.failUnlessEqual(self.pf.read(1), 'a')
self.failUnlessEqual(self.pf.read(1), 'b')
def testPeek1Read1ReadAll(self):
self.failUnlessEqual(self.pf.peek(1), 'a')
self.failUnlessEqual(self.pf.read(1), 'a')
self.failUnlessEqual(self.pf.read(), 'bc')
def testPeek1Peek1(self):
self.failUnlessEqual(self.pf.peek(1), 'a')
self.failUnlessEqual(self.pf.peek(1), 'a')
def testPeek1Peek2(self):
self.failUnlessEqual(self.pf.peek(1), 'a')
self.failUnlessEqual(self.pf.peek(2), 'ab')
def testPeek2Peek1(self):
self.failUnlessEqual(self.pf.peek(2), 'ab')
self.failUnlessEqual(self.pf.peek(1), 'a')
def testPeek2Read1Peek1(self):
self.failUnlessEqual(self.pf.peek(2), 'ab')
self.failUnlessEqual(self.pf.read(1), 'a')
self.failUnlessEqual(self.pf.peek(1), 'b')
def testRead0(self):
self.failUnlessEqual(self.pf.read(0), '')
def testRead1(self):
self.failUnlessEqual(self.pf.read(1), 'a')
def testReadAll(self):
self.failUnlessEqual(self.pf.read(), 'abc')
def testRead1Peek1(self):
self.failUnlessEqual(self.pf.read(1), 'a')
self.failUnlessEqual(self.pf.peek(1), 'b')
def testReadAllPeek1(self):
self.failUnlessEqual(self.pf.read(), 'abc')
self.failUnlessEqual(self.pf.peek(1), '')
unittest.TextTestRunner().run(unittest.makeSuite(P eekableFileTests))
if __name__ == '__main__':
main()