Source code for richy.core.views

import copy
import itertools
import json
import logging
import time
from collections import defaultdict
from datetime import date, datetime

from braces.views import FormMessagesMixin, JSONResponseMixin, LoginRequiredMixin
from django.conf import settings
from django.contrib import messages
from django.contrib.auth import logout
from django.contrib.auth.views import LoginView
from django.contrib.humanize.templatetags.humanize import intcomma
from django.core.cache import cache
from django.core.paginator import Paginator
from django.db.models import F, Window
from django.db.models.functions import Lead
from django.http.response import HttpResponseRedirect
from django.shortcuts import get_object_or_404
from django.template.defaultfilters import date as date_filter
from django.urls import reverse
from django.utils import timezone
from django.utils.translation import gettext as _
from django.utils.translation import ngettext
from django.views.generic import (
    CreateView,
    DetailView,
    FormView,
    ListView,
    RedirectView,
    TemplateView,
    View,
)

from ..core.scraper import Manager

# from ..indexes.tasks import fetch_current_price as fetch_current_price_index
from ..shares.models import Share
from ..transactions.models import Transaction
from ..transactions.transactions import Transactions
from . import scraper
from .charts import (
    AthPeriodsChart,
    DashboardMarketValueRatioPieChart,
    DrawdownChart,
    PerformanceChart,
    TransactionBasicPieChart,
    TransactionsBasicColumnChart,
)
from .forms import SplitterForm, SplitterPreviewForm
from .math import calc_percentage_change
from .models import (
    Asset,
    Item,
    ItemWithPrices,
    Price,
    UserItem,
    date_to_highcharts_timestamp,
)
from .templatetags.utils import autofloatformat, coinautofloatformat

LOGGER = logging.getLogger(__name__)


##############
#   Mixins   #
##############
class TransactionChartDataMixin:
    """
    Handles transactions basic chart data methods.
    """

    def get_transaction_basic_chart_data(self, closed=False):
        """
        Returns statistical and chart data:

        * investments vs. revenues charts
        * profit charts
        * textual stats

        :param bool closed: Flag if closed transaction should be considered.
        :return: Dict with keys ``investment_and_revenue``, ``profit`` and ``text``.
        :rtype: dict
        """

        def to_text_stats(stats_data):
            """
            Returns basic stats as a text.

            Output dict looks like this:

            {
                "share": {
                    "CZK":{
                        "Investment": 8150.0,
                        "Profit": 240.0,
                        "Profit %": 4.1
                    },
                    "USD": {
                        "Investment": 3206.8,
                        "Profit": 758.4,
                        "Profit %": 25.7,
                    }
                },
                "coin": {...}
            }
            """

            # Nested defaultdict.
            tree = lambda: defaultdict(tree)  # noqa
            data = tree()

            for type_, currency_stats in stats_data.items():
                for currency, stats in currency_stats.items():
                    # First is investment.
                    investment = stats[0]["data"][0]
                    # Second is revenue/market value.
                    revenue_or_market_value = stats[1]["data"][0]

                    if "coin" == type_:
                        roundfn = coinautofloatformat
                    else:
                        roundfn = autofloatformat

                    data[type_][currency] = {
                        "Investment": roundfn(investment, no_str=True),
                        "Profit": roundfn(
                            revenue_or_market_value - investment, no_str=True
                        ),
                        "Profit %": roundfn(
                            calc_percentage_change(revenue_or_market_value, investment),
                            no_str=True,
                        ),
                    }

            return data

        df = Transactions(self.request.user).get_transaction_basic_stats(closed=closed)
        pile = Transactions(self.request.user).get_pile_stats(df)

        if not df.empty:
            data = {
                "investment_and_revenue": TransactionsBasicColumnChart(
                    df, pile
                ).get_series(closed),
                "profit": TransactionBasicPieChart(df, pile).get_series(closed),
            }
            data["text"] = to_text_stats(data["investment_and_revenue"])

            return json.loads(json.dumps(data))

    def get_chart_options(self):  # pylint: disable=no-self-use
        """
        Returns options for charts used on transactions overview page.
        """

        return {
            "inv_and_rev_chart_options": {
                "tooltip": {"headerFormat": None},
                "yAxis": {"title": {"text": _("Investment")}},
                "xAxis": {"labels": {"enabled": False}},
            }
        }


