Async tasks are an intermediate backend engineering problem. Most public facing
apis need to return an answer under 1 second for a good user experience. However
many real world tasks require more than 1 second to complete or need to be scheduled
to repeat. The solution is usually to create a task that can run asynchronously.
In Django one way to accomplish this is by writing a management command.
Real APIs are typically paginated. Pagination means that only a portion of the matching data is sent back at a time. Sending all of the results without pagination could overwhelm the requesting program and bog down the service returning the data. Since most real-world apis are paginated, learning how to traverse and consume pagination is a good thing to master.
The following is an example of a django management command that queries a blog api, traverses pagination to get all of the posts, processes and stores the data in the database. Be sure to read the notes at the end of the post.
# app/management/commands/import_articles.py
import json, logging, os, traceback
from datetime import datetime, timedelta
import requests
from django.core.management.base import BaseCommand
from app.models import Photo, Post
class Command(BaseCommand):
"""
Usage:
$ python manage.py import_articles
$ python manage.py import_articles --begin_date 2019-09-09T09:30:00 --end_date 2019-09-10T09:30:00
"""
help = 'Import articles from 3rd party api.'
def add_arguments(self, parser):
parser.add_argument('--begin_date', action='store', default=None, help='beginning of data to import')
parser.add_argument('--end_date', action='store', default=None, help='end of data to import')
def handle(self, *args, **options):
logger = logging.getLogger('app.management.commands')
API_KEY = os.environ.get("API_KEY", "token")
base_url='https://www.blog.com/v3/data/search'
# process args if provided, otherwise set default values
if options['begin_date']:
begin_date = datetime.fromisoformat(options['begin_date'])
else:
begin_date = datetime.now() + timedelta(days=7)
if options['end_date']:
end_date = datetime.fromisoformat(options['end_date'])
else:
end_date = begin_date + timedelta(days=7)
# request params to be used
payload={
'token': API_KEY,
'start_date.range_start': begin_date.isoformat(timespec='seconds'),
'start_date.range_end': end_date.isoformat(timespec='seconds')
}
session = requests.Session()
# get_jobs processes one page worth of results at a time
def get_jobs():
first_page = session.get(base_url, params=payload).json()
yield first_page
num_pages = first_page['pagination']['page_count']
for page in range(2, num_pages + 1):
new_payload = payload
new_payload['page'] = page
next_page = session.get(base_url, params=new_payload).json()
yield next_page
# issue pagination api requests accumulating results into result_set
result_set = []
for page in get_jobs():
data = page.get('data', [])
result_set.append(data)
# store data into our api
for article in result_set:
post = Post(body=article['body'], is_published=False)
try:
post.save()
if article.get('logo') and article['logo'].get('url'):
photo = Photo(post=post, img_url=article['logo']['url'])
photo.save()
except:
trace_back = traceback.format_exc()
logger.warning('Failed to import data. %s', trace_back)
logger.info('Data import job complete.')
And below is the logging configuration that sends the management command’s logs to stdout in development.
# settings.py
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'simple': {
'format': '{levelname} {message}',
'style': '{',
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'simple'
},
},
'loggers': {
'django': {
'handlers': ['console'],
'level': os.getenv('DJANGO_LOG_LEVEL', 'INFO'),
},
'app.management.commands': {
'handlers': ['console'],
'level': os.getenv('DJANGO_LOG_LEVEL', 'INFO'),
}
}
}
Things to note:
app.management.commands
. If you use __name__
, as mentioned in the django docs, you will have to make a logger for each management command and that seems silly. Instead we use the same logger for all management commands.model.save()
) in a try except. In the except clause we log our error and continue. If we allow errors to be raised the job would stop, even if only one item has the problem. You should have your logging system send you an alert if the job logs a warning.Protip: you can pipe json responses to jq
to quickly understand the data. For example running $ curl -s 'https://blog.com/v3/data/search/?q=science&token=abc123' | jq '.data[0] | keys'
will query an api and send the json response to jq. The jq query here pulls out the first result in the data array and lists the keys on the object. I did several sanity checks in the commandline using this combo while developing this django task. Read the jq site for more things that are possible.
If you need help solving your business problems with software read how to hire me.