Source code for cardutil.iso8583

"""
Parsers for ISO8583 messages.

The iso8583 module provides ISO8583 message parsing functions.
See `ISO8583 Wikipedia page <https://en.wikipedia.org/wiki/ISO_8583>`_. for more details.
Also supports Mastercard |reg| PDS field structures.

The parsing functions are modelled on the python standard json library.
The functions convert raw ISO8583 messages to python dictionaries.

Dictionary keys that represent the elements of an ISO8583 message.

* MTI - Message type indicator
* DE(1-127) - Standard fields
* PDSxxxx - Mastercard PDS fields
* TAGxxxx - ICC tag fields

Import the library::

    from cardutil.iso8583 import dumps, loads

Read an ISO8583 message returning dict::

    >>> import binascii
    >>> binary_bitmap = binascii.unhexlify('c0000000000000000000000000000000')
    >>> message_bytes = b'1144' + binary_bitmap + b'164444555566667777'
    >>> message_dict = loads(message_bytes)
    >>> message_dict
    {'MTI': '1144', 'DE2': '4444555566667777'}

Create an ISO8583 message returning bytes::

    >>> message_dict = {'MTI': '1144', 'DE2': '4444555566667777'}
    >>> message_bytes = dumps(message_dict)
    >>> message_bytes
    b'1144\\xc0\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00164444555566667777'


Add **encoding** parameter if you need different message encoding.
All `standard python encoding codecs
<https://docs.python.org/3/library/codecs.html?highlight=encode#standard-encodings>`_ are available.
Default is **latin_1**.

::

    >>> message_dict = {'MTI': '1144', 'DE2': '4444555566667777'}
    >>> message_bytes = dumps(message_dict, encoding='cp500')
    >>> message_bytes
    b'\\xf1\\xf1\\xf4\\xf4\\xc0\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\xf1\\xf6\\xf4\\xf4\\xf4\\xf4\\xf5\\xf5\\xf5\\xf5\\xf6\\xf6\\xf6\\xf6\\xf7\\xf7\\xf7\\xf7'
    >>> message_dict = loads(message_bytes, encoding='cp500')
    >>> message_dict
    {'MTI': '1144', 'DE2': '4444555566667777'}

Set **hex_bitmap** to True if you require a hex format bitmap::

    >>> message_bytes = dumps(message_dict, hex_bitmap=True)
    >>> message_bytes
    b'1144c0000000000000000000000000000000164444555566667777'
    >>> message_dict = loads(message_bytes, hex_bitmap=True)
    >>> message_dict
    {'MTI': '1144', 'DE2': '4444555566667777'}

"""
import binascii
import datetime
import decimal
import logging
import re
import struct
import sys

from cardutil import CardutilError
from cardutil.BitArray import BitArray
from cardutil.card import mask
from cardutil.config import config
from cardutil.vendor.hexdump import hexdump

LOGGER = logging.getLogger(__name__)
DEFAULT_ENCODING = 'latin_1'


