import copy
import itertools
import json
import logging
import time
from collections import defaultdict
from datetime import date, datetime
from braces.views import FormMessagesMixin, JSONResponseMixin, LoginRequiredMixin
from django.conf import settings
from django.contrib import messages
from django.contrib.auth import logout
from django.contrib.auth.views import LoginView
from django.contrib.humanize.templatetags.humanize import intcomma
from django.core.cache import cache
from django.core.paginator import Paginator
from django.db.models import F, Window
from django.db.models.functions import Lead
from django.http.response import HttpResponseRedirect
from django.shortcuts import get_object_or_404
from django.template.defaultfilters import date as date_filter
from django.urls import reverse
from django.utils import timezone
from django.utils.translation import gettext as _
from django.utils.translation import ngettext
from django.views.generic import (
CreateView,
DetailView,
FormView,
ListView,
RedirectView,
TemplateView,
View,
)
from ..core.scraper import Manager
# from ..indexes.tasks import fetch_current_price as fetch_current_price_index
from ..shares.models import Share
from ..transactions.models import Transaction
from ..transactions.transactions import Transactions
from . import scraper
from .charts import (
AthPeriodsChart,
DashboardMarketValueRatioPieChart,
DrawdownChart,
PerformanceChart,
TransactionBasicPieChart,
TransactionsBasicColumnChart,
)
from .forms import SplitterForm, SplitterPreviewForm
from .math import calc_percentage_change
from .models import (
Asset,
Item,
ItemWithPrices,
Price,
UserItem,
date_to_highcharts_timestamp,
)
from .templatetags.utils import autofloatformat, coinautofloatformat
LOGGER = logging.getLogger(__name__)
##############
# Mixins #
##############
class TransactionChartDataMixin:
"""
Handles transactions basic chart data methods.
"""
def get_transaction_basic_chart_data(self, closed=False):
"""
Returns statistical and chart data:
* investments vs. revenues charts
* profit charts
* textual stats
:param bool closed: Flag if closed transaction should be considered.
:return: Dict with keys ``investment_and_revenue``, ``profit`` and ``text``.
:rtype: dict
"""
def to_text_stats(stats_data):
"""
Returns basic stats as a text.
Output dict looks like this:
{
"share": {
"CZK":{
"Investment": 8150.0,
"Profit": 240.0,
"Profit %": 4.1
},
"USD": {
"Investment": 3206.8,
"Profit": 758.4,
"Profit %": 25.7,
}
},
"coin": {...}
}
"""
# Nested defaultdict.
tree = lambda: defaultdict(tree) # noqa
data = tree()
for type_, currency_stats in stats_data.items():
for currency, stats in currency_stats.items():
# First is investment.
investment = stats[0]["data"][0]
# Second is revenue/market value.
revenue_or_market_value = stats[1]["data"][0]
if "coin" == type_:
roundfn = coinautofloatformat
else:
roundfn = autofloatformat
data[type_][currency] = {
"Investment": roundfn(investment, no_str=True),
"Profit": roundfn(
revenue_or_market_value - investment, no_str=True
),
"Profit %": roundfn(
calc_percentage_change(revenue_or_market_value, investment),
no_str=True,
),
}
return data
df = Transactions(self.request.user).get_transaction_basic_stats(closed=closed)
pile = Transactions(self.request.user).get_pile_stats(df)
if not df.empty:
data = {
"investment_and_revenue": TransactionsBasicColumnChart(
df, pile
).get_series(closed),
"profit": TransactionBasicPieChart(df, pile).get_series(closed),
}
data["text"] = to_text_stats(data["investment_and_revenue"])
return json.loads(json.dumps(data))
def get_chart_options(self): # pylint: disable=no-self-use
"""
Returns options for charts used on transactions overview page.
"""
return {
"inv_and_rev_chart_options": {
"tooltip": {"headerFormat": None},
"yAxis": {"title": {"text": _("Investment")}},
"xAxis": {"labels": {"enabled": False}},
}
}
[docs]
class PaginatorMixin:
[docs]
def get_paginator(self, object_list, per_page=20, get_attr_name="page"):
"""
Creates paginator object, paginates the given ``object_list`` and
also returns elided page range.
:param list object_list: QuerySet or list of object to paginate.
:param int per_page: Number of items per page.
:param get_attr_name: Name of GET attribute which carries the page number.
:return: Returns Page object and list of elided pages (for paging in template).
:rtype: tuple
"""
paginator = Paginator(object_list, per_page)
page = self.request.GET.get(get_attr_name, 1)
return paginator.get_page(page), paginator.get_elided_page_range(page)
[docs]
class ModelPaginatorMixin:
"""
Provides paginator for ListViews based on model
which template includes _paging.pug template.
"""
# might be overridden
paginate_by = 20
[docs]
def get_context_data(self, **kwargs):
"""
Adds ``pages`` and ``paginator`` context variables.
"""
context = super().get_context_data(**kwargs)
context["pages"] = context["paginator"].get_elided_page_range(
context["page_obj"].number
)
return context
class SubmenuViewMixin:
def get_submenu(self):
raise NotImplementedError("Please implement get_submenu() method.")
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["submenu_dynamic"] = self.get_submenu()
return context
##################
# Base views #
##################
class AjaxView(LoginRequiredMixin, JSONResponseMixin, View):
def clean_request_params(self, params):
"""
Cleans given request params from common mistyping casts.
:param django.http.QueryDict params: The given request params.
:return: Sanitized params.
:rtype: dict
"""
params = copy.deepcopy(params)
for param in params.keys():
if "undefined" == params[param]:
params[param] = None
return params
def send(self, status, data=None):
return self.render_json_response({"status": status, "data": data})
class BaseItemDetailView(LoginRequiredMixin, DetailView):
def dispatch(self, request, *args, **kwargs):
# Check if there are already any historical data (Price model).
# Probably being downloaded right now.
if not self.get_object().item.price_set.exists():
messages.warning(request, _("Historical data are not ready yet."))
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
return self.model.objects.select_related(
"item", "item__share", "item__etf", "item__coin", "item__coin"
)
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
# Last price.
context["last_price"] = self.object.item.get_last_price(current=True)
context["last_price_rounded"] = (
context["last_price"].round(self.object.item)
if context["last_price"]
else None
)
# ATH
context["ath"] = self.object.item.get_ath()
# YOY price change
context["yoy_change"] = self.object.item.get_last_days_change(
365, percents=True
)
# 52 weeks low/high price
context["52_weeks_low_high"] = self.object.item.get_52_weeks_low_high()
# Drawdown
context["drawdown"] = self.object.item.get_drawdown()
# Transactions.
context["transactions_open"] = (
Transaction.objects.by_user(self.request.user)
.filter(item=self.object.item)
.filter(is_closed=False)
.order_by("-date", "-pk")
)
context["transactions_closed"] = (
Transaction.objects.by_user(self.request.user)
.filter(item=self.object.item)
.filter(is_closed=True)
.order_by("-date", "-pk")
)
# Custom ranges.
context["ranges"] = self.get_ranges()
# Basic data.
context["basic_info"] = self.object.item.get_basic_info()
# Headline chip text.
if self.object.is_archived:
context["headline_chip_text"] = _("archived")
if self.object.item.is_share():
context["last_dividend"] = self.object.item.share.dividend_set.order_by(
"ex_date"
).last()
elif self.object.item.is_etf():
context["last_dividend"] = self.object.item.etf.dividend_set.order_by(
"ex_date"
).last()
# Drawdown.
context["drawdown_options"] = json.dumps(DrawdownChart.get_options())
return context
def get_ranges(self):
ranges = []
last_price = self.object.item.price_set.last()
# Check for last transaction.
if self.object.item.transaction_set.exists():
trans = self.object.item.transaction_set.last()
ranges.append(
{
"title": "LT",
"from": datetime.combine(
trans.date, datetime.min.time()
).timestamp()
* 1000,
"to": (
last_price.datetime.timestamp() * 1000 if last_price else None
),
}
)
return json.dumps(ranges)
class BaseFetchItemAjaxView(AjaxView):
def get_item_prices(self, item, **kwargs):
"""
Fetches item prices from database.
:param Item item: Item model instance.
:return: Price and timestamp pairs in a list.
:rtype: tuple
"""
data = []
if hasattr(item, "coin"):
roundfn = coinautofloatformat
else:
roundfn = autofloatformat
for p in Price.objects.filter(item=item, **kwargs).order_by("datetime"):
data.append([p.get_timestamp(), roundfn(p.price, no_str=True)])
return data
def get_transactions(self, item):
"""
Fetches all related transaction from database.
:param Item item: Item model instance.
:return: Transactions as list of object with keys "x", "title" and "text".
:rtype: list
"""
data = []
offset = datetime.now(timezone.get_current_timezone()).utcoffset().seconds
for t in (
Transaction.objects.by_user(self.request.user)
.filter(item=item)
.order_by("date")
):
data.append(
{
"x": (time.mktime(t.date.timetuple()) + offset) * 1000,
"title": "+" if 0 < t.amount else "-",
"text": "{} {} ({} {})".format(
(
int(t.amount) if t.amount.is_integer() else t.amount
), # don't show floating point if the amount is whole number (i.e. 6.0)
(
ngettext("share", "shares", abs(t.amount))
if isinstance(item, Share)
else ngettext("coin", "coins", abs(t.amount))
),
intcomma(t.price),
_("per share") if isinstance(item, Share) else _("per coin"),
),
}
)
return data
def add_index_data(self, request, data):
"""
Adds index data (in requested by the request) to
data series.
:param object request: Django request object.
:param dict data: Data object to be send to the client.
"""
if "indexes" in request.GET and (indexes := request.GET["indexes"]):
data["indexes"] = []
for index_symbol in indexes.split(","):
item = get_object_or_404(Item, symbol=index_symbol)
data["indexes"].append(self.get_item_prices(item))
class BaseDeleteUserItemRedirectView(LoginRequiredMixin, RedirectView):
def can_be_deleted(self):
"""
Checks if the current item can be deleted - has no related transactions.
:return: True if can be deleted False otherwise
:rtype: bool
"""
user_item = get_object_or_404(UserItem, pk=self.kwargs["pk"])
return (
not Transaction.objects.by_user(self.request.user)
.filter(item=user_item.item)
.exists()
)
def delete_item_if_not_needed(self, item):
"""
Deletes an item if it's not used by any user.
:param int pk: Item ID.
"""
if not item.useritem_set.exists():
pk = item.pk
item.delete()
LOGGER.debug(
f"Item with ID {pk} has been deleted because it's not used by any user."
)
def delete(self):
user_item = get_object_or_404(
UserItem, user=self.request.user, pk=self.kwargs["pk"]
)
item = user_item.item
user_item.delete()
# TODO: delete Item in case no user uses that symbol (no UserItem) + refresh ItemWithPrices view.
self.delete_item_if_not_needed(item)
ItemWithPrices.refresh()
class BasePerformanceDetailView(LoginRequiredMixin, DetailView):
template_name = "core/item_performance.pug"
model = UserItem
def get_context_data(self, **kwargs):
chart = PerformanceChart(self.object.item)
context = super().get_context_data(**kwargs)
series = chart.get_series()
context["annual_performance"] = sorted(
series, key=lambda i: i["year"], reverse=True
)
context["performance_chart_options"] = self.get_chart_options(
chart.get_options(), series
)
context["basic_info"] = self.object.item.get_basic_info()
# LOGGER.debug(context["performance_chart_options"])
return context
def get_chart_options(self, base_options, data):
return base_options | {
"series": [
{
"type": "candlestick",
"name": _("Performance"),
"data": [
(
date_to_highcharts_timestamp(
i["datetime"].replace(month=1, day=1).to_pydatetime()
),
i["open"],
i["high"],
i["low"],
i["close"],
)
for i in data
],
},
{
"name": _("Current price"),
"type": "line",
"color": settings.CHART_COLORS[1],
},
],
}
class UserItemManipulationMixin:
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs["user"] = self.request.user
return kwargs
class BaseUserItemCreateView(
LoginRequiredMixin,
FormMessagesMixin,
UserItemManipulationMixin,
CreateView,
):
model = UserItem
def form_valid(self, form):
item, created = form.item_model.objects.get_or_create(
symbol=form.cleaned_data["symbol"]
)
if created:
self.item_created_callback(form, item)
self.fetch_data(item.pk)
form.instance.user = self.request.user
form.instance.item = item
to_return = super().form_valid(form)
ItemWithPrices.refresh()
return to_return
def item_created_callback(self, form, item):
pass
def get_open_items_possession_stats(self, type):
"""
Fetches open items (possessions) and it's current possession percentage.
:param string type: Item type to get stats for.
:return: Sorted dict (by value - reversed) where key
is the item symbol and value is possession percentage
relative to other items.
:rtype: dict
"""
# Get amounts.
stats_df = Transactions(self.request.user).get_transaction_basic_stats(
closed=False
)
if 0 == len(stats_df):
return {}
pile = Transactions.get_pile_stats(stats_df[stats_df["type"] == type])
symbols = defaultdict(int)
for root_parent, stats in pile:
for item in stats["open_symbols"]:
symbols[item["symbol"]] += item["market_price"]
# Convert to percentages.
total = sum(symbols.values())
perc_symbols = {}
for key, value in symbols.items():
if value:
perc_symbols[key] = 100 / (total / value)
else:
perc_symbols[key] = 0
# Return as a new sorted (by value) dict.
return dict(sorted(perc_symbols.items(), key=lambda i: i[1], reverse=True))
def sort_items_by_other_dict_keys(self, items, other_dict):
"""
Sorts the given items list based on the other dict keys
which should be item symbols.
:param list items: List of UserItems objects to be sorted.
:param dict other_dict: The (sorted )dict where key is item symbol.
:return: Sorted items.
:rtype: list
"""
symbol_indexes = list(other_dict.keys())
return sorted(items, key=lambda i: symbol_indexes.index(i.item.symbol))
def split_open_items(self, items):
open_items = self.request.user.get_owned_items()
open_items_list = []
other_items_list = []
# Split shares into open and the rest.
for i in items:
if i.item in open_items:
open_items_list.append(i)
else:
other_items_list.append(i)
return open_items_list, other_items_list
class BaseHistoricalData(LoginRequiredMixin, ModelPaginatorMixin, ListView):
model = Price
paginate_by = 60 # 3 months
template_name = "core/item_historical_data.pug"
def get_queryset(self):
self.object = get_object_or_404(UserItem, pk=self.kwargs["pk"])
user_item = UserItem.objects.get(pk=self.kwargs["pk"])
# Annotate day change
return self.model.objects.annotate(
change=100.0
* (
# TODO: order_by: can be replaced with "-datetime"
F("price") / Window(Lead("price"), order_by=F("datetime").desc()) - 1
)
).filter(item=user_item.item)
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["object"] = self.object
context["min_change"] = min(
context["page_obj"].object_list, key=lambda i: i.change or 0
).change
context["max_change"] = max(
context["page_obj"].object_list, key=lambda i: i.change or 0
).change
context["object_list"] = self.regroup_and_calculate_month_changes(
context["object_list"]
)
context["basic_info"] = self.object.item.get_basic_info()
return context
def regroup_and_calculate_month_changes(self, data):
data = self.split_by_months(data)
return self.calculate_month_changes(data)
def split_by_months(self, data):
"""
Splits the given dataset (a list of objects) into batches
by object property by year and month.
Output list looks like:
[
{
"date": date(),
"records": [
{...},
...
],
}
]
:param list data: List of data objects.
:param str date_field: Date(time) property name.
:return: Grouped (split) dataset for each month.
:rtype: list
"""
def fetch(record):
timestamp = getattr(record, "datetime")
return [timestamp.year, timestamp.month]
output = []
for [year, month], data in itertools.groupby(data, fetch):
output.append({"date": date(year, month, 1), "records": list(data)})
return output
def calculate_month_changes(self, data):
"""
Extends the structure coming from self.split_by_months() for
"month_change" key which stores the month price change (1st - last day of the month).
"""
for i, group in enumerate(data):
first_of_month = group["records"][-1]
last_of_month = group["records"][0]
data[i]["month_change"] = calc_percentage_change(
last_of_month.price, first_of_month.price
)
return data
##################
# Core views #
##################
class HealthCheckView(JSONResponseMixin, View):
def get(self, request, *args, **kwargs):
"""Lightweight healthcheck endpoint for Docker."""
return self.render_json_response({"status": "ok"})
class DashboardTemplateView(
LoginRequiredMixin, TransactionChartDataMixin, TemplateView
):
template_name = "core/dashboard.pug"
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["open_shares"], context["shares"] = self.get_share_stats()
context["indexes"] = self.get_index_stats()
context["open_etfs"], context["etfs"] = self.get_etf_stats()
context["open_coins"], context["coins"] = self.get_coin_stats()
context["portfolio_stats"] = self.get_portfolio_stats()
context["all_share_tags"] = set(
itertools.chain(
*[
s["share"].get_tags()
for s in itertools.chain(context["open_shares"], context["shares"])
]
)
)
context["all_index_tags"] = set(
itertools.chain(*[i["index"].get_tags() for i in context["indexes"]])
)
context["all_etf_tags"] = set(
itertools.chain(
*[
r["etf"].get_tags()
for r in itertools.chain(context["open_etfs"], context["etfs"])
]
)
)
context["all_coin_tags"] = set(
itertools.chain(
*[
c["coin"].get_tags()
for c in itertools.chain(context["open_coins"], context["coins"])
]
)
)
return context
def get_share_stats(self):
"""
Fetches shares and shares chart from database.
"""
open_shares = []
shares = []
# Items with currently open transactions.
open_share_symbols = [
t.item.symbol
for t in Transaction.objects.by_user(self.request.user)
.prefetch_related("item")
.filter(is_closed=False, item__share__isnull=False)
.distinct("item")
]
# Stats for each share.
for i in (
UserItem.objects.by_user(self.request.user)
.select_related("item")
.extra(
select={
"perf_chart": "SELECT image FROM core_asset WHERE core_useritem.item_id = core_asset.item_id AND core_asset.type = %s"
},
select_params=(Asset.PERFORMANCE_CHART,),
)
.filter(
item__share__isnull=False, show_on_dashboard=True, is_archived=False
)
.order_by("item__symbol")
):
if i.item.symbol in open_share_symbols:
open_shares.append({"share": i, "chart": Asset(image=i.perf_chart)})
else:
shares.append({"share": i, "chart": Asset(image=i.perf_chart)})
return open_shares, shares
def get_index_stats(self):
"""
Fetches indexes and indexes chart from database.
"""
indexes = []
# Stats for each index.
for i in (
UserItem.objects.by_user(self.request.user)
.extra(
select={
"perf_chart": "SELECT image FROM core_asset WHERE core_useritem.item_id = core_asset.item_id AND core_asset.type = %s"
},
select_params=(Asset.PERFORMANCE_CHART,),
)
.filter(
item__index__isnull=False, show_on_dashboard=True, is_archived=False
)
.order_by("item__symbol")
):
indexes.append({"index": i, "chart": Asset(image=i.perf_chart)})
return indexes
def get_etf_stats(self):
"""
Fetches ETFs and ETFs chart from database.
"""
open_etfs = []
etfs = []
# Items with currently open transactions.
open_etf_symbols = [i.symbol for i in self.request.user.get_owned_items()]
# Stats for each etf.
for i in (
UserItem.objects.by_user(self.request.user)
.extra(
select={
"perf_chart": "SELECT image FROM core_asset WHERE core_useritem.item_id = core_asset.item_id AND core_asset.type = %s"
},
select_params=(Asset.PERFORMANCE_CHART,),
)
.filter(item__etf__isnull=False, show_on_dashboard=True, is_archived=False)
.order_by("item__symbol")
):
if i.item.symbol in open_etf_symbols:
open_etfs.append({"etf": i, "chart": Asset(image=i.perf_chart)})
else:
etfs.append({"etf": i, "chart": Asset(image=i.perf_chart)})
return open_etfs, etfs
def get_coin_stats(self):
"""
Fetches coins and coins chart from database.
"""
open_coins = []
coins = []
# Items with currently open transactions.
positives = (
Transaction.objects.by_user(self.request.user)
.positive_balance()
.values_list("item", flat=True)
)
open_coin_symbols = [
t.item.symbol
for t in Transaction.objects.by_user(self.request.user)
.select_related("item")
.filter(is_closed=False, item__in=positives, item__coin__isnull=False)
.distinct("item")
]
# Stats for each coin.
for i in (
UserItem.objects.by_user(self.request.user)
.select_related("item")
.extra(
select={
"perf_chart": "SELECT image FROM core_asset WHERE core_useritem.item_id = core_asset.item_id AND core_asset.type = %s"
},
select_params=(Asset.PERFORMANCE_CHART,),
)
.filter(item__coin__isnull=False, show_on_dashboard=True, is_archived=False)
.order_by("item__symbol")
):
if i.item.symbol in open_coin_symbols:
open_coins.append({"coin": i, "chart": Asset(image=i.perf_chart)})
else:
coins.append({"coin": i, "chart": Asset(image=i.perf_chart)})
return open_coins, coins
def get_portfolio_stats(self):
def get_investment_and_market_value(data):
"""
Fetches portfolio stats for each currency (currently open transactions).
Output dict looks like this:
{
"USD": {
"Investment": 36889.6,
"Market value": 104630.90000000001
}
}
:param dict data: Input data from ``TransactionChartDataMixin.get_transaction_basic_chart_data()`` method.
:return: Dict with investments and market value for each currency.
:rtype: dict
"""
stats = defaultdict(lambda: defaultdict(int))
for item_type, currency_data in data["investment_and_revenue"].items():
for currency, currency_stats in currency_data.items():
for stat in currency_stats:
stats[currency][stat["name"]] += stat["data"][0]
return json.loads(json.dumps(stats))
data = self.get_transaction_basic_chart_data()
investment_and_market_value = {}
market_value_ratio_chart_data = {}
if data:
investment_and_market_value = get_investment_and_market_value(data)
market_value_ratio_chart_data = (
DashboardMarketValueRatioPieChart().get_series(data)
)
return {
"investment_and_market_value": investment_and_market_value,
"market_value_ratio_chart_data": market_value_ratio_chart_data,
}
class SearchRedirectView(LoginRequiredMixin, RedirectView):
def get_redirect_url(self, *args, **kwargs):
q = self.request.GET["query"]
# Try shares.
try:
i = UserItem.objects.by_user(self.request.user).get(
item__symbol=q.upper(), item__share__isnull=False
)
except UserItem.DoesNotExist:
pass
else:
return reverse("shares:share_detail", args=[i.pk])
# Try indexes.
try:
i = UserItem.objects.by_user(self.request.user).get(
item__symbol=q.upper(), item__index__isnull=False
)
except UserItem.DoesNotExist:
pass
else:
return reverse("indexes:index_detail", args=[i.pk])
# Try coins.
try:
i = UserItem.objects.by_user(self.request.user).get(
item__symbol=q.upper(), item__coin__isnull=False
)
except UserItem.DoesNotExist:
pass
else:
return reverse("coins:coin_detail", args=[i.pk])
# Try ETFs.
try:
i = UserItem.objects.by_user(self.request.user).get(
item__symbol=q.upper(), item__etf__isnull=False
)
except UserItem.DoesNotExist:
pass
else:
return reverse("etfs:etf_detail", args=[i.pk])
# Use YAHOO! search as fallback.
return f"https://finance.yahoo.com/quote/{q}?p=V&.tsrc=fin-srch"
class SignInFormView(LoginView):
template_name = "core/sign_in.pug"
class SignOutRedirectView(RedirectView):
def get_redirect_url(self):
logout(self.request)
return reverse(settings.LOGIN_URL)
class ItemCurrentPriceAjaxView(DetailView, AjaxView):
model = Item
slug_field = "symbol"
def get(self, request, *args, **kwargs):
self.object = self.get_object()
# Fetch price changes and cache it.
price = Manager.get_current_price_and_change(self.object)
if price:
self.object.set_current_price_and_change(price)
try:
return self.send(True, price.round(self.object) if price else {})
except Exception:
LOGGER.exception("Couldn't retrieve last price.")
return self.send(False)
class FetchDrawdownAjaxView(AjaxView):
def get(self, request, *args, **kwargs):
user_item = get_object_or_404(UserItem, pk=self.kwargs["pk"])
return self.send(True, DrawdownChart(user_item.item).get_series())
class BaseDrawdownPeriodsAndAthsDetailView(
LoginRequiredMixin, PaginatorMixin, DetailView
):
template_name = "core/item_drawdowns.pug"
model = UserItem
def get_context_data(self, **kwargs):
chart = AthPeriodsChart(self.object.item)
aths = self.object.item.get_aths()
context = super().get_context_data(**kwargs)
context["aths_chart_series"] = chart.get_series(aths)
context["aths_chart_options"] = chart.get_options()
context["drawdowns"], context["pages"] = self.compile_drawdown_table_data(aths)
context["basic_info"] = self.object.item.get_basic_info()
return context
def compile_drawdown_table_data(self, aths):
aths.name = "ath"
aths = aths.reset_index()
aths["datetime_prev"] = aths["datetime"].shift()
aths["days"] = aths.apply(
lambda row: (row["datetime"] - row["datetime_prev"]).days, axis=1
).fillna(0)
rows = aths.sort_index(ascending=False).to_dict(orient="records")
return self.get_paginator(rows)
######################
# Splitter views #
######################
class BaseSplitterFormView(LoginRequiredMixin, FormView):
# preview_template = "shares/splitter/preview.pug"
form_class = SplitterForm
template_name = "core/splitter/form.pug"
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs["model"] = self.model
return kwargs
def form_valid(self, form):
self.request.session["splitter"] = {
"item": form.cleaned_data["item"].pk,
"date": form.cleaned_data["date"].isoformat(),
"ratio": form.cleaned_data["ratio"],
}
return super().form_valid(form)
class BaseSplitterPreviewFormView(LoginRequiredMixin, FormMessagesMixin, FormView):
"""
Accessible only to admins.
"""
template_name = "core/splitter/preview.pug"
form_class = SplitterPreviewForm
def dispatch(self, request, *args, **kwargs):
if "splitter" not in request.session:
messages.error(request, _("No splitter data. Please fill the form."))
return HttpResponseRedirect(self.request.path)
self.splitter = request.session["splitter"]
self.item = Item.objects.get(pk=self.splitter["item"])
self.date = datetime.fromisoformat(self.splitter["date"])
self.ratio = self.splitter["ratio"]
self.transactions = self.item.transaction_set.filter(date__lt=self.date)
return super().dispatch(request, *args, **kwargs)
def form_valid(self, form):
for trans in self.transactions:
# Split.
trans.price /= self.ratio
trans.amount *= self.ratio
if trans.note:
trans.note += "\n"
else:
trans.note = ""
trans.node = f"(split {date_filter(timezone.now(), 'DATETIME_FORMAT')}"
trans.save()
# Refetch prices.
self.fetch_historical_data()
return super().form_valid(form)
def get_form_valid_message(self):
message = _("Item %(item)s has been split with ratio %(ratio)s:1") % {
"item": self.item.symbol,
"ratio": int(self.ratio),
}
# Clean up session.
del self.request.session["splitter"]
return message
def get_success_url(self):
item = UserItem.objects.by_user(self.request.user).get(item=self.item)
if self.item.is_share():
return reverse("shares:share_detail", args=[item.pk])
if self.item.is_etf():
return reverse("etfs:etf_detail", args=[item.pk])
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["item"] = self.item
context["date"] = self.date
context["ratio"] = self.ratio
context["transactions"] = self.transactions
return context
#####################
# The eye views #
#####################
class BaseTheEyeView(LoginRequiredMixin, DetailView):
template_name = "core/the_eye.pug"
model = UserItem
def get_context_data(self, **kwargs):
# Is here because of sitetree.
context = super().get_context_data(**kwargs)
context["basic_info"] = self.object.item.get_basic_info()
return context
class TheEyeAjaxView(AjaxView):
def get(self, request, *args, **kwargs):
open = None
close = None
user_item = UserItem.objects.get(pk=kwargs["id"])
cache_key = f"item-{user_item.item.pk}-market-live"
# Try cache.
if data := cache.get(cache_key):
return self.send(True, data)
data = []
df, open_ts, close_ts = scraper.Manager.fetch_intraday_prices(user_item.item)
if df.empty:
return self.send(False)
# Share or index or ETF.
if (
user_item.item.is_share()
or user_item.item.is_index()
or user_item.item.is_etf()
):
df.index = df.index.tz_convert(settings.TIME_ZONE)
for idx, r in df.iterrows():
data.append(
{
"x": date_to_highcharts_timestamp(idx.to_pydatetime()),
"open": autofloatformat(r["Open"], no_str=True),
"high": autofloatformat(r["High"], no_str=True),
"low": autofloatformat(r["Low"], no_str=True),
"close": autofloatformat(r["Close"], no_str=True),
},
)
if open_ts:
open = date_to_highcharts_timestamp(
open_ts.tz_convert(settings.TIME_ZONE)
)
if close_ts:
close = date_to_highcharts_timestamp(
close_ts.tz_convert(settings.TIME_ZONE)
)
data = {
"symbol": user_item.item.symbol.upper(),
"open": open,
"close": close,
"min": {
"value": df["Low"].min(),
"formatted": intcomma(autofloatformat(df["Low"].min())),
},
"max": {
"value": df["High"].max(),
"formatted": intcomma(autofloatformat(df["High"].max())),
},
"prices": data,
}
elif user_item.item.is_coin():
for idx, r in df.iterrows():
data.append(
{
"x": date_to_highcharts_timestamp(idx.to_pydatetime()),
"open": autofloatformat(r["open"], no_str=True),
"low": autofloatformat(r["low"], no_str=True),
"high": autofloatformat(r["high"], no_str=True),
"close": autofloatformat(r["close"], no_str=True),
},
)
data = {
"symbol": user_item.item.symbol.upper(),
"min": {
"value": df["low"].min(),
"formatted": intcomma(autofloatformat(df["low"].min())),
},
"max": {
"value": df["high"].max(),
"formatted": intcomma(autofloatformat(df["high"].max())),
},
"prices": data,
}
# Cache data.
if data:
cache.set(cache_key, data, settings.ITEM_MARKET_LIVE_CACHE_TIMEOUT)
return self.send(True, data)