Tatsuya Sumiya

Tatsuya Sumiya

Data Scientist

長時間処理を実行するための Streamlit アプリ設計

長時間処理を実行するための Streamlit アプリ設計

はじめに

データサイエンティストの住谷です。

エムシーデジタルでは、数理最適化を活用したプロジェクトに取り組んでおり、これまでもテックブログで最適化のアルゴリズム開発に関する記事をいくつか公開してきました。 これらのアルゴリズムをユーザーに使ってもらうためには、最適化を実行するためのアプリ開発も必要になります。 一方で、プロジェクト初期の段階ではアルゴリズムの開発に比重を置きたいことが多く、アプリ自体は手早く作れることが望ましいです。

そのような場面で役立つのが、Python のみでアプリ開発できる Streamlit というフレームワークです。 ただし、後述のように Streamlit は長時間かかる処理と相性が悪く、最適化アルゴリズムを実行する上での課題となります。 この記事では、その課題を解決するためのアプローチを紹介します。

Streamlit の紹介

Streamlit は、Python を用いて簡単にインタラクティブな Web アプリを作成できるフレームワークです。 UI を構築するためのコードがとてもシンプルで、以下のような短いコードでアプリを作成できます。

このコードを streamlit run app.py のように実行することで、“a” と “b” の値を入力して “実行” ボタンを押すと足し算の結果が表示されるアプリを Web ブラウザ上に立ち上げることができます。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import streamlit as st


def solve(a: float, b: float) -> float:
    return a + b


if __name__ == "__main__":
    st.title("足し算アプリ")
    a = st.number_input("a", min_value=0.0, max_value=100.0, value=50.0)
    b = st.number_input("b", min_value=0.0, max_value=100.0, value=50.0)

    if st.button("実行"):
        answer = solve(a, b)
        st.success(f"結果: {answer}")

足し算アプリの画面

長時間処理を実行する上での課題

Streamlit は、ユーザーがページを操作するたびにコード全体を再実行する仕様になっています。 この仕様の利点として、アプリの状態は常に最新に保たれるため、状態管理やリフレッシュ処理を意識せずに開発することができます。 その一方で、長時間かかる処理を実行するアプリを作る際は、結果の確認などが困難になるという課題があります。

例えば、以下のように solve 関数の実行に時間を要する場合、実行中にフォームの編集やページのリロードをすると、結果が表示される前に最初から再実行されてしまいます。実行中はページを触らないようにしたとしても、処理が長時間の場合は途中でセッションが切れて結果が確認できないということもあり得ます。

1
2
3
def solve(a: float, b: float) -> float:
    time.sleep(60)  # 時間のかかる処理だという想定
    return a + b

補足として、重いファイルの読み込みなど少し時間のかかる同じ処理が何度も再実行されないようにしたいということであれば、st.cache_data というデコレータで関数にキャッシュ機能を付与することで簡単に解決できます(関数の結果が保存され、引数が同じであれば再実行されなくなります)。ただし、処理時間が非常に長く結果を一度表示するだけでも難しいケースなど、キャッシュ化のみでは完全な解決にならない場合もあります。以降では、このようなケースでも有効なアプローチを紹介します。

解決策: Streamlit 外でジョブを実行・管理する

基本方針

アプリ全体は少し複雑になりますが、次のような構成にすることで Streamlit でも長時間処理を扱うアプリを作成することができます。

  • 長時間処理はジョブとして別プロセスで実行する
  • ジョブの実行状況はデータベースで管理する
    • データベースはサーバーを必要としない SQLite を使用
  • アプリを機能ごとに次の 3 ページに分割する
    • ジョブ実行ページ:処理の入力を設定し、ジョブを実行
    • ジョブ一覧ページ:データベースを元にジョブの実行状況を一覧表示
    • 結果ページ:各ジョブの結果を表示

もちろんアプリの規模によっては、データベースやジョブを実行するバックエンドサービスを別に立てることが望ましい場合もありますが、小規模なアプリであればこのような構成にすることでセットアップや開発の負担を軽減できます。

実装例

上記の方針を実現するコードの例を以下に記載します。

フォルダ構成

example_project/
├── app/
│   ├── db.py                     # データベース定義と CRUD 操作
│   ├── job.py                    # ジョブの実行ロジック
│   ├── main.py                   # Streamlit アプリのエントリーポイント
│   ├── pages/
│   │   ├── job_execution.py      # ジョブ実行ページ
│   │   ├── job_list.py           # ジョブ一覧ページ
│   │   └── result.py             # 結果ページ
│   └── problem.py                # アプリで解く問題(長時間かかる処理の想定)
└── outputs/                      # データの出力先フォルダ