[docs] class Iso8583DataError(CardutilError): pass
[docs] def dumps(obj: dict, encoding=None, iso_config=None, hex_bitmap=False): """ Serialize obj to a ISO8583 message byte string :param obj: dict containing message data :param encoding: python text encoding scheme :param iso_config: iso8583 message configuration dict :param hex_bitmap: bitmap in hex format :return: byte string containing ISO8583 message The default usage will generate a latin_1 encoded message with a binary bitmap:: import cardutil.iso8583 message_dict = {'MTI': '1144', 'DE2': '4444555566667777'} cardutil.iso8583.dumps(message_dict) """ if not encoding: encoding = DEFAULT_ENCODING if not iso_config: iso_config = config['bit_config'] output = _dict_to_iso8583(obj, iso_config, encoding, hex_bitmap) return output
[docs] def loads(b: bytes, encoding=None, iso_config=None, hex_bitmap=False): """ Deserialise b (byte string) to a python object :param b: bytes containing message :param encoding: python text encoding scheme :param iso_config: iso8583 message configuration dictionary :param hex_bitmap: bitmap in hex format :return: dict containing message data :: import cardutil.iso8583 message_bytes = b'1144... iso message ...' cardutil.iso8583.loads(message_bytes) """ if not encoding: encoding = DEFAULT_ENCODING if not iso_config: iso_config = config['bit_config'] return _iso8583_to_dict(b, iso_config, encoding, hex_bitmap)
def _iso8583_to_dict(message, bit_config, encoding=DEFAULT_ENCODING, hex_bitmap=False): """ Convert ISO8583 style message to dictionary :param message: The message in ISO8583 based format * Message Type indicator - 4 bytes * Binary bitmap - 16 bytes (Reads DE1 and DE2) OR if hex_bitmap = True * Message Type indicator - 4 bytes * Hex bitmap - 32 bytes (Reads DE1 and DE2) * Message data - Remainder of record :param bit_config: dictionary of bit mapping configuration :param encoding: data encoding :return: dictionary of message elements * key = 'MTI' message type indicator * key = 'DEx' data elements * key = 'PDSxxxx' private data fields * key = 'TAGxxxx' icc fields """ LOGGER.debug("Processing message: len=%s contents:\n%s", len(message), hexdump(message, result="return")) # split raw message into components MessageType(4B), Bitmap(16B), # Message(l=*) try: if hex_bitmap: message_length = len(message)-36 message_type_indicator, bitmap, message_data = struct.unpack( "4s32s" + str(message_length) + "s", message) binary_bitmap = binascii.unhexlify(bitmap) else: message_length = len(message)-20 message_type_indicator, binary_bitmap, message_data = struct.unpack( "4s16s" + str(message_length) + "s", message) except struct.error as ex: raise Iso8583DataError('Failed unpacking bitmap values', binary_context_data=message, original_exception=ex) return_values = dict() # add the message type try: return_values["MTI"] = message_type_indicator.decode(encoding) except UnicodeError as ex: raise Iso8583DataError('Failed decoding MTI field', binary_context_data=message, original_exception=ex) message_pointer = 0 bitmap_list = _get_bitmap_list(binary_bitmap) for bit in range(2, 128): if bitmap_list[bit]: LOGGER.debug("processing bit %s", bit) # Check that config is available for this bit if not bit_config.get(str(bit)): raise Iso8583DataError( f'No bit config available for bit {bit}', binary_context_data=message ) return_message, message_increment = _iso8583_to_field( bit, bit_config[str(bit)], message_data[message_pointer:], encoding) # Increment the message pointer and process next field message_pointer += message_increment return_values.update(return_message) # check that all of message has been consumed, otherwise raise exception if message_pointer != len(message_data): raise Iso8583DataError( f'Message data not correct length. ' f'Bitmap indicates len={message_pointer}, message is len={len(message_data)}', binary_context_data=message ) return return_values def _dict_to_iso8583(message, bit_config, encoding=DEFAULT_ENCODING, hex_bitmap=False): """ Convert dictionary to ISO8583 message :param message: dictionary of message elements * key = 'MTI' message type indicator * key = 'DE(1-127)' data elements * key = 'PDSxxxx' private data fields * key = 'TAGxxxx' icc fields :param bit_config: dictionary of bit mapping configuration :param encoding: string indicating encoding of data :return: The message in ISO8583 based format * Message Type indicator - 4 bytes * Binary bitmap - 16 bytes (Reads DE1) * Message data - Remainder of record """ output_data = b'' bitmap_values = [False] * 128 bitmap_values[0] = True # set bit 1 on for presence of bitmap # get the pds fields from config de_pds_fields = sorted( [int(key) for key in bit_config if bit_config[key].get('field_processor') == 'PDS'], reverse=True) LOGGER.debug(de_pds_fields) for de_field_value in _pds_to_de(message): de_field_key = de_pds_fields.pop() LOGGER.debug(f'de{de_field_key}={de_field_value}') message[f'DE{de_field_key}'] = de_field_value for bit in range(2, 128): if message.get('DE' + str(bit)) or message.get('DE' + str(bit)) == 0: # 0 evals to false, allow zero values LOGGER.debug(f'processing bit {bit}') bitmap_values[bit - 1] = True LOGGER.debug(message.get('DE' + str(bit))) output_data += _field_to_iso8583( bit_config[str(bit)], message.get('DE' + str(bit)), encoding=encoding) bitarray = BitArray() bitarray.fromlist(bitmap_values) binary_bitmap = bitarray.tobytes() if hex_bitmap: bitmap = binascii.hexlify(binary_bitmap) else: bitmap = binary_bitmap mti = message['MTI'].encode(encoding) if message.get('MTI') else b'' output_string = mti + bitmap + output_data return output_string def _field_to_iso8583(bit_config, field_value, encoding=DEFAULT_ENCODING): output = b'' LOGGER.debug(f'bit_config={bit_config}, field_value={field_value}, encoding={encoding}') field_value = _pytype_to_string(field_value, bit_config) field_length = bit_config.get('field_length') length_size = _get_field_length(bit_config) # size of length for llvar and lllvar fields if length_size > 0: field_length = len(field_value) output += format(field_length, '0' + str(length_size)).encode(encoding) if isinstance(field_value, bytes): output += field_value[:field_length] else: output += format(field_value[:field_length], '<' + str(field_length)).encode(encoding) return output def _iso8583_to_field(bit, bit_config, message_data, encoding=DEFAULT_ENCODING): """ Processes a message bit element :param bit: DE bit :param bit_config: message bit configuration :param message_data: the data to be processed :param encoding: byte encoding :returns: dictionary: field values message incrementer: position of next message """ field_length = bit_config['field_length'] length_size = _get_field_length(bit_config) if length_size > 0: field_length_string = message_data[:length_size] try: field_length_string = field_length_string.decode(encoding) except UnicodeDecodeError as ex: raise Iso8583DataError(f'Unable to decode DE{bit} field length', binary_context_data=message_data, original_exception=ex) try: field_length = int(field_length_string) except ValueError as ex: raise Iso8583DataError(f'Invalid field length DE{bit}', binary_context_data=message_data, original_exception=ex) field_data = message_data[length_size:length_size + field_length] LOGGER.debug(f'field_data={field_data}') field_processor = bit_config.get('field_processor') # do ascii conversion except for ICC field if field_processor != 'ICC': try: field_data = field_data.decode(encoding) except UnicodeDecodeError as ex: raise Iso8583DataError(f'Unable to decode DE{bit} field value', binary_context_data=message_data, original_exception=ex) # if field is PAN type, mask the card value if field_processor == 'PAN': field_data = mask(field_data) # if field is PAN type, mask the card value if field_processor == 'PAN-PREFIX': field_data = _pan_prefix(field_data) # do field conversion to native python type try: field_data = _string_to_pytype(field_data, bit_config) except ValueError as ex: raise Iso8583DataError(f'Unable to convert DE{bit} field to python type', binary_context_data=message_data, original_exception=ex) return_values = dict() # add value to return dictionary return_values["DE" + str(bit)] = field_data # if a PDS field, break it down again and add to results if field_processor == 'PDS': return_values.update(_pds_to_dict(field_data)) # if a DE43 field, break in down again and add to results if field_processor == 'DE43': processor_config = bit_config.get('field_processor_config') return_values.update(_get_de43_fields(field_data, processor_config)) # if ICC field, break into tags if field_processor == 'ICC': return_values.update(_icc_to_dict(field_data)) return return_values, field_length + length_size def _pan_prefix(field_data): """ Get prefix of PAN """ return field_data[:9] def _string_to_pytype(field_data, bit_config): """ Field conversion to native python type :param field_data: Data to be converted :param bit_config: Configuration for bit :return: data in required type """ field_python_type = bit_config.get('field_python_type') field_date_format = bit_config.get('field_date_format', "%y%m%d") if field_python_type in ("int", "long"): field_data = int(field_data) if field_python_type == "decimal": field_data = decimal.Decimal(field_data) if field_python_type == "datetime": field_data = datetime.datetime.strptime(field_data, field_date_format) return field_data def _pytype_to_string(field_data, bit_config): """ convert py type to string for message :param field_data: Data to be converted :param bit_config: Configuration for bit :return: data in required type """ field_python_type = bit_config.get('field_python_type') field_date_format = bit_config.get('field_date_format', "%y%m%d") return_string = field_data if field_python_type in ('int', 'long'): return_string = format(int(field_data), '0' + str(bit_config.get('field_length', 0)) + 'd') if field_python_type == "decimal": return_string = format(decimal.Decimal(field_data), '0' + str(bit_config.get('field_length', 0)) + 'f') if field_python_type == "datetime": if not isinstance(field_data, datetime.datetime): field_data = _get_date_from_string(field_data) return_string = format(field_data, field_date_format) return return_string def _get_date_from_string(field_data: str) -> datetime: """ Parse string dates to python datetime object Use dateutils library if it is installed, otherwise revert to simple parser :param field_data: string containing date :return: datetime object """ try: import dateutil.parser as parser LOGGER.debug('Using dateutil parser') return parser.parse(field_data) except ImportError: pass if sys.version_info >= (3, 7): LOGGER.debug('Using fromisoformat') return datetime.datetime.fromisoformat(field_data) # fallback parser -- tries a few different formats until one works LOGGER.debug('Using built in date parser') date_formats = ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y-%m-%d") output_date = None for date_format in date_formats: try: output_date = datetime.datetime.strptime(field_data, date_format) break except ValueError: continue if not output_date: raise ValueError("Unrecognised date string format - {}".format(field_data)) return output_date def _get_field_length(bit_config): """ Determine length of iso8583 style field :param bit_config: dictionary of bit config data :return: length of field """ length_size = 0 if bit_config['field_type'] == "LLVAR": length_size = 2 elif bit_config['field_type'] == "LLLVAR": length_size = 3 return length_size def _get_bitmap_list(binary_bitmap): """ Get list of bits from binary bitmap :param binary_bitmap: the binary bitmap to be returned :return: the list containing bit values. Bit 0 contains original binary bitmap """ working_bitmap_list = BitArray(endian='big') working_bitmap_list.frombytes(binary_bitmap) # Add bit 0 -> original binary bitmap bitmap_list = [binary_bitmap] # add bits from bitmap bitmap_list.extend(working_bitmap_list.tolist()) return bitmap_list def _pds_to_de(dict_values): """ takes all the pds fields values in dict (PDSxxxx) and creates list of DE strings :param dict_values: dict containing "PDSxxxx" elements :return: list of byte strings containing pds data, or None if no fields """ # get the PDS field keys in order LOGGER.debug(f'dict_values={dict_values}') keys = sorted([key for key in dict_values if key.startswith('PDS')]) LOGGER.debug(f'keys={keys}') output = '' outputs = [] for key in keys: tag = int(key[3:]) LOGGER.debug(f'tag={tag}') length = len(dict_values[key]) add_output = f'{tag:04}{length:03}{dict_values[key]}' if len(output + add_output) > 999: outputs.append(output) output = '' output += add_output if output: outputs.append(output) LOGGER.debug(f'>pds2de: {outputs}') return outputs def _pds_to_dict(field_data): """ Get MasterCard pds fields from iso field :param field_data: the ISO8583 field containing pds fields :return: dictionary of pds key values. Key in the form PDSxxxx where x is zero filled number of pds """ field_pointer = 0 return_values = {} while field_pointer < len(field_data): # get the pds tag id pds_field_tag = field_data[field_pointer:field_pointer+4] LOGGER.debug("pds_field_tag=[%s]", pds_field_tag) # get the pds length pds_field_length = int(field_data[field_pointer+4:field_pointer+7]) LOGGER.debug("pds_field_length=[%i]", pds_field_length) # get the pds data pds_field_data = field_data[field_pointer+7:field_pointer+7+pds_field_length] LOGGER.debug("pds_field_data=[%s]", str(pds_field_data)) return_values["PDS" + pds_field_tag] = pds_field_data # increment the fieldPointer field_pointer += 7+pds_field_length return return_values def _icc_to_dict(field_data): """ Get de55 fields from message :param field_data: the field containing de55 :return: dictionary of de55 key values key is tag+tagid """ TWO_BYTE_TAG_PREFIXES = [b'\x9f', b'\x5f'] field_pointer = 0 return_values = {"ICC_DATA": binascii.b2a_hex(field_data).decode()} while field_pointer < len(field_data): # get the tag id (one byte) field_tag = field_data[field_pointer:field_pointer+1] # set to 2 bytes if 2 byte tag if field_tag in TWO_BYTE_TAG_PREFIXES: field_tag = field_data[field_pointer:field_pointer+2] field_pointer += 2 else: field_pointer += 1 field_tag_display = binascii.b2a_hex(field_tag) LOGGER.debug("field_tag_display=%s", field_tag_display) # stop processing de55 if low values tag found if field_tag_display == b'00': break field_length_raw = field_data[field_pointer:field_pointer+1] field_length = struct.unpack(">B", field_length_raw)[0] LOGGER.debug("%s", format(field_tag_display)) LOGGER.debug(field_length) # get the tag data de_field_data = field_data[field_pointer+1:field_pointer+field_length+1] de_field_data_display = binascii.b2a_hex(de_field_data).decode() LOGGER.debug("%s", de_field_data_display) return_values["TAG" + field_tag_display.upper().decode()] = de_field_data_display # increment the fieldPointer field_pointer += 1+field_length return return_values def _get_de43_fields(de43_field, processor_config=None): """ get pds 43 field breakdown :param de43_field: data of pds 43 :return: dictionary of pds 43 sub elements """ LOGGER.debug("de43_field=%s", de43_field) # No field config provided, just exit if not processor_config: return dict() # perform regex field matching de43_regex = processor_config field_match = re.match(de43_regex, de43_field) if not field_match: return dict() # get the dict field_dict = field_match.groupdict() if field_dict.get('DE43_POSTCODE'): field_dict['DE43_POSTCODE'] = field_dict['DE43_POSTCODE'].rstrip() return field_dict if __name__ == '__main__': import doctest doctest.testmod()