Skip to content

CSV

serialize_ingest_data(data, fieldnames=None, max_size_bytes=100000000, max_records=150000000)

Serialize data into CSV files for ingestion by Salesforce Bulk API 2.0.

None or missing values are ignored by Salesforce. To set a field in Salesforce to NULL, use the string "#N/A". Relationships are represented as nested dictionaries, with exactly one key-value pair. E.g. {"Account": {"Name": "Acme"}} or {"Custom_Field__r": {"External_Id__c": "123"}.

Parameters:

Name Type Description Default
data Iterable[dict[str, Any]]

Sequence of dictionaries, each representing a record.

required
fieldnames Collection[str]

Field names, determines order of fields in the CSV file. By default field names are inferred from the records. This is slow, so if you know the field names in advance, it is recommended to provide them. If a record is missing a field, it will be written as an empty string. If a record has a field not in fieldnames, an error will be raised.

None
max_size_bytes int

Maximum size of each CSV file in bytes. The default of 100MB is recommended by Salesforce recommends. This accounts for base64 encoding increases in size by up to 50%.

100000000
max_records int

Maximum number of records in each CSV file. By default 150,000,000. This corresponds to the maximum number of records in a 24-hour period.

150000000

Yields:

Type Description
bytes

CSV file as a byte string.

Source code in src/aiosalesforce/bulk/v2/_csv.py
def serialize_ingest_data(
    data: Iterable[dict[str, Any]],
    fieldnames: Collection[str] | None = None,
    max_size_bytes: int = 100_000_000,
    max_records: int = 150_000_000,
) -> Iterable[bytes]:
    """
    Serialize data into CSV files for ingestion by Salesforce Bulk API 2.0.

    None or missing values are ignored by Salesforce.
    To set a field in Salesforce to NULL, use the string "#N/A".
    Relationships are represented as nested dictionaries,
    with exactly one key-value pair. E.g. {"Account": {"Name": "Acme"}}
    or {"Custom_Field__r": {"External_Id__c": "123"}.

    Parameters
    ----------
    data : Iterable[dict[str, Any]]
        Sequence of dictionaries, each representing a record.
    fieldnames : Collection[str], optional
        Field names, determines order of fields in the CSV file.
        By default field names are inferred from the records. This is slow, so
        if you know the field names in advance, it is recommended to provide them.
        If a record is missing a field, it will be written as an empty string.
        If a record has a field not in `fieldnames`, an error will be raised.
    max_size_bytes : int, optional
        Maximum size of each CSV file in bytes.
        The default of 100MB is recommended by Salesforce recommends.
        This accounts for base64 encoding increases in size by up to 50%.
    max_records : int, optional
        Maximum number of records in each CSV file. By default 150,000,000.
        This corresponds to the maximum number of records in a 24-hour period.

    Yields
    ------
    bytes
        CSV file as a byte string.

    """
    if fieldnames is None and inspect.isgenerator(data):
        warnings.warn(
            (
                "Passing a generator without providing fieldnames causes the "
                "entire contents of the generator to be stored in memory "
                "to infer fieldnames. This may result in high memory usage."
            ),
            UserWarning,
        )

    data = map(_serialize_dict, data)
    if fieldnames is None:
        data = list(data)
        fieldnames = dict.fromkeys(itertools.chain.from_iterable(data)).keys()

    buffer = CsvBuffer()
    writer = csv.DictWriter(
        buffer,
        fieldnames=fieldnames,
        lineterminator="\n",
    )

    carry_over: bytes | None = None
    for row in data:
        if buffer.size == 0:
            writer.writeheader()
            if carry_over is not None:
                buffer.write(carry_over.decode("utf-8"))
                carry_over = None
        writer.writerow(row)
        # -1 to account for the header
        if buffer.size >= max_size_bytes or (buffer.n_rows - 1) >= max_records:
            if buffer.size > max_size_bytes or (buffer.n_rows - 1) > max_records:
                carry_over = buffer.pop()
            yield buffer.content
            buffer.flush()

    if buffer.size > 0:
        yield buffer.content

deserialize_ingest_results(data)

Deserialize Salesforce Bulk API 2.0 ingest results from CSV.

Parameters:

Name Type Description Default
data bytes

CSV file as a byte string.

required

Returns:

Type Description
list[dict[str, str]]

List of records as dictionaries.

Source code in src/aiosalesforce/bulk/v2/_csv.py
def deserialize_ingest_results(data: bytes) -> list[dict[str, str]]:
    """
    Deserialize Salesforce Bulk API 2.0 ingest results from CSV.

    Parameters
    ----------
    data : bytes
        CSV file as a byte string.

    Returns
    -------
    list[dict[str, str]]
        List of records as dictionaries.

    """
    reader = csv.DictReader(data.decode("utf-8").splitlines())
    return list(reader)