各コード

  • problem.py

アプリで解く問題を定義しています。結果保存の際などに便利であるため、問題の入出力の型も定義しています。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import time

from pydantic import BaseModel


class ProblemInputModel(BaseModel):
    a: float
    b: float


class ProblemOutputModel(BaseModel):
    answer: float


def solve_problem(input_data: ProblemInputModel) -> ProblemOutputModel:
    answer = input_data.a + input_data.b
    time.sleep(60)  # 時間のかかる処理だという想定
    return ProblemOutputModel(answer=answer)
  • db.py

ジョブ管理のためのテーブル定義と、その CRUD 操作を実装しています。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
from enum import StrEnum

from sqlalchemy import Enum, String, create_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, sessionmaker
from ulid import ULID


class Base(DeclarativeBase):
    pass


class JobStatus(StrEnum):
    RUNNING = "RUNNING"
    COMPLETED = "COMPLETED"
    FAILED = "FAILED"


class Job(Base):
    __tablename__ = "jobs"

    id: Mapped[str] = mapped_column(String, primary_key=True)
    name: Mapped[str] = mapped_column(String)
    status: Mapped[JobStatus] = mapped_column(Enum(JobStatus))


engine = create_engine("sqlite:///example_project/outputs/jobs.db")
Base.metadata.create_all(engine)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)


def create_job(name: str) -> str:
    session = SessionLocal()
    job_id = str(ULID())
    new_job = Job(id=job_id, name=name, status=JobStatus.RUNNING)
    session.add(new_job)
    session.commit()
    session.close()
    return job_id


def get_job(job_id: str) -> Job | None:
    session = SessionLocal()
    job = session.query(Job).filter(Job.id == job_id).first()
    session.close()
    return job


def get_all_jobs() -> list[Job]:
    session = SessionLocal()
    jobs = session.query(Job).all()
    session.close()
    return jobs


def update_job_status(job_id: str, status: JobStatus) -> None:
    session = SessionLocal()
    job = session.query(Job).filter(Job.id == job_id).first()
    assert job is not None, f"Job with id {job_id} does not exist."
    job.status = status
    session.commit()
    session.close()
  • job.py

データベース操作や solve_problem 関数の実行、入出力の保存といったジョブの実行ロジック全体を実装しています。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import json
from pathlib import Path

from example_project.app.db import JobStatus, create_job, update_job_status
from example_project.app.problem import ProblemInputModel, ProblemOutputModel, solve_problem


def execute_job(name: str, input_data: ProblemInputModel) -> None:
    job_id = create_job(name)
    save_dir = Path(f"example_project/outputs/{job_id}")
    save_dir.mkdir(parents=True, exist_ok=False)

    try:
        # 入力データを保存
        with open(save_dir / "input.json", "w") as f:
            json.dump(input_data.model_dump(), f, indent=4)

        # 問題を解いて結果を保存
        output_data: ProblemOutputModel = solve_problem(input_data)
        with open(save_dir / "output.json", "w") as f:
            json.dump(output_data.model_dump(), f, indent=4)

        # status を COMPLETED に更新
        update_job_status(job_id, JobStatus.COMPLETED)
    except Exception:
        # エラーが発生した場合は status を FAILED に更新
        update_job_status(job_id, JobStatus.FAILED)
        raise
  • main.py

アプリのエントリーポイントです。st.navigation を使用することで、複数ページからなるアプリを作成できます。以下に記載する 3 つのファイルが、アプリを構成するそれぞれのページの実装になっています。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
import streamlit as st

if __name__ == "__main__":
    st.navigation(
        [
            st.Page("pages/job_execution.py", title="ジョブ実行ページ"),
            st.Page("pages/job_list.py", title="ジョブ一覧ページ"),
            st.Page("pages/result.py", title="結果ページ"),
        ],
    ).run()
  • job_execution.py

ジョブ実行ページの実装です。“実行” ボタンを押すと別プロセスでジョブが実行されます。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
from multiprocessing import Process

import streamlit as st

from example_project.app.job import execute_job
from example_project.app.problem import ProblemInputModel