[docs] class PaginatorMixin:
[docs] def get_paginator(self, object_list, per_page=20, get_attr_name="page"): """ Creates paginator object, paginates the given ``object_list`` and also returns elided page range. :param list object_list: QuerySet or list of object to paginate. :param int per_page: Number of items per page. :param get_attr_name: Name of GET attribute which carries the page number. :return: Returns Page object and list of elided pages (for paging in template). :rtype: tuple """ paginator = Paginator(object_list, per_page) page = self.request.GET.get(get_attr_name, 1) return paginator.get_page(page), paginator.get_elided_page_range(page)
[docs] class ModelPaginatorMixin: """ Provides paginator for ListViews based on model which template includes _paging.pug template. """ # might be overridden paginate_by = 20
[docs] def get_context_data(self, **kwargs): """ Adds ``pages`` and ``paginator`` context variables. """ context = super().get_context_data(**kwargs) context["pages"] = context["paginator"].get_elided_page_range( context["page_obj"].number ) return context
class SubmenuViewMixin: def get_submenu(self): raise NotImplementedError("Please implement get_submenu() method.") def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context["submenu_dynamic"] = self.get_submenu() return context ################## # Base views # ################## class AjaxView(LoginRequiredMixin, JSONResponseMixin, View): def clean_request_params(self, params): """ Cleans given request params from common mistyping casts. :param django.http.QueryDict params: The given request params. :return: Sanitized params. :rtype: dict """ params = copy.deepcopy(params) for param in params.keys(): if "undefined" == params[param]: params[param] = None return params def send(self, status, data=None): return self.render_json_response({"status": status, "data": data}) class BaseItemDetailView(LoginRequiredMixin, DetailView): def dispatch(self, request, *args, **kwargs): # Check if there are already any historical data (Price model). # Probably being downloaded right now. if not self.get_object().item.price_set.exists(): messages.warning(request, _("Historical data are not ready yet.")) return super().dispatch(request, *args, **kwargs) def get_queryset(self): return self.model.objects.select_related( "item", "item__share", "item__etf", "item__coin", "item__coin" ) def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) # Last price. context["last_price"] = self.object.item.get_last_price(current=True) context["last_price_rounded"] = ( context["last_price"].round(self.object.item) if context["last_price"] else None ) # ATH context["ath"] = self.object.item.get_ath() # YOY price change context["yoy_change"] = self.object.item.get_last_days_change( 365, percents=True ) # 52 weeks low/high price context["52_weeks_low_high"] = self.object.item.get_52_weeks_low_high() # Drawdown context["drawdown"] = self.object.item.get_drawdown() # Transactions. context["transactions_open"] = ( Transaction.objects.by_user(self.request.user) .filter(item=self.object.item) .filter(is_closed=False) .order_by("-date", "-pk") ) context["transactions_closed"] = ( Transaction.objects.by_user(self.request.user) .filter(item=self.object.item) .filter(is_closed=True) .order_by("-date", "-pk") ) # Custom ranges. context["ranges"] = self.get_ranges() # Basic data. context["basic_info"] = self.object.item.get_basic_info() # Headline chip text. if self.object.is_archived: context["headline_chip_text"] = _("archived") if self.object.item.is_share(): context["last_dividend"] = self.object.item.share.dividend_set.order_by( "ex_date" ).last() elif self.object.item.is_etf(): context["last_dividend"] = self.object.item.etf.dividend_set.order_by( "ex_date" ).last() # Drawdown. context["drawdown_options"] = json.dumps(DrawdownChart.get_options()) return context def get_ranges(self): ranges = [] last_price = self.object.item.price_set.last() # Check for last transaction. if self.object.item.transaction_set.exists(): trans = self.object.item.transaction_set.last() ranges.append( { "title": "LT", "from": datetime.combine( trans.date, datetime.min.time() ).timestamp() * 1000, "to": ( last_price.datetime.timestamp() * 1000 if last_price else None ), } ) return json.dumps(ranges) class BaseFetchItemAjaxView(AjaxView): def get_item_prices(self, item, **kwargs): """ Fetches item prices from database. :param Item item: Item model instance. :return: Price and timestamp pairs in a list. :rtype: tuple """ data = [] if hasattr(item, "coin"): roundfn = coinautofloatformat else: roundfn = autofloatformat for p in Price.objects.filter(item=item, **kwargs).order_by("datetime"): data.append([p.get_timestamp(), roundfn(p.price, no_str=True)]) return data def get_transactions(self, item): """ Fetches all related transaction from database. :param Item item: Item model instance. :return: Transactions as list of object with keys "x", "title" and "text". :rtype: list """ data = [] offset = datetime.now(timezone.get_current_timezone()).utcoffset().seconds for t in ( Transaction.objects.by_user(self.request.user) .filter(item=item) .order_by("date") ): data.append( { "x": (time.mktime(t.date.timetuple()) + offset) * 1000, "title": "+" if 0 < t.amount else "-", "text": "{} {} ({} {})".format( ( int(t.amount) if t.amount.is_integer() else t.amount ), # don't show floating point if the amount is whole number (i.e. 6.0) ( ngettext("share", "shares", abs(t.amount)) if isinstance(item, Share) else ngettext("coin", "coins", abs(t.amount)) ), intcomma(t.price), _("per share") if isinstance(item, Share) else _("per coin"), ), } ) return data def add_index_data(self, request, data): """ Adds index data (in requested by the request) to data series. :param object request: Django request object. :param dict data: Data object to be send to the client. """ if "indexes" in request.GET and (indexes := request.GET["indexes"]): data["indexes"] = [] for index_symbol in indexes.split(","): item = get_object_or_404(Item, symbol=index_symbol) data["indexes"].append(self.get_item_prices(item)) class BaseDeleteUserItemRedirectView(LoginRequiredMixin, RedirectView): def can_be_deleted(self): """ Checks if the current item can be deleted - has no related transactions. :return: True if can be deleted False otherwise :rtype: bool """ user_item = get_object_or_404(UserItem, pk=self.kwargs["pk"]) return ( not Transaction.objects.by_user(self.request.user) .filter(item=user_item.item) .exists() ) def delete_item_if_not_needed(self, item): """ Deletes an item if it's not used by any user. :param int pk: Item ID. """ if not item.useritem_set.exists(): pk = item.pk item.delete() LOGGER.debug( f"Item with ID {pk} has been deleted because it's not used by any user." ) def delete(self): user_item = get_object_or_404( UserItem, user=self.request.user, pk=self.kwargs["pk"] ) item = user_item.item user_item.delete() # TODO: delete Item in case no user uses that symbol (no UserItem) + refresh ItemWithPrices view. self.delete_item_if_not_needed(item) ItemWithPrices.refresh() class BasePerformanceDetailView(LoginRequiredMixin, DetailView): template_name = "core/item_performance.pug" model = UserItem def get_context_data(self, **kwargs): chart = PerformanceChart(self.object.item) context = super().get_context_data(**kwargs) series = chart.get_series() context["annual_performance"] = sorted( series, key=lambda i: i["year"], reverse=True ) context["performance_chart_options"] = self.get_chart_options( chart.get_options(), series ) context["basic_info"] = self.object.item.get_basic_info() # LOGGER.debug(context["performance_chart_options"]) return context def get_chart_options(self, base_options, data): return base_options | { "series": [ { "type": "candlestick", "name": _("Performance"), "data": [ ( date_to_highcharts_timestamp( i["datetime"].replace(month=1, day=1).to_pydatetime() ), i["open"], i["high"], i["low"], i["close"], ) for i in data ], }, { "name": _("Current price"), "type": "line", "color": settings.CHART_COLORS[1], }, ], } class UserItemManipulationMixin: def get_form_kwargs(self): kwargs = super().get_form_kwargs() kwargs["user"] = self.request.user return kwargs class BaseUserItemCreateView( LoginRequiredMixin, FormMessagesMixin, UserItemManipulationMixin, CreateView, ): model = UserItem def form_valid(self, form): item, created = form.item_model.objects.get_or_create( symbol=form.cleaned_data["symbol"] ) if created: self.item_created_callback(form, item) self.fetch_data(item.pk) form.instance.user = self.request.user form.instance.item = item to_return = super().form_valid(form) ItemWithPrices.refresh() return to_return def item_created_callback(self, form, item): pass def get_open_items_possession_stats(self, type): """ Fetches open items (possessions) and it's current possession percentage. :param string type: Item type to get stats for. :return: Sorted dict (by value - reversed) where key is the item symbol and value is possession percentage relative to other items. :rtype: dict """ # Get amounts. stats_df = Transactions(self.request.user).get_transaction_basic_stats( closed=False ) if 0 == len(stats_df): return {} pile = Transactions.get_pile_stats(stats_df[stats_df["type"] == type]) symbols = defaultdict(int) for root_parent, stats in pile: for item in stats["open_symbols"]: symbols[item["symbol"]] += item["market_price"] # Convert to percentages. total = sum(symbols.values()) perc_symbols = {} for key, value in symbols.items(): if value: perc_symbols[key] = 100 / (total / value) else: perc_symbols[key] = 0 # Return as a new sorted (by value) dict. return dict(sorted(perc_symbols.items(), key=lambda i: i[1], reverse=True)) def sort_items_by_other_dict_keys(self, items, other_dict): """ Sorts the given items list based on the other dict keys which should be item symbols. :param list items: List of UserItems objects to be sorted. :param dict other_dict: The (sorted )dict where key is item symbol. :return: Sorted items. :rtype: list """ symbol_indexes = list(other_dict.keys()) return sorted(items, key=lambda i: symbol_indexes.index(i.item.symbol)) def split_open_items(self, items): open_items = self.request.user.get_owned_items() open_items_list = [] other_items_list = [] # Split shares into open and the rest. for i in items: if i.item in open_items: open_items_list.append(i) else: other_items_list.append(i) return open_items_list, other_items_list class BaseHistoricalData(LoginRequiredMixin, ModelPaginatorMixin, ListView): model = Price paginate_by = 60 # 3 months template_name = "core/item_historical_data.pug" def get_queryset(self): self.object = get_object_or_404(UserItem, pk=self.kwargs["pk"]) user_item = UserItem.objects.get(pk=self.kwargs["pk"]) # Annotate day change return self.model.objects.annotate( change=100.0 * ( # TODO: order_by: can be replaced with "-datetime" F("price") / Window(Lead("price"), order_by=F("datetime").desc()) - 1 ) ).filter(item=user_item.item) def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context["object"] = self.object context["min_change"] = min( context["page_obj"].object_list, key=lambda i: i.change or 0 ).change context["max_change"] = max( context["page_obj"].object_list, key=lambda i: i.change or 0 ).change context["object_list"] = self.regroup_and_calculate_month_changes( context["object_list"] ) context["basic_info"] = self.object.item.get_basic_info() return context def regroup_and_calculate_month_changes(self, data): data = self.split_by_months(data) return self.calculate_month_changes(data) def split_by_months(self, data): """ Splits the given dataset (a list of objects) into batches by object property by year and month. Output list looks like: [ { "date": date(), "records": [ {...}, ... ], } ] :param list data: List of data objects. :param str date_field: Date(time) property name. :return: Grouped (split) dataset for each month. :rtype: list """ def fetch(record): timestamp = getattr(record, "datetime") return [timestamp.year, timestamp.month] output = [] for [year, month], data in itertools.groupby(data, fetch): output.append({"date": date(year, month, 1), "records": list(data)}) return output def calculate_month_changes(self, data): """ Extends the structure coming from self.split_by_months() for "month_change" key which stores the month price change (1st - last day of the month). """ for i, group in enumerate(data): first_of_month = group["records"][-1] last_of_month = group["records"][0] data[i]["month_change"] = calc_percentage_change( last_of_month.price, first_of_month.price ) return data ################## # Core views # ################## class HealthCheckView(JSONResponseMixin, View): def get(self, request, *args, **kwargs): """Lightweight healthcheck endpoint for Docker.""" return self.render_json_response({"status": "ok"}) class DashboardTemplateView( LoginRequiredMixin, TransactionChartDataMixin, TemplateView ): template_name = "core/dashboard.pug" def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context["open_shares"], context["shares"] = self.get_share_stats() context["indexes"] = self.get_index_stats() context["open_etfs"], context["etfs"] = self.get_etf_stats() context["open_coins"], context["coins"] = self.get_coin_stats() context["portfolio_stats"] = self.get_portfolio_stats() context["all_share_tags"] = set( itertools.chain( *[ s["share"].get_tags() for s in itertools.chain(context["open_shares"], context["shares"]) ] ) ) context["all_index_tags"] = set( itertools.chain(*[i["index"].get_tags() for i in context["indexes"]]) ) context["all_etf_tags"] = set( itertools.chain( *[ r["etf"].get_tags() for r in itertools.chain(context["open_etfs"], context["etfs"]) ] ) ) context["all_coin_tags"] = set( itertools.chain( *[ c["coin"].get_tags() for c in itertools.chain(context["open_coins"], context["coins"]) ] ) ) return context def get_share_stats(self): """ Fetches shares and shares chart from database. """ open_shares = [] shares = [] # Items with currently open transactions. open_share_symbols = [ t.item.symbol for t in Transaction.objects.by_user(self.request.user) .prefetch_related("item") .filter(is_closed=False, item__share__isnull=False) .distinct("item") ] # Stats for each share. for i in ( UserItem.objects.by_user(self.request.user) .select_related("item") .extra( select={ "perf_chart": "SELECT image FROM core_asset WHERE core_useritem.item_id = core_asset.item_id AND core_asset.type = %s" }, select_params=(Asset.PERFORMANCE_CHART,), ) .filter( item__share__isnull=False, show_on_dashboard=True, is_archived=False ) .order_by("item__symbol") ): if i.item.symbol in open_share_symbols: open_shares.append({"share": i, "chart": Asset(image=i.perf_chart)}) else: shares.append({"share": i, "chart": Asset(image=i.perf_chart)}) return open_shares, shares def get_index_stats(self): """ Fetches indexes and indexes chart from database. """ indexes = [] # Stats for each index. for i in ( UserItem.objects.by_user(self.request.user) .extra( select={ "perf_chart": "SELECT image FROM core_asset WHERE core_useritem.item_id = core_asset.item_id AND core_asset.type = %s" }, select_params=(Asset.PERFORMANCE_CHART,), ) .filter( item__index__isnull=False, show_on_dashboard=True, is_archived=False ) .order_by("item__symbol") ): indexes.append({"index": i, "chart": Asset(image=i.perf_chart)}) return indexes def get_etf_stats(self): """ Fetches ETFs and ETFs chart from database. """ open_etfs = [] etfs = [] # Items with currently open transactions. open_etf_symbols = [i.symbol for i in self.request.user.get_owned_items()] # Stats for each etf. for i in ( UserItem.objects.by_user(self.request.user) .extra( select={ "perf_chart": "SELECT image FROM core_asset WHERE core_useritem.item_id = core_asset.item_id AND core_asset.type = %s" }, select_params=(Asset.PERFORMANCE_CHART,), ) .filter(item__etf__isnull=False, show_on_dashboard=True, is_archived=False) .order_by("item__symbol") ): if i.item.symbol in open_etf_symbols: open_etfs.append({"etf": i, "chart": Asset(image=i.perf_chart)}) else: etfs.append({"etf": i, "chart": Asset(image=i.perf_chart)}) return open_etfs, etfs def get_coin_stats(self): """ Fetches coins and coins chart from database. """ open_coins = [] coins = [] # Items with currently open transactions. positives = ( Transaction.objects.by_user(self.request.user) .positive_balance() .values_list("item", flat=True) ) open_coin_symbols = [ t.item.symbol for t in Transaction.objects.by_user(self.request.user) .select_related("item") .filter(is_closed=False, item__in=positives, item__coin__isnull=False) .distinct("item") ] # Stats for each coin. for i in ( UserItem.objects.by_user(self.request.user) .select_related("item") .extra( select={ "perf_chart": "SELECT image FROM core_asset WHERE core_useritem.item_id = core_asset.item_id AND core_asset.type = %s" }, select_params=(Asset.PERFORMANCE_CHART,), ) .filter(item__coin__isnull=False, show_on_dashboard=True, is_archived=False) .order_by("item__symbol") ): if i.item.symbol in open_coin_symbols: open_coins.append({"coin": i, "chart": Asset(image=i.perf_chart)}) else: coins.append({"coin": i, "chart": Asset(image=i.perf_chart)}) return open_coins, coins def get_portfolio_stats(self): def get_investment_and_market_value(data): """ Fetches portfolio stats for each currency (currently open transactions). Output dict looks like this: { "USD": { "Investment": 36889.6, "Market value": 104630.90000000001 } } :param dict data: Input data from ``TransactionChartDataMixin.get_transaction_basic_chart_data()`` method. :return: Dict with investments and market value for each currency. :rtype: dict """ stats = defaultdict(lambda: defaultdict(int)) for item_type, currency_data in data["investment_and_revenue"].items(): for currency, currency_stats in currency_data.items(): for stat in currency_stats: stats[currency][stat["name"]] += stat["data"][0] return json.loads(json.dumps(stats)) data = self.get_transaction_basic_chart_data() investment_and_market_value = {} market_value_ratio_chart_data = {} if data: investment_and_market_value = get_investment_and_market_value(data) market_value_ratio_chart_data = ( DashboardMarketValueRatioPieChart().get_series(data) ) return { "investment_and_market_value": investment_and_market_value, "market_value_ratio_chart_data": market_value_ratio_chart_data, } class SearchRedirectView(LoginRequiredMixin, RedirectView): def get_redirect_url(self, *args, **kwargs): q = self.request.GET["query"] # Try shares. try: i = UserItem.objects.by_user(self.request.user).get( item__symbol=q.upper(), item__share__isnull=False ) except UserItem.DoesNotExist: pass else: return reverse("shares:share_detail", args=[i.pk]) # Try indexes. try: i = UserItem.objects.by_user(self.request.user).get( item__symbol=q.upper(), item__index__isnull=False ) except UserItem.DoesNotExist: pass else: return reverse("indexes:index_detail", args=[i.pk]) # Try coins. try: i = UserItem.objects.by_user(self.request.user).get( item__symbol=q.upper(), item__coin__isnull=False ) except UserItem.DoesNotExist: pass else: return reverse("coins:coin_detail", args=[i.pk]) # Try ETFs. try: i = UserItem.objects.by_user(self.request.user).get( item__symbol=q.upper(), item__etf__isnull=False ) except UserItem.DoesNotExist: pass else: return reverse("etfs:etf_detail", args=[i.pk]) # Use YAHOO! search as fallback. return f"https://finance.yahoo.com/quote/{q}?p=V&.tsrc=fin-srch" class SignInFormView(LoginView): template_name = "core/sign_in.pug" class SignOutRedirectView(RedirectView): def get_redirect_url(self): logout(self.request) return reverse(settings.LOGIN_URL) class ItemCurrentPriceAjaxView(DetailView, AjaxView): model = Item slug_field = "symbol" def get(self, request, *args, **kwargs): self.object = self.get_object() # Fetch price changes and cache it. price = Manager.get_current_price_and_change(self.object) if price: self.object.set_current_price_and_change(price) try: return self.send(True, price.round(self.object) if price else {}) except Exception: LOGGER.exception("Couldn't retrieve last price.") return self.send(False) class FetchDrawdownAjaxView(AjaxView): def get(self, request, *args, **kwargs): user_item = get_object_or_404(UserItem, pk=self.kwargs["pk"]) return self.send(True, DrawdownChart(user_item.item).get_series()) class BaseDrawdownPeriodsAndAthsDetailView( LoginRequiredMixin, PaginatorMixin, DetailView ): template_name = "core/item_drawdowns.pug" model = UserItem def get_context_data(self, **kwargs): chart = AthPeriodsChart(self.object.item) aths = self.object.item.get_aths() context = super().get_context_data(**kwargs) context["aths_chart_series"] = chart.get_series(aths) context["aths_chart_options"] = chart.get_options() context["drawdowns"], context["pages"] = self.compile_drawdown_table_data(aths) context["basic_info"] = self.object.item.get_basic_info() return context def compile_drawdown_table_data(self, aths): aths.name = "ath" aths = aths.reset_index() aths["datetime_prev"] = aths["datetime"].shift() aths["days"] = aths.apply( lambda row: (row["datetime"] - row["datetime_prev"]).days, axis=1 ).fillna(0) rows = aths.sort_index(ascending=False).to_dict(orient="records") return self.get_paginator(rows) ###################### # Splitter views # ###################### class BaseSplitterFormView(LoginRequiredMixin, FormView): # preview_template = "shares/splitter/preview.pug" form_class = SplitterForm template_name = "core/splitter/form.pug" def get_form_kwargs(self): kwargs = super().get_form_kwargs() kwargs["model"] = self.model return kwargs def form_valid(self, form): self.request.session["splitter"] = { "item": form.cleaned_data["item"].pk, "date": form.cleaned_data["date"].isoformat(), "ratio": form.cleaned_data["ratio"], } return super().form_valid(form) class BaseSplitterPreviewFormView(LoginRequiredMixin, FormMessagesMixin, FormView): """ Accessible only to admins. """ template_name = "core/splitter/preview.pug" form_class = SplitterPreviewForm def dispatch(self, request, *args, **kwargs): if "splitter" not in request.session: messages.error(request, _("No splitter data. Please fill the form.")) return HttpResponseRedirect(self.request.path) self.splitter = request.session["splitter"] self.item = Item.objects.get(pk=self.splitter["item"]) self.date = datetime.fromisoformat(self.splitter["date"]) self.ratio = self.splitter["ratio"] self.transactions = self.item.transaction_set.filter(date__lt=self.date) return super().dispatch(request, *args, **kwargs) def form_valid(self, form): for trans in self.transactions: # Split. trans.price /= self.ratio trans.amount *= self.ratio if trans.note: trans.note += "\n" else: trans.note = "" trans.node = f"(split {date_filter(timezone.now(), 'DATETIME_FORMAT')}" trans.save() # Refetch prices. self.fetch_historical_data() return super().form_valid(form) def get_form_valid_message(self): message = _("Item %(item)s has been split with ratio %(ratio)s:1") % { "item": self.item.symbol, "ratio": int(self.ratio), } # Clean up session. del self.request.session["splitter"] return message def get_success_url(self): item = UserItem.objects.by_user(self.request.user).get(item=self.item) if self.item.is_share(): return reverse("shares:share_detail", args=[item.pk]) if self.item.is_etf(): return reverse("etfs:etf_detail", args=[item.pk]) def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context["item"] = self.item context["date"] = self.date context["ratio"] = self.ratio context["transactions"] = self.transactions return context ##################### # The eye views # ##################### class BaseTheEyeView(LoginRequiredMixin, DetailView): template_name = "core/the_eye.pug" model = UserItem def get_context_data(self, **kwargs): # Is here because of sitetree. context = super().get_context_data(**kwargs) context["basic_info"] = self.object.item.get_basic_info() return context class TheEyeAjaxView(AjaxView): def get(self, request, *args, **kwargs): open = None close = None user_item = UserItem.objects.get(pk=kwargs["id"]) cache_key = f"item-{user_item.item.pk}-market-live" # Try cache. if data := cache.get(cache_key): return self.send(True, data) data = [] df, open_ts, close_ts = scraper.Manager.fetch_intraday_prices(user_item.item) if df.empty: return self.send(False) # Share or index or ETF. if ( user_item.item.is_share() or user_item.item.is_index() or user_item.item.is_etf() ): df.index = df.index.tz_convert(settings.TIME_ZONE) for idx, r in df.iterrows(): data.append( { "x": date_to_highcharts_timestamp(idx.to_pydatetime()), "open": autofloatformat(r["Open"], no_str=True), "high": autofloatformat(r["High"], no_str=True), "low": autofloatformat(r["Low"], no_str=True), "close": autofloatformat(r["Close"], no_str=True), }, ) if open_ts: open = date_to_highcharts_timestamp( open_ts.tz_convert(settings.TIME_ZONE) ) if close_ts: close = date_to_highcharts_timestamp( close_ts.tz_convert(settings.TIME_ZONE) ) data = { "symbol": user_item.item.symbol.upper(), "open": open, "close": close, "min": { "value": df["Low"].min(), "formatted": intcomma(autofloatformat(df["Low"].min())), }, "max": { "value": df["High"].max(), "formatted": intcomma(autofloatformat(df["High"].max())), }, "prices": data, } elif user_item.item.is_coin(): for idx, r in df.iterrows(): data.append( { "x": date_to_highcharts_timestamp(idx.to_pydatetime()), "open": autofloatformat(r["open"], no_str=True), "low": autofloatformat(r["low"], no_str=True), "high": autofloatformat(r["high"], no_str=True), "close": autofloatformat(r["close"], no_str=True), }, ) data = { "symbol": user_item.item.symbol.upper(), "min": { "value": df["low"].min(), "formatted": intcomma(autofloatformat(df["low"].min())), }, "max": { "value": df["high"].max(), "formatted": intcomma(autofloatformat(df["high"].max())), }, "prices": data, } # Cache data. if data: cache.set(cache_key, data, settings.ITEM_MARKET_LIVE_CACHE_TIMEOUT) return self.send(True, data)