Migrating Users from One Auth0 Tenant to Another in Python: Handling Large Payloads

Igor Mardari
7code
Published in
6 min readOct 14, 2023

User migration between Auth0 tenants can be a complex task, especially when dealing with a large number of users.

Auth0 provides robust tools and APIs to help you accomplish this task seamlessly. In this article, we will walk you through the process of migrating users from one Auth0 tenant to another while also addressing the challenge of handling large user data payloads.

Main issue

{“statusCode”:413,”error”:”Request Entity Too Large”,”message”:”Payload content length greater than maximum allowed: 512000"}

If you need to migrate < 500kb, which is no more than ~1k users, then you can probably go faster with the User Import / Export Extension.

But since I had more than this number of users, I had to proceed with a custom implementation.

Prerequisites

Before we begin, ensure that you have the following:

  • Auth0 Tenants: Access to both the source (the tenant you want to migrate from) and the target (the tenant you want to migrate to) Auth0 tenants.
  • API Token: Obtain the necessary API token to authenticate your requests to the Auth0 Management API. Here’s an image where you can find it, easily
  • connection_id : This can be Username-Password-Authentication or Google Social. The identifier of the connection can be found here, as shown in the image:

Step 1: Exporting Users from the Source Tenant

The first step in user migration is to export user data from the source Auth0 tenant. We will use the Auth0 Management API for this purpose.

We’ll use the POST /api/v2/jobs/users-exports endpoint. More info about it, can be found here: https://auth0.com/docs/manage-users/user-migration/bulk-user-exports#json-compatible-format

Here’s a Python script snippet that demonstrates how to do it:

import gzip
import json
import time
import requests

export_management_token = "YOUR_MANAGEMENT_TOKEN" # Replace with your token
domain = "YOUR_SOURCE_TENANT_DOMAIN" # Replace with your source tenant's domain
connection_id = "YOUR_CONNECTION_ID" # Replace with your connection ID

export_url = f'https://{domain}/api/v2/jobs/users-exports'
export_payload = {
'connection_id': connection_id, # Replace with your connection ID
'format': 'json',
'fields': [
{"name": "user_id"},
{"name": "given_name"},
{"name": "family_name"},
{"name": "nickname"},
{"name": "name"},
{"name": "email"},
{"name": "email_verified"},
{"name": "picture"},
{"name": "identities[0].connection"},
{"name": "created_at"},
{"name": "updated_at"},
{"name": "password_hash"},
{"name": "username"},
{"name": "custom_password_hash"}
]
}

headers = {
'Authorization': f'Bearer {export_management_token}',
'Content-Type': 'application/json'
}

export_response = requests.post(export_url, data=json.dumps(export_payload), headers=headers)
export_data = export_response.json()

# Print the decoded data (optional)
print(export_data)



job_id = export_data.get('id')
connection_name = export_data.get('connection')

# Print the initial response
print(export_data)

# Check the job status until completion or error
while True:
time.sleep(1)

# Check job status
url = f"https://{domain}/api/v2/jobs/{job_id}"
response = requests.get(url, headers=headers)
status_data = response.json()

print(status_data) # Print the status

# If the "location" field is in the response or if the job has failed, stop checking
if "location" in status_data or status_data.get('status') in ['completed', 'failed']:
break


# If the "location" field exists, download the file
if "location" in status_data:
download_url = status_data['location']
response = requests.get(download_url, stream=True)

# Get the filename from the Content-Disposition header or use a default name
compressed_filename = response.headers.get('content-disposition', '').split('filename=')[-1].strip('"')
if not compressed_filename:
compressed_filename = f'./scripts/users-{connection_name}.gz' # Default name if none found

with open(compressed_filename, 'wb') as compressed_file:
for chunk in response.iter_content(chunk_size=8192):
compressed_file.write(chunk)

print(f"File downloaded and saved as {compressed_filename}")

# Decompress the file
decompressed_ndjson_filename = compressed_filename.replace('.gz', '.ndjson')
with gzip.open(compressed_filename, 'rt') as gz_file:
with open(decompressed_ndjson_filename, 'w') as out_file:
out_file.write(gz_file.read())

print(f"File decompressed and saved as {decompressed_ndjson_filename}")

# Convert NDJSON to JSON
with open(decompressed_ndjson_filename, 'r') as ndjson_file:
ndjson_data = ndjson_file.readlines()

json_data = [json.loads(item) for item in ndjson_data]

json_filename = decompressed_ndjson_filename.replace('.ndjson', '.json')
with open(json_filename, 'w') as json_file:
json.dump(json_data, json_file, indent=4)

print(f"Converted NDJSON to JSON and saved as {json_filename}")

This script starts a job of exporting the users.

Then, we check at every seconds the job status, to see if there is a location field available. If yes, means our job is complete and we can download the file.

One of the challenges is that we don’t get th .json file directly. We get a .gz (compressed) format, after uncompressing, it turns out that Auth0 exports NDJSON and not JSON:

Auth0’s export files use the ndjson format due to the large size of the export files, while the import functionality expects a JSON file.

So there 2steps we do after we download the file:

  1. Uncompress the .gz file.
  2. Convert NDJSON to JSON.

At this step we’re done wit the export. We can proceed to the import.

Step 2: Splitting User Data into Smaller Files and importing them

One of the challenges when migrating a large number of users is handling the payload size for import. Auth0 has a maximum payload size limit for user imports. To overcome this limitation, we need to split the user data into smaller files.

The split happens at every 1000 items, which avoid the limit, but this can be changed in the `items_per_file` variable. We ensure that we have manageable-sized files for import.

In the end, we’ll import the user data into the target Auth0 tenant. We will send multiple requests, each containing a chunk of user data. Here’s a script snippet that demonstrates this:

We’ll use POST /api/v2/jobs/users-imports endpoint. More info about the endpoint here.

To put all together, here’s how you can do it:

import time
import requests

token = "YOUR_TARGET_TENANT_API_TOKEN" # Replace with your target tenant's API token
domain = "YOUR_TARGET_TENANT_DOMAIN" # Replace with your target tenant's domain
connection_id = "YOUR_CONNECTION_ID" # Replace with your connection ID

payload = {}

headers = {
'Content-Type': 'multipart/form-data',
'Accept': 'application/json',
'Authorization': f'Bearer {token}'
}

MAX_PAYLOAD_SIZE = 500000 # slightly less than 512000 to give room for headers and other overheads

url = f"https://{domain}/api/v2/jobs/users-imports"
headers = {
'Accept': 'application/json',
'Authorization': f'Bearer {token}'
}

# Load the original JSON data
with open('./scripts/users-Username-Password-Authentication.json', 'r') as file:
data = json.load(file)

# Define the number of items per file
items_per_file = 1000

# Split the data into chunks of 1000 items each
chunks = [data[i:i + items_per_file] for i in range(0, len(data), items_per_file)]

# Enumerate over the chunks and send them
for i, chunk in enumerate(chunks):
temp_filename = f"./scripts/{connection_id}chunk_{i + 1}.json"

with open(temp_filename, 'w') as temp_file:
json.dump(chunk, temp_file)

files = {
'users': (temp_filename, open(temp_filename, 'rb'), 'application/json')
}

data = {
'connection_id': connection_id,
}

# await time.sleep(5)
response = requests.post(url, headers=headers, data=data, files=files)
print(f'Response for chunk {i + 1}: {response.text}')

time.sleep(15)

# Optionally, you can remove the temporary file after sending
# os.remove(temp_filename)

In this script, we iterate through the user data chunks and send them for import to the target tenant. We include a delay (15s) between requests to ensure we don’t exceed rate limits.

Other resources that might help:

  • There is an extension User Import / Export Extension. which turned out being not very helpful in my case cause I was getting different format errors and size limit errors.

Conclusion

Migrating users from one Auth0 tenant to anot her can be a complex task, especially when dealing with large amounts of data and different formats. By following the steps outlined in this article, you can export, split, and import user data while efficiently handling large payloads. This process ensures a smooth and reliable user migration between Auth0 tenants.

--

--