Source code for richy.core.charts

import io
import json
import logging
import math
import time
from collections import defaultdict
from datetime import datetime

import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from django.conf import settings
from django.db.models import Sum
from django.template.defaultfilters import date as date_filter
from django.utils.translation import gettext as _
from django.utils.translation import ngettext_lazy
from matplotlib import ticker
from PIL import ImageColor

from ..core.models import Asset
from ..core.templatetags.utils import to_quarter_period
from ..staking.models import Staking
from ..transactions.models import EtfDividendTransaction, ShareDividendTransaction
from ..transactions.transactions import Transactions
from .math import calc_percentage_change
from .models import Price
from .templatetags.utils import autofloatformat, coinautofloatformat

matplotlib.use("Agg")  # noqa


logger = logging.getLogger("richy.celery")


#########################
#   Matplotlib charts   #
#########################
[docs] class BaseChart: """ Common class for all chart classes. """ def material(self): """ Set's material theme for Seaborn and Matplotlib - colors + grid. """ # mpl.rcParams["axes.prop_cycle"] = cycler(color=settings.CHART_COLORS) sns.set_palette(settings.CHART_COLORS) sns.set_style("whitegrid", rc={"grid.color": ".9"}) matplotlib.style.use("seaborn-whitegrid") matplotlib.style.use({"grid.color": ".9"})
[docs] def export(self, format="svg"): """ Exports the currently being rendered chart in the given format. :param str format: Format the chart is gonna be exported in. :return: Chart as bytes. :rtype: io.BytesIO """ # Render the chart. buffer = io.BytesIO() plt.savefig(buffer, format=format, bbox_inches="tight", dpi=80) buffer.seek(0) logger.debug("Chart exported as bytes.") return buffer
def timestamp_to_date(self, x, _pos=None): """ Converts Unix timestamp to human readable data. Used as ticker formatter - FuncFormatter. :param int x: Value (unix timestamp) to be formatted. :param int _pos: Position. :return: Formatted date. :rtype: str """ return date_filter(datetime.fromtimestamp(x), "DATE_FORMAT")
[docs] class Manager(BaseChart): def linear_regression(self, q): def add_future(ax): """ Augment the X limit to the near future - 30% of current timeline. """ days = math.floor( (df["timestamp"].max() - df["timestamp"].min()) / (60 * 60 * 24) / 2 ) ax.set_xlim( [df["timestamp"].min(), df["timestamp"].max() + 60 * 60 * 24 * days] ) return ax # Prepare data and buffer. df = q.to_pandas() if not len(df): return # Set chart style. sns.set(font_scale=0.75) self.material() # Create new series with Unix timestamp. df["timestamp"] = df.apply( lambda row: time.mktime(row["datetime"].timetuple()), axis=1 ) # Set size of the chart. fig, ax = plt.subplots() fig.set_size_inches(35 / 2.54, 12.5 / 2.54) ax = add_future(ax) chart = sns.regplot( x="timestamp", y="price", data=df, ax=ax, line_kws={"color": settings.CHART_COLORS[1]}, ) plt.xlabel("") plt.ylabel("") # plt.xticks(rotation=45) # Set formatter for X axis - timeline. chart.get_xaxis().set_major_formatter( ticker.FuncFormatter(self.timestamp_to_date) ) return self.export()
[docs] def performance(self, item): """ Generates performance chart for following time spans: * 5 days * 1 month * 3 months * 6 months * YTD * Year :param richy.core.models.Item item: Item the performance chart is gonna be generated for. :return: Generated chart as bytes. :rtype: io.BytesIO """ def get_item_values(item): """ Calculates item values (percentage changes). :params richy.core.models.Item item: Item instance. :return: Values as NumPy array. :rtype: np.ndarray """ # Get day of current year. ytd = datetime.now().timetuple().tm_yday # TODO: nahradit item itemem z materializovanyho view return np.array( [ item.get_last_days_change(5, no_cache=True, percents=True) or 0, item.get_last_days_change(30, no_cache=True, percents=True) or 0, item.get_last_days_change(90, no_cache=True, percents=True) or 0, item.get_last_days_change(180, no_cache=True, percents=True) or 0, item.get_last_days_change(ytd, no_cache=True, percents=True) or 0, item.get_last_days_change(365, no_cache=True, percents=True) or 0, ] )[::-1] def apply_style_and_labels(rects): """ Attach a text label next to each bar showing it's value. """ i = 0 # Render value (percents) next to the bar. # Also render value label (period). for rect in rects: # Set bar color based on the value. if 0 > rect.get_width(): rect.set_color(settings.COLOR_RED) else: rect.set_color(settings.COLOR_GREEN) # Get Y position of label. # axis_to_data = ax.transAxes + ax.transData.inverted() # data_to_axis = axis_to_data.inverted() # _, y = data_to_axis.transform((0, rect.get_y())) # Render percentages. ax.text( -0.1, # y, 0.163 * i + 0.05, # HACK: because Y calculation doensn't work anymore "{:.1f}%".format(rect.get_width()), ha="right", va="bottom", size=20, fontdict={"family": "Roboto", "weight": "regular"}, # fontdict={"family": "RobotoRegular"}, transform=ax.transAxes, ) # Render period. ax.text( -0.7, # y, 0.163 * i + 0.05, # HACK: because Y calculation doesn't work anymore objects[i], va="bottom", size=20, fontdict={"family": "Roboto", "weight": "regular"}, # fontdict={"family": "RobotoRegular"}, transform=ax.transAxes, ) i += 1 # Prepare data. fig, ax = plt.subplots() plt.axis("off") objects = ("5 days", "1 month", "3 months", "6 months", "YTD", "Year")[::-1] y_pos = np.arange(len(objects)) performance = get_item_values(item) # Fit data into chart. rect = ax.barh(y_pos, performance, height=0.6) apply_style_and_labels(rect) to_return = self.export() plt.close() return to_return
######################### # Highcharts charts # #########################
[docs] class TransactionOverviewChart: """ Highcharts transaction overview chart used in transaction open overview pages. """ def __init__(self, user, df): """ :param pandas.DataFrame df: Dataframe with transaction basic stats. """ self.user = user self.df = df def get_series(self): """ Creates list of series from dataframe (``self.df``). Each series looks like: { 'name': 'Investments', 'data': [ 780.2199999999999, 1154.93, 1062.6, 1190.031984, 1993.90252, 1747.117446, 581.83644394, 579.18887804, 2115.843868, 1129.6, 1706.89695 ], 'stack': 'investments' } :return: list of series :rtype: list """ groups = Transactions(self.user).get_pile_stats(self.df) if not groups: return pd.Series() df_list = [] # alpha = list(string.ascii_uppercase) # symbols = [] # Creates a list of dataframes where each has following columns: # * investments # * market_value # * sold for root_transactions, stats in groups: df_list.append( pd.DataFrame( { "investments": pd.Series( list(stat.values())[0] for stat in stats["investments"] ), "market_value": pd.Series( stat["market_price"] for stat in stats["open_symbols"] ), "market_value_symbols": pd.Series( stat["symbol"] for stat in stats["open_symbols"] ), "sold": pd.Series( list(stat.values())[0] for stat in stats["incomes"] ), } ) ) # Concat all dataframes. df_all = pd.concat(df_list, axis=1) series = [] series.extend(self.df_to_series(df_all, "Investments", "investments")) series.extend( self.df_to_series( df_all, "Market values", "market_value", "market_value_symbols" ) ) series.extend(self.df_to_series(df_all, "Sold", "sold")) return series @staticmethod def get_options(item_type, series): """ Returns options for Highcharts chart as JSON. :param str item_type: Item type the chart options are for - "share", "coin", "etf". :param list series: List of data series. :return: Options JSON. :rtype: str """ def get_x_labels(series): """ Fetches all symbols from each market value series stacks and groups them by category separated by comma. :param list series: List of data series. :return: Labels for each series. :rtype: list """ labels = [] cleaned_labels = [] # Collect labels from all series. for ser in series: if "market_value" == ser["stack"]: labels.append([data["name"] for data in ser["data"]]) # Filter and combine labels together. for labels in zip(*labels): cleaned_labels.append( [ str(label) for label in labels if isinstance(label, str) or not np.isnan(label) ] ) return cleaned_labels options = json.dumps( { "xAxis": {"categories": get_x_labels(series)}, "plotOptions": {"column": {"stacking": "normal"}}, "chart": {"zooming": {"type": "x"}}, } ) if "coin" == item_type: precision = str(settings.COINS_MAX_PRECISION) else: precision = str(settings.MAX_PRECISION) options = ( options[:-1] + """ ,tooltip: { formatter: function() { return '<strong>' + this.series.name + '</strong><br/>' + (this.point.name ? this.point.name : this.series.name) + ': ' + this.y.toFixed(""" + precision + """) + '<br/>' + 'Total: ' + this.point.stackTotal.toFixed(""" + precision + """); }, }, } """ ) return options def df_to_series(self, df, name, values_col, labels_col=None): """ Converts a group (based on ``values_col`` param) into chart series. Also names each series by name and adds ``values_col`` name. Dataframe (input) looks like this: investments market_value market_value_symbols sold investments market_value market_value_symbols sold 0 780.22 1727.04 QCOM NaN 1154.93 1841.77 FB NaN 1 NaN NaN NaN NaN NaN NaN NaN NaN :param pd.DataFrame df: Padans dataframe with sums. :param str name: Series name. :param str values_col: Column name with values (also used as stack). :param str labels_col: Column name with labels. :return: list of series :rtype: list """ series = [] labels = [] if labels_col: df_tmp = df.loc[:, labels_col] # Make sure we work with a dataframe. if isinstance(df_tmp, pd.Series): df_tmp = df_tmp.to_frame() # Labels. for __, row in df_tmp.iterrows(): labels.append(row.to_list()) # Convert df to series. df = df.loc[:, values_col] # Make sure we work with a dataframe. if isinstance(df, pd.Series): df = df.to_frame() for idx, row in df.dropna(how="all").fillna(0).iterrows(): to_append = {"name": name, "data": row.to_list(), "stack": values_col} if labels: for i, value in enumerate(to_append["data"]): to_append["data"][i] = {"y": value, "name": labels[idx][i]} series.append(to_append) return series
[docs] class TransactionsBasicChart: def __init__(self, df, pile): """ :param pandas.DataFrame df: Dataframe with transaction basic stats. :param list pile: A pile with closed/open stats. """ self.df = df self.pile = pile def get_investments(self): """ Calculates investments and open symbols or ``self.df``. Investments pandas.Series looks like: :: type currency coin USD 21641.885258 share USD 2997.750000 Open symbols structure: .. code-block:: python { 'currency': 'USD', 'market_price': 2030.1399999999999, 'symbol': 'QCOM', 'type': 'share' } :return: Tuple with investments (pandas.Series) and open symbols (list). :rtype: tuple """ # Walk trhu pile and collect each open symbol stats. open_symbols = [] investments = [] for _root_parent, stats in self.pile: for open_symbol in stats["open_symbols"]: open_symbols.append( { "symbol": open_symbol["symbol"], "market_price": open_symbol["market_price"], "type": self.df[ self.df["symbol"] == open_symbol["symbol"] ].iloc[0]["type"], "currency": self.df[ self.df["symbol"] == open_symbol["symbol"] ].iloc[0]["currency"], } ) for investment in stats["investments"]: pk, amount = list(investment.items())[0] investments.append( { # TODO: is this column in future DF needed? "investment_pk": pk, "investment": amount, # we can safely copy from last open_symbols item "type": open_symbols[-1]["type"], "currency": open_symbols[-1]["currency"], } ) # Create dataframe from collected stats. # Dataframe looks like: # investment_pk investment # type currency # share USD 6.0 780.220000 # USD 9.0 1154.930000 # USD 13.0 1062.600000 # coin USD 14.0 1190.031984 # USD 35.0 5158.435500 # USD 39.0 1183.470000 df_investments = pd.DataFrame.from_dict(investments) df_investments = df_investments.set_index(["type", "currency"]) # Calculate market values and investments for each type-currency combo. return (df_investments.groupby(level=[0, 1]).sum()["investment"], open_symbols) def get_revenues(self): """ Calculates revenues as pandas.Series. Structure looks like: :: type currency coin USD 5175.807242 share CZK 8440.000000 USD 9150.210000 :return: Revenues series with multi-index. :type: pandas.Series """ # In case of closed transactions we can simply sum up closing transactions. df = self.df.set_index(["type", "currency"]) return ( df[df["is_closing"] == True] # noqa .groupby(level=[0, 1])["balance_in_time"] .sum() ) def get_market_values(self, open_symbols): """ Calculates market values as pandas.Series. Structure looks like: :: type currency coin USD 21086.637623 share USD 5844.470000 :return: Market values series with multi-index. :type: pandas.Series """ # Create dataframe from collected stats. # Dataframe looks like: # # symbol market_price # type currency # share USD QCOM 1727.040000 # USD FB 1841.770000 # USD FB 1578.660000 # coin USD ETH 0.000000 # USD TRX 1404.240576 # USD ETH 5770.068180 # USD STA 352.966568 df_market_values = pd.DataFrame.from_dict(open_symbols) df_market_values = df_market_values.set_index(["type", "currency"]) return df_market_values.groupby(level=[0, 1]).sum()["market_price"]
[docs] class TransactionsBasicColumnChart(TransactionsBasicChart): """ Highcharts transaction basic charts for each type and currency - investment vs. market value column chart. """ def get_series(self, closed=False): """ Converts the given dataframe into dict (type) of dicts (currency) of lists (symbols and investments and revenues). Output dict looks like this: { "share": { "CZK": [ {"data": -8150.0, "name": "Investment"}, {"data": 8490.0, "name": "Revenue"} ], "USD": [ {"data": -3206.83, "name": "Investment"}, {"data": 4033.0899999999997, "name": "Revenue"} ] }, "coin": {...} } :param pd.DataFrame df: Transaction basic stats dataframe. :return: Dict with struct described above. :rtype: dict """ investments, open_symbols = self.get_investments() if closed: revenues = self.get_revenues() else: market_values = self.get_market_values(open_symbols) # Nested defaultdict. tree = lambda: defaultdict(tree) # noqa data = tree() # Investments has the same structure including indexes as revenues or # market_values (depends on ``closed`` param). Therefore we can # iterate thru them and then fetch revenues or market values based # on those indexes. for (type_, currency), investment in investments.items(): if "coin" == type_: roundfn = coinautofloatformat else: roundfn = autofloatformat data[type_][currency] = [ { "name": _("Investment"), "data": [roundfn(investment, no_str=True)], }, { "name": _("Revenue") if closed else _("Market value"), "data": [ roundfn( ( revenues[type_][currency] if closed else market_values[type_][currency] ), no_str=True, ) ], }, ] return data
[docs] class TransactionBasicPieChart(TransactionsBasicChart): """ Highcharts transaction basic charts for each type and currency - profits column chart. """ def get_series(self, closed=False, no_negatives=True): """ Converts the given dataframe into dict (type) of dicts (currency) of lists (symbols and net). Output dict looks like this: { "share": { "USD": [ { "name": "Profit", "data": { "name": "FB", "y": -2233.43, "name": "QCOM", "y": -788.17 } } ] }, "coin": { "USD": [ { "name": "Profit", "data": [ {"name": "ETH, TRX", "y": 321.3}, {"name": "ETH, STA", "y": 2384.2}, {"name": "ETH, VIDYA", "y": -799.8}, {"name": "XRP", "y": -2285.9}, {"name": "BTC, TRX", "y": -492.6}, {"name": "BTC", "y": 1254.1}, {"name": "ETC", "y": -692.4}, {"name": "DASH", "y": -170.7}, {"name": "CRO", "y": -73.5} ] } ] } } :param pd.DataFrame df: Transaction basic stats dataframe. :param bool no_negatives: If True filters out 0 or negative values (every loss). :return: Dict with struct described above. :rtype: dict """ data = {} # Sums pile into investment vs market value or revenue (depends on closed param). # Final dict looks like: # { # 'TSLA': { # 'type': 'share', # 'currency': 'USD', # 'investment': 754.24, # 'revenue': 890.04 # }, # ... # } if closed: for root_parent, stats in self.pile: investment_sum = sum( [list(inv.values())[0] for inv in stats["investments"]] ) symbols = [] revenue = 0 type_ = self.df[ self.df["symbol"] == stats["open_symbols"][0]["symbol"] ].iloc[0]["type"] currency = self.df[ self.df["symbol"] == stats["open_symbols"][0]["symbol"] ].iloc[0]["currency"] for income in stats["incomes"]: revenue += list(income.values())[0] # Digs out a symbol based on transaction PK from self.df. symbols.append( self.df[self.df.index == list(income.keys())[0]].iloc[0][ "symbol" ] ) # Add calculated values to data dict. key = ", ".join(sorted(symbols)) if key in data: data[key]["investment"] += investment_sum data[key]["revenue"] += revenue else: data[key] = { "type": type_, "currency": currency, "investment": investment_sum, "revenue": revenue, } else: for root_parent, stats in self.pile: investment_sum = sum( [list(inv.values())[0] for inv in stats["investments"]] ) symbols = [] market_value = 0 type_ = self.df[ self.df["symbol"] == stats["open_symbols"][0]["symbol"] ].iloc[0]["type"] currency = self.df[ self.df["symbol"] == stats["open_symbols"][0]["symbol"] ].iloc[0]["currency"] for open_symbol in stats["open_symbols"]: market_value += open_symbol["market_price"] symbols.append(open_symbol["symbol"]) # Add calculated values to data dict. key = ", ".join(sorted(symbols)) if key in data: data[key]["investment"] += investment_sum data[key]["market_value"] += market_value else: data[key] = { "type": type_, "currency": currency, "investment": investment_sum, "market_value": market_value, } # Nested defaultdict. tree = lambda: defaultdict(tree) # noqa to_return = tree() for symbol, stats in data.items(): revenue_or_market_value = ( stats["revenue"] if closed else stats["market_value"] ) if "coin" == stats["type"]: roundfn = coinautofloatformat else: roundfn = autofloatformat profit = roundfn(revenue_or_market_value - stats["investment"], no_str=True) # In case we don't want negative values (loss) in final data # we simply skip such records. if no_negatives and 0 >= profit: continue if isinstance(to_return[stats["type"]][stats["currency"]], list): to_return[stats["type"]][stats["currency"]][0]["data"].append( {"name": symbol, "y": profit} ) else: to_return[stats["type"]][stats["currency"]] = [ { "name": _("Profit"), "data": [{"name": symbol, "y": profit}], } ] return to_return
[docs] class TransactionDetailColumnChart: """ Highcharts transaction detail column chart for a specified transaction. """ def __init__(self, df): """ :param pandas.DataFrame df: Dataframe with transaction basic stats. """ self.df = df def get_series(self, trans): """ Calculates deposit, investment and revenue sums based on current transactions (``trans`` param) like: * deposit: is calculated as sum of all unique root parent transactions as amount * price. Trans is positive: * investment: is current transaction calculated like price * amount * revenue: sums all transactions which has the current transaction as parent. These transactions are always (otherwise it doesn't make sense) negative so we calculate all the sells which come from current transaction. Revenue can be: * none - no children found * single - one child transaction found * multiple - more than one child transaction found Trans is negative: * investment: sums all parent transactions. These transactions are always (otherwise it doesn't make sense) positive so we calculate all the previous unique direct buys like amount * price. Investment can be: * single - one parent transaction found * multiple - more than one parent transaction found * revenue: is current transaction calculated like price * amount Returns data: [ { "data": [1000.0], "name": "Deposit" }, { "data": [871.1], "name": "Revenue" }, { "data": [847.8], "name": "Investment" } ] :param richy.transactions.models.Transaction trans: Current transaction model instance. :return: A list with 2 or 3 items (dicts) - (deposit), revenue and investment :rtype: list """ deposit = 0 investment = 0 revenue = 0 # Deposit. root_parents = self.df.loc[trans.pk]["root_parent"] # In case of multiple deposits we can say deposit is shared. # Fetches all unique root parents. if isinstance(root_parents, pd.Series): root_parents = root_parents.unique() else: root_parents = [root_parents] # Sum up all deposits. df_deposits = self.df.loc[root_parents].groupby("pk").first() for __, row in df_deposits.iterrows(): deposit += abs(row["amount"] * row["price"]) # Rounding function. if trans.item.is_coin: roundfn = coinautofloatformat else: roundfn = autofloatformat deposit_data = { "name": _("Deposit (shared)") if 1 < len(root_parents) else _("Deposit"), "data": [roundfn(deposit, no_str=True)], } # Positive transaction (buy). if trans.is_positive: # Revenue can be: # none (not sold yet) # single sell # multiple sells # Fetching all transactions which has current transaction # as parent - grouping by PK and fetching first records of each group # gives us unique transaction records. df_children = self.df[self.df.parents == trans.pk].groupby("pk").first() visible = True if df_children.empty: revenue = 0 revenue_title = "Revenue (none)" visible = False elif 1 == len(df_children): revenue = abs((df_children.amount * df_children.price).sum()) revenue_title = _("Revenue") else: revenue = abs((df_children.amount * df_children.price).sum()) revenue_title = _("Revenue (multiple)") to_return = [ { "name": _("Investment"), "data": [roundfn(trans.amount * trans.price, no_str=True)], }, { "name": revenue_title, "data": [roundfn(revenue, no_str=True)], "visible": visible, }, ] # In case current positive transaction has no children (has no following # transactions) and is open we care about current market value. if not trans.is_closed and df_children.empty: to_return.append( {"name": _("Market value"), "data": [trans.get_market_value()]} ) # In case current transaction is deposit we don't need # to explicitly return deposit - we already return investment # which is the same in this case. if not trans.is_deposit: to_return.insert(0, deposit_data) # Add stakinngs sum (if any). self.get_stakings_sum(to_return, trans) # Add dividends sum (if any). self.get_dividends_data( to_return, trans, self.df.loc[[trans.pk], "type"].iloc[0] ) return to_return # Negative transaction (sell). parents = self.df.loc[trans.pk]["parents"] # In case of multiple unique parents we can say investment is shared. if isinstance(parents, pd.Series): parents = parents.unique() else: parents = [parents] # Sum up all unique investments. df_investments = self.df.loc[parents].groupby("pk").first() for __, row in df_investments.iterrows(): investment += row["amount"] * row["price"] to_return = [ deposit_data, { "name": ( _("Investment (shared)") if 1 < len(parents) else _("Investment") ), "data": [roundfn(investment, no_str=True)], }, { "name": _("Revenue"), "data": [roundfn(abs(trans.amount * trans.price), no_str=True)], }, ] self.get_dividends_data( to_return, trans, self.df.loc[[trans.pk], "type"].iloc[0] ) return to_return def get_stakings_sum(self, to_return, transaction): """ Sums up all staking rewards "received" in regard of the given transaction. :param dict to_return: Series dict this method writes to. :param Transaction transaction: Transaction instance to work with. """ if staking_sum := transaction.staking_set.aggregate(Sum("reward_amount"))[ "reward_amount__sum" ]: last_price = transaction.item.get_last_price() to_return.append( { "name": _("Staking rewards"), "data": [staking_sum * last_price.price if last_price else 0], } ) def get_dividends_data(self, to_return, transaction, type): """ Sums all dividends "received" in regard of the given transaction. :param dict to_return: Series dict this method writes to. :param Transaction transaction: Transaction instance to work with. :param str type: "share", "etf" or "coin" type of the transaction underlying item. """ if "share" == type: model = ShareDividendTransaction elif "etf" == type: model = EtfDividendTransaction else: return # Deposit transaction if transaction.is_positive: query = model.objects.filter(transactions=transaction.pk).aggregate( Sum("amount") ) # Closing transaction else: # Get list of preceding deposit transactions (transactions that can "collect" dividends). deposit_transactions = self.df[ ( self.df["root_parent"].isin( np.atleast_1d(self.df.loc[[transaction.pk]].root_parent) ) ) & (self.df["is_deposit"] == True) # noqa ] query = ( model.objects.distinct() .filter(transactions__in=deposit_transactions.index.to_list()) .aggregate(Sum("amount")) ) if dividends_sum := query["amount__sum"]: to_return.append({"name": _("Dividends"), "data": [dividends_sum]})
[docs] class DashboardMarketValueRatioPieChart: """ Highcharts dashboard market value ratio pie chart on a dashboard. """ def get_series(self, data): """ Compiles data series based on data from ``TransactionChartDataMixin.get_transaction_basic_chart_data()`` method. Output dict looks like: { "USD": [ { "data": [ {"name": "Coin", "y": 85873.6}, {"name": "Share", "y": 11078.1} ], "name": "Net worth" } ] } :param dict data: Input data from ``TransactionChartDataMixin.get_transaction_basic_chart_data()`` method. :return: Dict with chart data for pie chart under each currency key. :rtype: dict """ stats = defaultdict(lambda: [{"name": _("Net worth"), "data": []}]) for item_type, currency_data in data["investment_and_revenue"].items(): for currency, currency_stats in currency_data.items(): for stat in currency_stats: if _("Market value") == stat["name"]: stats[currency][0]["data"].append( {"name": item_type.capitalize(), "y": stat["data"][0]} ) return json.loads(json.dumps(stats))
[docs] class OpenStakingsRatioPieChart: """ Staking pie chart used on coin list page. """ def __init__(self, user): self.user = user def get_series(self): df = self.get_df() if df.empty: return [] return [ { "name": _("Non-staked"), "y": df["market_value"].sum() - df["staked_value"].sum(), }, {"name": _("Staked"), "y": df["staked_value"].sum()}, ] def get_df(self): """ Produces a DataFrame with all needed values to calculate staking portions. Returned DataFrame looks like this: amount market_value current_price staked_value exchange symbol Binance TRX 6.022273e+04 3557.370512 1.181405e-01 3544.2138 Crypto.com CRO 1.000000e+04 716.104600 7.161046e-02 0.0000 MANA 1.000000e+02 57.078650 5.707865e-01 0.0000 Kraken BTC 2.160700e-01 5229.079820 2.420086e+04 0.0000 """ # 1. Compile basic dataframe. # amount market_value current_price # exchange symbol # Binance TRX 6.022273e+04 3557.370512 1.181405e-01 # Crypto.com CRO 1.000000e+04 716.104600 7.161046e-02 # MANA 1.000000e+02 57.078650 5.707865e-01 # Kraken BTC 2.160700e-01 5229.079820 2.420086e+04 df = Transactions(self.user).get_transaction_basic_stats( closed=False ) # fetch coins open transactions df if df.empty: return pd.DataFrame() df = df[df["type"] == "coin"] # coins only df = df[df["is_closed"] == False] # noqa - still open transactions df = df.groupby( df.index ).first() # unique transactions (solves multiple-parents case) df = df.groupby(["exchange", "symbol"]).sum( "amount" ) # sum amount based on exchange-symbol combination df = df[df["amount"] > 0] # remove zero balances df = df[["amount", "market_value", "current_price"]] # keep only some columns open_stakings = Staking.objects.by_user(self.user).select_related("item").open() # In case no transactions have left we are done here. if df.empty or not open_stakings: return pd.DataFrame() # New column "staked_value" which is a portion (or a whole) # of "market_value" column based on currently staked coins. df = df.assign(staked_value=0) for st in open_stakings: loc = (st.exchange.title, st.item.symbol) df.at[loc, "staked_value"] = st.amount * df.loc[loc]["current_price"] return df
[docs] class PerformanceChart: """ Highcharts candle chart of item performance year by year. """ def __init__(self, item): self.item = item def get_series(self): """ Compiles data for (price) performance chart of an item in length of "all time". :param Item item: Item we want performance for. :return: Series data. :rtype: dict """ df_price = Price.objects.filter(item=self.item).to_pandas() df_price = df_price.set_index("datetime") # open high low close # datetime # 1986-12-31 00:00:00+00:00 0.097222 0.177083 0.090278 0.167535 # 1987-12-31 00:00:00+00:00 0.165799 0.548611 0.165799 0.376736 # 1988-12-31 00:00:00+00:00 0.388889 0.483507 0.319444 0.369792 # 1989-12-31 00:00:00+00:00 0.372396 0.614583 0.322917 0.604167 # 1990-12-31 00:00:00+00:00 0.616319 1.100690 0.598090 1.045140 df = df_price["price"].resample("YE", label="right").ohlc() # datetime # 1986-12-31 00:00:00+00:00 0.107856 # 1987-12-31 00:00:00+00:00 0.344618 # 1988-12-31 00:00:00+00:00 0.383681 # 1989-12-31 00:00:00+00:00 0.406250 # 1990-12-31 00:00:00+00:00 0.868056 median = df_price.resample("YE", label="right").median()["price"] median = median.rename("median") # datetime # 1986-12-31 00:00:00+00:00 72.322108 # 1987-12-31 00:00:00+00:00 127.224531 # 1988-12-31 00:00:00+00:00 -4.910656 # 1989-12-31 00:00:00+00:00 62.237779 # 1990-12-31 00:00:00+00:00 69.577767 perc_change = df.apply( lambda row: calc_percentage_change(row["close"], row["open"]), axis=1 ) perc_change = perc_change.rename("perc_change") # open high low close median perc_change year # 0 0.097222 0.177083 0.090278 0.167535 0.107856 72.322108 1986 # 1 0.165799 0.548611 0.165799 0.376736 0.344618 127.224531 1987 # 2 0.388889 0.483507 0.319444 0.369792 0.383681 -4.910656 1988 # 3 0.372396 0.614583 0.322917 0.604167 0.406250 62.237779 1989 # 4 0.616319 1.100690 0.598090 1.045140 0.868056 69.577767 1990 df = pd.concat([df, median, perc_change], axis=1) df["year"] = df.index.year df = df.sort_index() # df = df.reset_index(drop=True) df = df.reset_index() return df.to_dict(orient="records") def get_options(self): last_price = self.item.get_last_price(current=True) return { "chart": {"zooming": {"type": "x"}, "height": 600}, "tooltip": { "valueDecimals": ( settings.COINS_MAX_PRECISION if self.item.is_coin() else settings.MAX_PRECISION ) }, "rangeSelector": { "selected": 5, "inputPosition": {"align": "center"}, "buttons": [ {"type": "year", "count": 5, "text": "5y", "title": "5y"}, {"type": "year", "count": 10, "text": "10y", "title": "10y"}, {"type": "year", "count": 15, "text": "15y", "title": "15y"}, {"type": "all", "text": "All", "title": "All"}, ], }, "legend": {"enabled": True}, "navigator": {"enabled": False}, "title": False, "plotOptions": { "candlestick": { "color": settings.COLOR_RED, "upColor": settings.COLOR_GREEN, }, }, "yAxis": { "title": False, "plotLines": [ { "value": last_price.price if last_price else 0, "color": settings.CHART_COLORS[1], "width": 2, "zIndex": 4, } ], }, "xAxis": { "minPadding": 0.05, "maxPadding": 0.05, }, }
[docs] class DrawdownChart: """ Drawdown chart for item detail page. """ def __init__(self, item): self.item = item def get_series(self): df = self.item.get_drawdowns() # Compile series list. series_data = [] for index, val in df["drawdown"].items(): series_data.append([index.timestamp() * 1000, val * 100 * -1]) return [ { "data": series_data, } ] @staticmethod def get_options(): return { "yAxis": {"reversed": True}, "colors": [settings.COLOR_RED], "plotOptions": { "area": { "fillColor": { "linearGradient": {"x1": 0, "y1": 1, "x2": 0, "y2": 0}, "stops": [ [0, settings.COLOR_RED], [ 1, f"rgba{ImageColor.getrgb(settings.COLOR_RED + '00')}", # settings.COLOR_RED with 0 opacity ], ], }, } }, "rangeSelector": {"selected": 1, "inputPosition": {"align": "center"}}, "tooltip": {"valueSuffix": "%"}, }
[docs] class AthPeriodsChart: """ Highcharts ATH periods chart. """ def __init__(self, item): self.item = item def get_series(self, aths): series_data = [] for index, val in aths.items(): series_data.append([index.timestamp() * 1000, val]) return [{"name": "ATHs", "data": series_data, "step": True}] def get_options(self): return { "tooltip": { "valueDecimals": ( settings.COINS_MAX_PRECISION if self.item.is_coin() else settings.MAX_PRECISION ) }, "xAxis": {"ordinal": 0}, "rangeSelector": { "inputPosition": {"align": "center"}, }, }
[docs] class ShareAnalystRatingsChart: """ Highcharts analyst ratings chart for shares. """ def __init__(self, user_item): self.user_item = user_item
[docs] def get_options(self): """ Compiles Highcharts chart options (which is later passed to highcarts vue component) for each ratings record in the database. :return: List of Highcharts option dicts. :rtype: list """ try: ratings = self.user_item.item.asset_set.get(type=Asset.RATINGS_DATA) except Exception: return [] if not ratings: logger.error( "Ratings data do exist in database but are falsy", extra={"ratings": ratings}, ) return [] data_set = [] for i, rating in enumerate(ratings.data): # Compile chart title. if 3 > i: title = ngettext_lazy( "%(months)d month ago", "%(months)d months ago", 3 - i ) % {"months": 3 - i} else: title = _("Now") # Chart options data. data_set.append( ( title, { "title": False, "chart": {"type": "pie"}, "colors": settings.CHART_COLORS, "plotOptions": { "pie": { # "size": "50%", "dataLabels": { "enabled": False, }, "showInLegend": True, } }, "tooltip": {"pointFormat": "<b>{point.percentage:.1f} %</b>"}, "series": [ { "name": _("Ratings"), "data": [ { "name": rating_state, "y": int(rating[rating_state]["value"]), } for rating_state in rating.keys() ], } ], }, ) ) return data_set
[docs] class ShareEpsChart: """ Highcharts EPS chart for shares. """
[docs] def get_options(self, rows): """ Compiles Highcharts chart options (which is later passed to highcarts vue component) with data sorted by date (first column). :param list rows: Table EPS rows without headers and Announcement date column. :return: Highcharts option dict. :rtype: dict """ # Sort by date. rows = sorted(rows, key=lambda r: r[0]) data = { "title": False, "chart": {"zooming": {"type": "x"}}, "colors": settings.CHART_COLORS, "xAxis": [ { "categories": [to_quarter_period(r[0]) for r in rows], }, ], "series": [ { "name": _("Estimated EPS"), "type": "column", "data": [float(r[1]) for r in rows], }, { "name": _("Actual EPS"), "type": "column", "data": [float(r[2]) for r in rows], }, ], } return data
[docs] class SharePriceRatingsChart: """ Highcharts Price ratings chart for shares. """ def __init__(self, user_item): self.user_item = user_item
[docs] def get_options(self, rows): """ Compiles Highcharts options (which is later passed to highcharts vue component) for price ratings. :param list rows: Table price ratings rows. :return: Highcharts option object. :rtype: dict """ # Sort by date. rows = sorted(rows, key=lambda row: row[0]) # Filter out rows with empty price columns. rows = list(filter(lambda row: row[4], rows)) last_price = self.user_item.item.get_last_price(current=True) data = { "title": False, "chart": {"zooming": {"type": "x"}}, "colors": settings.CHART_COLORS, "tooltip": { "pointFormat": "<span>{series.name}:</span> {point.price_formatted}" }, "xAxis": { "categories": [date_filter(row[0]) for row in rows], }, "yAxis": { "plotLines": [ { "value": last_price.price if last_price else 0, "color": settings.CHART_COLORS[1], "width": 2, "zIndex": 4, } ], }, "series": [ { "name": _("Price"), "type": "column", "data": [ { "name": row[2], "y": row[6], "price_formatted": ( f"<b>{row[5]}</b> -> " if row[5] else "" ) + f"<b>{row[6]}</b>", } for row in rows ], }, { "name": _("Current price"), "type": "line", "color": settings.CHART_COLORS[1], }, ], } return data
[docs] class ShareRevenueEarningsEpsChart: """ Highcharts Revenue and Earnings (and EPS) chart for shares. """ def __init__(self, user_item): self.user_item = user_item
[docs] def get_data(self): """ Fetches and converts revenues + earnings + eps data from the database to pandas dataframe sorted by date in descending order. Also return EPS rows except "announcement date" column sorted by date - in descending order. :return: Dataframe with "revenues" and "earnings" series and data as index and EPS rows. :rtype: tuple """ # TODO: whole method could be cached and invalidated by fetch_financial_data celery task. # Revenues. try: revenues_data = self.user_item.item.asset_set.get(type=Asset.REVENUES_DATA) except Exception: revenues_s = pd.Series(name="revenues") else: try: revenues = np.array(revenues_data.data) revenues_s = pd.Series( revenues[:, 1].astype(float), index=revenues[:, 0], name="revenues" ).round() except Exception: logger.exception( "Couldn't convert rvenues data to pandas series", extra={"revenues data": revenues_data}, ) revenues_s = pd.Series(name="revenues") # Earnings. try: earnings_data = self.user_item.item.asset_set.get(type=Asset.EARNINGS_DATA) except Exception: earnings_s = pd.Series(name="earnings") else: try: earnings = np.array(earnings_data.data[1:]) earnings_s = pd.Series( earnings[:, 1].astype(float), index=earnings[:, 0], name="earnings" ).round() except Exception: logger.exception( "Couldn't convert earnings data to pandas series", extra={"eargnings data": earnings_data}, ) earnings_s = pd.Series(name="earnings") # EPS try: eps_data = self.user_item.item.asset_set.get(type=Asset.EPS_DATA) except Exception: eps_s = pd.Series(name="eps") eps_rows = [] else: try: eps = np.array(eps_data.data[1:]) eps_rows = eps[:, [1, 2, 3]] eps_rows = eps_rows[ eps_rows[:, 0].argsort()[::-1] ] # Sorts by first column (date) - reversely. eps_s = pd.Series(eps[:, 3].astype(float), index=eps[:, 1], name="eps") except Exception: logger.exception( "Couldn't convert EPS data to pandas series", extra={"eps data": eps_data}, ) eps_s = pd.Series(name="eps") eps_rows = [] # Merge together. df = pd.concat([revenues_s, earnings_s, eps_s], axis=1) df.index = pd.to_datetime(df.index) return ( df, eps_rows, )
[docs] @staticmethod def get_options(df): """ Compiles Highcharts chart options which is later passed to highcarts vue component. :param pd.DataFrame df: Dataframe with earings and revenues returned by ``self.get_data()``. :return: Highcharts options as a dict. :rtype: dict """ data = { "title": False, "chart": {"zooming": {"type": "x"}}, "colors": settings.CHART_COLORS, "xAxis": [ { "categories": [to_quarter_period(d) for d in df.index], }, ], "yAxis": [ {}, { "opposite": True, }, ], } data["series"] = [ {"name": _("Revenue"), "type": "column", "data": df["revenues"].tolist()}, {"name": _("Earnings"), "type": "column", "data": df["earnings"].tolist()}, { "name": _("EPS"), "type": "column", "yAxis": 1, "data": df["eps"].tolist(), "visible": False, }, ] return data