Source code for richy.transactions.transactions

"""
Module handles all transaction-related calculations.
"""

import io
import logging
from collections import defaultdict
from datetime import date

import numpy as np
import pandas as pd
from django.conf import settings
from django.core.cache import cache
from django.db import connection
from django.db.models import Sum
from django.template.defaultfilters import date as date_format
from graphviz import Digraph

from ..core.math import calc_percentage_change
from ..core.models import ItemWithPrices, Price, User, UserItem
from ..core.templatetags.utils import autofloatformat, coinautofloatformat
from .models import Transaction

LOGGER = logging.getLogger(__name__)


[docs] class Performance: """ Calculates performance stats for transactions. """ def __init__(self, type, user=None, currency=None, profit=False, use_cache=True): """ :param User user: User performance is calculated for. :param str type: Item type - choices are "share", "index", "coin", "etf". :param bool profit: Profit flag. :param bool cache: Flag if cache should be considered. """ self.user = user self.type = type self.currency = currency self.profit = profit self.use_cache = use_cache
[docs] def get_data(self): """ Fetches data from database/cache for shares/indexes/coins/etfs and calculates it's performance (market value or profit) over time. :return: List of tuples (user, Data for Highcharts). :rtype: list """ def fetch(user, currency): cache_key = self._get_cache_key(user, currency) cached = cache.get(cache_key) if self.use_cache and cached: return cached else: data = self._get_asset_data(user, currency) cache.set(cache_key, data, settings.TRANSACTIONS_CACHE_TIMEOUT) LOGGER.debug("Transaction performance cache regenerated.") return data # An user and a currency were given -> we can return data here. if self.user and self.currency: return fetch(self.user, self.currency) # Else... we just recalculate data for (each) user and each of their currencies. criteria = {f"item__{self.type}__isnull": False} self.users = [ ui.user for ui in UserItem.objects.filter(**criteria).distinct("user") ] for user in self.users: if self.currency: currencies = [self.currency] else: currencies = user.get_traded_currencies() for currency in currencies: fetch(user, currency)
def _get_cache_key(self, user, currency): """ Determines cache key based on "profit" and "type" params combination. :param User user: User model instance. :return: Cache key. :rtype: str """ return "{}-transaction-user-{}-{}-{}".format( self.type, user.pk, currency, "profit-performance" if self.profit else "performance-and-assets", ) def _get_asset_data(self, user, currency): """ Calculates prices or profit over time and returns data ready for Highcharts. :param User user: An user who's tranractions are fetched for. :param str currency: Currency name that equals to currency property of a transaction - transactions are filtered based on the currency. :return: Data for Highcharts. :rtype: list """ def round_values(values): """ Rounds each value of the given list of values. Uses map in sake of performance. :param list values: List of to-be-rounded values. :return: List of rounded values. :rtype: list """ if "coin" == self.type: roundfn = coinautofloatformat else: roundfn = autofloatformat return list( map( lambda value: ( roundfn(value, no_str=True) if value is not None else None ), values, ) ) df_balance = self._build_balance_df(user, currency) if df_balance.empty: return [] if self.profit: df_market_value = self._calc_market_value(df_balance) df_balance_value = self._build_balance_df(user, currency, balance=True) df = ( df_market_value + df_balance_value ) # we add up because df_balance_value has negative values df = df.dropna(how="all") else: df = self._calc_market_value(df_balance) # Export dataframe into Highcharts complied data structure. data = [] if df.empty: return data exported = df.to_dict(orient="series") for symbol, series in exported.items(): series = series.replace({np.nan: None}) series_dict = series.to_dict() timestamps = series_dict.keys() timestamps = map( lambda timestamp: int(timestamp.timestamp() * 1000), timestamps ) data.append( { "name": symbol, "data": list( zip(list(timestamps), round_values(series_dict.values())) ), } ) return data def _get_transaction_df(self, base_q=None): """ Fetches transactions from database and calculates current balance. Also converts them into dataframe. pk parent date price symbol amount balance 0 14 NaN 2017-09-06 343.920000 ETH 3.460200 3.46020 1 25 26 2017-12-07 434.000000 ETH 3.455200 3.45520 2 26 14 2017-12-07 434.000000 ETH -3.460200 3.45520 3 15 NaN 2017-12-14 0.547000 XRP 3645.160000 3645.16000 4 20 NaN 2018-01-05 17299.600000 BTC 0.033480 0.03348 5 22 24 2018-01-07 0.158246 TRX 3262.730000 3262.73000 6 23 21 2018-01-07 17110.000000 BTC 0.032439 0.00000 7 24 23 2018-01-07 16405.600000 BTC -0.032439 0.00000 :return: Transaction dataframe. :rtype: pd.DataFrame """ # First check if there are any transactions at all. if not Transaction.objects.by_user(self.user).exists(): return False if base_q: query = base_q else: query = Transaction.objects.by_user(self.user) transactions = query.select_related("item").order_by("date") # Shares, indexes, coins of etfs? match self.type: case "share": transactions = transactions.filter(item__share__isnull=False) case "index": transactions = transactions.filter(item__index__isnull=False) case "coin": transactions = transactions.filter(item__coin__isnull=False) case "etf": transactions = transactions.filter(item__etf__isnull=False) data = [] # Create transaction dataframe. # df = transactions.to_pandas("date", "price", "item__symbol", "amount") # df = df.rename(columns={"item__symbol": "symbol", "amount": "balance"}) for trans in transactions: data.append( { "pk": trans.pk, "parent": trans.parents.values_list("pk", flat=True), "date": trans.date, "closed_on": trans.closed_on, "price": trans.price, "symbol": trans.item.symbol, "amount": trans.amount, "balance": Transaction.objects.by_user(self.user) .filter(item=trans.item, date__lte=trans.date) .aggregate(Sum("amount"))["amount__sum"], } ) df = pd.DataFrame(data) df = df.explode("parent") df["date"] = pd.to_datetime(df["date"], format="%Y-%m-%d") df["date"] = df["date"].dt.date # df["closed_on"] = pd.to_datetime(df["closed_on"], format="%Y-%m-%d") # df["closed_on"] = df["closed_on"].dt.date return df def _build_balance_df(self, user, currency, balance=False): """ Walks thru all transactions (in piles) and creates separated balance dataframes. These frames are then merged together and one big balance sum is calculated for each item (symbol). Returns a dataframe with balances looking like: TSLA CEZ.PR NOK QCOM FSLR FB AMD BABA 2021-04-13 NaN NaN NaN 14.0 NaN 13.0 NaN 23.0 2021-04-14 NaN NaN NaN 14.0 NaN 13.0 NaN 23.0 2021-04-15 NaN NaN NaN 14.0 NaN 13.0 NaN 23.0 2021-04-16 NaN NaN NaN 14.0 NaN 13.0 NaN 23.0 2021-04-17 NaN NaN NaN 14.0 NaN 13.0 NaN 23.0 :param User user: An user who's tranractions are fetched for. :param str currency: Currency name that equals to currency property of a transaction - transactions are filtered based on the currency. :param bool balance: Flag if dataframe should calculate rather (value/money) balance than amount balance. :return: Balance dataframe. :rtype: pd.DataFrame """ # Fetch basic stats and then filter by type. basic_stats_df = Transactions(user).get_transaction_basic_stats() if basic_stats_df.empty: return basic_stats_df basic_stats_df = basic_stats_df[ (basic_stats_df["type"] == self.type) & (basic_stats_df["currency"] == currency) ] if basic_stats_df.empty: return basic_stats_df # Fetch groups (piles). deposit_groups = Transactions.get_deposit_groups(basic_stats_df) groups = Transactions.group_deposit_groups(deposit_groups) # key - symbol # item - list of DataFrames - each DF is set of series (transactions) from a group symbol_dfs = defaultdict(list) for roots, group in groups: # Closing date of root transaction is the max date for this group # Falls back to today as long as that means the group is still not closed. date_max = basic_stats_df[basic_stats_df.index.isin(roots)][ "closed_on" ].max() if not isinstance(date_max, date): date_max = date.today() series = defaultdict(list) # Eliminates duplicated transaction caused by explode() function. group_transactions_df = basic_stats_df[basic_stats_df.index.isin(group)] group_transactions_df = group_transactions_df.groupby( group_transactions_df.index ).first() # drop duplicated transactions # save series under symbol for pk, trans in group_transactions_df.iterrows(): series[trans["symbol"]].append( pd.Series( trans["balance_in_time"] if balance else trans["amount"], index=pd.date_range(start=trans["date"], end=date_max), name=pk, ) ) # Concatenate series under the same symbol -> create a DataFrame # and add that to a global symbol_dfs under the symbol key. for symbol, series_ in series.items(): symbol_dfs[symbol].append(pd.concat(series_, axis=1)) # Walk thru all DFs and create "sum" column for each of them. for symbol, dfs in symbol_dfs.items(): for df in dfs: df["sum"] = df.sum(axis=1) # Walk thru all symbols and summarize "sum" columns from all dfs. sums_series = [] for symbol, dfs in symbol_dfs.items(): symbol_df = pd.concat([df["sum"] for df in dfs], axis=1) series = symbol_df.sum(axis=1) series.name = symbol sums_series.append(series) # Concat all series with sums into one DataFrame. df = pd.concat(sums_series, axis=1) # Replace 0 values - happens when a item is completely sold - sum is 0. return df.replace(0, np.nan) def _calc_market_value(self, df_balance): """ Calculates market value for each symbol based on balance dataframe. Also drops rows where each column (item) is np.NaN - typically weekends. Result dataframe looks like: ETH XRP BTC TRX ETC DASH STA VIDYA CRO 2021-03-15 40357.3492 3314.320145 12119.603977 3079.808479 1030.000 NaN 2539.044426 2396.535253 1891.04 2021-03-16 40947.9712 3639.586510 12077.902467 3066.728102 940.936 NaN 2774.132922 2432.952156 1719.99 2021-03-17 40410.3800 3541.182676 11973.540657 3173.442780 986.496 NaN 2706.352819 2443.235376 1930.06 2021-03-18 41486.7004 3638.644663 12665.785723 3345.444919 1010.512 NaN 2802.201673 2138.433221 2000.93 2021-03-19 41271.8460 3618.827597 12648.867442 3202.825450 972.752 NaN 2690.545086 1942.983925 2194.80 2021-03-20 41751.8544 3738.633556 12706.061171 3584.914582 995.296 NaN 2629.657922 2104.348770 2516.43 :param pd.DataFrame balance_df: Dataframe with balanaces. :return: Dataframe with calculated market values. :rtype: pd.DataFrame """ market_value_df = pd.DataFrame() start_date = df_balance.index.min().date() for symbol, series in df_balance.items(): price_df = self._get_price_df(symbol, start_date) if price_df.empty: continue market_value_df[symbol] = price_df["price"] * series # Drop rows with all values being empty (typically weekends). return market_value_df.dropna(how="all") def _get_price_df(self, symbol, start_date): """ Fetches prices of a ``symbol`` starting by ``start_date`` as a DataFrame. price datetime 2016-11-14 4.31 2016-11-15 4.14 2016-11-16 4.12 2016-11-17 4.15 2016-11-18 4.16 :param str symbol: Item symbol we want prices for. :param datetime.date start_date: Start point for prices. :return: DataFrame with "price" column and date as index. :rtype: pd.DataFrame """ df = Price.objects.filter( item__symbol=symbol, datetime__date__gte=start_date ).to_pandas() if df.empty: return df df["datetime"] = df["datetime"].dt.date df = df.set_index(pd.DatetimeIndex(df["datetime"])) df = df.filter(["price"]) return df
[docs] class Transactions: """ Handles transactions data digging. """ CACHE_BASIC_STATS = "transactions-basic-stats-user-{}-closed-{}" def __init__(self, user): self.user = user @staticmethod def update_cache(user=None): def create_basic_stats_cache(user, closed): """ Recreates cache for basic stats (``self.get_transaction_basic_stats()``) for the given flag. :param user core.User: User model instance. :param bool or None: Flag for get_transaction_basic_stats() method. """ cache.delete(Transactions.CACHE_BASIC_STATS.format(user.pk, closed)) Transactions(user).get_transaction_basic_stats(closed) if user: users = [user] else: if user_pks := Transaction.objects.distinct("user").values_list( "user", flat=True ): # Fetch only those users that have at least one transaction. users = User.objects.filter(pk__in=user_pks) else: users = [] # Basic stats. for u in users: LOGGER.debug(f"Updating transaction stats for user with ID {u.pk}.") create_basic_stats_cache(u, closed=None) create_basic_stats_cache(u, closed=True) create_basic_stats_cache(u, closed=False) LOGGER.debug(f"Transaction stats for user with ID {u.pk} has been updated.") def get_transactions(self, closed=None): """ Compiles dataframe of transactions. Output dataframe has following columns: * exchange * amount * price * fee * symbol * currency * is_deposit * is_closing * is_closed * date * closed_on * balance_in_time * type :param bool closed: If False already closed transaction are not included. If None all transaction are included :return: Transaction dataframe with transactions or empty dataframe. :rtype: pd.DataFrame """ def assign_type(row): if row["item__share"] and not np.isnan(row["item__share"]): return "share" if row["item__index"] and not np.isnan(row["item__index"]): return "index" if row["item__coin"] and not np.isnan(row["item__coin"]): return "coin" if row["item__etf"] and not np.isnan(row["item__etf"]): return "etf" # First check if there are any transactions at all. if not Transaction.objects.by_user(self.user).exists(): return pd.DataFrame() # Create dataframe. query = ( Transaction.objects.by_user(self.user) # TODO: vyzkouset zda tenhle related setri nejaky dotazy # .select_related("item__share", "item__index", "item__etf", "item__coin") .select_related("item") .order_by("date", "exchange") ) if closed is not None: # Collect closed transactions but not those which root parent is not closed too. # Transactions in actually_closed are such transactions it's root parent transaction # is closed too. regularly_closed = Transaction.objects.by_user(self.user).filter( is_closed=True ) potentially_closed = [] potentially_closed_root_parents = [] actually_closed = [] # Trace down all root parents of the closed transaction. If the root # parent is open then the transaction is considered also open. # 1. accumulate transaction PKs. for trans in regularly_closed: for parent in trans.parents.values_list("pk", flat=True): for root_parent in self.get_transaction_root_parents( trans.pk, parent ): potentially_closed.append((trans.pk, parent, root_parent)) potentially_closed_root_parents.append(root_parent) # try: # Transaction.objects.get(pk=root_parent, is_closed=True) # actually_closed.extend([trans.pk, parent, root_parent]) # except Transaction.DoesNotExist: # pass # 2. query the database and collect actually closed transactions. for t in Transaction.objects.filter( pk__in=potentially_closed_root_parents, is_closed=True ): for trans_pk, parent, root_parent in potentially_closed: if root_parent == t.pk: actually_closed.extend([trans_pk, parent, root_parent]) # 3. modify dataframe query for unique closed transactions. if closed is True: query = query.filter(pk__in=list(set(actually_closed))) elif closed is False: query = query.exclude(pk__in=list(set(actually_closed))) df = query.to_pandas( "pk", "exchange__title", "amount", "price", "fee", "item__symbol", "currency", "is_deposit", "is_closing", "is_closed", "date", "closed_on", "item__share", "item__index", "item__coin", "item__etf", "parents", ) # No transactions? if df.empty: return df df.set_index("pk", inplace=True) # Rename some columns. df = df.rename( columns={"exchange__title": "exchange", "item__symbol": "symbol"} ) # Calc balance for certain transaction (in the time the transaction was made). df["balance_in_time"] = df.apply( lambda row: row["amount"] * row["price"] * -1 - row["fee"], axis=1 ) # Add "type" column. df["type"] = df.apply(assign_type, axis=1) df = df.drop("item__share", axis=1) df = df.drop("item__index", axis=1) df = df.drop("item__coin", axis=1) df = df.drop("item__etf", axis=1) return df
[docs] def get_transaction_basic_stats(self, closed=None): """ Compiles a dataframe with all (see ``closed`` param) transactions and it's basic stats. DF looks like this: .. code-block:: exchange amount price fee symbol currency is_deposit is_closing is_closed date closed_on parents balance_in_time type is_leaf root_parent root_investment pk 1 Kraken 200.0 0.25 1.0 XRP USD True False False 2018-12-01 None NaN -51.0 coin False 1 51.0 3 Binance 100.0 0.25 0.0 XRP USD False False False 2019-12-02 None 2.0 -25.0 coin False 1 51.0 5 Kraken 100.0 0.25 0.0 ETC USD False False False 2019-12-02 None 4.0 -25.0 coin True 1 51.0 2 Kraken -100.0 0.25 0.0 XRP USD False False False 2019-12-02 None 1.0 25.0 coin False 1 51.0 4 Kraken -100.0 0.25 0.0 XRP USD False False False 2019-12-02 None 1.0 25.0 coin False 1 51.0 8 Binance 100.0 0.25 0.0 XRP USD False False True 2019-12-05 None 7.0 -25.0 coin False 6 50.0 6 Binance 50.0 1.00 0.0 BTC USD True False False 2019-12-05 None NaN -50.0 coin False 6 50.0 7 Binance -25.0 1.00 0.0 BTC USD False False True 2019-12-05 None 6.0 25.0 coin False 6 50.0 9 Binance -25.0 1.00 0.0 BTC USD False False True 2019-12-06 None 6.0 25.0 coin False 6 50.0 10 Binance 50.0 0.50 0.0 ETH USD False False False 2019-12-06 None 9.0 -25.0 coin False 6 50.0 11 Binance -25.0 0.50 0.0 ETH USD False False False 2019-12-07 None 10.0 12.5 coin False 6 50.0 12 Binance 50.0 0.25 0.0 TRX USD False False False 2019-12-07 None 11.0 -12.5 coin True 6 50.0 13 Binance -200.0 0.25 0.0 XRP USD False True True 2019-12-12 None 8.0 50.0 coin True 6 50.0 13 Binance -200.0 0.25 0.0 XRP USD False True True 2019-12-12 None 3.0 50.0 coin True 1 51.0 If ``closed`` is false then following columns are added: * current_price * market_value * market_value_perc DF looks like this: .. code-block:: exchange amount price fee symbol currency is_deposit is_closing is_closed date closed_on parents balance_in_time type is_leaf root_parent root_investment current_price market_value market_value_perc pk 1 Kraken 200.0 0.25 1.0 XRP USD True False False 2018-12-01 None NaN -51.0 coin False 1 51.0 0 0.0 0.0 3 Binance 100.0 0.25 0.0 XRP USD False False False 2019-12-02 None 2.0 -25.0 coin False 1 51.0 0 0.0 0.0 2 Kraken -100.0 0.25 0.0 XRP USD False False False 2019-12-02 None 1.0 25.0 coin False 1 51.0 0 -0.0 0.0 4 Kraken -100.0 0.25 0.0 XRP USD False False False 2019-12-02 None 1.0 25.0 coin False 1 51.0 0 -0.0 0.0 8 Binance 100.0 0.25 0.0 XRP USD False False True 2019-12-05 None 7.0 -25.0 coin False 6 50.0 0 0.0 0.0 13 Binance -200.0 0.25 0.0 XRP USD False True True 2019-12-12 None 8.0 50.0 coin True 6 50.0 0 -0.0 0.0 13 Binance -200.0 0.25 0.0 XRP USD False True True 2019-12-12 None 3.0 50.0 coin True 1 51.0 0 -0.0 0.0 5 Kraken 100.0 0.25 0.0 ETC USD False False False 2019-12-02 None 4.0 -25.0 coin True 1 51.0 0 0.0 0.0 6 Binance 50.0 1.00 0.0 BTC USD True False False 2019-12-05 None NaN -50.0 coin False 6 50.0 0 0.0 0.0 7 Binance -25.0 1.00 0.0 BTC USD False False True 2019-12-05 None 6.0 25.0 coin False 6 50.0 0 -0.0 0.0 9 Binance -25.0 1.00 0.0 BTC USD False False True 2019-12-06 None 6.0 25.0 coin False 6 50.0 0 -0.0 0.0 10 Binance 50.0 0.50 0.0 ETH USD False False False 2019-12-06 None 9.0 -25.0 coin False 6 50.0 0 0.0 0.0 11 Binance -25.0 0.50 0.0 ETH USD False False False 2019-12-07 None 10.0 12.5 coin False 6 50.0 0 -0.0 0.0 12 Binance 50.0 0.25 0.0 TRX USD False False False 2019-12-07 None 11.0 -12.5 coin True 6 50.0 0 0.0 0.0 Direct consumers: * Open Transactions list page: transactions overview chart (``TransactionOverviewChart``). * Open / Closed Transactions list pages: investment vs revenue column chart and profit/investment pie chart (``TransactionsBasicColumnChart`` / ``TransactionBasicPieChart``). * Transaction detail page: investment vs revenue column chart (``TransactionDetailColumnChart``); item-price chart date range (``FetchItemPricesAjaxView``). * Per-asset overview pages (shares/coins/etfs/indexes): possession-percentage list, via ``BaseUserItemCreateView.get_open_items_possession_stats``. * Coin list page: staking ratio pie (``OpenStakingsRatioPieChart``). Indirect consumers (via internal pipeline): * ``Performance.get_data`` -- asset-allocation and profit charts in the Assets section. * ``Graph.generate`` -- per-transaction parent/child PNG on the transaction detail page. :param bool closed: If False already closed transaction are not included. If None all transaction are included. :return: Dataframe with grouped transaction stats or empty dataframe in case of no transactions. :rtype: DataFrame """ # Check cache. cache_key = self.CACHE_BASIC_STATS.format(self.user.pk, closed) cached = cache.get(cache_key) if cached is not None: return cached df = self.get_transactions(closed) if df.empty: return df # Find leaves. # Leaf is a such transaction which doesn't have any children transactions. df["is_leaf"] = False for idx, row in df.iterrows(): if df[df["parents"] == idx].empty: df.loc[idx, "is_leaf"] = True # Add "root_parent" transaction PK column. tmp = [] for idx, row in df.iterrows(): tmp.append( { "pk": idx, "parents": row["parents"], "root_parent": self.get_transaction_root_parents( idx, row[ "parents" ], # row["parents"] is always single digit (transaction PK) ), } ) df = ( df.reset_index() .merge(pd.DataFrame(tmp), on=["pk", "parents"], how="inner") .set_index("pk") ) df = df.reset_index().explode("root_parent").set_index("pk") # Add root_investment column. root_investment = [] transactions = { t.pk: t.price * t.amount + t.fee for t in Transaction.objects.by_user(self.user).filter( pk__in=df["root_parent"].tolist() ) } for idx, row in df.iterrows(): # root_trans = Transaction.objects.get(pk=row["root_parent"]) # investment = root_trans.price * root_trans.amount + root_trans.fee # root_investment.append(investment) root_investment.append(transactions[row["root_parent"]]) df = pd.concat( [df.reset_index(), pd.Series(root_investment, name="root_investment")], axis=1, ).set_index("pk") # exchange amount price fee symbol currency is_deposit is_closing is_closed date closed_on parents balance_in_time type is_leaf root_parent root_investment # pk # 1 Kraken 200.0 0.25 1.0 XRP USD True False False 2018-12-01 None NaN -51.0 coin False 1 51.0 # 3 Binance 100.0 0.25 0.0 XRP USD False False False 2019-12-02 None 2.0 -25.0 coin False 1 51.0 # 5 Kraken 100.0 0.25 0.0 ETC USD False False False 2019-12-02 None 4.0 -25.0 coin True 1 51.0 # 2 Kraken -100.0 0.25 0.0 XRP USD False False False 2019-12-02 None 1.0 25.0 coin False 1 51.0 # 4 Kraken -100.0 0.25 0.0 XRP USD False False False 2019-12-02 None 1.0 25.0 coin False 1 51.0 # 8 Binance 100.0 0.25 0.0 XRP USD False False True 2019-12-05 None 7.0 -25.0 coin False 6 50.0 # 6 Binance 50.0 1.00 0.0 BTC USD True False False 2019-12-05 None NaN -50.0 coin False 6 50.0 # 7 Binance -25.0 1.00 0.0 BTC USD False False True 2019-12-05 None 6.0 25.0 coin False 6 50.0 # 9 Binance -25.0 1.00 0.0 BTC USD False False True 2019-12-06 None 6.0 25.0 coin False 6 50.0 # 10 Binance 50.0 0.50 0.0 ETH USD False False False 2019-12-06 None 9.0 -25.0 coin False 6 50.0 # 11 Binance -25.0 0.50 0.0 ETH USD False False False 2019-12-07 None 10.0 12.5 coin False 6 50.0 # 12 Binance 50.0 0.25 0.0 TRX USD False False False 2019-12-07 None 11.0 -12.5 coin True 6 50.0 # 13 Binance -200.0 0.25 0.0 XRP USD False True True 2019-12-12 None 8.0 50.0 coin True 6 50.0 # 13 Binance -200.0 0.25 0.0 XRP USD False True True 2019-12-12 None 3.0 50.0 coin True 1 51.0 # for _, row in df.iterrows(): item_cache = { f"{i.type}-{i.symbol}": i for i in ItemWithPrices.objects.filter(symbol__in=df["symbol"].unique()) } if closed is False: # Add "current_price" column. tmp = {} for idx, row in df.iterrows(): key = f"{row['type']}-{row['symbol']}" tmp[row["symbol"]] = item_cache[key].last_price or 0 series = pd.Series(tmp) series = series.rename("current_price") series.index.name = "symbol" df = df.join(series, on="symbol", how="inner") # Add "market_value" column. df["market_value"] = df["amount"] * df["current_price"] # Add "market_value_perc" column. market_value_perc = [] for idx, row in df.iterrows(): change = calc_percentage_change( row["market_value"], row["root_investment"] ) market_value_perc.append(100 + change) df = pd.concat( [ df.reset_index(), pd.Series(market_value_perc, name="market_value_perc"), ], axis=1, ) # Reset index to "pk" column. df = df.set_index("pk") # Final df has following structure. # # exchange amount price fee symbol currency is_deposit is_closing is_closed date closed_on parents balance_in_time type is_leaf root_parent root_investment current_price market_value market_value_perc # pk # 1 Kraken 200.0 0.25 1.0 XRP USD True False False 2018-12-01 None NaN -51.0 coin False 1 51.0 0 0.0 0.0 # 3 Binance 100.0 0.25 0.0 XRP USD False False False 2019-12-02 None 2.0 -25.0 coin False 1 51.0 0 0.0 0.0 # 2 Kraken -100.0 0.25 0.0 XRP USD False False False 2019-12-02 None 1.0 25.0 coin False 1 51.0 0 -0.0 0.0 # 4 Kraken -100.0 0.25 0.0 XRP USD False False False 2019-12-02 None 1.0 25.0 coin False 1 51.0 0 -0.0 0.0 # 8 Binance 100.0 0.25 0.0 XRP USD False False True 2019-12-05 None 7.0 -25.0 coin False 6 50.0 0 0.0 0.0 # 13 Binance -200.0 0.25 0.0 XRP USD False True True 2019-12-12 None 8.0 50.0 coin True 6 50.0 0 -0.0 0.0 # 13 Binance -200.0 0.25 0.0 XRP USD False True True 2019-12-12 None 3.0 50.0 coin True 1 51.0 0 -0.0 0.0 # 5 Kraken 100.0 0.25 0.0 ETC USD False False False 2019-12-02 None 4.0 -25.0 coin True 1 51.0 0 0.0 0.0 # 6 Binance 50.0 1.00 0.0 BTC USD True False False 2019-12-05 None NaN -50.0 coin False 6 50.0 0 0.0 0.0 # 7 Binance -25.0 1.00 0.0 BTC USD False False True 2019-12-05 None 6.0 25.0 coin False 6 50.0 0 -0.0 0.0 # 9 Binance -25.0 1.00 0.0 BTC USD False False True 2019-12-06 None 6.0 25.0 coin False 6 50.0 0 -0.0 0.0 # 10 Binance 50.0 0.50 0.0 ETH USD False False False 2019-12-06 None 9.0 -25.0 coin False 6 50.0 0 0.0 0.0 # 11 Binance -25.0 0.50 0.0 ETH USD False False False 2019-12-07 None 10.0 12.5 coin False 6 50.0 0 -0.0 0.0 # 12 Binance 50.0 0.25 0.0 TRX USD False False False 2019-12-07 None 11.0 -12.5 coin True 6 50.0 0 0.0 0.0 cache.set(cache_key, df) return df
@classmethod def get_transaction_root_parents(cls, pk, force_parent=None): """ Returns all root transactions traced from all parents. :param int pk: PK of the current transaction. :param int force_parent: Force specific parent instead of all possible ones. :return: List of root transactions to the given one. :rtype: list """ def append_to_chains(chains, parent): """ Appends the given parent to chains based on "from_transaction" column value. :param list chains: List of lists where: 1st level list represents parent branches 2nd level list represents all parents in the branch :param tuple parent: Table row - se bellow. """ # Seek to_seek = parent[1] # from_transaction_id column to_append = parent[2] # to_transaction_id column # Seek transaction ID in chains and if found # append a new one for chain in chains: if chain[-1] == to_seek: chain.append(to_append) return # If not found in chains create a new chain. chains.append([to_append]) sql_params = [pk] parent_condition = "" if force_parent: parent_condition = "AND to_transaction_id = %s" sql_params.append(force_parent) # A recursive query that compiles a temp table with all # parents of the given transaction and then fetches parent(s) # of the highest level - which are basically the root parent(s). sql = f""" WITH RECURSIVE transactions_tree AS ( SELECT *, 1 as level FROM transactions_transaction_parents WHERE from_transaction_id = %s {parent_condition} UNION ALL SELECT tp.*, tt.level + 1 as level FROM transactions_tree tt JOIN transactions_transaction_parents tp on tt.to_transaction_id = tp.from_transaction_id ) SELECT * FROM transactions_tree """ # SQL returns table like this: # id | from_transaction_id | to_transaction_id | level # ----+---------------------+-------------------+------- # 38 | 62 | 61 | 1 # 39 | 62 | 47 | 1 # 26 | 47 | 46 | 2 # 24 | 46 | 45 | 3 # 23 | 45 | 44 | 4 # 22 | 44 | 35 | 5 parents = [] with connection.cursor() as cursor: cursor.execute(sql, sql_params) parents = cursor.fetchall() if parents: chains = [] for parent in parents: append_to_chains(chains, parent) # Return "leafes" (last parents) from all chains. return [chain[-1] for chain in chains] # No parent transactions -> this transaction is parent to self. return [pk] # OLD CODE: # trans = Transaction.objects.get(pk=pk) # # if trans.parents.exists(): # parents_list = [] # # # Go only to the forced parent? # # if forced_parent: # # parents = [forced_parent] # # else: # parents = trans.parents.values_list("pk", flat=True) # # for parent in parents: # parents_list.extend(cls.get_transaction_root_parents(parent)) # # return parents_list # # # No parent transactions -> this transaction is parent to self. # return [pk] def get_transaction_parents(self, pk): """ Returns all parent transactions in the transaction chain. :param int pk: PK of the current transaction. :return: List of parent transactions to the given one. :rtype: list """ trans = Transaction.objects.get(pk=pk, user=self.user) if trans.parents.exists(): parents_list = [] parents = list(trans.parents.values_list("pk", flat=True)) for parent in parents: LOGGER.debug(f"Transaction {pk} has parent {parent}.") parents_list.append(parent) parents_list.extend(self.get_transaction_parents(parent)) return parents_list return [] @staticmethod def get_deposit_groups(df): """ Groups all transactions with the same `root_parent`. Returns dict where key is root parent transaction and values (list) are all transaction that leads to this parent. In case of a "merging" transaction, whichhas multiple root parents, it appears under all keys (root parents). Example: defaultdict({1: [ 1, 3, 2, 4, 13, 5 ], 6: [ 8, 13, 6, 7, 9, 10, 11, 12 ]}) :param df pd.DataFrame: DataFrame to be examined. :return :rtype: dict """ groups = defaultdict(list) for idx, row in df.iterrows(): groups[row["root_parent"]].append(idx) return groups @staticmethod def group_deposit_groups(deposit_groups): """ Merges those groups (root transaction, transactions) with common root transactions. Example: Input structure: { 1: [1, 2, ... ], ^ ^ | | +- root transaction | +- children transactions 6: [2, 3, ... ], } Output structure: [ [[1, 6], [1, 2, 3]] ^ ^ | | +- list of root transactions | +- list of children transactions ] Process: 1. builds up a parent tree where every the dict key has its root (thats where merging happens) 2. compile a dict of groups with info about roots and children 3. convert the groups into final structure """ keys = list(deposit_groups.keys()) # Union-Find (Disjoint Set) # Creates a dict where key and value is the same - the key. parent_tree = {k: k for k in keys} def find_root(wanted): """ Finds a parent of the given value in parent tree. """ # Climbs the tree up until key and value are not the same. # If values are equal that means a tree item's parent is the # same value -> there is no other parent - end of the tree. while parent_tree[wanted] != wanted: # In case of a parent is found set the new item (not necessary the # root) parent (compressing the tree). parent_tree[wanted] = parent_tree[parent_tree[wanted]] # Now the wanted is the newly found parent. wanted = parent_tree[wanted] # Top of the tree was reached - that's the tree root. return wanted def union(key_a, key_b): """ Finds roots of both given keys and if the roots are different (which means they are not merged yet) -> merge them. """ root_a, root_b = find_root(key_a), find_root(key_b) if root_a != root_b: parent_tree[root_b] = root_a # Maps value → keys that contain it. # Creates a dict where key is each value or the dict item (expands list so each # list value has it's own record) and value a list of keys where the values occurs. value_to_keys = defaultdict(list) for k, values in deposit_groups.items(): for v in values: value_to_keys[v].append(k) # Union keys (originally values) that share values (originally keys). for key_list in value_to_keys.values(): first_key = key_list[0] # If there are any other (more values in key_list) values # that means these transactions can be merged. # Merge (union) them by setting common root. for other_key in key_list[1:]: union(first_key, other_key) # Collect group # Build up groups dict with the input data based on parent_tree. groups = defaultdict(lambda: {"keys": set(), "values": set()}) for k, values in deposit_groups.items(): root = find_root(k) groups[root]["keys"].add(k) groups[root]["values"].update(values) # Builds the result which is a list of 2 items: # 1st item is the list of root parents # 2nd item is the list of children result = [] for g in groups.values(): roots = list(g["keys"]) children = list(g["values"]) result.append([roots, children]) return result
[docs] @staticmethod def get_pile_stats(df): """ Takes dataframe from ``get_transaction_basic_stats()`` (the given ``df`` param) method and converts it into a pile (see ``group_deposit_groups()``) with stats. The stats are basic stats for charts and overviews. Pile is a list of pairs. Each pair has 2 items: * root transactions list (key) * stats - objects with following keys * open_symbols * investments * incomes * fees Pile looks like this: .. code-block:: python [ ( [6], { 'open_symbols': [{ 'symbol': 'QCOM', 'amount': 14.0, 'market_price': 1271.3400000000001 }], 'investments': [{6: 780.2199999999999}], # key is transaction PK 'incomes': [], 'fees': 7.95 } ) ] It’s used for following charts: * transactions overview chart in Transactions :param pandas.DataFrame df: :return: Pile - for structure see above. :rtype: list """ groups = Transactions.get_deposit_groups(df) pile = Transactions.group_deposit_groups(groups) chain_data = [] if df.empty: return chain_data # Fetch all objects at once. item_objs = { i.symbol: i for i in ItemWithPrices.objects.filter(symbol__in=df["symbol"].unique()) } for root_parents, group in pile: # Calculate open symbols - amount and market price. df_group = df.query( "root_parent in [{}]".format( ",".join([str(transaction) for transaction in group]) ) ) # The first groupby() stands for unique transactions. # When a transaction has multiple parents it's then # presented in the df for each parent. df_group_uniq = df_group.groupby(df_group.index).first() open_symbols = [] for symbol, amount in ( df_group_uniq.groupby("symbol")["amount"].sum().items() ): item = item_objs[symbol] last_price = item.last_price open_symbols.append( { "symbol": symbol, "amount": amount, "market_price": amount * last_price if last_price else 0, } ) # Calculate investments - sum of root parent balances which is always negative # number. investments = [] # for idx, row in df_group.loc[root_parents].iterrows(): for idx, row in ( df_group.loc[df_group.index.intersection(root_parents)] .reindex(root_parents) .iterrows() ): investments.append({idx: row["balance_in_time"] * -1 - row["fee"]}) # Calculate incomes - sum of all closing (is_closing) transactions. incomes = [] for idx, row in df_group_uniq[ df_group_uniq["is_closing"] == True # noqa ].iterrows(): incomes.append({idx: row["balance_in_time"] - row["fee"]}) # Pull everything together. chain_data.append( ( root_parents, { "open_symbols": open_symbols, "investments": investments, "incomes": incomes, "fees": df_group["fee"].sum(), }, ) ) return chain_data
[docs] class Graph: """ Generates graphviz graph for the given transaction. """ def __init__(self, transaction): """ :param Transaction transaction: Transaction model instance. """ self.transaction = transaction self.nodes = 0 # Graph setup. self.dot = Digraph(node_attr={"style": "rounded,filled,bold", "shape": "box"}) self.dot.attr(rankdir="LR")
[docs] def generate(self): """ Generates complete (related to the current ``self.transaction``) transaction graph. """ # Get the pile. df = Transactions(self.transaction.user).get_transaction_basic_stats() groups = Transactions.get_deposit_groups(df) pile = Transactions.group_deposit_groups(groups) # Find current (self.transaction) transaction in the pile # and get all it's root transactions. # Then walk thru them and draw the whole graph. for roots, transactions in pile: if self.transaction.pk in transactions: for root in roots: # Add root transaction. self._add_node(Transaction.objects.get(pk=root)) # Add the rest. self._add_children(Transaction.objects.get(pk=root)) break
[docs] def export(self): """ Exports graph as PNG in binary data. :return: BInary data - PNG image. :rtype: bytes """ return io.BytesIO(self.dot.pipe(format="png"))
def _add_children(self, parent): """ Adds children transaction into the graph. :param Transaction parent: Transaction instance. """ if parent.transaction_set.exists(): for child in parent.transaction_set.select_related( "item", "exchange" ).all(): # Update graph. self._add_node(child) self.dot.edge(str(parent.pk), str(child.pk)) # Call again. self._add_children(child) def _add_node(self, trans, **kwargs): """ Adds transaction node to the graph. Current transaction (``self.transaction``) draws with white background. :param Transaction trans: Transaction instance. """ label_template = """< <B>{trans_pk}</B><BR/> <BR/> <FONT\point-size="11"> {symbol}<BR/> {amount}/{price}<BR/> {date}<BR/> {exchange} </FONT> >""" if trans.item.is_coin: roundfn = coinautofloatformat else: roundfn = autofloatformat html = ( label_template.replace(" ", "") .replace("\\", " ") .replace("\n", "") .format( trans_pk=trans.pk, symbol=trans.item.symbol, amount=trans.amount, price=roundfn(trans.price), date=date_format(trans.date), exchange=trans.exchange.title if trans.exchange else "", ) ) # Current transaction is highlighted with white background. if trans == self.transaction: kwargs["fillcolor"] = "#ffffff" self.dot.node(str(trans.pk), label=html, **kwargs) self.nodes += 1