Source code for richy.core.models

import calendar
import dataclasses
import json
import logging
import re
from datetime import date, datetime, timedelta

import numpy as np
import pandas as pd
from django.conf import settings
from django.contrib.auth.hashers import make_password
from django.contrib.auth.models import AbstractBaseUser, PermissionsMixin, UserManager
from django.core.cache import cache
from django.core.files.base import File
from django.core.serializers.json import DjangoJSONEncoder
from django.db import connections, models
from django.db.models.functions import Concat
from django.template.defaultfilters import slugify
from django.utils import timezone
from django.utils.translation import gettext as _
from django_extensions.db.models import ModificationDateTimeField
from picklefield.fields import PickledObjectField
from sitetree.models import TreeItemBase

from .math import calc_percentage_change
from .scraper import CurrentPrice
from .storage import OverwriteStorage
from .templatetags.utils import autofloatformat, coinautofloatformat

LOGGER = logging.getLogger(__name__)


[docs] def df_to_highcharts(df): data = [] for c in df.columns: data.append({"name": c, "data": df[c].values}) return data
[docs] def date_to_highcharts_timestamp(value): """ :param datetime.date or datetime.datetime value: Date(time) to be converted. """ return calendar.timegm(value.timetuple()) * 1000
[docs] class RichyJSONEncoder(DjangoJSONEncoder): """ JSON encoder which handles - dataclasses in additional to DjangoJSONEncoder which handles - datetime - date - time - timedelta - Decimal - Promise - UUID """ def default(self, obj): # Dataclasses. if dataclasses.is_dataclass(obj): return obj.__dict__ return super().default(obj)
[docs] class CustomUserManager(UserManager): def _create_user(self, email, password, **extra_fields): """ Create and save a user with the given username, email, and password. """ if not email: raise ValueError("The given email must be set") email = self.normalize_email(email) # Lookup the real model class from the global app registry so this # manager method can be used in migrations. This is fine because # managers are by definition working on the real model. # GlobalUserModel = apps.get_model( # self.model._meta.app_label, self.model._meta.object_name # ) user = self.model(email=email, **extra_fields) user.password = make_password(password) user.save(using=self._db) return user def create_user(self, email=None, password=None, **extra_fields): extra_fields.setdefault("is_staff", False) extra_fields.setdefault("is_superuser", False) return self._create_user(email, password, **extra_fields) def create_superuser(self, email=None, password=None, **extra_fields): extra_fields.setdefault("is_staff", True) extra_fields.setdefault("is_superuser", True) if extra_fields.get("is_staff") is not True: raise ValueError("Superuser must have is_staff=True.") if extra_fields.get("is_superuser") is not True: raise ValueError("Superuser must have is_superuser=True.") return self._create_user(email, password, **extra_fields)
[docs] class User(AbstractBaseUser, PermissionsMixin): """ Custom user model """ first_name = models.CharField(_("first name"), max_length=150, blank=True) last_name = models.CharField(_("last name"), max_length=150, blank=True) email = models.EmailField(_("email address"), unique=True) is_staff = models.BooleanField( _("staff status"), default=False, help_text=_("Designates whether the user can log into this admin site."), ) is_active = models.BooleanField( _("active"), default=True, help_text=_( "Designates whether this user should be treated as active. " "Unselect this instead of deleting accounts." ), ) date_joined = models.DateTimeField(_("date joined"), default=timezone.now) objects = CustomUserManager() EMAIL_FIELD = "email" USERNAME_FIELD = "email" # REQUIRED_FIELDS = ["email"] class Meta: verbose_name = _("user") verbose_name_plural = _("users") def clean(self): super().clean() self.email = self.__class__.objects.normalize_email(self.email)
[docs] def get_full_name(self): """ Return the first_name plus the last_name, with a space in between. """ full_name = "{} {}".format(self.first_name, self.last_name) return full_name.strip()
[docs] def get_short_name(self): """Return the short name for the user.""" return self.first_name
# TODO: cache until an item is added/removed.
[docs] def get_items(self): return Item.objects.filter( pk__in=UserItem.objects.by_user(self).values_list("item", flat=True) )
[docs] def get_owned_items(self): """ Returns currently held (open) items by the user. :return: List of owned Items. :rtype: list """ from ..transactions.models import Transaction return [ t.item for t in Transaction.objects.by_user(self) .prefetch_related("item") .filter(is_closed=False) .distinct("item") ]
[docs] def get_traded_currencies(self, item_type=None): """ Returns a list of currencies where at least one item with that currency has been traded in past. The list of traded currencies can be narrowed down by ``item_type`` param with ``share``, ``etf`` or ``coin`` choices. :param str item_type: Item type as a string to specify transaction item type. :return: Traded currencies. :rtype: list """ q = self.transaction_set.values("currency") match item_type: case "share": q = q.filter(item__share__isnull=False) case "etf": q = q.filter(item__etf__isnull=False) case "coin": q = q.filter(item__coin__isnull=False) return list(q.distinct().values_list("currency", flat=True))
# TODO: # def email_user(self, subject, message, from_email=None, **kwargs): # """Send an email to this user.""" # send_mail(subject, message, from_email, [self.email], **kwargs)
[docs] class Meta(models.Model): LAST_PRICE_UPDATE = 0 TYPE_CHOICES = ((LAST_PRICE_UPDATE, _("Last update of prices")),) type = models.PositiveSmallIntegerField(choices=TYPE_CHOICES) value = PickledObjectField()
######################## # Pandas query set # ########################
[docs] class PandasQuerySet(models.QuerySet): def to_pandas(self, *args): return pd.DataFrame(list(self.values(*args)))
[docs] class PandasManager(models.Manager): def get_queryset(self): return PandasQuerySet(self.model) get_query_set = get_queryset
###################### # User query set # ######################
[docs] class UserRelatedQuerySet(PandasQuerySet): def by_user(self, user): return self.filter(user=user)
[docs] class UserRelatedManager(models.Manager): def get_queryset(self): return UserRelatedQuerySet(self.model) def by_user(self, user): return self.get_queryset().by_user(user)
[docs] class UserRelatedModel(models.Model): user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE) objects = UserRelatedManager() class Meta: abstract = True
[docs] class ItemQuerySet(PandasQuerySet): def by_user(self, user): return self.filter(useritem__user=user)
[docs] class ItemManager(models.Manager): def get_queryset(self): return ItemQuerySet(self.model) def by_user(self, user): return self.get_queryset().by_user(user)
############## # Models # ##############
[docs] class BaseItem(models.Model): class Type(models.TextChoices): SHARE = "share" ETF = "etf" INDEX = "index" COIN = "coin" symbol = models.CharField(max_length=50, unique=True) type = models.CharField(max_length=50, choices=Type) is_discontinued = models.BooleanField(default=False, db_index=True) objects = ItemManager() class Meta: abstract = True @staticmethod def by_symbol(symbol): """ Tries to fetch specific item by the given symbol. """ from ..coins.models import Coin from ..etfs.models import Etf from ..indexes.models import Index from ..shares.models import Share # LOGGER.debug(f"Looking for symbol {symbol}") try: return Coin.objects.get(symbol=symbol) except Coin.DoesNotExist: try: return Share.objects.get(symbol=symbol) except Exception: try: return Etf.objects.get(symbol=symbol) except Exception: return Index.objects.get(symbol=symbol) def __str__(self): return self.symbol def save(self, **kwargs): self.type = self.item_type return super().save(**kwargs) def get_symbol_slug(self): """ Slugifies item symbol. :return: Slugified symbol. :rtype: str """ return slugify(self.symbol) def get_last_time_todays_price(self): """ Returns the date when the item price was as high/low as today (last known) price. :return: Date of the price. :rtype: datetime.date """ try: last = self.price_set.last() return ( self.price_set.exclude(pk=last.pk) .filter(price__lte=last.price) .order_by("-date")[0] .date ) except Exception: return def get_sma(self, period, no_cache=False): """ Calculates Simple Moving Averages (SMA) for the given period and caches the output. If the cached data already exists returns data from the cache. :param int period: Number of days for SMA calculation. :param bool no_cache: If true cache is not used (value is forced to be calculated). :return: A list with timestamp and sma value pairs. :rtype: list """ cache_key = self.CACHE_SMA.format(self.pk, period) cached = cache.get(cache_key) if cached is not None and not no_cache: return cached data = [] years = 5 # Calculate SMA for past X years. df = ( self.price_set.filter( datetime__date__gte=date.today() - timedelta(days=365 * years) ) .order_by("datetime") .to_pandas() ) col_name = "ma{}".format(period) df[col_name] = df["price"].rolling(window=period, min_periods=0).mean() if self.is_coin: roundfn = coinautofloatformat else: roundfn = autofloatformat for _idx, r in df.iterrows(): data.append( [ date_to_highcharts_timestamp(r.datetime.to_pydatetime()), roundfn(r[col_name], no_str=True), ] ) cache.set(cache_key, data, settings.SMA_TIMEOUT) # Cache for 1 day. return data def get_ath(self, no_cache=False): """ Returns all-time-high Price record. Also caches the value. :param bool no_cache: If true cache is not used (value is forced to be calculated). :return: Price model instance of all-time-high. :rtype: Price or None """ cache_key = self.CACHE_ATH.format(self.pk) cached = cache.get(cache_key) if cached is not None and not no_cache: return cached try: ath = self.price_set.order_by("-price")[0] cache.set(cache_key, ath, settings.ATH_TIMEOUT) # Cache for 1 day return ath except Exception: return None def get_drawdown(self, no_cache=False): """ Returns drawdown in percents. Also caches the value. :param bool no_cache: If true cache is not used (value is forced to be calculated). :return: Current drawdown in percents. :rtype: float """ cache_key = self.CACHE_DRAWDOWN.format(self.pk) cached = cache.get(cache_key) if cached is not None and not no_cache: return cached try: ath = self.get_ath() drawdown_value = self.get_last_price().price - ath.price # No drawdown at this time (new ATH). if 0 < drawdown_value: return 0 drawdown = drawdown_value / ath.price * 100 cache.set(cache_key, drawdown, settings.DRAWDOWN_TIMEOUT) # Cache for 1 day return drawdown except Exception: return None def get_52_weeks_low_high(self, no_cache=False): """ Fetches a minimum and maximum known item price for past 52 weeks from the database. Also caches the value for ``settings.LOW_HIGH_52_WEEKS_TIMEOUT`` :return: Min and max price pair. :rtype: tuple """ cache_key = self.CACHE_52_WEEKS_LOW_HIGH.format(self.pk) cached = cache.get(cache_key) if cached is not None and not no_cache: return cached try: year_ago = date.today() - timedelta(days=365) result = self.price_set.filter(datetime__date__gte=year_ago).aggregate( min_price=models.Min("price"), max_price=models.Max("price") ) data = (result["min_price"], result["max_price"]) cache.set(cache_key, data, settings.LOW_HIGH_52_WEEKS_TIMEOUT) return data except Exception: return None, None def is_share(self): return hasattr(self, "share") def is_index(self): return hasattr(self, "index") def is_etf(self): return hasattr(self, "etf") def is_coin(self): return hasattr(self, "coin") def get_drawdowns(self): """ Fetches drawdowns (complete history) as dataframe. Returned dataframe looks like this:: id item_id price peak_value drawdown_value drawdown datetime 2010-03-11 00:00:00+00:00 662564 172 0.609804 0.609804 0.000000 0.000000 2010-03-12 00:00:00+00:00 662565 172 0.594724 0.609804 -0.015079 -0.024728 2010-03-15 00:00:00+00:00 662566 172 0.575951 0.609804 -0.033853 -0.055514 2010-03-16 00:00:00+00:00 662567 172 0.622729 0.622729 0.000000 0.000000 2010-03-17 00:00:00+00:00 662568 172 0.645194 0.645194 0.000000 0.000000 """ def drawdown_value(row): """ Calculates drawdown (in value not in percents) :param pd.Series row: Dataframe row as series. :return: Calculated drawdown or 0 in case of a new ATH. :rtype: float """ if row["price"] > row["peak_value"]: return 0.0 return row["price"] - row["peak_value"] df = self.price_set.all().to_pandas() df = df.set_index("datetime") # Calculate. df = df.sort_index() df["peak_value"] = df["price"].cummax() df["drawdown_value"] = df.apply(drawdown_value, axis=1) df["drawdown"] = df["drawdown_value"] / df["peak_value"] # df = df.sort_index() return df def get_basic_info(self): try: return self.itemdata.data["basic_info"] except (ItemData.DoesNotExist, KeyError): return {} def get_aths(self): """ Returns pandas Series with ATH's where index is the date of ATH and value is value of the ATH. :return: Pandas series with ATHs. :rtype: pd.Series """ df = self.get_drawdowns() return df["peak_value"].drop_duplicates().sort_index() # ItemData access methods. def set_basic_info(self, data): try: record = self.itemdata record.data["basic_info"] = data except ItemData.DoesNotExist: record = ItemData(item=self, data={"basic_info": data}) record.save()
class Item(BaseItem): CACHE_DAY_PERC_CHANGE = "item-{}-days-{}-change-{}" CACHE_SMA = "item-{}-sma-{}" CACHE_ATH = "item-{}-ath" CACHE_DRAWDOWN = "item-{}-drawdown" CACHE_CURRENT_PRICE = "item-{}-current-price" CACHE_52_WEEKS_LOW_HIGH = "item-{}-52-weeks-low-high" # @staticmethod # def get_class_by_type(type): # if "share" == type: # from ..shares.models import Share # # return Share # # elif "etf" == type: # from ..etfs.models import Etf # # return Etf # # elif "index" == type: # from ..indexes.models import Index # # return Index # # elif "coin" == type: # from ..coins.models import Coin # # return Coin
[docs] def get_last_days_change( self, days, no_cache=False, compounded=False, percents=False ): # TODO: projit pouziti tyhle metody a rovnou v tech mistech tahat Item z materializovanyho view, # ale zachovat cache. """ Calculates change in price in percents between last known price (today) and "today - days param" day. In case of compounded change function works with open transactions. Takes the calculated percentage change and every found transaction and calculates it's real gain/loss and it's investment. Based on these 2 values for each transaction a weighted average is calculated. :param int days: Number of days to go in past. :param bool no_cache: If true cache is not used (value is forced to be calculated). :param bool compounded: If true calculates compounded change based on deposit transactions (only for precents=True). :param bool percents: If true the method calculates percentage change. :return: Percentage diff since "past" (today - days param) to today. :rtype: int or None """ # Parameters check. if compounded and not percents: raise AttributeError( "Compounded can be calculated only in percents - pass percents=True." ) # Check cache. cache_key = self.CACHE_DAY_PERC_CHANGE.format( self.pk, "percents" if percents else "value", days ) if compounded: cache_key += "-compounded" cached = cache.get(cache_key) if cached is not None and not no_cache: return cached # Calculate fresh value. days_ago = date.today() - timedelta(days=days) last_record = self.price_set.order_by("datetime").last() result = None if last_record: # Aims for the exact (see param "days") day in past. # If the price doesn't exist (weekend, holidays, ...) # just get the first available before. try: before_last_record_price = self.price_set.exclude( pk=last_record.pk ).get(datetime__date=days_ago) except Price.DoesNotExist: before_last_record_price = ( self.price_set.exclude(pk=last_record.pk) .filter(datetime__date__lt=days_ago) .order_by("-datetime") .first() ) if before_last_record_price: # Calculate change between last available price and "days ago" if percents: result = calc_percentage_change( last_record.price, before_last_record_price.price ) else: result = last_record.price - before_last_record_price.price if compounded: change = last_record.price - before_last_record_price.price # Find all open deposit transactions made til "days ago". deposits = self.transaction_set.filter( is_deposit=True, is_closed=False ) # There might be case where we own a item (crypto case) which wasn't # bought with any deposit transaction - it was exchanged for any other # item. In this case we are not able to track down price for each transaction # because the tree might be complex with buys and sells and there is no way # how to pair sells prices + amounts to buys prices and amounts (weighted average). if not deposits.count(): cache.set(cache_key, 0, settings.PRICE_CHANGE_CACHE_TIMEOUT) return 0 # Calculate percentage change for each transaction based on the # change above (now and "days ago") changes = [] investments = [] for dep in deposits: changes.append( calc_percentage_change( (dep.price + change) * dep.amount, dep.price * dep.amount, ) ) investments.append(dep.price * dep.amount) # Calculate weighted average of all changes. result = np.average(changes, weights=investments) # Cache the final value. cache.set(cache_key, result, settings.PRICE_CHANGE_CACHE_TIMEOUT) return result
def get_last_month_perc_change(self): """ Wrapper of self.get_last_days_change() used in templates. """ return self.get_last_days_change(30, percents=True) def get_last_week_perc_change(self): """ Wrapper of self.get_last_days_change() used in templates. """ return self.get_last_days_change(7, percents=True) def get_last_price(self, *, current=False): # TODO: projit pouziti tyhle metody a rovnou v tech mistech tahat Item z materializovanyho view. """ Tries to return current market price (if current attribute is true) with market state and move in value and percents. Falls back to last known historical price + move in value and percents. Everything is returned as CurrentPrice dataclass. In case of no cached data and historical price None is returned. :param bool current: Look for current market (live open market) value. :return: Dataclass with current market data or last known regular data. :rtype: CurrentPrice or None """ cache_key = self.CACHE_CURRENT_PRICE.format(self.pk) cached = cache.get(cache_key) # If current price exists it sits in cache. if current and cached is not None: return CurrentPrice(**json.loads(cached)) # No cache. try: prices = ItemWithPrices.objects.get(pk=self.pk) except ItemWithPrices.DoesNotExist: return return CurrentPrice( prices.last_price, prices.last_price - prices.price_1_day_ago, prices.price_1_day_ago_change, ) def set_current_price_and_change(self, price): """ Caches the given price object as "item-X-current-price". :param CurrentPrice price: Price object to be cached. """ cache.set( self.CACHE_CURRENT_PRICE.format(self.pk), json.dumps(price.__dict__), settings.ITEM_CURRENT_PRICE_TIMEOUT, ) def get_last_day_perc_change(self): """ Wrapper of self.get_last_days_change() used in templates. """ return self.get_last_days_change(1, percents=True) def get_last_day_compounded_perc_change(self): """ Wrapper of self.get_last_days_change() used in templates. """ return self.get_last_days_change(1, compounded=True, percents=True) def update_cache(self): """ Updates cached values cached by: - self.get_last_days_change(...) - self.get_sma(...) - self.get_ath() - self.get_drawdown() - self.get_52_weeks_low_high() - ItemWithPrices.refresh() """ def create_perc_cache(days, compounded=False): for d in days: self.get_last_days_change( d, no_cache=True, compounded=compounded, percents=True ) LOGGER.debug( "{} days percent price change cache regenerated.".format(d) ) def create_sma_cache(smas): for s in smas: self.get_sma(s, no_cache=True) LOGGER.debug( "{} days percent price change cache regenerated.".format(s) ) # No data -> no need to refresh the cache. if not self.price_set.exists(): return ytd = datetime.now().timetuple().tm_yday days = (1, 5, 7, 30, 90, 180, ytd - 1, ytd + 1, 365) smas = (20, 50, 200) create_perc_cache(days) create_perc_cache([1], compounded=True) create_sma_cache(smas) self.get_last_day_compounded_perc_change() self.get_ath(no_cache=True) self.get_drawdown(no_cache=True) self.get_52_weeks_low_high(no_cache=True) ItemWithPrices.refresh() from ..transactions.transactions import Transactions Transactions.update_cache() from ..core.tasks import generate_performance_charts generate_performance_charts(self.pk)
[docs] class ItemWithPricesQuerySet(models.QuerySet): def with_identifier(self): return self.annotate( identifier=Concat(models.F("type"), models.Value("_"), models.F("symbol")) )
ItemWithPricesManager = models.Manager.from_queryset(ItemWithPricesQuerySet)
[docs] class ItemWithPrices(BaseItem): last_price_id = models.BigIntegerField() last_price = models.FloatField() price_1_day_ago = models.FloatField() price_5_days_ago = models.FloatField() price_7_days_ago = models.FloatField() price_30_days_ago = models.FloatField() price_90_days_ago = models.FloatField() price_180_days_ago = models.FloatField() price_365_days_ago = models.FloatField() price_ytd = models.FloatField() price_1_day_ago_change = models.FloatField() price_5_days_ago_change = models.FloatField() price_7_days_ago_change = models.FloatField() price_30_days_ago_change = models.FloatField() price_90_days_ago_change = models.FloatField() price_180_days_ago_change = models.FloatField() price_365_days_ago_change = models.FloatField() price_ytd_days_ago_change = models.FloatField() objects = ItemWithPricesManager() class Meta: managed = False
[docs] @staticmethod def refresh(): """ Refreshes the materialized view (recalculates values) """ with connections["default"].cursor() as c: c.execute("REFRESH MATERIALIZED VIEW core_itemwithprices")
[docs] @staticmethod def fetch_into_user_items(items): """ Fetches (in a bulk/at once) ItemWithPrices model instance for all the given items and attaches them under "item_with_prices" property. :param list items: List of UserItem instances. """ with_prices = {} # Fetch all ItemWithPrices models at once. for i in ItemWithPrices.objects.with_identifier().filter( identifier__in=[f"{i.item.type}_{i.item.symbol}" for i in items] ): with_prices[i.identifier] = i # Assign them to the given instances as "item_with_prices". for i in items: i.item_with_prices = with_prices[f"{i.item.type}_{i.item.symbol}"]
[docs] class UserItem(UserRelatedModel): item = models.ForeignKey(Item, on_delete=models.CASCADE) indexes = models.ManyToManyField("indexes.Index", blank=True, related_name="+") show_in_overview = models.BooleanField(default=True) show_in_news = models.BooleanField(default=True) show_on_dashboard = models.BooleanField(default=True) note = models.TextField(blank=True, null=True) is_archived = models.BooleanField(default=False) class Meta: unique_together = ["user", "item"] # @cached_property def item_with_prices(self): return ItemWithPrices.objects.get(pk=self.item.pk) def get_tags(self): """ Finds all tags in item note. :return: List of found tags. :rtype: list """ if self.note: return re.findall(r"#(\w+)", self.note) return [] def get_note_without_tags(self): """ Strips all tags from item note. :return: Stripped note. :rtype: str """ if self.note: return re.sub(r"(#\w+)", "", self.note) def get_owned_items_sum(self, user): """ Returns dict with keys "value" and "amount" of current shares price and amount. Answers question how many items (shares, coins, ...) you currently have and what is it's value today. :return: Dict with "amount" and "value" keys. """ from ..transactions.models import Transaction q = ( Transaction.objects.by_user(user) .filter(item=self.item) .aggregate(sum=models.Sum("amount")) ) if q["sum"] and 0 < q["sum"]: return { "amount": q["sum"], "value": self.item.price_set.last().price * q["sum"], } def get_owned_items(self, user): """ Returns dict with keys "amount" and "value" of all open transactions. Answers question how many items you currently own and how much it did cost you at the time of buying. :return: Dict with "amount" and "value" keys. """ from ..transactions.models import Transaction value = 0 amount = 0 transactions = Transaction.objects.by_user(user).filter( item=self.item, is_closed=False ) if len(transactions): for t in transactions: value += t.get_value() amount += t.amount return {"value": value, "amount": amount} def indexes_as_symbols(self): return [i.symbol for i in self.indexes.all()]
class Asset(models.Model): EARNINGS_DATA = 0 REVENUES_DATA = 1 # TODO: substitute # TRENDS_CHART = 2 PERFORMANCE_CHART = 3 # TODO: substitute # ANALYSTS_PRICE_CHART = 4 EPS_DATA = 5 RATINGS_DATA = 6 TRANSACTION_GRAPH = 7 PRICE_RATINGS = 8 TYPE_CHOICES = ( (EARNINGS_DATA, _("Earnings data")), (REVENUES_DATA, _("Revenue data")), (PERFORMANCE_CHART, _("Performance chart")), (EPS_DATA, _("EPS data")), (RATINGS_DATA, _("Ratings data")), (TRANSACTION_GRAPH, _("Transaction graph")), (PRICE_RATINGS, _("Price ratings")), ) item = models.ForeignKey(Item, on_delete=models.CASCADE, blank=True, null=True) transaction = models.ForeignKey( "transactions.Transaction", on_delete=models.CASCADE, blank=True, null=True ) type = models.PositiveSmallIntegerField(choices=TYPE_CHOICES) image = models.ImageField( upload_to="assets", storage=OverwriteStorage(), blank=True, null=True ) data = models.JSONField(blank=True, null=True, encoder=RichyJSONEncoder) updated = ModificationDateTimeField()
[docs] @classmethod def upload(cls, buffer, type, extension="png", item=None, transaction=None): """ Creates/updates item asset and uploads file (param buffer) to the storage. :param buffer: File-like object. :param int type: Item asset type - see Asset.TYPE_CHOICES. :param extension: Extensions string i.e. "png". :param Item item: Item model instance. """ try: asset = cls.objects.get(item=item, transaction=transaction, type=type) except cls.DoesNotExist: asset = cls(item=item, transaction=transaction, type=type) type_name = asset.get_type_display().lower().replace(" ", "-") if item: asset.image.save( "{}_{}.{}".format(asset.item.symbol, type_name, extension), File(buffer) ) LOGGER.debug( "{} for {} has been generated.".format( asset.get_type_display(), item.symbol ) ) elif transaction: asset.image.save( "{}_{}.{}".format(transaction.pk, type_name, extension), File(buffer) ) LOGGER.debug( "{} for transaction {} has been generated.".format( asset.get_type_display(), transaction.pk ) ) else: asset.image.save("{}.{}".format(type_name, extension), File(buffer)) LOGGER.debug("{} has been generated.".format(asset.get_type_display())) asset.save()
[docs] class Price(models.Model): item = models.ForeignKey(Item, blank=True, null=True, on_delete=models.CASCADE) datetime = models.DateTimeField(db_index=True) price = models.FloatField() objects = PandasManager() class Meta: constraints = [ models.UniqueConstraint( fields=["item", "datetime"], name="unique_item_datetime" ) ] def get_timestamp(self): return date_to_highcharts_timestamp(self.datetime)
class ItemData(models.Model): item = models.OneToOneField(Item, on_delete=models.CASCADE) data = models.JSONField(encoder=DjangoJSONEncoder) def __str__(self): return f"{self.item} data"
[docs] class BaseDividend(models.Model): ex_date = models.DateField() payment_date = models.DateField(null=True) record_date = models.DateField(null=True) yield_value = models.FloatField() amount_value = models.FloatField() class Meta: abstract = True def is_in_future(self): return self.payment_date > date.today()