import io
import json
import logging
import math
import time
from collections import defaultdict
from datetime import datetime
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from django.conf import settings
from django.db.models import Sum
from django.template.defaultfilters import date as date_filter
from django.utils.translation import gettext as _
from django.utils.translation import ngettext_lazy
from matplotlib import ticker
from PIL import ImageColor
from ..core.models import Asset
from ..core.templatetags.utils import to_quarter_period
from ..staking.models import Staking
from ..transactions.models import EtfDividendTransaction, ShareDividendTransaction
from ..transactions.transactions import Transactions
from .math import calc_percentage_change
from .models import Price
from .templatetags.utils import autofloatformat, coinautofloatformat
matplotlib.use("Agg") # noqa
logger = logging.getLogger("richy.celery")
#########################
# Matplotlib charts #
#########################
[docs]
class BaseChart:
"""
Common class for all chart classes.
"""
def material(self):
"""
Set's material theme for Seaborn and Matplotlib - colors + grid.
"""
# mpl.rcParams["axes.prop_cycle"] = cycler(color=settings.CHART_COLORS)
sns.set_palette(settings.CHART_COLORS)
sns.set_style("whitegrid", rc={"grid.color": ".9"})
matplotlib.style.use("seaborn-whitegrid")
matplotlib.style.use({"grid.color": ".9"})
[docs]
def export(self, format="svg"):
"""
Exports the currently being rendered chart in the
given format.
:param str format: Format the chart is gonna be exported in.
:return: Chart as bytes.
:rtype: io.BytesIO
"""
# Render the chart.
buffer = io.BytesIO()
plt.savefig(buffer, format=format, bbox_inches="tight", dpi=80)
buffer.seek(0)
logger.debug("Chart exported as bytes.")
return buffer
def timestamp_to_date(self, x, _pos=None):
"""
Converts Unix timestamp to human readable data.
Used as ticker formatter - FuncFormatter.
:param int x: Value (unix timestamp) to be formatted.
:param int _pos: Position.
:return: Formatted date.
:rtype: str
"""
return date_filter(datetime.fromtimestamp(x), "DATE_FORMAT")
[docs]
class Manager(BaseChart):
def linear_regression(self, q):
def add_future(ax):
"""
Augment the X limit to the near future - 30% of current timeline.
"""
days = math.floor(
(df["timestamp"].max() - df["timestamp"].min()) / (60 * 60 * 24) / 2
)
ax.set_xlim(
[df["timestamp"].min(), df["timestamp"].max() + 60 * 60 * 24 * days]
)
return ax
# Prepare data and buffer.
df = q.to_pandas()
if not len(df):
return
# Set chart style.
sns.set(font_scale=0.75)
self.material()
# Create new series with Unix timestamp.
df["timestamp"] = df.apply(
lambda row: time.mktime(row["datetime"].timetuple()), axis=1
)
# Set size of the chart.
fig, ax = plt.subplots()
fig.set_size_inches(35 / 2.54, 12.5 / 2.54)
ax = add_future(ax)
chart = sns.regplot(
x="timestamp",
y="price",
data=df,
ax=ax,
line_kws={"color": settings.CHART_COLORS[1]},
)
plt.xlabel("")
plt.ylabel("")
# plt.xticks(rotation=45)
# Set formatter for X axis - timeline.
chart.get_xaxis().set_major_formatter(
ticker.FuncFormatter(self.timestamp_to_date)
)
return self.export()
#########################
# Highcharts charts #
#########################
[docs]
class TransactionOverviewChart:
"""
Highcharts transaction overview chart used in transaction open overview pages.
"""
def __init__(self, user, df):
"""
:param pandas.DataFrame df: Dataframe with transaction basic stats.
"""
self.user = user
self.df = df
def get_series(self):
"""
Creates list of series from dataframe (``self.df``).
Each series looks like:
{
'name': 'Investments',
'data': [
780.2199999999999,
1154.93,
1062.6,
1190.031984,
1993.90252,
1747.117446,
581.83644394,
579.18887804,
2115.843868,
1129.6,
1706.89695
],
'stack': 'investments'
}
:return: list of series
:rtype: list
"""
groups = Transactions(self.user).get_pile_stats(self.df)
if not groups:
return pd.Series()
df_list = []
# alpha = list(string.ascii_uppercase)
# symbols = []
# Creates a list of dataframes where each has following columns:
# * investments
# * market_value
# * sold
for root_transactions, stats in groups:
df_list.append(
pd.DataFrame(
{
"investments": pd.Series(
list(stat.values())[0] for stat in stats["investments"]
),
"market_value": pd.Series(
stat["market_price"] for stat in stats["open_symbols"]
),
"market_value_symbols": pd.Series(
stat["symbol"] for stat in stats["open_symbols"]
),
"sold": pd.Series(
list(stat.values())[0] for stat in stats["incomes"]
),
}
)
)
# Concat all dataframes.
df_all = pd.concat(df_list, axis=1)
series = []
series.extend(self.df_to_series(df_all, "Investments", "investments"))
series.extend(
self.df_to_series(
df_all, "Market values", "market_value", "market_value_symbols"
)
)
series.extend(self.df_to_series(df_all, "Sold", "sold"))
return series
@staticmethod
def get_options(item_type, series):
"""
Returns options for Highcharts chart as JSON.
:param str item_type: Item type the chart options are for - "share", "coin", "etf".
:param list series: List of data series.
:return: Options JSON.
:rtype: str
"""
def get_x_labels(series):
"""
Fetches all symbols from each market value series stacks
and groups them by category separated by comma.
:param list series: List of data series.
:return: Labels for each series.
:rtype: list
"""
labels = []
cleaned_labels = []
# Collect labels from all series.
for ser in series:
if "market_value" == ser["stack"]:
labels.append([data["name"] for data in ser["data"]])
# Filter and combine labels together.
for labels in zip(*labels):
cleaned_labels.append(
[
str(label)
for label in labels
if isinstance(label, str) or not np.isnan(label)
]
)
return cleaned_labels
options = json.dumps(
{
"xAxis": {"categories": get_x_labels(series)},
"plotOptions": {"column": {"stacking": "normal"}},
"chart": {"zooming": {"type": "x"}},
}
)
if "coin" == item_type:
precision = str(settings.COINS_MAX_PRECISION)
else:
precision = str(settings.MAX_PRECISION)
options = (
options[:-1]
+ """
,tooltip: {
formatter: function() {
return '<strong>' + this.series.name + '</strong><br/>' +
(this.point.name ? this.point.name : this.series.name) + ': ' + this.y.toFixed("""
+ precision
+ """) + '<br/>' +
'Total: ' + this.point.stackTotal.toFixed("""
+ precision
+ """);
},
},
}
"""
)
return options
def df_to_series(self, df, name, values_col, labels_col=None):
"""
Converts a group (based on ``values_col`` param) into
chart series. Also names each series by name and
adds ``values_col`` name.
Dataframe (input) looks like this:
investments market_value market_value_symbols sold investments market_value market_value_symbols sold
0 780.22 1727.04 QCOM NaN 1154.93 1841.77 FB NaN
1 NaN NaN NaN NaN NaN NaN NaN NaN
:param pd.DataFrame df: Padans dataframe with sums.
:param str name: Series name.
:param str values_col: Column name with values (also used as stack).
:param str labels_col: Column name with labels.
:return: list of series
:rtype: list
"""
series = []
labels = []
if labels_col:
df_tmp = df.loc[:, labels_col]
# Make sure we work with a dataframe.
if isinstance(df_tmp, pd.Series):
df_tmp = df_tmp.to_frame()
# Labels.
for __, row in df_tmp.iterrows():
labels.append(row.to_list())
# Convert df to series.
df = df.loc[:, values_col]
# Make sure we work with a dataframe.
if isinstance(df, pd.Series):
df = df.to_frame()
for idx, row in df.dropna(how="all").fillna(0).iterrows():
to_append = {"name": name, "data": row.to_list(), "stack": values_col}
if labels:
for i, value in enumerate(to_append["data"]):
to_append["data"][i] = {"y": value, "name": labels[idx][i]}
series.append(to_append)
return series
[docs]
class TransactionsBasicChart:
def __init__(self, df, pile):
"""
:param pandas.DataFrame df: Dataframe with transaction basic stats.
:param list pile: A pile with closed/open stats.
"""
self.df = df
self.pile = pile
def get_investments(self):
"""
Calculates investments and open symbols or ``self.df``.
Investments pandas.Series looks like:
::
type currency
coin USD 21641.885258
share USD 2997.750000
Open symbols structure:
.. code-block:: python
{
'currency': 'USD',
'market_price': 2030.1399999999999,
'symbol': 'QCOM',
'type': 'share'
}
:return: Tuple with investments (pandas.Series) and open symbols (list).
:rtype: tuple
"""
# Walk trhu pile and collect each open symbol stats.
open_symbols = []
investments = []
for _root_parent, stats in self.pile:
for open_symbol in stats["open_symbols"]:
open_symbols.append(
{
"symbol": open_symbol["symbol"],
"market_price": open_symbol["market_price"],
"type": self.df[
self.df["symbol"] == open_symbol["symbol"]
].iloc[0]["type"],
"currency": self.df[
self.df["symbol"] == open_symbol["symbol"]
].iloc[0]["currency"],
}
)
for investment in stats["investments"]:
pk, amount = list(investment.items())[0]
investments.append(
{
# TODO: is this column in future DF needed?
"investment_pk": pk,
"investment": amount,
# we can safely copy from last open_symbols item
"type": open_symbols[-1]["type"],
"currency": open_symbols[-1]["currency"],
}
)
# Create dataframe from collected stats.
# Dataframe looks like:
# investment_pk investment
# type currency
# share USD 6.0 780.220000
# USD 9.0 1154.930000
# USD 13.0 1062.600000
# coin USD 14.0 1190.031984
# USD 35.0 5158.435500
# USD 39.0 1183.470000
df_investments = pd.DataFrame.from_dict(investments)
df_investments = df_investments.set_index(["type", "currency"])
# Calculate market values and investments for each type-currency combo.
return (df_investments.groupby(level=[0, 1]).sum()["investment"], open_symbols)
def get_revenues(self):
"""
Calculates revenues as pandas.Series.
Structure looks like:
::
type currency
coin USD 5175.807242
share CZK 8440.000000
USD 9150.210000
:return: Revenues series with multi-index.
:type: pandas.Series
"""
# In case of closed transactions we can simply sum up closing transactions.
df = self.df.set_index(["type", "currency"])
return (
df[df["is_closing"] == True] # noqa
.groupby(level=[0, 1])["balance_in_time"]
.sum()
)
def get_market_values(self, open_symbols):
"""
Calculates market values as pandas.Series.
Structure looks like:
::
type currency
coin USD 21086.637623
share USD 5844.470000
:return: Market values series with multi-index.
:type: pandas.Series
"""
# Create dataframe from collected stats.
# Dataframe looks like:
#
# symbol market_price
# type currency
# share USD QCOM 1727.040000
# USD FB 1841.770000
# USD FB 1578.660000
# coin USD ETH 0.000000
# USD TRX 1404.240576
# USD ETH 5770.068180
# USD STA 352.966568
df_market_values = pd.DataFrame.from_dict(open_symbols)
df_market_values = df_market_values.set_index(["type", "currency"])
return df_market_values.groupby(level=[0, 1]).sum()["market_price"]
[docs]
class TransactionsBasicColumnChart(TransactionsBasicChart):
"""
Highcharts transaction basic charts for each type and currency - investment
vs. market value column chart.
"""
def get_series(self, closed=False):
"""
Converts the given dataframe into dict (type) of dicts
(currency) of lists (symbols and investments and revenues).
Output dict looks like this:
{
"share": {
"CZK": [
{"data": -8150.0, "name": "Investment"},
{"data": 8490.0, "name": "Revenue"}
],
"USD": [
{"data": -3206.83, "name": "Investment"},
{"data": 4033.0899999999997, "name": "Revenue"}
]
},
"coin": {...}
}
:param pd.DataFrame df: Transaction basic stats dataframe.
:return: Dict with struct described above.
:rtype: dict
"""
investments, open_symbols = self.get_investments()
if closed:
revenues = self.get_revenues()
else:
market_values = self.get_market_values(open_symbols)
# Nested defaultdict.
tree = lambda: defaultdict(tree) # noqa
data = tree()
# Investments has the same structure including indexes as revenues or
# market_values (depends on ``closed`` param). Therefore we can
# iterate thru them and then fetch revenues or market values based
# on those indexes.
for (type_, currency), investment in investments.items():
if "coin" == type_:
roundfn = coinautofloatformat
else:
roundfn = autofloatformat
data[type_][currency] = [
{
"name": _("Investment"),
"data": [roundfn(investment, no_str=True)],
},
{
"name": _("Revenue") if closed else _("Market value"),
"data": [
roundfn(
(
revenues[type_][currency]
if closed
else market_values[type_][currency]
),
no_str=True,
)
],
},
]
return data
[docs]
class TransactionBasicPieChart(TransactionsBasicChart):
"""
Highcharts transaction basic charts for each type and currency - profits
column chart.
"""
def get_series(self, closed=False, no_negatives=True):
"""
Converts the given dataframe into dict (type) of dicts
(currency) of lists (symbols and net).
Output dict looks like this:
{
"share": {
"USD": [
{
"name": "Profit",
"data": {
"name": "FB", "y": -2233.43,
"name": "QCOM", "y": -788.17
}
}
]
},
"coin": {
"USD": [
{
"name": "Profit",
"data": [
{"name": "ETH, TRX", "y": 321.3},
{"name": "ETH, STA", "y": 2384.2},
{"name": "ETH, VIDYA", "y": -799.8},
{"name": "XRP", "y": -2285.9},
{"name": "BTC, TRX", "y": -492.6},
{"name": "BTC", "y": 1254.1},
{"name": "ETC", "y": -692.4},
{"name": "DASH", "y": -170.7},
{"name": "CRO", "y": -73.5}
]
}
]
}
}
:param pd.DataFrame df: Transaction basic stats dataframe.
:param bool no_negatives: If True filters out 0 or negative values (every loss).
:return: Dict with struct described above.
:rtype: dict
"""
data = {}
# Sums pile into investment vs market value or revenue (depends on closed param).
# Final dict looks like:
# {
# 'TSLA': {
# 'type': 'share',
# 'currency': 'USD',
# 'investment': 754.24,
# 'revenue': 890.04
# },
# ...
# }
if closed:
for root_parent, stats in self.pile:
investment_sum = sum(
[list(inv.values())[0] for inv in stats["investments"]]
)
symbols = []
revenue = 0
type_ = self.df[
self.df["symbol"] == stats["open_symbols"][0]["symbol"]
].iloc[0]["type"]
currency = self.df[
self.df["symbol"] == stats["open_symbols"][0]["symbol"]
].iloc[0]["currency"]
for income in stats["incomes"]:
revenue += list(income.values())[0]
# Digs out a symbol based on transaction PK from self.df.
symbols.append(
self.df[self.df.index == list(income.keys())[0]].iloc[0][
"symbol"
]
)
# Add calculated values to data dict.
key = ", ".join(sorted(symbols))
if key in data:
data[key]["investment"] += investment_sum
data[key]["revenue"] += revenue
else:
data[key] = {
"type": type_,
"currency": currency,
"investment": investment_sum,
"revenue": revenue,
}
else:
for root_parent, stats in self.pile:
investment_sum = sum(
[list(inv.values())[0] for inv in stats["investments"]]
)
symbols = []
market_value = 0
type_ = self.df[
self.df["symbol"] == stats["open_symbols"][0]["symbol"]
].iloc[0]["type"]
currency = self.df[
self.df["symbol"] == stats["open_symbols"][0]["symbol"]
].iloc[0]["currency"]
for open_symbol in stats["open_symbols"]:
market_value += open_symbol["market_price"]
symbols.append(open_symbol["symbol"])
# Add calculated values to data dict.
key = ", ".join(sorted(symbols))
if key in data:
data[key]["investment"] += investment_sum
data[key]["market_value"] += market_value
else:
data[key] = {
"type": type_,
"currency": currency,
"investment": investment_sum,
"market_value": market_value,
}
# Nested defaultdict.
tree = lambda: defaultdict(tree) # noqa
to_return = tree()
for symbol, stats in data.items():
revenue_or_market_value = (
stats["revenue"] if closed else stats["market_value"]
)
if "coin" == stats["type"]:
roundfn = coinautofloatformat
else:
roundfn = autofloatformat
profit = roundfn(revenue_or_market_value - stats["investment"], no_str=True)
# In case we don't want negative values (loss) in final data
# we simply skip such records.
if no_negatives and 0 >= profit:
continue
if isinstance(to_return[stats["type"]][stats["currency"]], list):
to_return[stats["type"]][stats["currency"]][0]["data"].append(
{"name": symbol, "y": profit}
)
else:
to_return[stats["type"]][stats["currency"]] = [
{
"name": _("Profit"),
"data": [{"name": symbol, "y": profit}],
}
]
return to_return
[docs]
class TransactionDetailColumnChart:
"""
Highcharts transaction detail column chart for a specified transaction.
"""
def __init__(self, df):
"""
:param pandas.DataFrame df: Dataframe with transaction basic stats.
"""
self.df = df
def get_series(self, trans):
"""
Calculates deposit, investment and revenue sums based on current transactions
(``trans`` param) like:
* deposit: is calculated as sum of all unique root parent transactions
as amount * price.
Trans is positive:
* investment: is current transaction calculated like price * amount
* revenue: sums all transactions which has the current transaction
as parent. These transactions are always (otherwise it
doesn't make sense) negative so we calculate all the sells
which come from current transaction.
Revenue can be:
* none - no children found
* single - one child transaction found
* multiple - more than one child transaction found
Trans is negative:
* investment: sums all parent transactions. These transactions are always
(otherwise it doesn't make sense) positive so we calculate all
the previous unique direct buys like amount * price.
Investment can be:
* single - one parent transaction found
* multiple - more than one parent transaction found
* revenue: is current transaction calculated like price * amount
Returns data: [
{
"data": [1000.0],
"name": "Deposit"
},
{
"data": [871.1],
"name": "Revenue"
},
{
"data": [847.8],
"name": "Investment"
}
]
:param richy.transactions.models.Transaction trans: Current transaction model instance.
:return: A list with 2 or 3 items (dicts) - (deposit), revenue and investment
:rtype: list
"""
deposit = 0
investment = 0
revenue = 0
# Deposit.
root_parents = self.df.loc[trans.pk]["root_parent"]
# In case of multiple deposits we can say deposit is shared.
# Fetches all unique root parents.
if isinstance(root_parents, pd.Series):
root_parents = root_parents.unique()
else:
root_parents = [root_parents]
# Sum up all deposits.
df_deposits = self.df.loc[root_parents].groupby("pk").first()
for __, row in df_deposits.iterrows():
deposit += abs(row["amount"] * row["price"])
# Rounding function.
if trans.item.is_coin:
roundfn = coinautofloatformat
else:
roundfn = autofloatformat
deposit_data = {
"name": _("Deposit (shared)") if 1 < len(root_parents) else _("Deposit"),
"data": [roundfn(deposit, no_str=True)],
}
# Positive transaction (buy).
if trans.is_positive:
# Revenue can be:
# none (not sold yet)
# single sell
# multiple sells
# Fetching all transactions which has current transaction
# as parent - grouping by PK and fetching first records of each group
# gives us unique transaction records.
df_children = self.df[self.df.parents == trans.pk].groupby("pk").first()
visible = True
if df_children.empty:
revenue = 0
revenue_title = "Revenue (none)"
visible = False
elif 1 == len(df_children):
revenue = abs((df_children.amount * df_children.price).sum())
revenue_title = _("Revenue")
else:
revenue = abs((df_children.amount * df_children.price).sum())
revenue_title = _("Revenue (multiple)")
to_return = [
{
"name": _("Investment"),
"data": [roundfn(trans.amount * trans.price, no_str=True)],
},
{
"name": revenue_title,
"data": [roundfn(revenue, no_str=True)],
"visible": visible,
},
]
# In case current positive transaction has no children (has no following
# transactions) and is open we care about current market value.
if not trans.is_closed and df_children.empty:
to_return.append(
{"name": _("Market value"), "data": [trans.get_market_value()]}
)
# In case current transaction is deposit we don't need
# to explicitly return deposit - we already return investment
# which is the same in this case.
if not trans.is_deposit:
to_return.insert(0, deposit_data)
# Add stakinngs sum (if any).
self.get_stakings_sum(to_return, trans)
# Add dividends sum (if any).
self.get_dividends_data(
to_return, trans, self.df.loc[[trans.pk], "type"].iloc[0]
)
return to_return
# Negative transaction (sell).
parents = self.df.loc[trans.pk]["parents"]
# In case of multiple unique parents we can say investment is shared.
if isinstance(parents, pd.Series):
parents = parents.unique()
else:
parents = [parents]
# Sum up all unique investments.
df_investments = self.df.loc[parents].groupby("pk").first()
for __, row in df_investments.iterrows():
investment += row["amount"] * row["price"]
to_return = [
deposit_data,
{
"name": (
_("Investment (shared)") if 1 < len(parents) else _("Investment")
),
"data": [roundfn(investment, no_str=True)],
},
{
"name": _("Revenue"),
"data": [roundfn(abs(trans.amount * trans.price), no_str=True)],
},
]
self.get_dividends_data(
to_return, trans, self.df.loc[[trans.pk], "type"].iloc[0]
)
return to_return
def get_stakings_sum(self, to_return, transaction):
"""
Sums up all staking rewards "received" in regard of the given transaction.
:param dict to_return: Series dict this method writes to.
:param Transaction transaction: Transaction instance to work with.
"""
if staking_sum := transaction.staking_set.aggregate(Sum("reward_amount"))[
"reward_amount__sum"
]:
last_price = transaction.item.get_last_price()
to_return.append(
{
"name": _("Staking rewards"),
"data": [staking_sum * last_price.price if last_price else 0],
}
)
def get_dividends_data(self, to_return, transaction, type):
"""
Sums all dividends "received" in regard of the given transaction.
:param dict to_return: Series dict this method writes to.
:param Transaction transaction: Transaction instance to work with.
:param str type: "share", "etf" or "coin" type of the transaction underlying item.
"""
if "share" == type:
model = ShareDividendTransaction
elif "etf" == type:
model = EtfDividendTransaction
else:
return
# Deposit transaction
if transaction.is_positive:
query = model.objects.filter(transactions=transaction.pk).aggregate(
Sum("amount")
)
# Closing transaction
else:
# Get list of preceding deposit transactions (transactions that can "collect" dividends).
deposit_transactions = self.df[
(
self.df["root_parent"].isin(
np.atleast_1d(self.df.loc[[transaction.pk]].root_parent)
)
)
& (self.df["is_deposit"] == True) # noqa
]
query = (
model.objects.distinct()
.filter(transactions__in=deposit_transactions.index.to_list())
.aggregate(Sum("amount"))
)
if dividends_sum := query["amount__sum"]:
to_return.append({"name": _("Dividends"), "data": [dividends_sum]})
[docs]
class DashboardMarketValueRatioPieChart:
"""
Highcharts dashboard market value ratio pie chart on a dashboard.
"""
def get_series(self, data):
"""
Compiles data series based on data from ``TransactionChartDataMixin.get_transaction_basic_chart_data()``
method.
Output dict looks like:
{
"USD": [
{
"data": [
{"name": "Coin", "y": 85873.6},
{"name": "Share", "y": 11078.1}
],
"name": "Net worth"
}
]
}
:param dict data: Input data from ``TransactionChartDataMixin.get_transaction_basic_chart_data()`` method.
:return: Dict with chart data for pie chart under each currency key.
:rtype: dict
"""
stats = defaultdict(lambda: [{"name": _("Net worth"), "data": []}])
for item_type, currency_data in data["investment_and_revenue"].items():
for currency, currency_stats in currency_data.items():
for stat in currency_stats:
if _("Market value") == stat["name"]:
stats[currency][0]["data"].append(
{"name": item_type.capitalize(), "y": stat["data"][0]}
)
return json.loads(json.dumps(stats))
[docs]
class OpenStakingsRatioPieChart:
"""
Staking pie chart used on coin list page.
"""
def __init__(self, user):
self.user = user
def get_series(self):
df = self.get_df()
if df.empty:
return []
return [
{
"name": _("Non-staked"),
"y": df["market_value"].sum() - df["staked_value"].sum(),
},
{"name": _("Staked"), "y": df["staked_value"].sum()},
]
def get_df(self):
"""
Produces a DataFrame with all needed values to calculate staking portions.
Returned DataFrame looks like this:
amount market_value current_price staked_value
exchange symbol
Binance TRX 6.022273e+04 3557.370512 1.181405e-01 3544.2138
Crypto.com CRO 1.000000e+04 716.104600 7.161046e-02 0.0000
MANA 1.000000e+02 57.078650 5.707865e-01 0.0000
Kraken BTC 2.160700e-01 5229.079820 2.420086e+04 0.0000
"""
# 1. Compile basic dataframe.
# amount market_value current_price
# exchange symbol
# Binance TRX 6.022273e+04 3557.370512 1.181405e-01
# Crypto.com CRO 1.000000e+04 716.104600 7.161046e-02
# MANA 1.000000e+02 57.078650 5.707865e-01
# Kraken BTC 2.160700e-01 5229.079820 2.420086e+04
df = Transactions(self.user).get_transaction_basic_stats(
closed=False
) # fetch coins open transactions df
if df.empty:
return pd.DataFrame()
df = df[df["type"] == "coin"] # coins only
df = df[df["is_closed"] == False] # noqa - still open transactions
df = df.groupby(
df.index
).first() # unique transactions (solves multiple-parents case)
df = df.groupby(["exchange", "symbol"]).sum(
"amount"
) # sum amount based on exchange-symbol combination
df = df[df["amount"] > 0] # remove zero balances
df = df[["amount", "market_value", "current_price"]] # keep only some columns
open_stakings = Staking.objects.by_user(self.user).select_related("item").open()
# In case no transactions have left we are done here.
if df.empty or not open_stakings:
return pd.DataFrame()
# New column "staked_value" which is a portion (or a whole)
# of "market_value" column based on currently staked coins.
df = df.assign(staked_value=0)
for st in open_stakings:
loc = (st.exchange.title, st.item.symbol)
df.at[loc, "staked_value"] = st.amount * df.loc[loc]["current_price"]
return df
[docs]
class DrawdownChart:
"""
Drawdown chart for item detail page.
"""
def __init__(self, item):
self.item = item
def get_series(self):
df = self.item.get_drawdowns()
# Compile series list.
series_data = []
for index, val in df["drawdown"].items():
series_data.append([index.timestamp() * 1000, val * 100 * -1])
return [
{
"data": series_data,
}
]
@staticmethod
def get_options():
return {
"yAxis": {"reversed": True},
"colors": [settings.COLOR_RED],
"plotOptions": {
"area": {
"fillColor": {
"linearGradient": {"x1": 0, "y1": 1, "x2": 0, "y2": 0},
"stops": [
[0, settings.COLOR_RED],
[
1,
f"rgba{ImageColor.getrgb(settings.COLOR_RED + '00')}", # settings.COLOR_RED with 0 opacity
],
],
},
}
},
"rangeSelector": {"selected": 1, "inputPosition": {"align": "center"}},
"tooltip": {"valueSuffix": "%"},
}
[docs]
class AthPeriodsChart:
"""
Highcharts ATH periods chart.
"""
def __init__(self, item):
self.item = item
def get_series(self, aths):
series_data = []
for index, val in aths.items():
series_data.append([index.timestamp() * 1000, val])
return [{"name": "ATHs", "data": series_data, "step": True}]
def get_options(self):
return {
"tooltip": {
"valueDecimals": (
settings.COINS_MAX_PRECISION
if self.item.is_coin()
else settings.MAX_PRECISION
)
},
"xAxis": {"ordinal": 0},
"rangeSelector": {
"inputPosition": {"align": "center"},
},
}
[docs]
class ShareAnalystRatingsChart:
"""
Highcharts analyst ratings chart for shares.
"""
def __init__(self, user_item):
self.user_item = user_item
[docs]
def get_options(self):
"""
Compiles Highcharts chart options (which is later passed to
highcarts vue component) for each ratings record in the
database.
:return: List of Highcharts option dicts.
:rtype: list
"""
try:
ratings = self.user_item.item.asset_set.get(type=Asset.RATINGS_DATA)
except Exception:
return []
if not ratings:
logger.error(
"Ratings data do exist in database but are falsy",
extra={"ratings": ratings},
)
return []
data_set = []
for i, rating in enumerate(ratings.data):
# Compile chart title.
if 3 > i:
title = ngettext_lazy(
"%(months)d month ago", "%(months)d months ago", 3 - i
) % {"months": 3 - i}
else:
title = _("Now")
# Chart options data.
data_set.append(
(
title,
{
"title": False,
"chart": {"type": "pie"},
"colors": settings.CHART_COLORS,
"plotOptions": {
"pie": {
# "size": "50%",
"dataLabels": {
"enabled": False,
},
"showInLegend": True,
}
},
"tooltip": {"pointFormat": "<b>{point.percentage:.1f} %</b>"},
"series": [
{
"name": _("Ratings"),
"data": [
{
"name": rating_state,
"y": int(rating[rating_state]["value"]),
}
for rating_state in rating.keys()
],
}
],
},
)
)
return data_set
[docs]
class ShareEpsChart:
"""
Highcharts EPS chart for shares.
"""
[docs]
def get_options(self, rows):
"""
Compiles Highcharts chart options (which is later passed to
highcarts vue component) with data sorted by date (first column).
:param list rows: Table EPS rows without headers and Announcement date column.
:return: Highcharts option dict.
:rtype: dict
"""
# Sort by date.
rows = sorted(rows, key=lambda r: r[0])
data = {
"title": False,
"chart": {"zooming": {"type": "x"}},
"colors": settings.CHART_COLORS,
"xAxis": [
{
"categories": [to_quarter_period(r[0]) for r in rows],
},
],
"series": [
{
"name": _("Estimated EPS"),
"type": "column",
"data": [float(r[1]) for r in rows],
},
{
"name": _("Actual EPS"),
"type": "column",
"data": [float(r[2]) for r in rows],
},
],
}
return data
[docs]
class SharePriceRatingsChart:
"""
Highcharts Price ratings chart for shares.
"""
def __init__(self, user_item):
self.user_item = user_item
[docs]
def get_options(self, rows):
"""
Compiles Highcharts options (which is later passed to
highcharts vue component) for price ratings.
:param list rows: Table price ratings rows.
:return: Highcharts option object.
:rtype: dict
"""
# Sort by date.
rows = sorted(rows, key=lambda row: row[0])
# Filter out rows with empty price columns.
rows = list(filter(lambda row: row[4], rows))
last_price = self.user_item.item.get_last_price(current=True)
data = {
"title": False,
"chart": {"zooming": {"type": "x"}},
"colors": settings.CHART_COLORS,
"tooltip": {
"pointFormat": "<span>{series.name}:</span> {point.price_formatted}"
},
"xAxis": {
"categories": [date_filter(row[0]) for row in rows],
},
"yAxis": {
"plotLines": [
{
"value": last_price.price if last_price else 0,
"color": settings.CHART_COLORS[1],
"width": 2,
"zIndex": 4,
}
],
},
"series": [
{
"name": _("Price"),
"type": "column",
"data": [
{
"name": row[2],
"y": row[6],
"price_formatted": (
f"<b>{row[5]}</b> -> " if row[5] else ""
)
+ f"<b>{row[6]}</b>",
}
for row in rows
],
},
{
"name": _("Current price"),
"type": "line",
"color": settings.CHART_COLORS[1],
},
],
}
return data
[docs]
class ShareRevenueEarningsEpsChart:
"""
Highcharts Revenue and Earnings (and EPS) chart for shares.
"""
def __init__(self, user_item):
self.user_item = user_item
[docs]
def get_data(self):
"""
Fetches and converts revenues + earnings + eps data from the database
to pandas dataframe sorted by date in descending order.
Also return EPS rows except "announcement date" column sorted by date - in descending order.
:return: Dataframe with "revenues" and "earnings" series and data as index and EPS rows.
:rtype: tuple
"""
# TODO: whole method could be cached and invalidated by fetch_financial_data celery task.
# Revenues.
try:
revenues_data = self.user_item.item.asset_set.get(type=Asset.REVENUES_DATA)
except Exception:
revenues_s = pd.Series(name="revenues")
else:
try:
revenues = np.array(revenues_data.data)
revenues_s = pd.Series(
revenues[:, 1].astype(float), index=revenues[:, 0], name="revenues"
).round()
except Exception:
logger.exception(
"Couldn't convert rvenues data to pandas series",
extra={"revenues data": revenues_data},
)
revenues_s = pd.Series(name="revenues")
# Earnings.
try:
earnings_data = self.user_item.item.asset_set.get(type=Asset.EARNINGS_DATA)
except Exception:
earnings_s = pd.Series(name="earnings")
else:
try:
earnings = np.array(earnings_data.data[1:])
earnings_s = pd.Series(
earnings[:, 1].astype(float), index=earnings[:, 0], name="earnings"
).round()
except Exception:
logger.exception(
"Couldn't convert earnings data to pandas series",
extra={"eargnings data": earnings_data},
)
earnings_s = pd.Series(name="earnings")
# EPS
try:
eps_data = self.user_item.item.asset_set.get(type=Asset.EPS_DATA)
except Exception:
eps_s = pd.Series(name="eps")
eps_rows = []
else:
try:
eps = np.array(eps_data.data[1:])
eps_rows = eps[:, [1, 2, 3]]
eps_rows = eps_rows[
eps_rows[:, 0].argsort()[::-1]
] # Sorts by first column (date) - reversely.
eps_s = pd.Series(eps[:, 3].astype(float), index=eps[:, 1], name="eps")
except Exception:
logger.exception(
"Couldn't convert EPS data to pandas series",
extra={"eps data": eps_data},
)
eps_s = pd.Series(name="eps")
eps_rows = []
# Merge together.
df = pd.concat([revenues_s, earnings_s, eps_s], axis=1)
df.index = pd.to_datetime(df.index)
return (
df,
eps_rows,
)
[docs]
@staticmethod
def get_options(df):
"""
Compiles Highcharts chart options which is later passed to
highcarts vue component.
:param pd.DataFrame df: Dataframe with earings and revenues returned by ``self.get_data()``.
:return: Highcharts options as a dict.
:rtype: dict
"""
data = {
"title": False,
"chart": {"zooming": {"type": "x"}},
"colors": settings.CHART_COLORS,
"xAxis": [
{
"categories": [to_quarter_period(d) for d in df.index],
},
],
"yAxis": [
{},
{
"opposite": True,
},
],
}
data["series"] = [
{"name": _("Revenue"), "type": "column", "data": df["revenues"].tolist()},
{"name": _("Earnings"), "type": "column", "data": df["earnings"].tolist()},
{
"name": _("EPS"),
"type": "column",
"yAxis": 1,
"data": df["eps"].tolist(),
"visible": False,
},
]
return data