オンラインドキュメント
https://pypi.org/project/celery/
https://docs.celeryproject.org/en/stable/index.html
https://docs.celeryproject.org/en/stable/django/first-steps-with-django.html
インストール
使い方
Fedora35にPython3.8をソースコンパイルでインストールした環境で、DjangoでのCeleryの利用について説明する。
パッケージのインストール
pip3.8 install django==4.0.3 | ln -s /usr/local/python38/bin/django-admin /usr/local/bin/django-admin |
pip3.8 install celery==5.2.3 |
ln -s /usr/local/python38/bin/celery /usr/local/bin/celery |
pip3.8 install redis |
|
redisのインストール、開始
dnf install redis | systemctl start redis |
|
Djangoプロジェクトの作成
コンソールで django-admin startproject djSite を実行し、プロジェクト djSite を作成する。
Djangoアプリケーションの作成
ディレクトリ djSite でコンソールで python3.8 manage.py startapp task11 を実行する。
task11 スクリーンショット
settings.pyの編集
INSTALLED_APPS に 'task11.apps.Task11Config', を追加する。
言語、タイムゾーンを以下の内容で設定する。
LANGUAGE_CODE = 'en-us' | TIME_ZONE = 'Asia/Tokyo' |
USE_I18N = True |
USE_TZ = False |
|
以下の行を追加する。
import os | |
# Celery |
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' |
CELERY_BROKER_URL = os.environ.get('REDIS_URL', 'redis://127.0.0.1:6379/1') |
|
celery.pyの作成
settings.pyと同じ階層にcelery.pyを以下の内容で作成する。
import os | from celery import Celery |
|
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djSite.settings') |
app = Celery('djSite') |
|
app.config_from_object('django.conf:settings', namespace='CELERY') |
|
app.autodiscover_tasks() |
|
tasks.pyの作成
settings.pyと同じ階層にtasks.pyを以下の内容で作成する。
from celery import shared_task | import time |
|
@shared_task |
def add(x1, x2): |
from djSite import celery |
from task11.models import Celery_Task |
|
ins = celery.app.control.inspect() |
active_info = ins.active() |
key0 = list(active_info.keys())[0] |
task_id = active_info[key0][0]['id'] |
|
db_object = Celery_Task.objects.get(task_id=task_id) |
db_object.started_datetime = datetime.now() |
db_object.task_status = 'STARTED' |
db_object.save() |
|
print('処理中 [{}]'.format(task_id)) |
y = x1 + x2 |
time.sleep(10) |
print('処理完了') |
|
db_object.finished_datetime = datetime.now() |
db_object.task_status = 'SUCCESS' |
db_object.task_result = str(y) |
db_object.save() |
|
return y |
|
1 | 2 |
3 |
4 |
5 |
6 |
7 |
8 |
9 |
10 |
11 |
12 |
13 |
14 |
15 |
16 |
17 |
18 |
19 |
20 |
21 |
22 |
23 |
24 |
25 |
26 |
27 |
28 |
29 |
|
__init__.pyの編集
settings.pyと同じ階層の__init__.pyを編集する。
from .celery import app as celery_app | |
__all__ = ('celery_app',) |
|
urls.pyの編集
settings.pyと同じ階層のurls.pyを編集する。
from django.contrib import admin | from django.urls import include, path |
|
urlpatterns = [ |
path('task11/', include('task11.urls')), |
path('admin/', admin.site.urls), |
] |
|
task11/urls.pyの作成
from django.urls import path | |
from . import views |
|
urlpatterns = [ |
path('', views.index, name='task11_index'), |
path('list/', views.list, name='task11_list'), |
path('list/delete/', views.list_delete, name='task11_list_delete'), |
path('result/<str:task_id>/', views.result, name='task11_result'), |
] |
|
task11/models.pyの編集
from django.db import models | |
# Create your models here. |
class Celery_Task(models.Model): |
task_id = models.CharField(max_length=100) |
task_name = models.CharField(max_length=100) |
STATUS_CHOICES = [ ('PENDING','PENDING'), ('STARTED','STARTED'), ('FAILURE','FAILURE'), ('SUCCESS','SUCCESS'), ] |
task_status = models.CharField(max_length=20,choices=STATUS_CHOICES, default='PENDING') |
task_result = models.TextField(null=True) |
recieved_datetime = models.DateTimeField(null=True) |
started_datetime = models.DateTimeField(null=True) |
finished_datetime = models.DateTimeField(null=True) |
|
def __str__(self): |
return self.task_id + ' (' + self.task_status + ')' |
|
task11/admin.pyの編集
from django.contrib import admin | |
# Register your models here. |
from .models import Celery_Task |
admin.site.register(Celery_Task) |
|
task11/views.pyの編集
from django.shortcuts import render | |
# Create your views here. |
from djSite.tasks import add |
from .models import Celery_Task |
|
from datetime import datetime |
|
def index(request): |
result = add.delay(1, 2) |
task_id = result.id |
ct = Celery_Task(task_id=task_id, task_name='add', recieved_datetime=datetime.now()) |
ct.save() |
|
text1 = '<h1>Django Celery タスクの登録完了</h1>\n' |
text1 += '<div class="result">' |
text1 += '<p>タスクID: {}</p>'.format(task_id) |
text1 += '<p>タスクの状態: {}</p>'.format(result.state) |
text1 += '</div>\n' |
|
text1 += '<h4><p class="left"><a href="list/">タスクのリスト</a></p></h4>\n' |
|
context = { 'title':'Django Celery' , 'render_text': text1, } |
return render(request, 'task11_index.html', context) |
|
def to_datetime_str(dt) : |
dt_str = "" |
|
if dt != None : |
dt_str = dt.strftime('%B %d, %Y, %H:%M:%S') |
|
return dt_str |
|
def list(request): |
text1 = '<h1>Django Celery タスクのリスト</h1>\n' |
text1 += '<table>\n<th>タスクID</th><th>タスク状態</th><th>受付日時</th><th>開始日時</th><th>終了日時</th><tr>\n' |
|
result_object_list = Celery_Task.objects.all().filter(task_name='add') |
for result_object in result_object_list : |
dt1 = to_datetime_str(result_object.recieved_datetime) |
dt2 = to_datetime_str(result_object.started_datetime) |
dt3 = to_datetime_str(result_object.finished_datetime) |
|
if(result_object.task_status!="SUCCESS") : |
text1 += '<td>{}</td><td>{}</td><td>{}</td><td>{}</td><td>{}</td><tr>\n'.format(result_object.task_id, result_object.task_status, dt1, dt2, dt3) |
else : |
text1 += '<td><a href="../result/{}/">{}</a></td><td>{}</td><td>{}</td><td>{}</td><td>{}</td><tr>\n'.format(result_object.task_id, result_object.task_id, result_object.task_status, dt1, dt2, dt3) |
|
text1 += '</table>\n' |
|
text1 += '<h4><p class="left"><a href="delete">終了タスクの履歴の削除</a></p></h4>\n' |
|
context = { 'title':'Django Celery' , 'render_text': text1, } |
return render(request, 'task11_page.html', context) |
|
def list_delete(request): |
text1 = '<h1>Django Celery 終了タスクの履歴の削除</h1>\n' |
|
db_object = Celery_Task.objects.all().filter(task_status='SUCCESS') |
db_object.delete() |
|
db_object = Celery_Task.objects.all().filter(task_status='FAILURE') |
db_object.delete() |
|
text1 += '<script>\n setTimeout(function(){ document.location.href="../"; }, 500);\n</script>\n' |
|
context = { 'title':'Django Celery' , 'render_text': text1, } |
return render(request, 'task11_page.html', context) |
|
def result(request, task_id): |
text1 = '<h1>Django Celery タスクの処理結果</h1>\n' |
|
result_object = Celery_Task.objects.get(task_id=task_id) |
task_result = result_object.task_result |
|
text1 += "<h3>処理結果</h3>\n" |
text1 += '<div class="result"><p>{}</p></div>\n'.format(task_result) |
|
context = { 'title':'Django Celery' , 'render_text': text1, } |
return render(request, 'task11_page.html', context) |
|
1 | 2 |
3 |
4 |
5 |
6 |
7 |
8 |
9 |
10 |
11 |
12 |
13 |
14 |
15 |
16 |
17 |
18 |
19 |
20 |
21 |
22 |
23 |
24 |
25 |
26 |
27 |
28 |
29 |
30 |
31 |
32 |
33 |
34 |
35 |
36 |
37 |
38 |
39 |
40 |
41 |
42 |
43 |
44 |
45 |
46 |
47 |
48 |
49 |
50 |
51 |
52 |
53 |
54 |
55 |
56 |
57 |
58 |
59 |
60 |
61 |
62 |
63 |
64 |
65 |
66 |
67 |
68 |
69 |
70 |
71 |
72 |
73 |
74 |
75 |
76 |
77 |
78 |
79 |
80 |
|
task11/templates/task11_layout.htmlの作成
<!DOCTYPE html> | <html lang="ja"> |
<head> |
<meta charset="UTF-8"> |
<title>{{title}}</title> |
<style> |
body { background-color: black; color: white; margin: 30px; } |
h1 { color: gold; border-bottom: double 3px; font-weight: normal; } |
h2 { color: navajowhite; border-bottom: solid 2px; border-left: solid 20px; padding: 3px; font-weight: normal; } |
h3 { border-bottom: solid 1px white; padding: 3px; font-weight: normal; } |
h4 { border-top: solid 1px gold; padding: 3px; margin-top: 30px; font-weight: normal; } |
h4 p { padding: 0px; margin: 0px; } |
.left { text-align: left; } |
.center { text-align: center; } |
.right { text-align: right; } |
.result { margin-top: 10px; margin-bottom: 10px; margin-left: 60px; margin-right: 60px; } |
table { color: white; padding: 0px; margin: 0px; border-collapse: collapse; width: 100%; border: none; } |
th { text-align: left; } |
th,td { border-bottom: solid 1px white; padding-top: 3px; padding-left: 10px; padding-right: 10px; padding-bottom: 3px; } |
a { color: deepskyblue; text-decoration: underline dotted lightskyblue; } |
</style> |
</head> |
<body> |
{% block body %}{% endblock %} |
</body> |
</html> |
|
1 | 2 |
3 |
4 |
5 |
6 |
7 |
8 |
9 |
10 |
11 |
12 |
13 |
14 |
15 |
16 |
17 |
18 |
19 |
20 |
21 |
22 |
23 |
24 |
25 |
26 |
|
task11/templates/task11_index.htmlの作成
{% extends "task11_layout.html" %} | {% block body %} |
{{render_text|safe}} |
<footer><h4><p class="right">copyright 2022 eagle eight</p></h4></footer> |
{% endblock %} |
|
task11/templates/task11_page.htmlの作成
{% extends "task11_layout.html" %} | {% block body %} |
{{render_text|safe}} |
<footer><h4><p class="right">copyright 2022 eagle eight</p></h4></footer> |
{% endblock %} |
|
データベーステーブルの作成
コンソールで python3.8 manage.py makemigrations を実行する。
コンソールで python3.8 manage.py migrate を実行する。
ワーカープロセスの起動
manage.pyと同じ階層でコンソールで celery -A djSite worker -c 1 -l info を実行する。
※ 著者:注 2022/03/08
ワーカー数を設定するオプション -c を指定しない場合、ワーカー数は 4 で動作する。この時、tasks.py の add関数 は例外を出すことなく動作するが、Celery_Taskテーブルの更新が正常に動作できていない。オプション -c 1 を付けてワーカー数を 1 に設定すると、Celery_Taskテーブルの更新は正常に動作できる。
Django開発用サーバの起動
manage.pyと同じ階層でコンソールで python3.8 manage.py runserver を実行する。
動作確認
http://127.0.0.1:8000/task11/ へアクセスして、Django Celery タスクの登録完了 のページが表示されることを確認する。
Django Celery タスクの登録完了 のページで タスクのリストをクリックして、Django Celery タスクのリスト のページが表示されることを確認する。
Django Celery タスクのリスト のページで 終了タスクのタスクID をクリックして、Django Celery タスクの処理結果 のページが表示されることを確認する。
Django Celery タスクのリスト のページで 終了タスクの履歴の削除 をクリックして、終了タスクがリストから削除されていることを確認する。