import calendar
import dataclasses
import json
import logging
import re
from datetime import date, datetime, timedelta
import numpy as np
import pandas as pd
from django.conf import settings
from django.contrib.auth.hashers import make_password
from django.contrib.auth.models import AbstractBaseUser, PermissionsMixin, UserManager
from django.core.cache import cache
from django.core.files.base import File
from django.core.serializers.json import DjangoJSONEncoder
from django.db import connections, models
from django.db.models.functions import Concat
from django.template.defaultfilters import slugify
from django.utils import timezone
from django.utils.translation import gettext as _
from django_extensions.db.models import ModificationDateTimeField
from picklefield.fields import PickledObjectField
from sitetree.models import TreeItemBase
from .math import calc_percentage_change
from .scraper import CurrentPrice
from .storage import OverwriteStorage
from .templatetags.utils import autofloatformat, coinautofloatformat
LOGGER = logging.getLogger(__name__)
[docs]
def df_to_highcharts(df):
data = []
for c in df.columns:
data.append({"name": c, "data": df[c].values})
return data
[docs]
def date_to_highcharts_timestamp(value):
"""
:param datetime.date or datetime.datetime value: Date(time) to be converted.
"""
return calendar.timegm(value.timetuple()) * 1000
[docs]
class RichyJSONEncoder(DjangoJSONEncoder):
"""
JSON encoder which handles
- dataclasses
in additional to DjangoJSONEncoder which handles
- datetime
- date
- time
- timedelta
- Decimal
- Promise
- UUID
"""
def default(self, obj):
# Dataclasses.
if dataclasses.is_dataclass(obj):
return obj.__dict__
return super().default(obj)
[docs]
class CustomUserManager(UserManager):
def _create_user(self, email, password, **extra_fields):
"""
Create and save a user with the given username, email, and password.
"""
if not email:
raise ValueError("The given email must be set")
email = self.normalize_email(email)
# Lookup the real model class from the global app registry so this
# manager method can be used in migrations. This is fine because
# managers are by definition working on the real model.
# GlobalUserModel = apps.get_model(
# self.model._meta.app_label, self.model._meta.object_name
# )
user = self.model(email=email, **extra_fields)
user.password = make_password(password)
user.save(using=self._db)
return user
def create_user(self, email=None, password=None, **extra_fields):
extra_fields.setdefault("is_staff", False)
extra_fields.setdefault("is_superuser", False)
return self._create_user(email, password, **extra_fields)
def create_superuser(self, email=None, password=None, **extra_fields):
extra_fields.setdefault("is_staff", True)
extra_fields.setdefault("is_superuser", True)
if extra_fields.get("is_staff") is not True:
raise ValueError("Superuser must have is_staff=True.")
if extra_fields.get("is_superuser") is not True:
raise ValueError("Superuser must have is_superuser=True.")
return self._create_user(email, password, **extra_fields)
[docs]
class User(AbstractBaseUser, PermissionsMixin):
"""
Custom user model
"""
first_name = models.CharField(_("first name"), max_length=150, blank=True)
last_name = models.CharField(_("last name"), max_length=150, blank=True)
email = models.EmailField(_("email address"), unique=True)
is_staff = models.BooleanField(
_("staff status"),
default=False,
help_text=_("Designates whether the user can log into this admin site."),
)
is_active = models.BooleanField(
_("active"),
default=True,
help_text=_(
"Designates whether this user should be treated as active. "
"Unselect this instead of deleting accounts."
),
)
date_joined = models.DateTimeField(_("date joined"), default=timezone.now)
objects = CustomUserManager()
EMAIL_FIELD = "email"
USERNAME_FIELD = "email"
# REQUIRED_FIELDS = ["email"]
class Meta:
verbose_name = _("user")
verbose_name_plural = _("users")
def clean(self):
super().clean()
self.email = self.__class__.objects.normalize_email(self.email)
[docs]
def get_full_name(self):
"""
Return the first_name plus the last_name, with a space in between.
"""
full_name = "{} {}".format(self.first_name, self.last_name)
return full_name.strip()
[docs]
def get_short_name(self):
"""Return the short name for the user."""
return self.first_name
# TODO: cache until an item is added/removed.
[docs]
def get_items(self):
return Item.objects.filter(
pk__in=UserItem.objects.by_user(self).values_list("item", flat=True)
)
[docs]
def get_owned_items(self):
"""
Returns currently held (open) items by the user.
:return: List of owned Items.
:rtype: list
"""
from ..transactions.models import Transaction
return [
t.item
for t in Transaction.objects.by_user(self)
.prefetch_related("item")
.filter(is_closed=False)
.distinct("item")
]
[docs]
def get_traded_currencies(self, item_type=None):
"""
Returns a list of currencies where at least one item with that
currency has been traded in past.
The list of traded currencies can be narrowed down by ``item_type``
param with ``share``, ``etf`` or ``coin`` choices.
:param str item_type: Item type as a string to specify transaction item type.
:return: Traded currencies.
:rtype: list
"""
q = self.transaction_set.values("currency")
match item_type:
case "share":
q = q.filter(item__share__isnull=False)
case "etf":
q = q.filter(item__etf__isnull=False)
case "coin":
q = q.filter(item__coin__isnull=False)
return list(q.distinct().values_list("currency", flat=True))
# TODO:
# def email_user(self, subject, message, from_email=None, **kwargs):
# """Send an email to this user."""
# send_mail(subject, message, from_email, [self.email], **kwargs)
########################
# Pandas query set #
########################
[docs]
class PandasQuerySet(models.QuerySet):
def to_pandas(self, *args):
return pd.DataFrame(list(self.values(*args)))
[docs]
class PandasManager(models.Manager):
def get_queryset(self):
return PandasQuerySet(self.model)
get_query_set = get_queryset
######################
# User query set #
######################
[docs]
class NavigationTreeItem(TreeItemBase):
icon = models.CharField(max_length=50, blank=True, null=True)
[docs]
class ItemQuerySet(PandasQuerySet):
def by_user(self, user):
return self.filter(useritem__user=user)
[docs]
class ItemManager(models.Manager):
def get_queryset(self):
return ItemQuerySet(self.model)
def by_user(self, user):
return self.get_queryset().by_user(user)
##############
# Models #
##############
[docs]
class BaseItem(models.Model):
class Type(models.TextChoices):
SHARE = "share"
ETF = "etf"
INDEX = "index"
COIN = "coin"
symbol = models.CharField(max_length=50, unique=True)
type = models.CharField(max_length=50, choices=Type)
is_discontinued = models.BooleanField(default=False, db_index=True)
objects = ItemManager()
class Meta:
abstract = True
@staticmethod
def by_symbol(symbol):
"""
Tries to fetch specific item by the given symbol.
"""
from ..coins.models import Coin
from ..etfs.models import Etf
from ..indexes.models import Index
from ..shares.models import Share
# LOGGER.debug(f"Looking for symbol {symbol}")
try:
return Coin.objects.get(symbol=symbol)
except Coin.DoesNotExist:
try:
return Share.objects.get(symbol=symbol)
except Exception:
try:
return Etf.objects.get(symbol=symbol)
except Exception:
return Index.objects.get(symbol=symbol)
def __str__(self):
return self.symbol
def save(self, **kwargs):
self.type = self.item_type
return super().save(**kwargs)
def get_symbol_slug(self):
"""
Slugifies item symbol.
:return: Slugified symbol.
:rtype: str
"""
return slugify(self.symbol)
def get_last_time_todays_price(self):
"""
Returns the date when the item price was
as high/low as today (last known) price.
:return: Date of the price.
:rtype: datetime.date
"""
try:
last = self.price_set.last()
return (
self.price_set.exclude(pk=last.pk)
.filter(price__lte=last.price)
.order_by("-date")[0]
.date
)
except Exception:
return
def get_sma(self, period, no_cache=False):
"""
Calculates Simple Moving Averages (SMA) for the given
period and caches the output.
If the cached data already exists returns data from the cache.
:param int period: Number of days for SMA calculation.
:param bool no_cache: If true cache is not used (value is forced to be calculated).
:return: A list with timestamp and sma value pairs.
:rtype: list
"""
cache_key = self.CACHE_SMA.format(self.pk, period)
cached = cache.get(cache_key)
if cached is not None and not no_cache:
return cached
data = []
years = 5 # Calculate SMA for past X years.
df = (
self.price_set.filter(
datetime__date__gte=date.today() - timedelta(days=365 * years)
)
.order_by("datetime")
.to_pandas()
)
col_name = "ma{}".format(period)
df[col_name] = df["price"].rolling(window=period, min_periods=0).mean()
if self.is_coin:
roundfn = coinautofloatformat
else:
roundfn = autofloatformat
for _idx, r in df.iterrows():
data.append(
[
date_to_highcharts_timestamp(r.datetime.to_pydatetime()),
roundfn(r[col_name], no_str=True),
]
)
cache.set(cache_key, data, settings.SMA_TIMEOUT) # Cache for 1 day.
return data
def get_ath(self, no_cache=False):
"""
Returns all-time-high Price record.
Also caches the value.
:param bool no_cache: If true cache is not used (value is forced to be calculated).
:return: Price model instance of all-time-high.
:rtype: Price or None
"""
cache_key = self.CACHE_ATH.format(self.pk)
cached = cache.get(cache_key)
if cached is not None and not no_cache:
return cached
try:
ath = self.price_set.order_by("-price")[0]
cache.set(cache_key, ath, settings.ATH_TIMEOUT) # Cache for 1 day
return ath
except Exception:
return None
def get_drawdown(self, no_cache=False):
"""
Returns drawdown in percents.
Also caches the value.
:param bool no_cache: If true cache is not used (value is forced to be calculated).
:return: Current drawdown in percents.
:rtype: float
"""
cache_key = self.CACHE_DRAWDOWN.format(self.pk)
cached = cache.get(cache_key)
if cached is not None and not no_cache:
return cached
try:
ath = self.get_ath()
drawdown_value = self.get_last_price().price - ath.price
# No drawdown at this time (new ATH).
if 0 < drawdown_value:
return 0
drawdown = drawdown_value / ath.price * 100
cache.set(cache_key, drawdown, settings.DRAWDOWN_TIMEOUT) # Cache for 1 day
return drawdown
except Exception:
return None
def get_52_weeks_low_high(self, no_cache=False):
"""
Fetches a minimum and maximum known item price for
past 52 weeks from the database.
Also caches the value for ``settings.LOW_HIGH_52_WEEKS_TIMEOUT``
:return: Min and max price pair.
:rtype: tuple
"""
cache_key = self.CACHE_52_WEEKS_LOW_HIGH.format(self.pk)
cached = cache.get(cache_key)
if cached is not None and not no_cache:
return cached
try:
year_ago = date.today() - timedelta(days=365)
result = self.price_set.filter(datetime__date__gte=year_ago).aggregate(
min_price=models.Min("price"), max_price=models.Max("price")
)
data = (result["min_price"], result["max_price"])
cache.set(cache_key, data, settings.LOW_HIGH_52_WEEKS_TIMEOUT)
return data
except Exception:
return None, None
def is_share(self):
return hasattr(self, "share")
def is_index(self):
return hasattr(self, "index")
def is_etf(self):
return hasattr(self, "etf")
def is_coin(self):
return hasattr(self, "coin")
def get_drawdowns(self):
"""
Fetches drawdowns (complete history) as dataframe.
Returned dataframe looks like this::
id item_id price peak_value drawdown_value drawdown
datetime
2010-03-11 00:00:00+00:00 662564 172 0.609804 0.609804 0.000000 0.000000
2010-03-12 00:00:00+00:00 662565 172 0.594724 0.609804 -0.015079 -0.024728
2010-03-15 00:00:00+00:00 662566 172 0.575951 0.609804 -0.033853 -0.055514
2010-03-16 00:00:00+00:00 662567 172 0.622729 0.622729 0.000000 0.000000
2010-03-17 00:00:00+00:00 662568 172 0.645194 0.645194 0.000000 0.000000
"""
def drawdown_value(row):
"""
Calculates drawdown (in value not in percents)
:param pd.Series row: Dataframe row as series.
:return: Calculated drawdown or 0 in case of a new ATH.
:rtype: float
"""
if row["price"] > row["peak_value"]:
return 0.0
return row["price"] - row["peak_value"]
df = self.price_set.all().to_pandas()
df = df.set_index("datetime")
# Calculate.
df = df.sort_index()
df["peak_value"] = df["price"].cummax()
df["drawdown_value"] = df.apply(drawdown_value, axis=1)
df["drawdown"] = df["drawdown_value"] / df["peak_value"]
# df = df.sort_index()
return df
def get_basic_info(self):
try:
return self.itemdata.data["basic_info"]
except (ItemData.DoesNotExist, KeyError):
return {}
def get_aths(self):
"""
Returns pandas Series with ATH's where index is the date of
ATH and value is value of the ATH.
:return: Pandas series with ATHs.
:rtype: pd.Series
"""
df = self.get_drawdowns()
return df["peak_value"].drop_duplicates().sort_index()
# ItemData access methods.
def set_basic_info(self, data):
try:
record = self.itemdata
record.data["basic_info"] = data
except ItemData.DoesNotExist:
record = ItemData(item=self, data={"basic_info": data})
record.save()
class Item(BaseItem):
CACHE_DAY_PERC_CHANGE = "item-{}-days-{}-change-{}"
CACHE_SMA = "item-{}-sma-{}"
CACHE_ATH = "item-{}-ath"
CACHE_DRAWDOWN = "item-{}-drawdown"
CACHE_CURRENT_PRICE = "item-{}-current-price"
CACHE_52_WEEKS_LOW_HIGH = "item-{}-52-weeks-low-high"
# @staticmethod
# def get_class_by_type(type):
# if "share" == type:
# from ..shares.models import Share
#
# return Share
#
# elif "etf" == type:
# from ..etfs.models import Etf
#
# return Etf
#
# elif "index" == type:
# from ..indexes.models import Index
#
# return Index
#
# elif "coin" == type:
# from ..coins.models import Coin
#
# return Coin
[docs]
def get_last_days_change(
self, days, no_cache=False, compounded=False, percents=False
):
# TODO: projit pouziti tyhle metody a rovnou v tech mistech tahat Item z materializovanyho view,
# ale zachovat cache.
"""
Calculates change in price in percents between
last known price (today) and "today - days param" day.
In case of compounded change function works with open transactions.
Takes the calculated percentage change and every found transaction and
calculates it's real gain/loss and it's investment. Based on these 2
values for each transaction a weighted average is calculated.
:param int days: Number of days to go in past.
:param bool no_cache: If true cache is not used (value is forced to be calculated).
:param bool compounded: If true calculates compounded change based on deposit transactions (only for precents=True).
:param bool percents: If true the method calculates percentage change.
:return: Percentage diff since "past" (today - days param) to today.
:rtype: int or None
"""
# Parameters check.
if compounded and not percents:
raise AttributeError(
"Compounded can be calculated only in percents - pass percents=True."
)
# Check cache.
cache_key = self.CACHE_DAY_PERC_CHANGE.format(
self.pk, "percents" if percents else "value", days
)
if compounded:
cache_key += "-compounded"
cached = cache.get(cache_key)
if cached is not None and not no_cache:
return cached
# Calculate fresh value.
days_ago = date.today() - timedelta(days=days)
last_record = self.price_set.order_by("datetime").last()
result = None
if last_record:
# Aims for the exact (see param "days") day in past.
# If the price doesn't exist (weekend, holidays, ...)
# just get the first available before.
try:
before_last_record_price = self.price_set.exclude(
pk=last_record.pk
).get(datetime__date=days_ago)
except Price.DoesNotExist:
before_last_record_price = (
self.price_set.exclude(pk=last_record.pk)
.filter(datetime__date__lt=days_ago)
.order_by("-datetime")
.first()
)
if before_last_record_price:
# Calculate change between last available price and "days ago"
if percents:
result = calc_percentage_change(
last_record.price, before_last_record_price.price
)
else:
result = last_record.price - before_last_record_price.price
if compounded:
change = last_record.price - before_last_record_price.price
# Find all open deposit transactions made til "days ago".
deposits = self.transaction_set.filter(
is_deposit=True, is_closed=False
)
# There might be case where we own a item (crypto case) which wasn't
# bought with any deposit transaction - it was exchanged for any other
# item. In this case we are not able to track down price for each transaction
# because the tree might be complex with buys and sells and there is no way
# how to pair sells prices + amounts to buys prices and amounts (weighted average).
if not deposits.count():
cache.set(cache_key, 0, settings.PRICE_CHANGE_CACHE_TIMEOUT)
return 0
# Calculate percentage change for each transaction based on the
# change above (now and "days ago")
changes = []
investments = []
for dep in deposits:
changes.append(
calc_percentage_change(
(dep.price + change) * dep.amount,
dep.price * dep.amount,
)
)
investments.append(dep.price * dep.amount)
# Calculate weighted average of all changes.
result = np.average(changes, weights=investments)
# Cache the final value.
cache.set(cache_key, result, settings.PRICE_CHANGE_CACHE_TIMEOUT)
return result
def get_last_month_perc_change(self):
"""
Wrapper of self.get_last_days_change() used in templates.
"""
return self.get_last_days_change(30, percents=True)
def get_last_week_perc_change(self):
"""
Wrapper of self.get_last_days_change() used in templates.
"""
return self.get_last_days_change(7, percents=True)
def get_last_price(self, *, current=False):
# TODO: projit pouziti tyhle metody a rovnou v tech mistech tahat Item z materializovanyho view.
"""
Tries to return current market price (if current attribute is true)
with market state and move in value and percents. Falls back to last
known historical price + move in value and percents. Everything
is returned as CurrentPrice dataclass.
In case of no cached data and historical price None is returned.
:param bool current: Look for current market (live open market) value.
:return: Dataclass with current market data or last known regular data.
:rtype: CurrentPrice or None
"""
cache_key = self.CACHE_CURRENT_PRICE.format(self.pk)
cached = cache.get(cache_key)
# If current price exists it sits in cache.
if current and cached is not None:
return CurrentPrice(**json.loads(cached))
# No cache.
try:
prices = ItemWithPrices.objects.get(pk=self.pk)
except ItemWithPrices.DoesNotExist:
return
return CurrentPrice(
prices.last_price,
prices.last_price - prices.price_1_day_ago,
prices.price_1_day_ago_change,
)
def set_current_price_and_change(self, price):
"""
Caches the given price object as "item-X-current-price".
:param CurrentPrice price: Price object to be cached.
"""
cache.set(
self.CACHE_CURRENT_PRICE.format(self.pk),
json.dumps(price.__dict__),
settings.ITEM_CURRENT_PRICE_TIMEOUT,
)
def get_last_day_perc_change(self):
"""
Wrapper of self.get_last_days_change() used in templates.
"""
return self.get_last_days_change(1, percents=True)
def get_last_day_compounded_perc_change(self):
"""
Wrapper of self.get_last_days_change() used in templates.
"""
return self.get_last_days_change(1, compounded=True, percents=True)
def update_cache(self):
"""
Updates cached values cached by:
- self.get_last_days_change(...)
- self.get_sma(...)
- self.get_ath()
- self.get_drawdown()
- self.get_52_weeks_low_high()
- ItemWithPrices.refresh()
"""
def create_perc_cache(days, compounded=False):
for d in days:
self.get_last_days_change(
d, no_cache=True, compounded=compounded, percents=True
)
LOGGER.debug(
"{} days percent price change cache regenerated.".format(d)
)
def create_sma_cache(smas):
for s in smas:
self.get_sma(s, no_cache=True)
LOGGER.debug(
"{} days percent price change cache regenerated.".format(s)
)
# No data -> no need to refresh the cache.
if not self.price_set.exists():
return
ytd = datetime.now().timetuple().tm_yday
days = (1, 5, 7, 30, 90, 180, ytd - 1, ytd + 1, 365)
smas = (20, 50, 200)
create_perc_cache(days)
create_perc_cache([1], compounded=True)
create_sma_cache(smas)
self.get_last_day_compounded_perc_change()
self.get_ath(no_cache=True)
self.get_drawdown(no_cache=True)
self.get_52_weeks_low_high(no_cache=True)
ItemWithPrices.refresh()
from ..transactions.transactions import Transactions
Transactions.update_cache()
from ..core.tasks import generate_performance_charts
generate_performance_charts(self.pk)
[docs]
class ItemWithPricesQuerySet(models.QuerySet):
def with_identifier(self):
return self.annotate(
identifier=Concat(models.F("type"), models.Value("_"), models.F("symbol"))
)
ItemWithPricesManager = models.Manager.from_queryset(ItemWithPricesQuerySet)
[docs]
class ItemWithPrices(BaseItem):
last_price_id = models.BigIntegerField()
last_price = models.FloatField()
price_1_day_ago = models.FloatField()
price_5_days_ago = models.FloatField()
price_7_days_ago = models.FloatField()
price_30_days_ago = models.FloatField()
price_90_days_ago = models.FloatField()
price_180_days_ago = models.FloatField()
price_365_days_ago = models.FloatField()
price_ytd = models.FloatField()
price_1_day_ago_change = models.FloatField()
price_5_days_ago_change = models.FloatField()
price_7_days_ago_change = models.FloatField()
price_30_days_ago_change = models.FloatField()
price_90_days_ago_change = models.FloatField()
price_180_days_ago_change = models.FloatField()
price_365_days_ago_change = models.FloatField()
price_ytd_days_ago_change = models.FloatField()
objects = ItemWithPricesManager()
class Meta:
managed = False
[docs]
@staticmethod
def refresh():
"""
Refreshes the materialized view (recalculates values)
"""
with connections["default"].cursor() as c:
c.execute("REFRESH MATERIALIZED VIEW core_itemwithprices")
[docs]
@staticmethod
def fetch_into_user_items(items):
"""
Fetches (in a bulk/at once) ItemWithPrices model instance for all the given items
and attaches them under "item_with_prices" property.
:param list items: List of UserItem instances.
"""
with_prices = {}
# Fetch all ItemWithPrices models at once.
for i in ItemWithPrices.objects.with_identifier().filter(
identifier__in=[f"{i.item.type}_{i.item.symbol}" for i in items]
):
with_prices[i.identifier] = i
# Assign them to the given instances as "item_with_prices".
for i in items:
i.item_with_prices = with_prices[f"{i.item.type}_{i.item.symbol}"]
[docs]
class UserItem(UserRelatedModel):
item = models.ForeignKey(Item, on_delete=models.CASCADE)
indexes = models.ManyToManyField("indexes.Index", blank=True, related_name="+")
show_in_overview = models.BooleanField(default=True)
show_in_news = models.BooleanField(default=True)
show_on_dashboard = models.BooleanField(default=True)
note = models.TextField(blank=True, null=True)
is_archived = models.BooleanField(default=False)
class Meta:
unique_together = ["user", "item"]
# @cached_property
def item_with_prices(self):
return ItemWithPrices.objects.get(pk=self.item.pk)
def get_tags(self):
"""
Finds all tags in item note.
:return: List of found tags.
:rtype: list
"""
if self.note:
return re.findall(r"#(\w+)", self.note)
return []
def get_note_without_tags(self):
"""
Strips all tags from item note.
:return: Stripped note.
:rtype: str
"""
if self.note:
return re.sub(r"(#\w+)", "", self.note)
def get_owned_items_sum(self, user):
"""
Returns dict with keys "value" and "amount" of current
shares price and amount.
Answers question how many items (shares, coins, ...) you currently
have and what is it's value today.
:return: Dict with "amount" and "value" keys.
"""
from ..transactions.models import Transaction
q = (
Transaction.objects.by_user(user)
.filter(item=self.item)
.aggregate(sum=models.Sum("amount"))
)
if q["sum"] and 0 < q["sum"]:
return {
"amount": q["sum"],
"value": self.item.price_set.last().price * q["sum"],
}
def get_owned_items(self, user):
"""
Returns dict with keys "amount" and "value" of all open transactions.
Answers question how many items you currently own and
how much it did cost you at the time of buying.
:return: Dict with "amount" and "value" keys.
"""
from ..transactions.models import Transaction
value = 0
amount = 0
transactions = Transaction.objects.by_user(user).filter(
item=self.item, is_closed=False
)
if len(transactions):
for t in transactions:
value += t.get_value()
amount += t.amount
return {"value": value, "amount": amount}
def indexes_as_symbols(self):
return [i.symbol for i in self.indexes.all()]
class Asset(models.Model):
EARNINGS_DATA = 0
REVENUES_DATA = 1
# TODO: substitute
# TRENDS_CHART = 2
PERFORMANCE_CHART = 3
# TODO: substitute
# ANALYSTS_PRICE_CHART = 4
EPS_DATA = 5
RATINGS_DATA = 6
TRANSACTION_GRAPH = 7
PRICE_RATINGS = 8
TYPE_CHOICES = (
(EARNINGS_DATA, _("Earnings data")),
(REVENUES_DATA, _("Revenue data")),
(PERFORMANCE_CHART, _("Performance chart")),
(EPS_DATA, _("EPS data")),
(RATINGS_DATA, _("Ratings data")),
(TRANSACTION_GRAPH, _("Transaction graph")),
(PRICE_RATINGS, _("Price ratings")),
)
item = models.ForeignKey(Item, on_delete=models.CASCADE, blank=True, null=True)
transaction = models.ForeignKey(
"transactions.Transaction", on_delete=models.CASCADE, blank=True, null=True
)
type = models.PositiveSmallIntegerField(choices=TYPE_CHOICES)
image = models.ImageField(
upload_to="assets", storage=OverwriteStorage(), blank=True, null=True
)
data = models.JSONField(blank=True, null=True, encoder=RichyJSONEncoder)
updated = ModificationDateTimeField()
[docs]
@classmethod
def upload(cls, buffer, type, extension="png", item=None, transaction=None):
"""
Creates/updates item asset and uploads file (param buffer)
to the storage.
:param buffer: File-like object.
:param int type: Item asset type - see Asset.TYPE_CHOICES.
:param extension: Extensions string i.e. "png".
:param Item item: Item model instance.
"""
try:
asset = cls.objects.get(item=item, transaction=transaction, type=type)
except cls.DoesNotExist:
asset = cls(item=item, transaction=transaction, type=type)
type_name = asset.get_type_display().lower().replace(" ", "-")
if item:
asset.image.save(
"{}_{}.{}".format(asset.item.symbol, type_name, extension), File(buffer)
)
LOGGER.debug(
"{} for {} has been generated.".format(
asset.get_type_display(), item.symbol
)
)
elif transaction:
asset.image.save(
"{}_{}.{}".format(transaction.pk, type_name, extension), File(buffer)
)
LOGGER.debug(
"{} for transaction {} has been generated.".format(
asset.get_type_display(), transaction.pk
)
)
else:
asset.image.save("{}.{}".format(type_name, extension), File(buffer))
LOGGER.debug("{} has been generated.".format(asset.get_type_display()))
asset.save()
[docs]
class Price(models.Model):
item = models.ForeignKey(Item, blank=True, null=True, on_delete=models.CASCADE)
datetime = models.DateTimeField(db_index=True)
price = models.FloatField()
objects = PandasManager()
class Meta:
constraints = [
models.UniqueConstraint(
fields=["item", "datetime"], name="unique_item_datetime"
)
]
def get_timestamp(self):
return date_to_highcharts_timestamp(self.datetime)
class ItemData(models.Model):
item = models.OneToOneField(Item, on_delete=models.CASCADE)
data = models.JSONField(encoder=DjangoJSONEncoder)
def __str__(self):
return f"{self.item} data"
[docs]
class BaseDividend(models.Model):
ex_date = models.DateField()
payment_date = models.DateField(null=True)
record_date = models.DateField(null=True)
yield_value = models.FloatField()
amount_value = models.FloatField()
class Meta:
abstract = True
def is_in_future(self):
return self.payment_date > date.today()