Files
“shengyudong” 322ac74336 2025-12-25 upload
2025-12-25 11:16:59 +08:00

1022 lines
34 KiB
Python

import itertools
import sys
import struct
from io import open, BytesIO, SEEK_CUR, SEEK_END # noqa
import warnings
PY2 = sys.version_info[0] == 2
# Kaitai Struct runtime version, in the format defined by PEP 440.
# Used by our setup.cfg to set the version number in
# packaging/distribution metadata.
# Also used in Python code generated by older ksc versions (0.7 through 0.9)
# to check that the imported runtime is compatible with the generated code.
# Since ksc 0.10, the compatibility check instead uses the API_VERSION constant,
# so that the version string does not need to be parsed at runtime
# (see https://github.com/kaitai-io/kaitai_struct/issues/804).
__version__ = '0.11'
# Kaitai Struct runtime API version, as a tuple of ints.
# Used in generated Python code (since ksc 0.10) to check that the imported
# runtime is compatible with the generated code.
API_VERSION = (0, 11)
# pylint: disable=invalid-name,missing-docstring,too-many-public-methods
# pylint: disable=useless-object-inheritance,super-with-arguments,consider-using-f-string
class KaitaiStruct(object):
def __init__(self, io):
self._io = io
def __enter__(self):
return self
def __exit__(self, *args, **kwargs):
self.close()
def close(self):
self._io.close()
@classmethod
def from_file(cls, filename):
f = open(filename, 'rb')
try:
return cls(KaitaiStream(f))
except Exception:
# close file descriptor, then reraise the exception
f.close()
raise
@classmethod
def from_bytes(cls, buf):
return cls(KaitaiStream(BytesIO(buf)))
@classmethod
def from_io(cls, io):
return cls(KaitaiStream(io))
class ReadWriteKaitaiStruct(KaitaiStruct):
def __init__(self, io):
super(ReadWriteKaitaiStruct, self).__init__(io)
self._dirty = True
def _fetch_instances(self):
raise NotImplementedError()
def _write(self, io=None):
self._write__seq(io)
self._fetch_instances()
self._io.write_back_child_streams()
def _write__seq(self, io):
if io is not None:
self._io = io
if self._dirty:
raise ConsistencyNotCheckedError("consistency not checked: _check() has not been called since the last modification of the object")
def __setattr__(self, key, value):
super_setattr = super(ReadWriteKaitaiStruct, self).__setattr__
if not key.startswith('_') or key in {'_parent', '_root'} or key.startswith('_unnamed'):
super_setattr('_dirty', True)
super_setattr(key, value)
class KaitaiStream(object):
def __init__(self, io):
self._io = io
self.align_to_byte()
self.bits_le = False
self.bits_write_mode = False
self.write_back_handler = None
self.child_streams = []
try:
self._size = self.size()
# IOError is for Python 2 (IOError also exists in Python 3, but it has
# become just an alias for OSError).
#
# Although I haven't actually seen a bare ValueError raised in this case
# in practice, chances are some implementation may be doing it (see
# <https://docs.python.org/3/library/io.html#io.IOBase> for reference:
# "Also, implementations may raise a ValueError (or
# UnsupportedOperation) when operations they do not support are
# called."). And I've seen ValueError raised at least in Python 2 when
# calling read() on an unreadable stream.
except (OSError, IOError, ValueError):
# tell() or seek() failed - we have a non-seekable stream (which is
# fine for reading, but writing will fail, see
# _write_bytes_not_aligned())
pass
def __enter__(self):
return self
def __exit__(self, *args, **kwargs):
self.close()
def close(self):
try:
if self.bits_write_mode:
self.write_align_to_byte()
else:
self.align_to_byte()
finally:
self._io.close()
# region Stream positioning
def is_eof(self):
if not self.bits_write_mode and self.bits_left > 0:
return False
# NB: previously, we first tried if self._io.read(1) did in fact read 1
# byte from the stream (and then seeked 1 byte back if so), but given
# that is_eof() may be called from both read and write contexts, it's
# more universal not to use read() at all. See also
# <https://github.com/kaitai-io/kaitai_struct_python_runtime/issues/75>.
return self._io.tell() >= self.size()
def seek(self, n):
if n < 0:
raise InvalidArgumentError("cannot seek to invalid position %d" % (n,))
if self.bits_write_mode:
self.write_align_to_byte()
else:
self.align_to_byte()
self._io.seek(n)
def pos(self):
return self._io.tell() + (1 if self.bits_write_mode and self.bits_left > 0 else 0)
def size(self):
# Python has no internal File object API function to get
# current file / StringIO size, thus we use the following
# trick.
io = self._io
# Remember our current position
cur_pos = io.tell()
# Seek to the end of the stream and remember the full length
full_size = io.seek(0, SEEK_END)
if full_size is None:
# In Python 2, the seek() method of 'file' objects (created by the
# built-in open() function) has no return value, so we have to call
# tell() ourselves to get the new absolute position - see
# <https://github.com/kaitai-io/kaitai_struct_python_runtime/issues/72>.
#
# In Python 3, seek() methods of all
# <https://docs.python.org/3/library/io.html> streams return the new
# position already, so this won't be needed once we drop support for
# Python 2.
full_size = io.tell()
# Seek back to the current position
io.seek(cur_pos)
return full_size
# endregion
# region Structs for numeric types
packer_s1 = struct.Struct('b')
packer_s2be = struct.Struct('>h')
packer_s4be = struct.Struct('>i')
packer_s8be = struct.Struct('>q')
packer_s2le = struct.Struct('<h')
packer_s4le = struct.Struct('<i')
packer_s8le = struct.Struct('<q')
packer_u1 = struct.Struct('B')
packer_u2be = struct.Struct('>H')
packer_u4be = struct.Struct('>I')
packer_u8be = struct.Struct('>Q')
packer_u2le = struct.Struct('<H')
packer_u4le = struct.Struct('<I')
packer_u8le = struct.Struct('<Q')
packer_f4be = struct.Struct('>f')
packer_f8be = struct.Struct('>d')
packer_f4le = struct.Struct('<f')
packer_f8le = struct.Struct('<d')
# endregion
# region Reading
# region Integer numbers
# region Signed
def read_s1(self):
return KaitaiStream.packer_s1.unpack(self.read_bytes(1))[0]
# region Big-endian
def read_s2be(self):
return KaitaiStream.packer_s2be.unpack(self.read_bytes(2))[0]
def read_s4be(self):
return KaitaiStream.packer_s4be.unpack(self.read_bytes(4))[0]
def read_s8be(self):
return KaitaiStream.packer_s8be.unpack(self.read_bytes(8))[0]
# endregion
# region Little-endian
def read_s2le(self):
return KaitaiStream.packer_s2le.unpack(self.read_bytes(2))[0]
def read_s4le(self):
return KaitaiStream.packer_s4le.unpack(self.read_bytes(4))[0]
def read_s8le(self):
return KaitaiStream.packer_s8le.unpack(self.read_bytes(8))[0]
# endregion
# endregion
# region Unsigned
def read_u1(self):
return KaitaiStream.packer_u1.unpack(self.read_bytes(1))[0]
# region Big-endian
def read_u2be(self):
return KaitaiStream.packer_u2be.unpack(self.read_bytes(2))[0]
def read_u4be(self):
return KaitaiStream.packer_u4be.unpack(self.read_bytes(4))[0]
def read_u8be(self):
return KaitaiStream.packer_u8be.unpack(self.read_bytes(8))[0]
# endregion
# region Little-endian
def read_u2le(self):
return KaitaiStream.packer_u2le.unpack(self.read_bytes(2))[0]
def read_u4le(self):
return KaitaiStream.packer_u4le.unpack(self.read_bytes(4))[0]
def read_u8le(self):
return KaitaiStream.packer_u8le.unpack(self.read_bytes(8))[0]
# endregion
# endregion
# endregion
# region Floating point numbers
# region Big-endian
def read_f4be(self):
return KaitaiStream.packer_f4be.unpack(self.read_bytes(4))[0]
def read_f8be(self):
return KaitaiStream.packer_f8be.unpack(self.read_bytes(8))[0]
# endregion
# region Little-endian
def read_f4le(self):
return KaitaiStream.packer_f4le.unpack(self.read_bytes(4))[0]
def read_f8le(self):
return KaitaiStream.packer_f8le.unpack(self.read_bytes(8))[0]
# endregion
# endregion
# region Unaligned bit values
def align_to_byte(self):
self.bits_left = 0
self.bits = 0
def read_bits_int_be(self, n):
self.bits_write_mode = False
res = 0
bits_needed = n - self.bits_left
self.bits_left = -bits_needed % 8
if bits_needed > 0:
# 1 bit => 1 byte
# 8 bits => 1 byte
# 9 bits => 2 bytes
bytes_needed = ((bits_needed - 1) // 8) + 1 # `ceil(bits_needed / 8)`
buf = self._read_bytes_not_aligned(bytes_needed)
if PY2:
buf = bytearray(buf)
for byte in buf:
res = res << 8 | byte
new_bits = res
res = res >> self.bits_left | self.bits << bits_needed
self.bits = new_bits # will be masked at the end of the function
else:
res = self.bits >> -bits_needed # shift unneeded bits out
mask = (1 << self.bits_left) - 1 # `bits_left` is in range 0..7
self.bits &= mask
return res
def read_bits_int(self, n):
"""Deprecated and no longer used as of KSC 0.9. It is only available
for backwards compatibility and will be removed in the future.
KSC 0.9 and later uses `read_bits_int_be()` instead.
"""
warnings.warn(
"read_bits_int() is deprecated since 0.9, use read_bits_int_be() instead",
DeprecationWarning,
stacklevel=2,
)
return self.read_bits_int_be(n)
def read_bits_int_le(self, n):
self.bits_write_mode = False
res = 0
bits_needed = n - self.bits_left
if bits_needed > 0:
# 1 bit => 1 byte
# 8 bits => 1 byte
# 9 bits => 2 bytes
bytes_needed = ((bits_needed - 1) // 8) + 1 # `ceil(bits_needed / 8)`
buf = self._read_bytes_not_aligned(bytes_needed)
if PY2:
buf = bytearray(buf)
for i, byte in enumerate(buf):
res |= byte << (i * 8)
new_bits = res >> bits_needed
res = res << self.bits_left | self.bits
self.bits = new_bits
else:
res = self.bits
self.bits >>= n
self.bits_left = -bits_needed % 8
mask = (1 << n) - 1 # no problem with this in Python (arbitrary precision integers)
res &= mask
return res
# endregion
# region Byte arrays
def read_bytes(self, n):
self.align_to_byte()
return self._read_bytes_not_aligned(n)
def _read_bytes_not_aligned(self, n):
if n < 0:
raise InvalidArgumentError(
"requested invalid %d amount of bytes" %
(n,)
)
is_satisfiable = True
# When a large number of bytes is requested, try to check first
# that there is indeed enough data left in the stream.
# This avoids reading large amounts of data only to notice afterwards
# that it's not long enough. For smaller amounts of data, it's faster to
# first read the data unconditionally and check the length afterwards.
if (
n >= 8*1024*1024 # = 8 MiB
# in Python 2, there is a common error ['file' object has no
# attribute 'seekable'], so we need to make sure that seekable() exists
and callable(getattr(self._io, 'seekable', None))
and self._io.seekable()
):
num_bytes_available = self.size() - self.pos()
is_satisfiable = (n <= num_bytes_available)
if is_satisfiable:
r = self._io.read(n)
num_bytes_available = len(r)
is_satisfiable = (n <= num_bytes_available)
if not is_satisfiable:
# noinspection PyUnboundLocalVariable
raise EndOfStreamError(
"requested %d bytes, but only %d bytes available" %
(n, num_bytes_available),
n, num_bytes_available
)
# noinspection PyUnboundLocalVariable
return r
def read_bytes_full(self):
self.align_to_byte()
return self._io.read()
def read_bytes_term(self, term, include_term, consume_term, eos_error):
self.align_to_byte()
term_byte = KaitaiStream.byte_from_int(term)
r = bytearray()
while True:
c = self._io.read(1)
if not c:
if eos_error:
raise NoTerminatorFoundError(term_byte, 0)
return bytes(r)
if c == term_byte:
if include_term:
r += c
if not consume_term:
self._io.seek(-1, SEEK_CUR)
return bytes(r)
r += c
def read_bytes_term_multi(self, term, include_term, consume_term, eos_error):
self.align_to_byte()
unit_size = len(term)
r = bytearray()
while True:
c = self._io.read(unit_size)
if len(c) < unit_size:
if eos_error:
raise NoTerminatorFoundError(term, len(c))
r += c
return bytes(r)
if c == term:
if include_term:
r += c
if not consume_term:
self._io.seek(-unit_size, SEEK_CUR)
return bytes(r)
r += c
def ensure_fixed_contents(self, expected):
"""Deprecated and no longer used as of KSC 0.9. It is only available
for backwards compatibility and will be removed in the future.
KSC 0.9 and later explicitly raises `ValidationNotEqualError` from an
`if` statement instead.
"""
warnings.warn(
"ensure_fixed_contents() is deprecated since 0.9, explicitly raise "
"ValidationNotEqualError from an `if` statement instead",
DeprecationWarning,
stacklevel=2,
)
actual = self._io.read(len(expected))
if actual != expected:
raise Exception(
"unexpected fixed contents: got %r, was waiting for %r" %
(actual, expected)
)
return actual
@staticmethod
def bytes_strip_right(data, pad_byte):
return data.rstrip(KaitaiStream.byte_from_int(pad_byte))
@staticmethod
def bytes_terminate(data, term, include_term):
term_index = KaitaiStream.byte_array_index_of(data, term)
if term_index == -1:
return data[:]
return data[:term_index + (1 if include_term else 0)]
@staticmethod
def bytes_terminate_multi(data, term, include_term):
unit_size = len(term)
search_index = data.find(term)
while True:
if search_index == -1:
return data[:]
mod = search_index % unit_size
if mod == 0:
return data[:search_index + (unit_size if include_term else 0)]
search_index = data.find(term, search_index + (unit_size - mod))
# endregion
# endregion
# region Writing
def _ensure_bytes_left_to_write(self, n, pos):
try:
full_size = self._size
except AttributeError:
raise ValueError("writing to non-seekable streams is not supported")
num_bytes_left = full_size - pos
if n > num_bytes_left:
raise EndOfStreamError(
"requested to write %d bytes, but only %d bytes left in the stream" %
(n, num_bytes_left),
n, num_bytes_left
)
# region Integer numbers
# region Signed
def write_s1(self, v):
self.write_bytes(KaitaiStream.packer_s1.pack(v))
# region Big-endian
def write_s2be(self, v):
self.write_bytes(KaitaiStream.packer_s2be.pack(v))
def write_s4be(self, v):
self.write_bytes(KaitaiStream.packer_s4be.pack(v))
def write_s8be(self, v):
self.write_bytes(KaitaiStream.packer_s8be.pack(v))
# endregion
# region Little-endian
def write_s2le(self, v):
self.write_bytes(KaitaiStream.packer_s2le.pack(v))
def write_s4le(self, v):
self.write_bytes(KaitaiStream.packer_s4le.pack(v))
def write_s8le(self, v):
self.write_bytes(KaitaiStream.packer_s8le.pack(v))
# endregion
# endregion
# region Unsigned
def write_u1(self, v):
self.write_bytes(KaitaiStream.packer_u1.pack(v))
# region Big-endian
def write_u2be(self, v):
self.write_bytes(KaitaiStream.packer_u2be.pack(v))
def write_u4be(self, v):
self.write_bytes(KaitaiStream.packer_u4be.pack(v))
def write_u8be(self, v):
self.write_bytes(KaitaiStream.packer_u8be.pack(v))
# endregion
# region Little-endian
def write_u2le(self, v):
self.write_bytes(KaitaiStream.packer_u2le.pack(v))
def write_u4le(self, v):
self.write_bytes(KaitaiStream.packer_u4le.pack(v))
def write_u8le(self, v):
self.write_bytes(KaitaiStream.packer_u8le.pack(v))
# endregion
# endregion
# endregion
# region Floating point numbers
# region Big-endian
def write_f4be(self, v):
self.write_bytes(KaitaiStream.packer_f4be.pack(v))
def write_f8be(self, v):
self.write_bytes(KaitaiStream.packer_f8be.pack(v))
# endregion
# region Little-endian
def write_f4le(self, v):
self.write_bytes(KaitaiStream.packer_f4le.pack(v))
def write_f8le(self, v):
self.write_bytes(KaitaiStream.packer_f8le.pack(v))
# endregion
# endregion
# region Unaligned bit values
def write_align_to_byte(self):
if self.bits_left > 0:
b = self.bits
if not self.bits_le:
b <<= 8 - self.bits_left
# We clear the `bits_left` and `bits` fields using align_to_byte()
# before writing the byte in the stream so that it happens even in
# case the write fails. The reason is that if the write fails, it
# would likely be a permanent issue that's not going to resolve
# itself when retrying the operation with the same stream state, and
# since seek() calls write_align_to_byte() at the beginning too, you
# wouldn't be even able to seek anywhere without getting the same
# exception again. So the stream could be in a broken state,
# throwing the same exception over and over again even though you've
# already processed it and you'd like to move on. And the only way
# to get rid of it would be to call align_to_byte() externally
# (given how it's currently implemented), but that's really just a
# coincidence - that's a method intended for reading (not writing)
# and it should never be necessary to call it from the outside (it's
# more like an internal method now).
#
# So it seems more reasonable to deliver the exception once and let
# the user application process it, but otherwise clear the bit
# buffer to make the stream ready for further operations and to
# avoid repeatedly delivering an exception for one past failed
# operation. The rationale behind this is that it's not really a
# failure of the "align to byte" operation, but the writing of some
# bits to the stream that was requested earlier.
self.align_to_byte()
self._write_bytes_not_aligned(KaitaiStream.byte_from_int(b))
def write_bits_int_be(self, n, val):
self.bits_le = False
self.bits_write_mode = True
mask = (1 << n) - 1 # no problem with this in Python (arbitrary precision integers)
val &= mask
bits_to_write = self.bits_left + n
bytes_needed = ((bits_to_write - 1) // 8) + 1 # `ceil(bits_to_write / 8)`
# Unlike self._io.tell(), pos() respects the `bits_left` field (it
# returns the stream position as if it were already aligned on a byte
# boundary), which ensures that we report the same numbers of bytes here
# as read_bits_int_*() methods would.
self._ensure_bytes_left_to_write(bytes_needed - (1 if self.bits_left > 0 else 0), self.pos())
bytes_to_write = bits_to_write // 8
self.bits_left = bits_to_write % 8
if bytes_to_write > 0:
buf = bytearray(bytes_to_write)
mask = (1 << self.bits_left) - 1 # `bits_left` is in range 0..7
new_bits = val & mask
val = val >> self.bits_left | self.bits << (n - self.bits_left)
self.bits = new_bits
for i in range(bytes_to_write - 1, -1, -1):
buf[i] = val & 0xff
val >>= 8
self._write_bytes_not_aligned(buf)
else:
self.bits = self.bits << n | val
def write_bits_int_le(self, n, val):
self.bits_le = True
self.bits_write_mode = True
bits_to_write = self.bits_left + n
bytes_needed = ((bits_to_write - 1) // 8) + 1 # `ceil(bits_to_write / 8)`
# Unlike self._io.tell(), pos() respects the `bits_left` field (it
# returns the stream position as if it were already aligned on a byte
# boundary), which ensures that we report the same numbers of bytes here
# as read_bits_int_*() methods would.
self._ensure_bytes_left_to_write(bytes_needed - (1 if self.bits_left > 0 else 0), self.pos())
bytes_to_write = bits_to_write // 8
old_bits_left = self.bits_left
self.bits_left = bits_to_write % 8
if bytes_to_write > 0:
buf = bytearray(bytes_to_write)
new_bits = val >> (n - self.bits_left) # no problem with this in Python (arbitrary precision integers)
val = val << old_bits_left | self.bits
self.bits = new_bits
for i in range(bytes_to_write):
buf[i] = val & 0xff
val >>= 8
self._write_bytes_not_aligned(buf)
else:
self.bits |= val << old_bits_left
mask = (1 << self.bits_left) - 1 # `bits_left` is in range 0..7
self.bits &= mask
# endregion
# region Byte arrays
def write_bytes(self, buf):
self.write_align_to_byte()
self._write_bytes_not_aligned(buf)
def _write_bytes_not_aligned(self, buf):
n = len(buf)
self._ensure_bytes_left_to_write(n, self._io.tell())
self._io.write(buf)
def write_bytes_limit(self, buf, size, term, pad_byte):
n = len(buf)
# Strictly speaking, this assertion is redundant because it is already
# done in the corresponding _check() method in the generated code, but
# it seems to make sense to include it here anyway so that this method
# itself does something reasonable for every set of arguments.
#
# However, it should never be `false` when operated correctly (and in
# this case, assigning inconsistent values to fields of a KS-generated
# object is considered correct operation if the user application calls
# the corresponding _check(), which we know would raise an error and
# thus the code should not reach _write() and this method at all). So
# it's by design that this throws AssertionError, not any specific
# error, because it's not intended to be caught in user applications,
# but avoided by calling all _check() methods correctly.
assert n <= size, "writing %d bytes, but %d bytes were given" % (size, n)
self.write_bytes(buf)
if n < size:
self.write_u1(term)
self.write_bytes(KaitaiStream.byte_from_int(pad_byte) * (size - n - 1))
# endregion
# endregion
# region Byte array processing
@staticmethod
def process_xor_one(data, key):
if PY2:
return bytes(bytearray(v ^ key for v in bytearray(data)))
return bytes(v ^ key for v in data)
@staticmethod
def process_xor_many(data, key):
if PY2:
return bytes(bytearray(a ^ b for a, b in zip(bytearray(data), itertools.cycle(bytearray(key)))))
return bytes(a ^ b for a, b in zip(data, itertools.cycle(key)))
@staticmethod
def process_rotate_left(data, amount, group_size):
if group_size != 1:
raise NotImplementedError(
"unable to rotate group of %d bytes yet" %
(group_size,)
)
anti_amount = -amount % (group_size * 8)
r = bytearray(data)
for i, byte in enumerate(r):
r[i] = (byte << amount) & 0xff | (byte >> anti_amount)
return bytes(r)
# endregion
# region Misc runtime operations
@staticmethod
def int_from_byte(v):
return ord(v) if PY2 else v
@staticmethod
def byte_from_int(i):
return chr(i) if PY2 else bytes((i,))
@staticmethod
def byte_array_index(data, i):
return KaitaiStream.int_from_byte(data[i])
@staticmethod
def byte_array_min(b):
return KaitaiStream.int_from_byte(min(b))
@staticmethod
def byte_array_max(b):
return KaitaiStream.int_from_byte(max(b))
@staticmethod
def byte_array_index_of(data, b):
return data.find(KaitaiStream.byte_from_int(b))
@staticmethod
def resolve_enum(enum_obj, value):
"""Resolves value using enum: if the value is not found in the map,
we'll just use literal value per se. Works around problem with Python
enums throwing an exception when encountering unknown value.
"""
try:
return enum_obj(value)
except ValueError:
return value
# endregion
def to_byte_array(self):
pos = self.pos()
self.seek(0)
r = self.read_bytes_full()
self.seek(pos)
return r
class WriteBackHandler(object):
def __init__(self, pos, handler):
self.pos = pos
self.handler = handler
def write_back(self, parent):
parent.seek(self.pos)
self.handler(parent)
def add_child_stream(self, child):
self.child_streams.append(child)
def write_back_child_streams(self, parent=None):
_pos = self.pos()
for child in self.child_streams:
child.write_back_child_streams(self)
# NOTE: Python 2 doesn't have list.clear() so it can't be used, see
# https://docs.python.org/3.11/library/stdtypes.html#mutable-sequence-types
# ("New in version 3.3: clear() and copy() methods.")
del self.child_streams[:]
self.seek(_pos)
if parent is not None:
self._write_back(parent)
def _write_back(self, parent):
self.write_back_handler.write_back(parent)
class KaitaiStructError(Exception):
"""Common ancestor for all errors originating from correct Kaitai Struct
usage (i.e. errors that indicate a problem with user input, not errors
indicating incorrect usage that are not meant to be caught but fixed in the
application code). Use this exception type in the `except` clause if you
want to handle all parse errors and serialization errors.
If available, the `src_path` attribute will contain the KSY source path
pointing to the element where the error occurred. If it is not available,
`src_path` will be `None`.
"""
def __init__(self, msg, src_path):
super(KaitaiStructError, self).__init__(("" if src_path is None else src_path + ": ") + msg)
self.src_path = src_path
class InvalidArgumentError(KaitaiStructError, ValueError):
"""Indicates that an invalid argument value was received (like `ValueError`),
but used in places where this might indicate invalid user input and
therefore represents a parse error or serialization error.
"""
def __init__(self, msg):
super(InvalidArgumentError, self).__init__(msg, None)
class EndOfStreamError(KaitaiStructError, EOFError):
"""Read or write beyond end of stream. Provides the `bytes_needed` (number
of bytes requested to read or write) and `bytes_available` (number of bytes
remaining in the stream) attributes.
"""
def __init__(self, msg, bytes_needed, bytes_available):
super(EndOfStreamError, self).__init__(msg, None)
self.bytes_needed = bytes_needed
self.bytes_available = bytes_available
class NoTerminatorFoundError(EndOfStreamError):
"""Special type of `EndOfStreamError` that occurs when end of stream is
reached before the required terminator is found. If you want to tolerate a
missing terminator, you can specify `eos-error: false` in the KSY
specification, in which case the end of stream will be considered a valid
end of field and this error will no longer be raised.
The `term` attribute contains a `bytes` object with the searched terminator.
"""
def __init__(self, term, bytes_available):
super(NoTerminatorFoundError, self).__init__("end of stream reached, but no terminator %r found" % (term,), len(term), bytes_available)
self.term = term
class UndecidedEndiannessError(KaitaiStructError):
"""Error that occurs when default endianness should be decided with
switch, but nothing matches (although using endianness expression
implies that there should be some positive result).
"""
def __init__(self, src_path):
super(UndecidedEndiannessError, self).__init__("unable to decide on endianness for a type", src_path)
class ValidationFailedError(KaitaiStructError):
"""Common ancestor for all validation failures. Stores pointer to
KaitaiStream IO object which was involved in an error.
"""
def __init__(self, msg, io, src_path):
super(ValidationFailedError, self).__init__(("" if io is None else "at pos %d: " % (io.pos(),)) + "validation failed: " + msg, src_path)
self.io = io
class ValidationNotEqualError(ValidationFailedError):
"""Signals validation failure: we required "actual" value to be equal to
"expected", but it turned out that it's not.
"""
def __init__(self, expected, actual, io, src_path):
super(ValidationNotEqualError, self).__init__("not equal, expected %s, but got %s" % (repr(expected), repr(actual)), io, src_path)
self.expected = expected
self.actual = actual
class ValidationLessThanError(ValidationFailedError):
"""Signals validation failure: we required "actual" value to be
greater than or equal to "min", but it turned out that it's not.
"""
def __init__(self, min_bound, actual, io, src_path):
super(ValidationLessThanError, self).__init__("not in range, min %s, but got %s" % (repr(min_bound), repr(actual)), io, src_path)
self.min = min_bound
self.actual = actual
class ValidationGreaterThanError(ValidationFailedError):
"""Signals validation failure: we required "actual" value to be
less than or equal to "max", but it turned out that it's not.
"""
def __init__(self, max_bound, actual, io, src_path):
super(ValidationGreaterThanError, self).__init__("not in range, max %s, but got %s" % (repr(max_bound), repr(actual)), io, src_path)
self.max = max_bound
self.actual = actual
class ValidationNotAnyOfError(ValidationFailedError):
"""Signals validation failure: we required "actual" value to be
from the list, but it turned out that it's not.
"""
def __init__(self, actual, io, src_path):
super(ValidationNotAnyOfError, self).__init__("not any of the list, got %s" % (repr(actual)), io, src_path)
self.actual = actual
class ValidationNotInEnumError(ValidationFailedError):
"""Signals validation failure: we required "actual" value to be in
the enum, but it turned out that it's not.
"""
def __init__(self, actual, io, src_path):
super(ValidationNotInEnumError, self).__init__("not in the enum, got %s" % (repr(actual)), io, src_path)
self.actual = actual
class ValidationExprError(ValidationFailedError):
"""Signals validation failure: we required "actual" value to match
the expression, but it turned out that it doesn't.
"""
def __init__(self, actual, io, src_path):
super(ValidationExprError, self).__init__("not matching the expression, got %s" % (repr(actual)), io, src_path)
self.actual = actual
class ConsistencyError(Exception):
def __init__(self, attr_id, expected, actual):
super(ConsistencyError, self).__init__("Check failed: %s, expected: %s, actual: %s" % (attr_id, repr(expected), repr(actual)))
self.id = attr_id
self.expected = expected
self.actual = actual
class ConsistencyNotCheckedError(Exception):
"""Thrown when attempting to write an object whose consistency hasn't been
checked since the last modification."""
pass