Source code for richy.core.scraper

"""
Module for scraping stuff from the internet.
Utilizes Karpet and Rug libraries.
"""

import dataclasses
import logging
from dataclasses import dataclass, field
from datetime import date
from typing import Dict

import pandas as pd
import requests
import yfinance as yf
from django.conf import settings
from karpet import Karpet
from requests.adapters import HTTPAdapter
from rug import AlphaQuery, BarChart, FinViz, StockAnalysis, TipRanks
from rug.exceptions import SymbolNotFound
from urllib3.util.retry import Retry

from .templatetags.utils import autofloatformat, coinautofloatformat

LOGGER = logging.getLogger("richy")


[docs] @dataclass(frozen=True) class CurrentPrice: """ Current market price state. TODO: pridat popis pro pouziti current_market """ price: float change_value: float change_percents: float state: str = "open" closed_market: Dict = field(default_factory=dict) def __bool__(self): """ Whole structure is True if the price evaluates as True. """ return bool(self.price)
[docs] def is_closed(self): return "closed" == self.state
[docs] def round(self, item): """ Returns a dict with all numbers rounded according to item type. :param Item item: Item model instance. :return: Dict with rounded numbers. :rtype: dict """ if not self: return None data = dataclasses.asdict(self) if item.is_coin(): data["price"] = coinautofloatformat(data["price"]) data["change_value"] = coinautofloatformat(data["change_value"]) data["change_percents"] = coinautofloatformat(data["change_percents"]) else: data["price"] = autofloatformat(data["price"]) data["change_value"] = autofloatformat(data["change_value"]) data["change_percents"] = autofloatformat(data["change_percents"]) if data["closed_market"]: data["closed_market"]["price"] = autofloatformat( data["closed_market"]["price"] ) data["closed_market"]["change_value"] = autofloatformat( data["closed_market"]["change_value"] ) data["closed_market"]["change_percents"] = autofloatformat( data["closed_market"]["change_percents"] ) return data
[docs] class Manager: """ Main manager class for scraping. All scraping methods are placed here. """ def __repr__(self): from pprint import pformat return "<" + type(self).__name__ + "> " + pformat(vars(self), indent=4)
[docs] @staticmethod def get_share_basic_info(share): """ Fetches share basic info via rug library. :param Share share: Share model instance. :return: Basic info as a dict. :rtype: dict """ api = StockAnalysis(share.symbol) try: return api.get_basic_info() except SymbolNotFound: try: info = yf.Ticker(share.symbol).get_info() return { "company_name": info["shortName"], "market": "", "description": info["longBusinessSummary"], "market_cap": info["marketCap"], "has_dividends": bool(info["dividendYield"]), "year_low": info["fiftyTwoWeekLow"], "year_high": info["fiftyTwoWeekHigh"], "pe_ratio": info["forwardPE"], "eps": info["forwardEps"], "similar_items": [], } except Exception: pass LOGGER.exception("Basic info wasn't downloaded", extra={"share": share}) except Exception: LOGGER.exception("Basic info wasn't downloaded", extra={"share": share}) return {}
[docs] @staticmethod def get_etf_basic_info(etf): """ Fetches etf basic info via rug library. :param Etf etf: Etf model instance. :return: Basic info as a dict. :rtype: dict """ api = BarChart(etf.symbol) try: return api.get_etf_basic_info() except SymbolNotFound: LOGGER.info(f"Holdings weren't downloaded - symbol {etf} wasn't found.") except Exception: LOGGER.exception("Holdings weren't downloaded", extra={"etf": etf}) return {}
[docs] @staticmethod def get_coin_basic_info(coin): """ Fetches coin basic info via karpet library. :param Coin coin: Coin model instance. :return: Basic info as a dict. :rtype: dict """ api = Karpet() if coin.coin_id: return api.get_basic_info(slug=coin.coin_id) return api.get_basic_info(symbol=coin.symbol)
[docs] @staticmethod def get_dividends(share_or_etf): """ Fetches share dividends via rug library. :param Share or Etf share_or_etf: Share or Etf model instance. :return: Dividends as a list. :rtype: list """ api = TipRanks(share_or_etf) try: return api.get_dividends() except SymbolNotFound: LOGGER.debug( f"Dividends weren't downloaded - symbol {share_or_etf} wasn't found." ) except Exception: LOGGER.exception( "Dividends weren't downloaded", extra={"share_or_etf": share_or_etf}, ) return []
[docs] @staticmethod def get_current_price_and_change(item): """ Fetches current market price, market staten and price change in value and percents. :param Item item: Item model instance to be fetched price for. :return: Dataclass with price, state and change values. :rtype: CurrentPrice """ def for_share_or_etf(symbol): def dig_price_data(price, state=None): """ Digs out and sanitize price data from the given price data structure. Digging is done by current market state or by the given one. That can be used in case current market state is "post-market", but data needed are "current-market". :param dict price: Price data provided by rug library. :param str state: The desired price data - defaults to actually current market state. :return: Price data wrapped in a dict :rtype: dict """ try: price_key = { "pre-market": "pre_market", "open": "current_market", "closed": "current_market", "post-market": "post_market", }[state or price["state"]] return { "price": float(price[price_key]["value"]), "state": price["state"], "change_value": price[price_key]["change"]["value"], "change_percents": price[price_key]["change"]["percents"], } except Exception: LOGGER.exception("Couldn't dig price data.") return {} price = TipRanks(symbol).get_current_price_change() LOGGER.debug(f"Current price data for {symbol} is {price}.") struct_data = dig_price_data(price) if struct_data["state"] in ("pre-market", "post-market") and ( closed_market := dig_price_data(price, "closed") ): struct_data["closed_market"] = { "price": closed_market["price"], "change_value": closed_market["change_value"], "change_percents": closed_market["change_percents"], } return CurrentPrice(**struct_data) def for_coin(coin): if coin.coin_id: kwargs = {"slug": coin.coin_id} else: kwargs = {"symbol": coin.symbol} data = Karpet().get_basic_info(**kwargs) LOGGER.debug(f"Current price data for {coin.symbol} is {data}.") return CurrentPrice( price=float(data["current_price"]), state="open", # crypto market is always open change_value=data["price_change_24"], change_percents=data["price_change_24_percents"], ) if item.is_coin(): return for_coin(item.coin) if item.is_share() or item.is_etf(): return for_share_or_etf(item.symbol)
# if item.is_index(): # return for_share_or_index_or_etf(f"^{item.symbol}")
[docs] @staticmethod def fetch_price_ratings(share): """ Fetches share price ratings data and directly updates them in the database for the given share. :param Share share: Share which financials will be downloaded for. """ from .models import Asset fv = FinViz(share.symbol) try: Asset.objects.update_or_create( item=share, type=Asset.PRICE_RATINGS, defaults={"data": fv.get_price_ratings()}, ) except SymbolNotFound: LOGGER.debug( f"Price ratings weren't downloaded - symbol {share} wasn't found." ) except Exception: LOGGER.exception( "Price ratings weren't downloadeded", extra={"share": share} )
[docs] @staticmethod def fetch_financials(share): """ Fetches all the share financials data and directly updates them in the database for the given share. :param Share share: Share which financials will be downloaded for. """ from .models import Asset query = AlphaQuery(share.symbol) # Revenues. try: Asset.objects.update_or_create( item=share, type=Asset.REVENUES_DATA, defaults={"data": query.get_revenues()}, ) LOGGER.debug(f"Revenues for {share} has been downloaded") except SymbolNotFound: LOGGER.debug(f"Revenues weren't downloaded - symbol {share} wasn't found.") except Exception: LOGGER.exception( "Revenues weren't downloaded", extra={"share": share}, ) # Earnings. try: Asset.objects.update_or_create( item=share, type=Asset.EARNINGS_DATA, defaults={"data": query.get_earnings()}, ) LOGGER.debug(f"Earnings for {share} has been downloaded") except SymbolNotFound: LOGGER.debug(f"Earnings weren't downloaded - symbol {share} wasn't found.") except Exception: LOGGER.exception( "Earnings weren't downloaded", extra={"share": share}, ) # EPS. try: Asset.objects.update_or_create( item=share, type=Asset.EPS_DATA, defaults={"data": query.get_eps()} ) LOGGER.debug(f"EPS for {share} has been downloaded") except SymbolNotFound: LOGGER.debug(f"EPS weren't downloaded - symbol {share} wasn't found.") except Exception: LOGGER.exception( "EPS wasn't downloaded", extra={"share": share}, )
[docs] @staticmethod def fetch_ratings(share): """ Fetches analyst ratings and saves it as Asset model record. :param Share share: Share which ratings will be downloaded for. """ from .models import Asset bar = BarChart(share.symbol) try: Asset.objects.update_or_create( item=share, type=Asset.RATINGS_DATA, defaults={"data": bar.get_ratings()}, ) except Exception: LOGGER.exception("Ratings weren't downloadeded", extra={"share": share})
[docs] @staticmethod def fetch_share_prices(share, history="max"): """ Downloads all prices for the share. Returns dataframe with following columns: - Date (index) - Open - High - Low - Close - Volume - Dividends - Stock Splits :param Share share: Share model instance we want prices for. :return: Pandas dataframe. :rtype: pandas.DataFrame """ try: ticker = yf.Ticker(share.symbol) df = ticker.history(period=history) LOGGER.debug(f"Share prices successfully downloaded for {share}.") except Exception: LOGGER.exception("Couldn't fetch share prices.") return pd.DataFrame() return df
[docs] @staticmethod def fetch_etf_prices(etf): """ Downloads all prices for the etf. Returns dataframe with following columns: - Date (index) - Open - High - Low - Close - Volume - Dividends - Stock Splits :param Etf etf: Etf model instance we want prices for. :return: Pandas dataframe. :rtype: pandas.DataFrame """ try: ticker = yf.Ticker(etf.symbol) df = ticker.history("max") LOGGER.debug(f"Etf prices successfully downloaded for {etf}.") except Exception: LOGGER.exception("Couldn't fetch etf prices.") return pd.DataFrame() return df
[docs] @staticmethod def fetch_index_prices(index): """ Downloads all prices for the index. Returns dataframe with following columns: - Date (index) - Open - High - Low - Close :param Share share: Share model instance we want prices for. :return: Pandas dataframe. :rtype: pandas.DataFrame """ try: ticker = yf.Ticker(f"^{index.symbol}") df = ticker.history("max") # Drop 0 value columns. df = df.drop(["Volume", "Dividends", "Stock Splits"], axis=1) LOGGER.debug(f"Index prices successfully downloaded for {index}.") except Exception: LOGGER.exception("Couldn't fetch index prices.") return pd.DataFrame() return df
[docs] @staticmethod def fetch_coin_prices(coin): """ Downloads all prices for the coin since settings.COIN_EPOCH. Returns dataframe with following columns: - date (index) - price - market_cap - total_volume :param Coin coin: Coin model instance we want prices for. :return: Pandas dataframe. :rtype: pandas.DataFrame """ LOGGER.debug(f"Downloading prices for {coin.symbol}.") # Try to download historical data. try: karpet = Karpet(settings.COIN_EPOCH, date.today()) df = karpet.fetch_crypto_historical_data(coin.symbol, coin.coin_id) LOGGER.debug(f"Prices successfully downloaded for {coin.symbol}.") except Exception: LOGGER.exception(f"Couldn't download historical data for {coin.symbol}.") return pd.DataFrame() # Sort the dataframe. df = df.sort_index() return df
[docs] @staticmethod def fetch_intraday_prices(item): """ Fetches market (intraday) data prices for shares, indexes and ETFs. For coins past 24 hours prices are fetched in 30 minutes interval. :param Item item: Item model instance we want prices for. :return: Pandas dataframe. :rtype: pandas.DataFrame """ def for_share_or_index_or_etf(item): """ Downloads all (including pre/post market) prices for the share in 5 menut intervals. Returns dataframe with following columns: - Date (index) - Open - High - Low - Close - Volume - Dividends - Stock Splits :param Share share: Share model instance we want prices for. :return: Pandas dataframe. :rtype: pandas.DataFrame """ open = None close = None try: ticker = yf.Ticker( f"^{item.symbol}" if item.is_index() else item.symbol ) df = ticker.history("1d", interval="5m") df_pp = ticker.history("1d", interval="5m", prepost=True) # Is market open yet? if len(df) and df.index[0] > df_pp.index[0]: open = df.index[0] # Is market closed yet? if len(df) and df.index[-1] < df_pp.index[-1]: close = df.index[-1] LOGGER.debug( f"Share intraday prices successfully downloaded for {item.symbol}." ) except Exception: LOGGER.exception("Couldn't fetch share/index/etf intraday prices.") return () return df_pp, open, close @staticmethod def for_coin(coin): """ Fetches market prices for past 24 hours with 30 minutes interval. Returns dataframe with following columns: - date time (index) - open - high - low - close :param Coin coin: Coin modil instance we want prices for. :return: Pandas dataframe. :rtype: pandas.DataFrame """ k = Karpet() df = None if coin.coin_id: df = k.fetch_crypto_live_data(slug=coin.coin_id) else: df = k.fetch_crypto_live_data(symbol=coin.symbol) # Data need timezone localization. df = df.tz_localize("UTC") return df.tz_convert(settings.TIME_ZONE) if item.is_coin(): return for_coin(item.coin), None, None if item.is_share() or item.is_index() or item.is_etf(): return for_share_or_index_or_etf(item)
[docs] @staticmethod def fetch_etf_holdings(etf): """ Fetches ETF holdings with rug library. :param Etf etf: Etf model instance we want prices for. :return: List of objects with ``name``, ``symbol`` (can be None), ``instrument``, ``weight`` (in %) keys. """ api = FinViz(etf.symbol) try: return api.get_etf_holdings() except SymbolNotFound: LOGGER.debug(f"Holdings weren't downloaded - symbol {etf} wasn't found.") except Exception: LOGGER.exception("Holdings weren't downloaded", extra={"etf": etf}) return []
[docs] class Downloader: """ Common application downloader based on requests library """ DEFAULT_TIMEOUT = 10 # seconds DEFAULT_USER_AGENT = ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/125.0.0.0 Safari/537.36" ) class TimeoutHTTPAdapter(HTTPAdapter): """HTTPAdapter that applies a default timeout if the caller doesn't set one.""" def __init__(self, *args, timeout: float | None = None, **kwargs): self.timeout = Downloader.DEFAULT_TIMEOUT if timeout is None else timeout super().__init__(*args, **kwargs) def send(self, request, **kwargs): if kwargs.get("timeout") is None: kwargs["timeout"] = self.timeout return super().send(request, **kwargs)
[docs] @staticmethod def get_client( retries: int = 5, backoff_factor: float = 1.0, status_forcelist: tuple = (429, 500, 502, 503, 504), timeout: float = DEFAULT_TIMEOUT, ) -> requests.Session: """ Builds a requests.Session with automatic retries, exponential backoff, and a default per-request timeout. Backoff sleep between attempts is: {backoff_factor} * (2 ** (retry_number - 1)) e.g. with backoff_factor=1.0 -> 0s, 1s, 2s, 4s, 8s, 16s, ... Retry also honors the Retry-After header on 429/503 responses. Note: uses urllib3's default allowed_methods, which retries idempotent methods only (HEAD, GET, PUT, DELETE, OPTIONS, TRACE). POST/PATCH are NOT retried by default. """ retry = Retry( total=retries, connect=retries, read=retries, status=retries, backoff_factor=backoff_factor, status_forcelist=status_forcelist, respect_retry_after_header=True, raise_on_status=False, ) adapter = Downloader.TimeoutHTTPAdapter(max_retries=retry, timeout=timeout) session = requests.Session() session.mount("http://", adapter) session.mount("https://", adapter) session.headers.update({"User-Agent": Downloader.DEFAULT_USER_AGENT}) return session