Celery

オンラインドキュメント

https://pypi.org/project/celery/

https://docs.celeryproject.org/en/stable/index.html

https://docs.celeryproject.org/en/stable/django/first-steps-with-django.html

インストール

pip install celery
1

使い方

Fedora35にPython3.8をソースコンパイルでインストールした環境で、DjangoでのCeleryの利用について説明する。

パッケージのインストール

pip3.8 install django==4.0.3
ln -s /usr/local/python38/bin/django-admin /usr/local/bin/django-admin
pip3.8 install celery==5.2.3
ln -s /usr/local/python38/bin/celery /usr/local/bin/celery
pip3.8 install redis
1
2
3
4
5

redisのインストール、開始

dnf install redis
systemctl start redis
1
2

Djangoプロジェクトの作成

コンソールで django-admin startproject djSite を実行し、プロジェクト djSite を作成する。

Djangoアプリケーションの作成

ディレクトリ djSite でコンソールで python3.8 manage.py startapp task11 を実行する。

task11 スクリーンショット

settings.pyの編集

INSTALLED_APPS に 'task11.apps.Task11Config', を追加する。

言語、タイムゾーンを以下の内容で設定する。

LANGUAGE_CODE = 'en-us'
TIME_ZONE = 'Asia/Tokyo'
USE_I18N = True
USE_TZ = False
1
2
3
4

以下の行を追加する。

import os
 
# Celery
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
CELERY_BROKER_URL = os.environ.get('REDIS_URL', 'redis://127.0.0.1:6379/1')
1
2
3
4
5

celery.pyの作成

settings.pyと同じ階層にcelery.pyを以下の内容で作成する。

import os
from celery import Celery
 
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djSite.settings')
app = Celery('djSite')
 
app.config_from_object('django.conf:settings', namespace='CELERY')
 
app.autodiscover_tasks()
1
2
3
4
5
6
7
8
9

tasks.pyの作成

settings.pyと同じ階層にtasks.pyを以下の内容で作成する。

from celery import shared_task
import time
 
@shared_task
def add(x1, x2):
    from djSite import celery
    from task11.models import Celery_Task
     
    ins = celery.app.control.inspect()
    active_info = ins.active()
    key0 = list(active_info.keys())[0]
    task_id = active_info[key0][0]['id']
     
    db_object = Celery_Task.objects.get(task_id=task_id)
    db_object.started_datetime = datetime.now()
    db_object.task_status = 'STARTED'
    db_object.save()
     
    print('処理中 [{}]'.format(task_id))
    y = x1 + x2
    time.sleep(10)
    print('処理完了')
     
    db_object.finished_datetime = datetime.now()
    db_object.task_status = 'SUCCESS'
    db_object.task_result = str(y)
    db_object.save()
     
    return y
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

__init__.pyの編集

settings.pyと同じ階層の__init__.pyを編集する。

from .celery import app as celery_app
 
__all__ = ('celery_app',)
1
2
3

urls.pyの編集

settings.pyと同じ階層のurls.pyを編集する。

from django.contrib import admin
from django.urls import include, path
 
urlpatterns = [
    path('task11/', include('task11.urls')),
    path('admin/', admin.site.urls),
]
1
2
3
4
5
6
7

task11/urls.pyの作成

from django.urls import path
 
from . import views
 
urlpatterns = [
    path('', views.index, name='task11_index'),
    path('list/', views.list, name='task11_list'),
    path('list/delete/', views.list_delete, name='task11_list_delete'),
    path('result/<str:task_id>/', views.result, name='task11_result'),
]
1
2
3
4
5
6
7
8
9
10

task11/models.pyの編集

from django.db import models
 
# Create your models here.
class Celery_Task(models.Model):
    task_id = models.CharField(max_length=100)
    task_name = models.CharField(max_length=100)
    STATUS_CHOICES = [ ('PENDING','PENDING'), ('STARTED','STARTED'), ('FAILURE','FAILURE'), ('SUCCESS','SUCCESS'), ]
    task_status = models.CharField(max_length=20,choices=STATUS_CHOICES, default='PENDING')
    task_result = models.TextField(null=True)
    recieved_datetime = models.DateTimeField(null=True)
    started_datetime = models.DateTimeField(null=True)
    finished_datetime = models.DateTimeField(null=True)
    
    def __str__(self):
        return self.task_id + ' (' + self.task_status + ')'
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

task11/admin.pyの編集

from django.contrib import admin
 
# Register your models here.
from .models import Celery_Task
admin.site.register(Celery_Task)
1
2
3
4
5

task11/views.pyの編集

from django.shortcuts import render
 
# Create your views here.
from djSite.tasks import add
from .models import Celery_Task
 
from datetime import datetime
 
def index(request):
    result = add.delay(1, 2)
    task_id = result.id
    ct = Celery_Task(task_id=task_id, task_name='add', recieved_datetime=datetime.now())
    ct.save()
    
    text1 = '<h1>Django Celery タスクの登録完了</h1>\n'
    text1 += '<div class="result">'
    text1 += '<p>タスクID: {}</p>'.format(task_id)
    text1 += '<p>タスクの状態: {}</p>'.format(result.state)
    text1 += '</div>\n'
    
    text1 += '<h4><p class="left"><a href="list/">タスクのリスト</a></p></h4>\n'
    
    context = { 'title':'Django Celery' , 'render_text': text1, }
    return render(request, 'task11_index.html', context)
 
def to_datetime_str(dt) :
    dt_str = ""
    
    if dt != None :
        dt_str = dt.strftime('%B %d, %Y, %H:%M:%S')
    
    return dt_str
 
