Tasks ===== The application uses `Celery `_ for asynchronous and periodic work. This page describes how Celery is configured, the queue / routing model, and gives a complete inventory of every scheduled task. Per-module task references (auto-generated from docstrings) live with each app: see :doc:`/modules/shares/tasks`, :doc:`/modules/coins/tasks`, :doc:`/modules/etfs/tasks`, :doc:`/modules/indexes/tasks`, :doc:`/modules/news/tasks`, :doc:`/modules/transactions/tasks`, :doc:`/modules/core/tasks`. Configuration ------------- * **Broker:** Redis -- ``redis://redis:6379/1`` (configurable via the ``REDIS_URL`` env var). * **Result backend:** ``django-db`` -- task results are persisted in the application database via ``django_celery_results``. * **Beat scheduler:** ``django_celery_beat.schedulers:DatabaseScheduler`` -- the schedule lives in the database; on startup it is seeded from the ``celery_beat`` fixture (``app/richy/core/fixtures/celery_beat.json``). * **Serializer:** JSON for both task arguments and results. Beat ---- Beat is started with: .. code-block:: bash $ celery -A richy beat -l error -S django Notes: * ``-l error`` -- log level is ``error`` so only failures are emitted. * No ``--pidfile`` is supplied. When beat hangs the stale PID file would otherwise block a fresh process from starting; omitting it side-steps that failure mode. * ``-S django`` -- use the ``DatabaseScheduler``, so the schedule is managed in the Django admin and survives restarts. Workers ------- Three worker services are deployed, one per queue. Production configuration (from ``compose.yaml.skel``): .. code-block:: bash $ celery -A richy worker -c 2 -Q celery -O fair -l error --max-tasks-per-child=3 $ celery -A richy worker -c 1 -Q slow -O fair -l error --max-tasks-per-child=3 $ celery -A richy worker -c 2 -Q fast -O fair -l error --max-tasks-per-child=3 Notes: * ``-c N`` -- concurrency. The ``slow`` worker runs single-threaded so long scrapes cannot trample each other. * ``-O fair`` -- distribute tasks fairly to children rather than prefetching ahead, which suits long-running tasks. * ``--max-tasks-per-child=3`` -- recycle each worker process after three tasks. This bounds memory growth from libraries that leak (notably the scraping stack) and ensures a fresh interpreter for each batch. Queues ------ The application uses three queues: * ``celery`` -- default queue for general tasks * ``slow`` -- long-running scraping / network-bound tasks * ``fast`` -- short tasks expected to complete in seconds Routing ~~~~~~~ A task can be routed to different queues depending on whether it is invoked directly or fired by beat. The current routing table: +----------------------------------------------+---------+------------------------+ | task | direct | periodic (via beat) | +==============================================+=========+========================+ | richy.core.tasks.generate_performance_charts | celery | celery | +----------------------------------------------+---------+------------------------+ | richy.coins.tasks.fetch_historical_data | celery | slow | +----------------------------------------------+---------+------------------------+ | richy.coins.tasks.fetch_basic_info | fast | celery | +----------------------------------------------+---------+------------------------+ | richy.coins.tasks.fetch_current_price | fast | fast | +----------------------------------------------+---------+------------------------+ | richy.shares.tasks.fetch_historical_data | celery | slow | +----------------------------------------------+---------+------------------------+ | richy.shares.tasks.fetch_financial_data | celery | slow | +----------------------------------------------+---------+------------------------+ | richy.shares.tasks.fetch_basic_info | fast | celery | +----------------------------------------------+---------+------------------------+ | richy.shares.tasks.fetch_current_price | fast | celery | +----------------------------------------------+---------+------------------------+ | richy.shares.tasks.fetch_dividends | fast | celery | +----------------------------------------------+---------+------------------------+ | richy.shares.tasks.fetch_ratings | fast | fast | +----------------------------------------------+---------+------------------------+ | richy.shares.tasks.fetch_price_ratings | fast | fast | +----------------------------------------------+---------+------------------------+ | richy.etfs.tasks.fetch_historical_data | celery | celery | +----------------------------------------------+---------+------------------------+ | richy.etfs.tasks.fetch_basic_info | fast | fast | +----------------------------------------------+---------+------------------------+ | richy.etfs.tasks.fetch_current_price | fast | fast | +----------------------------------------------+---------+------------------------+ | richy.etfs.tasks.fetch_holdings | fast | fast | +----------------------------------------------+---------+------------------------+ | richy.etfs.tasks.fetch_dividends | fast | fast | +----------------------------------------------+---------+------------------------+ | richy.indexes.tasks.fetch_historical_data | celery | celery | +----------------------------------------------+---------+------------------------+ | richy.news.tasks.download_share_news | slow | slow | +----------------------------------------------+---------+------------------------+ | richy.news.tasks.download_coin_news | slow | slow | +----------------------------------------------+---------+------------------------+ | richy.news.tasks.download_index_news | slow | slow | +----------------------------------------------+---------+------------------------+ | richy.news.tasks.download_etfs_news | slow | slow | +----------------------------------------------+---------+------------------------+ | richy.transactions.tasks.generate_graphs | fast | -- | +----------------------------------------------+---------+------------------------+ .. note:: The "direct" column is the queue set on the ``@app.task`` decorator -- i.e. the queue used when the task is invoked programmatically (``task.delay()`` / ``apply_async()``). The "periodic" column is the *effective* queue when beat fires the task: the value from ``celery_beat.json`` if set, otherwise the decorator's queue. The fixture lives at ``app/richy/core/fixtures/celery_beat.json`` and is editable through the Django admin -- changes are picked up by beat without a restart. ``richy.transactions.tasks.generate_graphs`` is not scheduled by beat; it is invoked directly from application code. Periodic schedule ----------------- Every enabled periodic task in the system, sourced from the beat fixture, ordered by frequency: .. list-table:: :header-rows: 1 :widths: 36 22 10 32 * - Task - Schedule - Queue - Purpose * - ``richy.shares.tasks.fetch_current_price`` - every 30 minutes - celery - Cached current share market price * - ``richy.etfs.tasks.fetch_current_price`` - every 30 minutes - fast - Cached ETF market price * - ``richy.coins.tasks.fetch_current_price`` - every 30 minutes - fast - Cached coin market price * - ``richy.news.tasks.download_share_news`` - every 2 hours - slow - Share news feed * - ``richy.news.tasks.download_coin_news`` - every 2 hours - slow - Coin news feed * - ``richy.news.tasks.download_index_news`` - every 2 hours - slow - Index news feed * - ``richy.news.tasks.download_etfs_news`` - every 2 hours - slow - ETF news feed * - ``richy.coins.tasks.fetch_historical_data`` - every 3 hours - slow - Coin price history * - ``richy.core.tasks.generate_performance_charts`` - every 3 hours - celery - Generate per-item performance charts * - ``celery.backend_cleanup`` - daily, 04:00 UTC - celery - Built-in result-backend cleanup * - ``richy.shares.tasks.fetch_historical_data`` - daily, 05:00 UTC - slow - Daily share price history fetch * - ``richy.etfs.tasks.fetch_historical_data`` - daily, 06:00 UTC - celery - ETF price history * - ``richy.indexes.tasks.fetch_historical_data`` - daily, 06:00 UTC - celery - Index price history * - ``richy.shares.tasks.fetch_financial_data`` - every 1 day - slow - Earnings / revenue / EPS * - ``richy.shares.tasks.fetch_basic_info`` - every 1 day - celery - Basic share info into ``ItemData`` * - ``richy.shares.tasks.fetch_dividends`` - every 1 day - celery - Share dividend records * - ``richy.shares.tasks.fetch_ratings`` - every 1 day - fast - Analyst ratings * - ``richy.shares.tasks.fetch_price_ratings`` - every 1 day - fast - Analyst price targets * - ``richy.coins.tasks.fetch_basic_info`` - every 1 day - celery - Basic coin info * - ``richy.etfs.tasks.fetch_basic_info`` - every 1 day - fast - Basic ETF info * - ``richy.etfs.tasks.fetch_holdings`` - every 1 day - fast - ETF holdings composition * - ``richy.etfs.tasks.fetch_dividends`` - every 1 day - fast - ETF dividend records .. note:: The fixture also contains an enabled entry for ``richy.trends.tasks.fetch_trends_chart_data`` (daily, 06:00 UTC) pointing to a Django app (``trends``) that no longer exists. Beat will attempt to dispatch it and the worker will fail to import. This fixture entry should be removed -- a code change, tracked separately from these docs.