"""
Mastercard |reg| IPM clearing file readers and writers
* VBS file readers and writers
* IPM file readers and writers
* IPM parameter extract reader
* Support for 1014 blocked format
Read an IPM file::
from cardutil import mciipm
with open('ipm_in.bin', 'rb') as ipm_in:
reader = mciipm.IpmReader(ipm_in)
for record in reader:
print(record)
Create an IPM file::
from cardutil import mciipm
with open('ipm_out.bin', 'wb') as ipm_out:
writer = mciipm.IpmWriter(ipm_out)
writer.write({'MTI': '1111', 'DE2': '9999111122221111'})
writer.close()
MasterCard file formats
-----------------------
VBS file format
^^^^^^^^^^^^^^^
This format is a basic variable record format.
There are no carriage returns or line feeds in the file.
A file consists of records. Each record is prefixed with a 4 byte binary length.
Say you had a file with the following 2 records:
.. code-block:: text
"This is first record 1234567" <- length 28
"This is second record AAAABBBBB123" <- length 34
Add 4 byte binary length to the start of each record. (x'1C' = 28, x'22' = 34)
with the file finishing with a zero length record length
.. code-block:: hexdump
00000000: 00 00 00 1C 54 68 69 73 20 69 73 20 66 69 72 73 ....This is firs
00000010: 74 20 72 65 63 6F 72 64 20 31 32 33 34 35 36 37 t record 1234567
00000020: 00 00 00 22 54 68 69 73 20 69 73 20 73 65 63 6F ..."This is seco
00000030: 6E 64 20 72 65 63 6F 72 64 20 41 41 41 41 42 42 nd record AAAABB
00000040: 42 42 42 31 32 33 00 00 00 00 BBB123....
1014 blocked file format
^^^^^^^^^^^^^^^^^^^^^^^^
This is the same as VBS format with 1014 blocking applied.
The VBS data is blocked into lengths of 1012, and an additional
2 x'40' characters are appended at each block.
Finally, the total file length is made a multiple of 1014 with the final
incomplete record being filled with the x'40' character
Taking the above VBS example
.. code-block:: hexdump
00000000: 00 00 00 1C 54 68 69 73 20 69 73 20 66 69 72 73 ....This is firs
00000010: 74 20 72 65 63 6F 72 64 20 31 32 33 34 35 36 37 t record 1234567
00000020: 00 00 00 22 54 68 69 73 20 69 73 20 73 65 63 6F ..."This is seco
00000030: 6E 64 20 72 65 63 6F 72 64 20 41 41 41 41 42 42 nd record AAAABB
00000040: 42 42 42 31 32 33 00 00 00 00 BBB123....
Block to 1014 by adding 2 * x'40' characters every 1012 characters in the data.
Finally fill with x'40' characters to next 1014 increment.
In this case, there is only one increment
.. code-block:: hexdump
00000000: 00 00 00 1C 54 68 69 73 20 69 73 20 66 69 72 73 ....This is firs
00000010: 74 20 72 65 63 6F 72 64 20 31 32 33 34 35 36 37 t record 1234567
00000020: 00 00 00 22 54 68 69 73 20 69 73 20 73 65 63 6F ..."This is seco
00000030: 6E 64 20 72 65 63 6F 72 64 20 41 41 41 41 42 42 nd record AAAABB
00000040: 42 42 42 31 32 33 00 00 00 00 40 40 40 40 40 40 BBB123....@@@@@@
00000050: 40 40 40 40 40 40 40 40 40 40 40 40 40 40 40 40 @@@@@@@@@@@@@@@@
000003E0: 40 40 40 40 40 40 40 40 40 40 40 40 40 40 40 40 @@@@@@@@@@@@@@@@
000003F0: 40 40 40 40 40 40 @@@@@@
"""
import io
import logging
import struct
import typing
from cardutil import iso8583, config
LOGGER = logging.getLogger(__name__)
[docs]class Block1014(object):
"""
1014 Blocker for file objects.
Wrap around a file object. Return 1014 blocked data
"""
PAD_CHAR = b'\x40'
def __init__(self, file_obj):
self.file_obj = file_obj
self.remaining_chars = 1012
def __getattr__(self, name: str):
"""
Attribute proxy for wrapped file object
"""
try:
return self.__dict__[name]
except KeyError:
if hasattr(self.file_obj, name):
return getattr(self.file_obj, name)
return None
[docs] def write(self, bytes_to_write: bytes) -> None:
"""
Write requested bytes to the output file object.
"""
# not enough bytes to complete a block, just write and subtract from remaining
LOGGER.debug(f'bytes_to_write={bytes_to_write}')
if len(bytes_to_write) < self.remaining_chars:
LOGGER.debug(f'len->{len(bytes_to_write)}<{self.remaining_chars}')
LOGGER.debug(f'write {bytes_to_write}')
self.file_obj.write(bytes_to_write)
self.remaining_chars -= len(bytes_to_write)
return
# complete the first record
LOGGER.debug(f'write first: {bytes_to_write[:self.remaining_chars]}')
self.file_obj.write(bytes_to_write[:self.remaining_chars])
self.file_obj.write(self.PAD_CHAR * 2)
bytes_to_write = bytes_to_write[self.remaining_chars:]
# now write complete blocks
while len(bytes_to_write) > 1012:
LOGGER.debug(f'write while: {bytes_to_write[:1012]}')
self.file_obj.write(bytes_to_write[:1012])
self.file_obj.write(self.PAD_CHAR * 2)
bytes_to_write = bytes_to_write[1012:]
# write whatever is left
LOGGER.debug(f'write last: {bytes_to_write}')
self.file_obj.write(bytes_to_write)
self.remaining_chars = 1012-len(bytes_to_write)
LOGGER.debug(f'remaining_chars={self.remaining_chars}')
[docs] def seek(self, pos: int) -> None:
"""
Finalise then seek file object to requested position
.. note:: Method only partially implemented. Only use to seek start of file (zero)
"""
self.finalise()
self.file_obj.seek(pos)
[docs] def close(self) -> None:
"""
Finalise then close the file object
"""
self.finalise()
self.file_obj.close()
[docs] def finalise(self) -> None:
"""
Complete the blocking operation by creating final 1014 block.
Called by ``close`` and ``seek`` methods to ensure completion.
"""
self.file_obj.write(self.PAD_CHAR * (self.remaining_chars + 2))
self.remaining_chars = 1012
[docs]class Unblock1014(object):
"""
Unblocks 1014 blocked file objects.
Wrap around a 1014 blocked file object. Return file like object providing only unblocked data
"""
def __init__(self, file_obj: typing.BinaryIO):
self.file_obj = file_obj
self.buffer = b''
def __getattr__(self, name: str) -> any:
"""
Attribute proxy for wrapped file object
"""
try:
return self.__dict__[name]
except KeyError:
if hasattr(self.file_obj, name):
return getattr(self.file_obj, name)
return None
[docs] def read(self, bytes_to_read: int = 0):
"""
Read requested bytes from the file object. Returned data will be unblocked
"""
read_all = True if not bytes_to_read else False
while read_all or len(self.buffer) <= bytes_to_read:
block = self.file_obj.read(1014)
if not block: # eof
break
self.buffer += block[:1012]
output = self.buffer[:bytes_to_read]
self.buffer = self.buffer[bytes_to_read:]
return output
[docs]class VbsReader(object):
"""
The VbsReader class can be used to iterate through a VBS formatted file
object record by record.
::
from cardutil.mciipm import VbsReader
with open('vbs_file.bin', 'rb') as vbs_file:
vbs_reader = VbsReader(vbs_file)
for vbs_record in vbs_reader:
print(vbs_record)
"""
def __init__(self, vbs_file: typing.BinaryIO, blocked: bool = False):
"""
Create a VbsReader object
:param vbs_file: File object with VBS formatted data
"""
self.vbs_data = vbs_file
if blocked:
self.vbs_data = Unblock1014(vbs_file)
def __getattr__(self, name) -> any:
"""
Attribute proxy for wrapped file object
"""
try:
return self.__dict__[name]
except KeyError:
if hasattr(self.vbs_data, name):
return getattr(self.vbs_data, name)
return None
def __iter__(self):
return self
def __next__(self) -> bytes:
"""
Unpacks a variable blocked object into records
"""
record_length_raw = self.vbs_data.read(4)
if len(record_length_raw) != 4:
# this can happen if the VBS does not have a zero length record at end.
# You can recreate using VbsWriter and not calling close method.
# The reader will just accept we are at end if this happens.
LOGGER.error(f'requested 4 bytes, got {len(record_length_raw)} -- exiting')
raise StopIteration
record_length = struct.unpack(">i", record_length_raw)[0]
LOGGER.debug("record_length=%s", record_length)
# exit if last record (length=0)
if record_length == 0:
raise StopIteration
record = self.vbs_data.read(record_length)
return record
[docs]class IpmReader(VbsReader):
"""
IPM reader can be used to iterate through an IPM file
The file object must be in VBS format.
::
from cardutil.mciipm import IpmReader
with open('vbs_in.bin', 'rb') as vbs_in:
reader = IpmReader(vbs_in)
for record in reader:
print(record)
If the file required 1014 block format, then set the ``blocked`` parameter to True.
::
from cardutil.mciipm import IpmReader
with open('blocked_in.bin', 'rb') as blocked_in:
reader = IpmReader(blocked_in, blocked=True)
for record in reader:
print(record)
"""
def __init__(self, ipm_file: typing.BinaryIO,
encoding: str = None, iso_config: dict = None, **kwargs):
"""
Create a new IpmReader
:param ipm_file: the file object to read
:param encoding: the file encoding
:param config: config dict with key bit_config
"""
self.encoding = encoding
self.iso_config = iso_config
super(IpmReader, self).__init__(ipm_file, **kwargs)
def __next__(self) -> dict:
vbs_record = super(IpmReader, self).__next__()
LOGGER.debug(f'{len(vbs_record)}: {vbs_record}')
return iso8583.loads(vbs_record, encoding=self.encoding, iso_config=self.iso_config)
[docs]class IpmParamReader(VbsReader):
"""
IPM Param reader can be used to iterate through an IPM parameter extract file.
The record is returned as a dictionary containing the parameter keys.
::
from cardutil.mciipm import IpmParamReader
with open('param.bin', 'rb') as param_in:
reader = IpmParamReader(param_in, table_id='IP0040T1')
for record in reader:
print(record)
If the parameter file is 1014 block format, then set the ``blocked`` parameter to True.
::
from cardutil.mciipm import IpmParamReader
with open('blocked_param.bin', 'rb') as param_in:
reader = IpmParamReader(param_in, table_id='IP0040T1', blocked=True)
for record in reader:
print(record)
"""
# layout for IP0000T1
_IP0000T1_KEY = slice(11, 19)
_IP0000T1_TABLE_ID = slice(19, 27)
_IP0000T1_TABLE_SUB_ID = slice(243, 246)
# table type for all except IP0000T1 - get this 3 letter code from self.table_keys
_TABLE_SUB_ID = slice(8, 11)
def __init__(self, param_file: typing.BinaryIO, table_id: str, encoding: str = None, param_config: dict = None,
**kwargs):
"""
Create a new IpmParamReader
:param param_file: the file object to read
:param table_id: the IPM parameter table to read
:param encoding: the parameter file encoding
:param param_config: config dict with key bit_config
"""
self.encoding = encoding if encoding else 'latin_1'
self.param_config = param_config if param_config else config.config.get('mci_parameter_tables')
self.table_id = table_id
self.table_index = dict()
super(IpmParamReader, self).__init__(param_file, **kwargs)
# check if config available for table id
if not self.param_config.get(table_id):
raise ValueError(f'Parameter config not available for table {table_id}')
# load the table index
trailer_record_found = False
while True:
try:
vbs_record = super(IpmParamReader, self).__next__()
except StopIteration:
break
record = vbs_record.decode(self.encoding)
if record[self._IP0000T1_KEY] == 'IP0000T1':
self.table_index[record[self._IP0000T1_TABLE_SUB_ID]] = record[self._IP0000T1_TABLE_ID]
if record.startswith('TRAILER RECORD IP0000T1'):
trailer_record_found = True
break
print(self.table_index)
if not trailer_record_found:
raise ValueError('parameter file missing IP0000T1 trailer record')
def __next__(self) -> dict:
while True:
record = super(IpmParamReader, self).__next__()
record_table_id = self.table_index.get(record[self._TABLE_SUB_ID].decode(self.encoding))
if record_table_id == self.table_id:
record_dict = dict()
for field in self.param_config[record_table_id]:
record_dict[field] = self._get_param_field(record, field)
return record_dict
def _get_param_field(self, record, field):
table_id = self.table_index.get(record[self._TABLE_SUB_ID].decode(self.encoding))
return record[
self.param_config[
table_id][field]["start"]:self.param_config[table_id][field]["end"]].decode(self.encoding)
[docs]class VbsWriter(object):
"""
Writes VBS formatted files.
The writer can be used as follows::
>>> with io.BytesIO() as vbs_out:
... writer = VbsWriter(vbs_out)
... writer.write(b'This is the record')
... writer.close()
The `close` method must be issued to finalise the file by adding the zero length record which
indicated the end of the file.
The message is a byte string containing the data.
Alternatively, you can use as a context manager which will take care of the writer closure.
>>> with io.BytesIO() as vbs_out:
... with VbsWriter(vbs_out, blocked=True) as writer:
... writer.write(b'This is the record')
"""
def __init__(self, out_file: typing.BinaryIO, blocked: bool = False):
self.out_file = out_file
if blocked:
self.out_file = Block1014(out_file)
def __getattr__(self, name) -> any:
"""
Attribute proxy for wrapped file object
"""
try:
return self.__dict__[name]
except KeyError:
if hasattr(self.out_file, name):
return getattr(self.out_file, name)
return None
[docs] def write(self, record: bytes) -> None:
"""
Add a new record to the VBS output file
:param record: byte string containing data
:return: None
"""
# get the length of the record
record_length = len(record)
# convert length to binary
record_length_raw = struct.pack(">i", record_length)
# add length to output data
self.out_file.write(record_length_raw)
# add data to output
self.out_file.write(record)
[docs] def write_many(self, iterable: typing.Iterable[bytes]) -> None:
"""
Convenience method to write multiple records from an iterable
:param iterable: iterable providing records as bytes
:return: None
"""
for record in iterable:
self.write(record)
[docs] def close(self) -> None:
"""
Finalise the VBS file output by adding the zero length file record.
:return: None
"""
# add zero length to end of record
self.out_file.write(struct.pack(">i", 0))
self.out_file.seek(0)
def __enter__(self, *args, **kwargs):
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self.close()
[docs]class IpmWriter(VbsWriter):
"""
IPM writer can be used to write records to a Mastercard IPM file
::
>>> with io.BytesIO() as ipm_out:
... writer = IpmWriter(ipm_out)
... writer.write({'MTI': '1111', 'DE2': '9999111122221111'})
... writer.close()
If the required file is 1014 block format, then set the ``blocked`` parameter to True.
::
>>> with io.BytesIO() as ipm_out:
... writer = IpmWriter(ipm_out, blocked=True)
... writer.write({'MTI': '1111', 'DE2': '9999111122221111'})
... writer.close()
You can provide the specific file encoding if required.
All standard python encoding schemes are supported. Mainframe systems likely use ``cp500``
::
>>> with io.BytesIO() as ipm_out:
... writer = IpmWriter(ipm_out, encoding='cp500')
... writer.write({'MTI': '1111', 'DE2': '9999111122221111'})
... writer.close()
Alternatively use as a context manager to ensure closure at end of processing
::
>>> with io.BytesIO() as ipm_out:
... with IpmWriter(ipm_out, encoding='cp500') as writer:
... writer.write({'MTI': '1111', 'DE2': '9999111122221111'})
"""
def __init__(self, file_obj: typing.BinaryIO, encoding: str = None, iso_config: dict = None, **kwargs):
"""
Create a new IpmWriter
:param file_obj: the file object to write to
:param encoding: the file encoding
"""
self.encoding = encoding
self.iso_config = iso_config
super(IpmWriter, self).__init__(file_obj, **kwargs)
[docs] def write(self, obj: dict) -> None:
"""
Writes new record to IPM file
:param obj: dictionary object containing ISO8583 elements.
See :py:mod:`cardutil.iso8583` for expected dict object keys.
:return: None
"""
record = iso8583.dumps(obj, encoding=self.encoding, iso_config=self.iso_config)
super(IpmWriter, self).write(record)
[docs] def write_many(self, iterable: typing.Iterable[dict]) -> None:
"""
Convenience method to write multiple records from an iterable
:param iterable: iterable providing records as dict
:return: None
"""
for record in iterable:
self.write(record)
[docs]def unblock_1014(input_data: typing.BinaryIO, output_data: typing.BinaryIO):
"""
Unblocks a 1014 byte blocked file object
:param input_data: 1014 blocked IPM file object
:param output_data: unblocked file object
"""
pad_char = b'\x40'
while True:
record = input_data.read(1014)
if not record:
break
if len(record) != 1014:
raise ValueError('Invalid file size')
if record[-2:] != pad_char * 2:
raise ValueError('Invalid 1014 block line ending')
output_data.write(record[0:1012])
output_data.seek(0)
input_data.seek(0)
[docs]def block_1014(input_data: typing.BinaryIO, output_data: typing.BinaryIO):
"""
Creates a 1014 byte blocked file object
:param input_data: file object to be 1014 byte blocked
:param output_data: 1014 byte blocked file object
"""
pad_char = b'\x40'
while True:
record = input_data.read(1012)
# end of data
if not record:
break
# incomplete 1012 block
if len(record) != 1012:
record += (1012 - len(record)) * pad_char
output_data.write(record + (pad_char * 2))
output_data.seek(0)
input_data.seek(0)
[docs]def vbs_list_to_bytes(byte_list: iter, **kwargs) -> bytes:
"""
Convenience function for creating VBS byte strings (optionally blocked) from list of byte strings
:param byte_list: a list containing byte string records
:param kwargs: any options to be passed to VbsWriter constructor. See :py:mod:`cardutil.mciipm.VbsWriter`
:return: single byte string containing VBS data.
"""
file_out = io.BytesIO()
vbs_out = VbsWriter(file_out, **kwargs)
for rec in byte_list:
vbs_out.write(rec)
vbs_out.close()
return file_out.read()
[docs]def vbs_bytes_to_list(vbs_bytes: bytes, **kwargs) -> list:
"""
Convenience function for unpacking VBS byte strings to byte string list
:param vbs_bytes: single byte string containing VBS data
:param kwargs: any options to be passed to VbsReader constructor. See :py:mod:`cardutil.mciipm.VbsReader`
:return: a list containing byte string records
"""
file_in = io.BytesIO(vbs_bytes)
return [record for record in VbsReader(file_in, **kwargs)]
if __name__ == '__main__':
import doctest
doctest.testmod()