Skip to content

Bulk API 2.0

Bulk API 2.0 is used to perform ingest (CRUD) and query operations against Salesforce when working with large amounts of data. aiosalesforce exposes interface to perform these operations at a high level (similar to sObject CRUD operations) and to manage bulk jobs at a low level if needed.

Tip

Salesforce recommends using Bulk API when an operation involves more than 2,000 records. If you are working with a small number of records (up to a few thousand), consider using one of the composite resources instead. Generally speaking, however, it depends on your particular use case. Creating, monitoring, and managing bulk jobs requires making at least 7 REST API calls.

Ingest

Ingest operations are used to create (insert), update, upsert, or delete records. All bulk ingest operations are performed on a single sObject type.

Response of the ingest operation is an object with the following attributes:

  • jobs - list of JobInfo objects containing metadata about jobs created to to perform the ingest operation. A job is created for each 150,000,000 records. Unless you exceed this number, you will always have a single job.
  • successful_results - list of dictionaries with successful results. In addition to payload data, each dictionary contains sf__Created and sf__Id keys. See Salesforce documentation.
  • failed_results - list of dictionaries with failed results. In addition to payload data, each dictionary contains sf__Error and sf__Id keys. See Salesforce documentation.
  • unprocessed_records - list of dictionaries with unprocessed records. Does not contain any additional keys (only original payloads). See Salesforce documentation.

Warning

Order of records in the response lists does not match the order of records in the request data. However, because each record in the response (for successful, failed, and unprocessed records) contains full original payload, you can use this information to match records in the request data with records in the response.

Warning

All results are returned as lists of dictionaries with both keys and values being strings. This is due to the fact that Bulk API 2.0 uses CSV format for data exchange. For example, sf__Created is a string like "true" or "false".

Insert

To insert records in bulk, you must provide the object name and data (sequence of dictionaries).

data = [
    {"FirstName": "Jon", "LastName": "Doe"},
    {"FirstName": "Jane", "LastName": "Doe"},
]
result = await salesforce.bulk_v2.insert("Contact", data)
record_ids = [record["sf__Id"] for record in result.successful_results]

Update

To update records in bulk, you must provide the object name and data. Data must include Id field for each record.

data = [
    {"Id": "0031R00001K1H2IQAV", "FirstName": "Jon", "LastName": "Doe"},
    {"Id": "0031R00001K1H2JQAV", "FirstName": "Jane", "LastName": "Doe"},
]
result = await salesforce.bulk_v2.update("Contact", data)

Delete

To delete records in bulk, you must provide the object name and data. Data must include Id field for each record.

data = [
    {"Id": "0031R00001K1H2IQAV"},
    {"Id": "0031R00001K1H2JQAV"},
]
result = await salesforce.bulk_v2.delete("Contact", data)

Upsert

To upsert records in bulk, you must provide the object name, external ID field name, and data. Data must include external ID field value for each record.

data = [
    {"ExternalId__c": "123", "FirstName": "Jon", "LastName": "Doe"},
    {"ExternalId__c": "456", "FirstName": "Jane", "LastName": "Doe"},
]
result = await salesforce.bulk_v2.upsert("Contact", data, "ExternalId__c")

Low Level Job Management

You can manage individual jobs at a low level using the Ingest client. You can access it via salesforce.bulk_v2.ingest and it exposes the following methods:

  • create_job - create a new job for specified sObject and operation
  • get_job - get information about an existing job
  • list_jobs - list all jobs (returns an asynchronous iterator)
  • abort_job - abort an existing job
  • delete_job - delete an existing job
  • upload_job_data - upload CSV data (already converted to bytes) and set job state to UploadComplete
  • perform_operation - uses other methods to fully execute a bulk job. This method is equivalent to running regular high-level bulk operations (e.g., upsert) but with additional parameters.

Example of manually executing a bulk job:

from aiosalesforce.bulk.v2._csv import (
    deserialize_ingest_results,
    serialize_ingest_data,
)

data = [
    {"FirstName": "Jon", "LastName": "Doe"},
    {"FirstName": "Jane", "LastName": "Doe"},
]
job = await salesforce.bulk_v2.ingest.create_job("Contact", "insert")
job = await salesforce.bulk_v2.ingest.upload_job_data(
    job.id,
    serialize_ingest_data(data),
)
while job.state in {"Open", "UploadComplete", "InProgress"}:
    await asyncio.sleep(5)
    job = await self.get_job(job.id)
response = await salesforce.request(
    "GET",
    f"{salesforce.bulk_v2.ingest.base_url}/{job.id}/successfulResults",
)
successful_results = deserialize_ingest_results(response.content)

Query

Warning

🚧 Work in progress.