"""
Mastercard |reg| IPM clearing file readers and writers
* VBS file readers and writers
* IPM file readers and writers
* IPM parameter extract reader
* Support for 1014 blocked format
Read an IPM file::
from cardutil import mciipm
with open('ipm_in.bin', 'rb') as ipm_in:
reader = mciipm.IpmReader(ipm_in)
for record in reader:
print(record)
Create an IPM file::
from cardutil import mciipm
with open('ipm_out.bin', 'wb') as ipm_out:
writer = mciipm.IpmWriter(ipm_out)
writer.write({'MTI': '1111', 'DE2': '9999111122221111'})
writer.close()
MasterCard file formats
-----------------------
VBS file format
^^^^^^^^^^^^^^^
This format is a basic variable record format.
There are no carriage returns or line feeds in the file.
A file consists of records. Each record is prefixed with a 4 byte binary length.
Say you had a file with the following 2 records:
.. code-block:: text
"This is first record 1234567" <- length 28
"This is second record AAAABBBBB123" <- length 34
Add 4 byte binary length to the start of each record. (x'1C' = 28, x'22' = 34)
with the file finishing with a zero length record length
.. code-block:: hexdump
00000000: 00 00 00 1C 54 68 69 73 20 69 73 20 66 69 72 73 ....This is firs
00000010: 74 20 72 65 63 6F 72 64 20 31 32 33 34 35 36 37 t record 1234567
00000020: 00 00 00 22 54 68 69 73 20 69 73 20 73 65 63 6F ..."This is seco
00000030: 6E 64 20 72 65 63 6F 72 64 20 41 41 41 41 42 42 nd record AAAABB
00000040: 42 42 42 31 32 33 00 00 00 00 BBB123....
1014 blocked file format
^^^^^^^^^^^^^^^^^^^^^^^^
This is the same as VBS format with 1014 blocking applied.
The VBS data is blocked into lengths of 1012, and an additional
2 x'40' characters are appended at each block.
Finally, the total file length is made a multiple of 1014 with the final
incomplete record being filled with the x'40' character
Taking the above VBS example
.. code-block:: hexdump
00000000: 00 00 00 1C 54 68 69 73 20 69 73 20 66 69 72 73 ....This is firs
00000010: 74 20 72 65 63 6F 72 64 20 31 32 33 34 35 36 37 t record 1234567
00000020: 00 00 00 22 54 68 69 73 20 69 73 20 73 65 63 6F ..."This is seco
00000030: 6E 64 20 72 65 63 6F 72 64 20 41 41 41 41 42 42 nd record AAAABB
00000040: 42 42 42 31 32 33 00 00 00 00 BBB123....
Block to 1014 by adding 2 * x'40' characters every 1012 characters in the data.
Finally fill with x'40' characters to next 1014 increment.
In this case, there is only one increment
.. code-block:: hexdump
00000000: 00 00 00 1C 54 68 69 73 20 69 73 20 66 69 72 73 ....This is firs
00000010: 74 20 72 65 63 6F 72 64 20 31 32 33 34 35 36 37 t record 1234567
00000020: 00 00 00 22 54 68 69 73 20 69 73 20 73 65 63 6F ..."This is seco
00000030: 6E 64 20 72 65 63 6F 72 64 20 41 41 41 41 42 42 nd record AAAABB
00000040: 42 42 42 31 32 33 00 00 00 00 40 40 40 40 40 40 BBB123....@@@@@@
00000050: 40 40 40 40 40 40 40 40 40 40 40 40 40 40 40 40 @@@@@@@@@@@@@@@@
000003E0: 40 40 40 40 40 40 40 40 40 40 40 40 40 40 40 40 @@@@@@@@@@@@@@@@
000003F0: 40 40 40 40 40 40 @@@@@@
"""
import io
import logging
import struct
import typing
from cardutil import iso8583, config, CardutilError, BitArray
from cardutil.vendor import hexdump
LOGGER = logging.getLogger(__name__)
[docs]
class MciIpmDataError(CardutilError):
pass
[docs]
class Block1014(object):
"""
1014 Blocker for file objects.
Wrap around a file object. Return 1014 blocked data
"""
PAD_CHAR = b'\x40'
def __init__(self, file_obj):
self.file_obj = file_obj
self.remaining_chars = 1012
def __getattr__(self, name: str):
"""
Attribute proxy for wrapped file object
"""
try:
return self.__dict__[name]
except KeyError:
if hasattr(self.file_obj, name):
return getattr(self.file_obj, name)
return None
[docs]
def write(self, bytes_to_write: bytes) -> None:
"""
Write requested bytes to the output file object.
"""
# not enough bytes to complete a block, just write and subtract from remaining
LOGGER.debug(f'bytes_to_write={bytes_to_write}')
if len(bytes_to_write) < self.remaining_chars:
LOGGER.debug(f'len->{len(bytes_to_write)}<{self.remaining_chars}')
LOGGER.debug(f'write {bytes_to_write}')
self.file_obj.write(bytes_to_write)
self.remaining_chars -= len(bytes_to_write)
return
# complete the first record
LOGGER.debug(f'write first: {bytes_to_write[:self.remaining_chars]}')
self.file_obj.write(bytes_to_write[:self.remaining_chars])
self.file_obj.write(self.PAD_CHAR * 2)
bytes_to_write = bytes_to_write[self.remaining_chars:]
# now write complete blocks
while len(bytes_to_write) > 1012:
LOGGER.debug(f'write while: {bytes_to_write[:1012]}')
self.file_obj.write(bytes_to_write[:1012])
self.file_obj.write(self.PAD_CHAR * 2)
bytes_to_write = bytes_to_write[1012:]
# write whatever is left
LOGGER.debug(f'write last: {bytes_to_write}')
self.file_obj.write(bytes_to_write)
self.remaining_chars = 1012-len(bytes_to_write)
LOGGER.debug(f'remaining_chars={self.remaining_chars}')
[docs]
def seek(self, pos: int) -> None:
"""
Finalise then seek file object to requested position
.. note:: Method only partially implemented. Only use to seek start of file (zero)
"""
self.finalise()
self.file_obj.seek(pos)
[docs]
def close(self) -> None:
"""
Finalise then close the file object
"""
self.finalise()
self.file_obj.close()
[docs]
def finalise(self) -> None:
"""
Complete the blocking operation by creating final 1014 block.
Called by ``close`` and ``seek`` methods to ensure completion.
"""
self.file_obj.write(self.PAD_CHAR * (self.remaining_chars + 2))
self.remaining_chars = 1012
[docs]
class Unblock1014(object):
"""
Unblocks 1014 blocked file objects.
Wrap around a 1014 blocked file object. Return file like object providing only unblocked data
"""
def __init__(self, file_obj: typing.BinaryIO):
self.file_obj = file_obj
self.buffer = b''
def __getattr__(self, name: str) -> any:
"""
Attribute proxy for wrapped file object
"""
try:
return self.__dict__[name]
except KeyError:
if hasattr(self.file_obj, name):
return getattr(self.file_obj, name)
return None
[docs]
def read(self, bytes_to_read: int = 0):
"""
Read requested bytes from the file object. Returned data will be unblocked
"""
read_all = True if not bytes_to_read else False
while read_all or len(self.buffer) <= bytes_to_read:
block = self.file_obj.read(1014)
if not block: # eof
break
self.buffer += block[:1012]
output = self.buffer[:bytes_to_read]
self.buffer = self.buffer[bytes_to_read:]
return output
[docs]
class VbsReader(object):
"""
The VbsReader class can be used to iterate through a VBS formatted file
object record by record.
::
from cardutil.mciipm import VbsReader
with open('vbs_file.bin', 'rb') as vbs_file:
vbs_reader = VbsReader(vbs_file)
for vbs_record in vbs_reader:
print(vbs_record)
"""
record_number = 1
last_record = None
def __init__(self, vbs_file: typing.BinaryIO, blocked: bool = False):
"""
Create a VbsReader object
:param vbs_file: File object with VBS formatted data
"""
self.vbs_data = vbs_file
if blocked:
self.vbs_data = Unblock1014(vbs_file)
def __getattr__(self, name) -> any:
"""
Attribute proxy for wrapped file object
"""
try:
return self.__dict__[name]
except KeyError:
if hasattr(self.vbs_data, name):
return getattr(self.vbs_data, name)
return None
def __iter__(self):
return self
def __next__(self) -> bytes:
"""
Unpacks a variable blocked object into records
"""
record_length_raw = self.vbs_data.read(4)
if len(record_length_raw) != 4:
# this can happen if the VBS does not have a zero length record at end.
# You can recreate using VbsWriter and not calling close method.
# The reader will just accept we are at end if this happens.
LOGGER.warning(f'Unable to read next record length - requested 4 bytes,'
f' got {len(record_length_raw)} -- assuming end of data')
raise StopIteration
record_length = struct.unpack(">I", record_length_raw)[0]
LOGGER.debug("record_length=%s", record_length)
# throw mcipm data error if length is negative or excessively large (indicates bad input)
if record_length < 0 or record_length > config.config.get("MAX_VBS_RECORD_LENGTH", 6000):
raise MciIpmDataError(f"Exceeded configured maximum VBS record length "
f"({config.config.get('MAX_VBS_RECORD_LENGTH', 6000)})"
f" - value read was {record_length}",
record_number=self.record_number,
binary_context_data=record_length_raw)
# exit if last record (length=0)
if record_length == 0:
raise StopIteration
record = self.vbs_data.read(record_length)
if len(record) != record_length:
raise MciIpmDataError(f"Unable to read complete record - record length: {record_length}, "
f"data read: {len(record)}",
record_number=self.record_number,
binary_context_data=record_length_raw + record)
self.last_record = record_length_raw + record # save last record read
self.record_number += 1 # increment record counter
return record # get the full record including the record length
[docs]
class IpmReader(VbsReader):
"""
IPM reader can be used to iterate through an IPM file
The file object must be in VBS format.
::
from cardutil.mciipm import IpmReader
with open('vbs_in.bin', 'rb') as vbs_in:
reader = IpmReader(vbs_in)
for record in reader:
print(record)
If the file required 1014 block format, then set the ``blocked`` parameter to True.
::
from cardutil.mciipm import IpmReader
with open('blocked_in.bin', 'rb') as blocked_in:
reader = IpmReader(blocked_in, blocked=True)
for record in reader:
print(record)
"""
def __init__(self, ipm_file: typing.BinaryIO,
encoding: str = None, iso_config: dict = None, **kwargs):
"""
Create a new IpmReader
:param ipm_file: the file object to read
:param encoding: the file encoding
:param config: config dict with key bit_config
"""
self.encoding = encoding
self.iso_config = iso_config
super(IpmReader, self).__init__(ipm_file, **kwargs)
def __next__(self) -> dict:
vbs_record = super(IpmReader, self).__next__()
LOGGER.debug(f'{len(vbs_record)}: {vbs_record}')
try:
output = iso8583.loads(vbs_record, encoding=self.encoding, iso_config=self.iso_config)
except CardutilError as ex:
raise MciIpmDataError(
'Error while processing ISO8583 record',
binary_context_data=self.last_record,
record_number=self.record_number,
original_exception=ex
)
return output
[docs]
class IpmParamReader(VbsReader):
"""
IPM Param reader can be used to iterate through an IPM parameter extract file.
The record is returned as a dictionary containing the parameter keys.
::
from cardutil.mciipm import IpmParamReader
with open('param.bin', 'rb') as param_in:
reader = IpmParamReader(param_in, table_id='IP0040T1')
for record in reader:
print(record)
If the parameter file is 1014 block format, then set the ``blocked`` parameter to True.
::
from cardutil.mciipm import IpmParamReader
with open('blocked_param.bin', 'rb') as param_in:
reader = IpmParamReader(param_in, table_id='IP0040T1', blocked=True)
for record in reader:
print(record)
"""
# layout for IP0000T1
_IP0000T1_KEY = slice(11, 19)
_IP0000T1_TABLE_ID = slice(19, 27)
_IP0000T1_TABLE_SUB_ID = slice(243, 246)
# compressed table type for all except IP0000T1 - get this 3 letter code from self.table_keys
_C_EFF_TIMESTAMP = slice(0, 7)
_C_ACTIVE_INACTIVE_CODE = slice(7, 8)
_C_TABLE_SUB_ID = slice(8, 11)
# expanded table common fields
_X_EFF_TIMESTAMP = slice(0, 10)
_X_ACTIVE_INACTIVE_CODE = slice(10, 11)
_X_TABLE_ID = slice(11, 19)
def __init__(
self,
param_file: typing.BinaryIO,
table_id: str,
encoding: str = None,
param_config: dict = None,
expanded: bool = False,
**kwargs,
):
"""
Create a new IpmParamReader
:param param_file: the file object to read
:param table_id: the IPM parameter table to read
:param encoding: the parameter file encoding
:param param_config: config dict with key bit_config
"""
self.encoding = encoding if encoding else 'latin_1'
self.param_config = param_config if param_config else config.config.get('mci_parameter_tables')
self.table_id = table_id
self.table_index = dict()
self.expanded = expanded
super(IpmParamReader, self).__init__(param_file, **kwargs)
# check if config available for table id
if not self.param_config.get(table_id):
raise MciIpmDataError(f'Parameter config not available for table {table_id}')
# load the table index
trailer_record_found = False
while True:
try:
vbs_record = super(IpmParamReader, self).__next__()
except StopIteration:
break
record = vbs_record.decode(self.encoding)
if record[self._IP0000T1_KEY] == 'IP0000T1':
self.table_index[record[self._IP0000T1_TABLE_SUB_ID]] = record[self._IP0000T1_TABLE_ID]
if record.startswith('TRAILER RECORD IP0000T1'):
trailer_record_found = True
break
LOGGER.debug('IP0000T1 records: {}'.format(self.table_index))
if not trailer_record_found:
raise MciIpmDataError('parameter file missing IP0000T1 trailer record')
def __next__(self) -> dict:
while True:
record = super(IpmParamReader, self).__next__()
if self.expanded:
record_table_id = record[self._X_TABLE_ID].decode(self.encoding)
record_effective_timestamp = record[self._X_EFF_TIMESTAMP].decode(
self.encoding
)
record_active_inactive_code = record[
self._X_ACTIVE_INACTIVE_CODE
].decode(self.encoding)
else:
record_table_id = self.table_index.get(
record[self._C_TABLE_SUB_ID].decode(self.encoding)
)
record_effective_timestamp = record[self._C_EFF_TIMESTAMP].decode(
self.encoding
)
record_active_inactive_code = record[
self._C_ACTIVE_INACTIVE_CODE
].decode(self.encoding)
LOGGER.debug(f"{record_table_id=}, {record=}")
if record_table_id == self.table_id:
record_dict = {
"table_id": record_table_id,
"effective_timestamp": record_effective_timestamp,
"active_inactive_code": record_active_inactive_code,
}
for field in self.param_config[record_table_id]:
record_dict[field] = self._get_param_field(record, field)
return record_dict
def _get_param_field(self, record, field):
field_offset = 0
if self.expanded:
record_table_id = record[self._X_TABLE_ID].decode(self.encoding)
else:
record_table_id = self.table_index.get(
record[self._C_TABLE_SUB_ID].decode(self.encoding)
)
field_offset = -8 # all fields should be offset by this value
return record[
self.param_config[record_table_id][field]["start"] + field_offset:
self.param_config[record_table_id][field]["end"] + field_offset
].decode(self.encoding)
[docs]
class VbsWriter(object):
"""
Writes VBS formatted files.
The writer can be used as follows::
>>> with io.BytesIO() as vbs_out:
... writer = VbsWriter(vbs_out)
... writer.write(b'This is the record')
... writer.close()
The `close` method must be issued to finalise the file by adding the zero length record which
indicated the end of the file.
The message is a byte string containing the data.
Alternatively, you can use as a context manager which will take care of the writer closure.
>>> with io.BytesIO() as vbs_out:
... with VbsWriter(vbs_out, blocked=True) as writer:
... writer.write(b'This is the record')
"""
def __init__(self, out_file: typing.BinaryIO, blocked: bool = False):
self.out_file = out_file
if blocked:
self.out_file = Block1014(out_file)
def __getattr__(self, name) -> any:
"""
Attribute proxy for wrapped file object
"""
try:
return self.__dict__[name]
except KeyError:
if hasattr(self.out_file, name):
return getattr(self.out_file, name)
return None
[docs]
def write(self, record: bytes) -> None:
"""
Add a new record to the VBS output file
:param record: byte string containing data
:return: None
"""
# get the length of the record
record_length = len(record)
# convert length to binary
record_length_raw = struct.pack(">I", record_length)
# add length to output data
self.out_file.write(record_length_raw)
# add data to output
self.out_file.write(record)
[docs]
def write_many(self, iterable: typing.Iterable[bytes]) -> None:
"""
Convenience method to write multiple records from an iterable
:param iterable: iterable providing records as bytes
:return: None
"""
for record in iterable:
self.write(record)
[docs]
def close(self) -> None:
"""
Finalise the VBS file output by adding the zero length file record.
:return: None
"""
# add zero length to end of record
self.out_file.write(struct.pack(">I", 0))
self.out_file.seek(0)
def __enter__(self, *args, **kwargs):
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self.close()
[docs]
class IpmWriter(VbsWriter):
"""
IPM writer can be used to write records to a Mastercard IPM file
::
>>> with io.BytesIO() as ipm_out:
... writer = IpmWriter(ipm_out)
... writer.write({'MTI': '1111', 'DE2': '9999111122221111'})
... writer.close()
If the required file is 1014 block format, then set the ``blocked`` parameter to True.
::
>>> with io.BytesIO() as ipm_out:
... writer = IpmWriter(ipm_out, blocked=True)
... writer.write({'MTI': '1111', 'DE2': '9999111122221111'})
... writer.close()
You can provide the specific file encoding if required.
All standard python encoding schemes are supported. Mainframe systems likely use ``cp500``
::
>>> with io.BytesIO() as ipm_out:
... writer = IpmWriter(ipm_out, encoding='cp500')
... writer.write({'MTI': '1111', 'DE2': '9999111122221111'})
... writer.close()
Alternatively use as a context manager to ensure closure at end of processing
::
>>> with io.BytesIO() as ipm_out:
... with IpmWriter(ipm_out, encoding='cp500') as writer:
... writer.write({'MTI': '1111', 'DE2': '9999111122221111'})
"""
def __init__(self, file_obj: typing.BinaryIO, encoding: str = None, iso_config: dict = None, **kwargs):
"""
Create a new IpmWriter
:param file_obj: the file object to write to
:param encoding: the file encoding
"""
self.encoding = encoding
self.iso_config = iso_config
super(IpmWriter, self).__init__(file_obj, **kwargs)
[docs]
def write(self, obj: dict) -> None:
"""
Writes new record to IPM file
:param obj: dictionary object containing ISO8583 elements.
See :py:mod:`cardutil.iso8583` for expected dict object keys.
:return: None
"""
record = iso8583.dumps(obj, encoding=self.encoding, iso_config=self.iso_config)
super(IpmWriter, self).write(record)
[docs]
def write_many(self, iterable: typing.Iterable[dict]) -> None:
"""
Convenience method to write multiple records from an iterable
:param iterable: iterable providing records as dict
:return: None
"""
for record in iterable:
self.write(record)
[docs]
def unblock_1014(input_data: typing.BinaryIO, output_data: typing.BinaryIO):
"""
Unblocks a 1014 byte blocked file object
:param input_data: 1014 blocked IPM file object
:param output_data: unblocked file object
"""
pad_char = b'\x40'
while True:
record = input_data.read(1014)
if not record:
break
if len(record) != 1014:
raise MciIpmDataError('Invalid record size for 1014 blocked')
if record[-2:] != pad_char * 2:
raise MciIpmDataError('Invalid line ending for 1014 blocked')
output_data.write(record[0:1012])
output_data.seek(0)
input_data.seek(0)
[docs]
def block_1014(input_data: typing.BinaryIO, output_data: typing.BinaryIO):
"""
Creates a 1014 byte blocked file object
:param input_data: file object to be 1014 byte blocked
:param output_data: 1014 byte blocked file object
"""
pad_char = b'\x40'
while True:
record = input_data.read(1012)
# end of data
if not record:
break
# incomplete 1012 block
if len(record) != 1012:
record += (1012 - len(record)) * pad_char
output_data.write(record + (pad_char * 2))
output_data.seek(0)
input_data.seek(0)
[docs]
def vbs_list_to_bytes(byte_list: iter, **kwargs) -> bytes:
"""
Convenience function for creating VBS byte strings (optionally blocked) from list of byte strings
:param byte_list: a list containing byte string records
:param kwargs: any options to be passed to VbsWriter constructor. See :py:mod:`cardutil.mciipm.VbsWriter`
:return: single byte string containing VBS data.
"""
file_out = io.BytesIO()
vbs_out = VbsWriter(file_out, **kwargs)
for rec in byte_list:
vbs_out.write(rec)
vbs_out.close()
return file_out.read()
[docs]
def vbs_bytes_to_list(vbs_bytes: bytes, **kwargs) -> list:
"""
Convenience function for unpacking VBS byte strings to byte string list
:param vbs_bytes: single byte string containing VBS data
:param kwargs: any options to be passed to VbsReader constructor. See :py:mod:`cardutil.mciipm.VbsReader`
:return: a list containing byte string records
"""
file_in = io.BytesIO(vbs_bytes)
return [record for record in VbsReader(file_in, **kwargs)]
[docs]
def ipm_info(input_data: typing.BinaryIO) -> dict:
"""
Use this function to inspect an IPM file and provide details
:param input_data: The file like object of IPM data
:return: a dictionary containing file information:
.. code-block:: text
{
"isValidIPM": True,
"reason": "If not valid, describes the reason"
"isBlocked": True,
"encoding": "latin1",
}
"""
output = {"isValidIPM": False}
# get first 2500 bytes to perform analysis
sample_data = input_data.read(2500)
# if data less than 20 bytes then can't be valid
if len(sample_data) < 24:
output["reason"] = "File does not have sufficient data to be valid"
return output
# check that the first 4 bytes contain a valid length
# large lengths indicate file issues
length_bytes = sample_data[:4]
record_length = struct.unpack(">I", length_bytes)[0]
max_rec_length = config.config.get("MAX_VBS_RECORD_LENGTH", 6000)
if record_length > max_rec_length:
output["reason"] = (f"First IPM record length ({record_length}) exceeds the configured maximum record length "
f"({max_rec_length}) which usually indicates a file issue")
return output
# check the bitmap to make sure it has a valid bit config
bitmap_ok, reason = bitmap_check(sample_data[8:24])
if not bitmap_ok:
output["reason"] = reason
return output
output["isBlocked"] = block_1014_check(sample_data)
output["encoding"] = encoding_check(sample_data[4:8])
output["isValidIPM"] = True
return output
[docs]
def block_1014_check(sample_data):
# Blocked files should be blocked out to 1014 at a minimum.
# Going to work with first 1014 bytes of the file
if len(sample_data) < 1014:
return False
# if the last two bytes of stream is x40x40, the probably blocked.
# go and get the next 2 just to be sure
first_1014 = sample_data[0:1014]
if first_1014[-2:] == Block1014.PAD_CHAR * 2:
if len(sample_data) == 1014:
return True
if len(sample_data) == 2028 and sample_data[-2:] == Block1014.PAD_CHAR * 2:
return True
return False
[docs]
def bitmap_check(bitmap: bytes) -> (bool, str):
LOGGER.debug(hexdump.hexdump(bitmap, result='return'))
bitarray = BitArray.BitArray()
bitarray.frombytes(bitmap)
bits = bitarray.tolist()
for bit, bit_value in enumerate(bits):
if bit == 0: # bit 1 does not have config
continue
if bit_value:
if str(bit+1) not in config.config['bit_config']:
return False, f"Bitmap uses DE{bit+1} which is not used in IPM"
return True, None
[docs]
def encoding_check(mti: bytes) -> str:
"""
This function will check if an MTI in record looks
like ASCII based encoding or EBCDIC encoding.
This is a very basic encoding check.
"""
if mti.decode('latin1').isnumeric():
return 'latin1'
if mti.decode('cp037').isnumeric():
return 'cp037'
return 'unknown'
if __name__ == '__main__':
import doctest
doctest.testmod()