A Blog About D4T4 & M47H

A simple bulk API client for Python 3

15 March ’17

I recently put together my first Python package! My company (AbleTo) recently migrated our CRM to Though not the most elegant interface, many tools (e.g. Marketo) provide off-the-shelf integrations with Salesforce, which was very appealing to us. To migrate all of our customer data (several million records), I used's bulk API. The big difference between working with the REST API and working with the bulk API is that the bulk API is asynchronous. You create a job, add one or more batches to that job, close the job, and wait for it to complete. Once complete, you download the results (in pieces). So a single 'operation' requires several HTTP calls to the bulk API and a rather annoying process of stitching together results. This library provides a few methods to simplify this workflow. Inputs and outputs are Pandas dataframes. To avoid reinventing the wheel, I leveraged the SalesforceLogin class in the simple_salesforce package for authentication.

Here are the key methods for performing bulk API queries:

  • query: creates a query job and submits a SOQL query as a batch to that job
  • get_all_query_results: downloads results from a query job into a Pandas dataframe

Here are the key methods for performging bulk API jobs:

  • create_job: creates a job (operations types include 'insert', 'upsert , 'update', 'delete', and 'hardDelete')
  • bulk_csv_operation: breaks Pandas dataframe into chunks and adds each chunk as a batches to a job
  • get_bulk_csv_operation_results: downloads results from a bulk CSV job into a Pandas dataframe

Here is the an example of how to use the library:

from sfdc_bulk import SalesforceBulkAPI

# set up our SF object
bulk = SalesforceBulkAPI(**sfdc_credentials)

# pull down some records into a pandas df
query_job = bulk.query('SELECT Id, Company FROM Lead LIMIT 10000')
some_records = bulk.get_all_query_results(query_job)

# make an update
update = some_records[some_records.Company == "Blockbuster Video"]
update['DoNotCall'] = 1

# push to SFDC
update_job = bulk.create_update_job(object='Lead', contentType='CSV')
bulk.bulk_csv_operation(update_job, update)
update_results = bulk.get_bulk_csv_operation_results(update_job)

Package is available on PyPI, so it can be installed easily with pip. Enjoy!