def list(request):
    text1 = '<h1>Django Celery タスクのリスト</h1>\n'
    text1 += '<table>\n<th>タスクID</th><th>タスク状態</th><th>受付日時</th><th>開始日時</th><th>終了日時</th><tr>\n'
    
    result_object_list = Celery_Task.objects.all().filter(task_name='add')
    for result_object in result_object_list :
        dt1 = to_datetime_str(result_object.recieved_datetime)
        dt2 = to_datetime_str(result_object.started_datetime)
        dt3 = to_datetime_str(result_object.finished_datetime)
        
        if(result_object.task_status!="SUCCESS") :
            text1 += '<td>{}</td><td>{}</td><td>{}</td><td>{}</td><td>{}</td><tr>\n'.format(result_object.task_id, result_object.task_status, dt1, dt2, dt3)
        else :
            text1 += '<td><a href="../result/{}/">{}</a></td><td>{}</td><td>{}</td><td>{}</td><td>{}</td><tr>\n'.format(result_object.task_id, result_object.task_id, result_object.task_status, dt1, dt2, dt3)
    
    text1 += '</table>\n'
    
    text1 += '<h4><p class="left"><a href="delete">終了タスクの履歴の削除</a></p></h4>\n'
    
    context = { 'title':'Django Celery' , 'render_text': text1, }
    return render(request, 'task11_page.html', context)
 
def list_delete(request):
    text1 = '<h1>Django Celery 終了タスクの履歴の削除</h1>\n'
    
    db_object = Celery_Task.objects.all().filter(task_status='SUCCESS')
    db_object.delete()
    
    db_object = Celery_Task.objects.all().filter(task_status='FAILURE')
    db_object.delete()
    
    text1 += '<script>\n  setTimeout(function(){ document.location.href="../"; }, 500);\n</script>\n'
    
    context = { 'title':'Django Celery' , 'render_text': text1, }
    return render(request, 'task11_page.html', context)
 
def result(request, task_id):
    text1 = '<h1>Django Celery タスクの処理結果</h1>\n'
    
    result_object = Celery_Task.objects.get(task_id=task_id)
    task_result = result_object.task_result
    
    text1 += "<h3>処理結果</h3>\n"
    text1 += '<div class="result"><p>{}</p></div>\n'.format(task_result)
    
    context = { 'title':'Django Celery' , 'render_text': text1, }
    return render(request, 'task11_page.html', context)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80

task11/templates/task11_layout.htmlの作成

<!DOCTYPE html>
<html lang="ja">
  <head>
    <meta charset="UTF-8">
    <title>{{title}}</title>
    <style>
      body  { background-color: black; color: white; margin: 30px; }
      h1  { color: gold; border-bottom: double 3px; font-weight: normal; }
      h2  { color: navajowhite; border-bottom: solid 2px; border-left: solid 20px; padding: 3px; font-weight: normal; }
      h3  { border-bottom: solid 1px white; padding: 3px; font-weight: normal; }
      h4  { border-top: solid 1px gold; padding: 3px; margin-top: 30px; font-weight: normal; }
      h4 p  { padding: 0px; margin: 0px; }
      .left  { text-align: left; }
      .center  { text-align: center; }
      .right  { text-align: right; }
      .result  { margin-top: 10px; margin-bottom: 10px; margin-left: 60px; margin-right: 60px; }
      table  { color: white; padding: 0px; margin: 0px; border-collapse: collapse; width: 100%; border: none; }
      th  { text-align: left; }
      th,td  { border-bottom: solid 1px white; padding-top: 3px; padding-left: 10px; padding-right: 10px; padding-bottom: 3px; }
      a  {  color: deepskyblue; text-decoration: underline dotted lightskyblue; }
    </style>
  </head>
  <body>
    {% block body %}{% endblock %}
  </body>
</html>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

task11/templates/task11_index.htmlの作成

{% extends "task11_layout.html" %}
{% block body %}
{{render_text|safe}}
<footer><h4><p class="right">copyright 2022 eagle eight</p></h4></footer>
{% endblock %}
1
2
3
4
5

task11/templates/task11_page.htmlの作成

{% extends "task11_layout.html" %}
{% block body %}
{{render_text|safe}}
<footer><h4><p class="right">copyright 2022 eagle eight</p></h4></footer>
{% endblock %}
1
2
3
4
5

データベーステーブルの作成

コンソールで python3.8 manage.py makemigrations を実行する。

コンソールで python3.8 manage.py migrate を実行する。

ワーカープロセスの起動

manage.pyと同じ階層でコンソールで celery -A djSite worker -c 1 -l info を実行する。

※ 著者:注 2022/03/08

ワーカー数を設定するオプション -c を指定しない場合、ワーカー数は 4 で動作する。この時、tasks.py の add関数 は例外を出すことなく動作するが、Celery_Taskテーブルの更新が正常に動作できていない。オプション -c 1 を付けてワーカー数を 1 に設定すると、Celery_Taskテーブルの更新は正常に動作できる。

Django開発用サーバの起動

manage.pyと同じ階層でコンソールで python3.8 manage.py runserver を実行する。

動作確認

http://127.0.0.1:8000/task11/ へアクセスして、Django Celery タスクの登録完了 のページが表示されることを確認する。

Django Celery タスクの登録完了 のページで タスクのリストをクリックして、Django Celery タスクのリスト のページが表示されることを確認する。

Django Celery タスクのリスト のページで 終了タスクのタスクID をクリックして、Django Celery タスクの処理結果 のページが表示されることを確認する。

Django Celery タスクのリスト のページで 終了タスクの履歴の削除 をクリックして、終了タスクがリストから削除されていることを確認する。