We can't find the internet
Attempting to reconnect
Something went wrong!
Hang in there while we get back on track
Some of the tasks for my website are recurring and need to be executed on a daily or hourly basis. This can be done by using a crontab, but each of them requires a different configuration and I prefer to keep the cron definitions part of the code.
Therefor, I settled on a different approach using apscheduler
.
The way I tend to start it is by starting to define the jobs in a jobs
module under each application package:
blog/jobs.py
from django.core import management
def daily_db_backup():
print('---> Making datababase backup')
management.call_command('dbbackup')
print('---> Finished datababase backup')
def hourly_project_refresh():
# refresh the project list
Each application can easily have it's own jobs
module.
Then, under the main application, I create a command which can be used to run them:
core/management/commands/run_cronjobs
from django.core.management.base import BaseCommand
from apscheduler.schedulers.blocking import BlockingScheduler
import blog.jobs as blog_jobs
class Command(BaseCommand):
help = 'Runs the cronjobs'
def handle(self, *args, **options):
self._log_info("Configuring jobs")
scheduler = BlockingScheduler()
scheduler.add_job(blog_jobs.hourly_project_refresh, 'cron', minute='0', hour='*', day='*', week='*', month='*')
scheduler.add_job(blog_jobs.daily_db_backup, 'cron', minute='0', hour='1', day='*', week='*', month='*')
self._log_info("Running scheduler")
try:
scheduler.start()
except KeyboardInterrupt:
return
def _log_info(self, *msg):
full_msg = ' '.join(msg)
self.stdout.write(self.style.SUCCESS(full_msg))
You can then run the following command to have the cronjobs processed (in my case, I'm running them with supervisord):
$ ./manage.py run_cronjobs
If this post was enjoyable or useful for you, please share it! If you have comments, questions, or feedback, you can email my personal email. To get new posts, subscribe use the RSS feed.