if __name__ == "__main__":
    st.title("ジョブ実行ページ")
    a = st.number_input("a", min_value=0.0, max_value=100.0, value=50.0)
    b = st.number_input("b", min_value=0.0, max_value=100.0, value=50.0)
    name = st.text_input("保存名")

    if st.button("実行"):
        input_data = ProblemInputModel(a=a, b=b)
        process = Process(
            target=execute_job,
            args=(name, input_data),
        )
        process.start()
        st.success("実行しました。")
  • job_list.py

ジョブ一覧ページの実装です。ジョブの実行状況の一覧と、それぞれの結果ページへのリンクを表示します。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import pandas as pd
import streamlit as st

from example_project.app.db import get_all_jobs

if __name__ == "__main__":
    st.title("ジョブ一覧ページ")
    jobs = get_all_jobs()
    jobs_df = pd.DataFrame(
        [
            {
                "保存名": job.name,
                "ステータス": job.status,
                "結果": f"/result?id={job.id}",  # 結果ページへのリンク
            }
            for job in jobs
        ]
    )
    if jobs_df.empty:
        st.write("実行されたジョブはありません。")
    else:
        st.dataframe(
            jobs_df,
            column_config={
                "結果": st.column_config.LinkColumn("結果", display_text="結果を見る ↗️")
            },
            hide_index=True,
        )
  • result.py

結果ページの実装です。クエリパラメータからジョブの ID を取得し、それに対応する結果を表示しています。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from pathlib import Path

import streamlit as st

from example_project.app.db import JobStatus, get_job

if __name__ == "__main__":
    st.title("結果ページ")

    job_id = st.query_params.get("id", None)
    if job_id is None:
        st.error("job_id is required in query parameters")
        st.stop()

    job = get_job(job_id)
    if job is None:
        st.error(f"Job with id {job_id} does not exist.")
        st.stop()

    load_dir = Path(f"example_project/outputs/{job_id}")
    with open(load_dir / "input.json", "r") as f:
        input_data = f.read()

    st.write("### 入力データ")
    st.json(input_data)

    if job.status is JobStatus.RUNNING:
        st.info("実行中です。")
    elif job.status is JobStatus.FAILED:
        st.error("実行に失敗しました。")
    elif job.status is JobStatus.COMPLETED:
        with open(load_dir / "output.json", "r") as f:
            output_data = f.read()
        st.write("### 結果")
        st.json(output_data)

実行結果

上記のコードを streamlit run example_project/app/main.py と実行すると、次のような画面のアプリが動作します。

ジョブ実行ページ

ジョブ実行ページ

ジョブ一覧ページ

ジョブ一覧ページ

結果ページ(実行完了後)

結果ページ

更なる改善

この記事の実装はなるべく簡潔になるようにしたということもあり、使いにくい点や改善点を残しています。 次のような実装を入れることで、アプリの使い勝手を向上できます。

  • 現実装ではジョブが完了してデータベースに反映されても、ページが勝手に更新するわけではないので手動でリロードする必要があります。一定時間ごとにデータを再取得するようにすることで、操作なしでページが更新されるようにできます。
  • 結果ページはクエリパラメータでジョブの ID を指定する前提ですが、サイドバーにあるページリンクからこのページに遷移するとクエリパラメータがないためエラーとなってしまいます。このサイドバーのページリンクは Streamlit が全ページを対象にデフォルトで作成するものです。サイドバーをデフォルトのものではなく自作することで、このようなページ遷移をできないようにすることが望ましいです。
  • created_atupdated_at をデータベースのスキーマに加えて、実行時刻や終了時刻も表示するとジョブ一覧ページが分かりやすくなります。
  • ジョブを実行しているプロセスの ID (PID) もデータベースで管理することで、ジョブ中断機能を実現できます。

まとめ

Streamlit は簡単にアプリを構築できる便利なフレームワークですが、長時間処理を扱う場合にはその仕様が課題となることがあります。本記事では、ジョブ管理の仕組みによりこの課題を解決するアプローチを紹介しました。

エムシーデジタルでは、技術力向上のためのイベントや勉強会なども定期的に実施しています。 もしエムシーデジタルで働くことに興味を持っていただいた方がいらっしゃいましたら、カジュアル面談も受け付けておりますので、お気軽にお声掛けください! 採用情報や面談申込みはこちらから

RSS

Tags

Next

Shohei Saito

Shohei Saito

MCD Data Science Competition 2024 開催の振り返り

はじめに データサイエンティストの齋藤です。2024年12月13日(金)に開催された MCD Data Science Competition 2024 について、運営目線での振り返りをまとめます。 本番のコンペは非公開ですが、同一設定の公開コ

  • #TechBlog
  • #Kaggle
  • #Data Science