-
This commit is contained in:
316
utils/utils.py
Normal file
316
utils/utils.py
Normal file
@@ -0,0 +1,316 @@
|
||||
import copy
|
||||
from collections import defaultdict, deque
|
||||
from datetime import datetime
|
||||
from typing import List
|
||||
|
||||
import numpy as np
|
||||
|
||||
from internal_types.types import OHLC, BidAsk, Instrument, Position, Quote
|
||||
|
||||
TRADING_DAYS = 252
|
||||
|
||||
SEC_15_MINUTES = 15 * 60
|
||||
SEC_30_MINUTES = 30 * 60
|
||||
SEC_1_HOUR = 60 * 60
|
||||
SEC_1_DAY = 24 * 60 * 60
|
||||
|
||||
|
||||
def sign(x):
|
||||
# return x // abs(x)
|
||||
# e.g. sign(-5) = -1, sign(0) = 0, sign(5) = 1
|
||||
return (x > 0) - (x < 0)
|
||||
|
||||
|
||||
def max_abs(*xs):
|
||||
return max(map(abs, xs))
|
||||
|
||||
|
||||
def min_abs(*xs):
|
||||
return min(map(abs, xs))
|
||||
|
||||
|
||||
def simple_sharpe_ratio(historical_net_liquid_value: List[float], interval_sec: int) -> float:
|
||||
if len(historical_net_liquid_value) < 2:
|
||||
return np.nan
|
||||
|
||||
xs = np.asarray(historical_net_liquid_value, dtype=float)
|
||||
returns = xs[1:] / xs[:-1] - 1.0
|
||||
mean_r = np.mean(returns)
|
||||
std_r = np.std(returns, ddof=1)
|
||||
|
||||
if std_r == 0:
|
||||
return np.nan
|
||||
|
||||
periods_per_year = TRADING_DAYS * SEC_1_DAY / interval_sec
|
||||
|
||||
return (mean_r / std_r) * np.sqrt(periods_per_year)
|
||||
|
||||
|
||||
def log_sharpe_ratio(historical_net_liquid_value: List[float], interval_sec: int) -> float:
|
||||
if len(historical_net_liquid_value) < 2:
|
||||
return np.nan
|
||||
|
||||
xs = np.asarray(historical_net_liquid_value, dtype=float)
|
||||
log_returns = np.log(xs[1:] / xs[:-1])
|
||||
mean_r = log_returns.mean()
|
||||
std_r = log_returns.std(ddof=1)
|
||||
|
||||
if std_r == 0:
|
||||
return np.nan
|
||||
|
||||
periods_per_year = TRADING_DAYS * SEC_1_DAY / interval_sec
|
||||
|
||||
return (mean_r / std_r) * np.sqrt(periods_per_year)
|
||||
|
||||
|
||||
# def consolidate_positions(ps: List[Position]) -> Position:
|
||||
# # assumes that ps is not empty and contains the same instruments
|
||||
# qty = 0
|
||||
# outstanding_balance = 0
|
||||
# for p in ps:
|
||||
# qty += p.quantity
|
||||
# outstanding_balance += p.price * p.quantity
|
||||
# return Position(ps[0].instr, qty, outstanding_balance / qty if qty else 0)
|
||||
|
||||
|
||||
def timestamp_to_str(timestamp: int) -> str:
|
||||
return datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S UTC')
|
||||
|
||||
|
||||
def interval_idx(timestamp: int, interval_sec: int) -> int:
|
||||
return timestamp // interval_sec
|
||||
|
||||
|
||||
def long(pos: Position) -> bool:
|
||||
return pos.quantity > 0
|
||||
|
||||
|
||||
def unrealized_gains(pos: Position, at_price: float) -> float:
|
||||
return (at_price - pos.price) * pos.quantity * pos.instr.multiplier
|
||||
|
||||
|
||||
def crossover(quote: Quote, price: float) -> bool:
|
||||
# return true if quote crosses over price
|
||||
return (quote.bid if isinstance(quote, BidAsk) else quote.high) > price
|
||||
|
||||
|
||||
def crossunder(quote: Quote, price: float) -> bool:
|
||||
# return true if quote crosses under price
|
||||
return (quote.ask if isinstance(quote, BidAsk) else quote.low) < price
|
||||
|
||||
|
||||
class Portfolio:
|
||||
|
||||
def __init__(self):
|
||||
self._realized_gains = defaultdict(float)
|
||||
self.outstanding_pos = defaultdict(deque[Position])
|
||||
self.pos_history = defaultdict(list[Position])
|
||||
|
||||
def empty(self, instr: Instrument) -> bool:
|
||||
return len(self.outstanding_pos[instr]) == 0
|
||||
|
||||
def add_position(self, new_pos: Position):
|
||||
if new_pos.quantity == 0:
|
||||
return
|
||||
|
||||
instr = new_pos.instr
|
||||
|
||||
self.pos_history[instr].append(copy.deepcopy(new_pos))
|
||||
|
||||
if self.empty(instr) or long(self.outstanding_pos[instr][0]) == long(new_pos):
|
||||
self.outstanding_pos[instr].append(new_pos)
|
||||
return
|
||||
|
||||
while not self.empty(instr) and new_pos.quantity:
|
||||
old_pos = self.outstanding_pos[instr].popleft()
|
||||
min_abs_qty = min_abs(old_pos.quantity, new_pos.quantity)
|
||||
tmp_pos = Position(instr, min_abs_qty * sign(old_pos.quantity), old_pos.price)
|
||||
self._realized_gains[instr] += unrealized_gains(tmp_pos, new_pos.price)
|
||||
old_pos = Position(instr, old_pos.quantity - min_abs_qty * sign(old_pos.quantity),
|
||||
old_pos.price)
|
||||
new_pos = Position(instr, new_pos.quantity - min_abs_qty * sign(new_pos.quantity),
|
||||
new_pos.price)
|
||||
|
||||
if old_pos.quantity:
|
||||
self.outstanding_pos[instr].appendleft(old_pos)
|
||||
|
||||
if new_pos.quantity:
|
||||
self.outstanding_pos[instr].append(new_pos)
|
||||
|
||||
def liquidate_positions(self, instr: Instrument, price: float):
|
||||
self.add_position(Position(instr, -self.outstanding_shares(instr), price))
|
||||
|
||||
def outstanding_shares(self, instr: Instrument) -> int:
|
||||
# todo: handle fractional shares (crypto)
|
||||
return sum(pos.quantity for pos in self.outstanding_pos[instr])
|
||||
|
||||
def has_outstanding_shares(self, instr: Instrument) -> bool:
|
||||
return len(self.outstanding_pos[instr]) != 0
|
||||
|
||||
def consolidate_last_x_shares(self, instr: Instrument, x: int) -> Position:
|
||||
# todo: handle fractional shares (crypto)
|
||||
qty = 0
|
||||
outstanding_balance = 0
|
||||
|
||||
for pos in self.pos_history[instr][::-1]:
|
||||
if qty == x:
|
||||
break
|
||||
to_add = pos.quantity
|
||||
if (qty < x < qty + to_add) or (qty + to_add < x < qty):
|
||||
to_add = x - qty
|
||||
qty += to_add
|
||||
outstanding_balance += pos.price * to_add
|
||||
|
||||
# todo: should price be 0 or np.nan if qty == 0?
|
||||
# todo: take care of the case where qty != x
|
||||
|
||||
return Position(instr, qty, outstanding_balance / qty if qty else 0)
|
||||
|
||||
def realized_gains(self, instr: Instrument) -> float:
|
||||
return self._realized_gains[instr]
|
||||
|
||||
def unrealized_gains(self, instr: Instrument, at_price: float) -> float:
|
||||
return sum(unrealized_gains(pos, at_price) for pos in self.outstanding_pos[instr])
|
||||
|
||||
def total_gains(self, instr: Instrument, at_price: float) -> float:
|
||||
return self._realized_gains[instr] + self.unrealized_gains(instr, at_price)
|
||||
|
||||
|
||||
class SMA:
|
||||
|
||||
def __init__(self, interval_sec: int, window_sec: int):
|
||||
if interval_sec == 0:
|
||||
raise ValueError('interval_sec == 0')
|
||||
if window_sec < interval_sec:
|
||||
raise ValueError('window_sec < interval_sec')
|
||||
self.periods = window_sec // interval_sec
|
||||
self.cnt = 0
|
||||
self.xs: List[float] = [0 for _ in range(self.periods)]
|
||||
self.rolling_sum = 0
|
||||
self.sma = 0
|
||||
|
||||
def append(self, x: float):
|
||||
self.cnt += 1
|
||||
idx = (self.cnt - 1) % self.periods
|
||||
if self.has_val():
|
||||
self.rolling_sum -= self.xs[idx]
|
||||
self.xs[idx] = x
|
||||
self.rolling_sum += x
|
||||
self.sma = self.rolling_sum / self.periods
|
||||
|
||||
def has_val(self):
|
||||
return self.cnt >= self.periods
|
||||
|
||||
def val(self) -> float:
|
||||
if not self.has_val():
|
||||
raise RuntimeError('cnt < periods')
|
||||
return self.sma
|
||||
|
||||
|
||||
class EMA:
|
||||
|
||||
def __init__(self, interval_sec: int, window_sec: int):
|
||||
if interval_sec == 0:
|
||||
raise ValueError('interval_sec == 0')
|
||||
if window_sec < interval_sec:
|
||||
raise ValueError('window_sec < interval_sec')
|
||||
|
||||
self.periods = window_sec // interval_sec
|
||||
self.alpha = 2 / (self.periods + 1)
|
||||
self.cnt = 0
|
||||
self.tmp_sum = 0
|
||||
self.ema = 0
|
||||
|
||||
def append(self, x: float):
|
||||
self.cnt += 1
|
||||
|
||||
if self.cnt <= self.periods:
|
||||
self.tmp_sum += x
|
||||
if self.cnt == self.periods:
|
||||
self.ema = self.tmp_sum / self.periods
|
||||
else:
|
||||
self.ema = self.alpha * x + (1 - self.alpha) * self.ema
|
||||
|
||||
def has_val(self):
|
||||
return self.cnt >= self.periods
|
||||
|
||||
def val(self) -> float:
|
||||
if not self.has_val():
|
||||
raise RuntimeError('cnt < periods')
|
||||
return self.ema
|
||||
|
||||
|
||||
class BlendedOHLC:
|
||||
|
||||
def __init__(self, instr: Instrument, interval_sec: int):
|
||||
self.instr = instr
|
||||
self.interval_sec = interval_sec
|
||||
self.timestamps: List[int] = []
|
||||
self.opens: List[float] = []
|
||||
self.highs: List[float] = []
|
||||
self.lows: List[float] = []
|
||||
self.closes: List[float] = []
|
||||
self.volumes: List[int] = [] # todo: float for crypto
|
||||
self.incomplete_bar: OHLC | None = None
|
||||
|
||||
def __len__(self):
|
||||
return len(self.timestamps)
|
||||
|
||||
def __append(self, ohlc: OHLC):
|
||||
if self.incomplete_bar is not None:
|
||||
self.timestamps.append(self.incomplete_bar.timestamp)
|
||||
self.opens.append(self.incomplete_bar.open)
|
||||
self.highs.append(self.incomplete_bar.high)
|
||||
self.lows.append(self.incomplete_bar.low)
|
||||
self.closes.append(self.incomplete_bar.close)
|
||||
self.volumes.append(self.incomplete_bar.volume)
|
||||
self.incomplete_bar = ohlc
|
||||
|
||||
def __blend(self, ohlc: OHLC):
|
||||
if self.incomplete_bar is None:
|
||||
self.incomplete_bar = ohlc
|
||||
else:
|
||||
self.incomplete_bar = OHLC(instr=self.instr,
|
||||
timestamp=self.incomplete_bar.timestamp,
|
||||
open=self.incomplete_bar.open,
|
||||
high=max(self.incomplete_bar.high, ohlc.high),
|
||||
low=min(self.incomplete_bar.low, ohlc.low),
|
||||
close=ohlc.close,
|
||||
volume=self.incomplete_bar.volume + ohlc.volume)
|
||||
|
||||
def __to_blend(self, ohlc: OHLC):
|
||||
if not self.timestamps or self.incomplete_bar is None:
|
||||
return False
|
||||
last_interval_idx = interval_idx(self.incomplete_bar.timestamp, self.interval_sec)
|
||||
ohlc_interval_idx = interval_idx(ohlc.timestamp, self.interval_sec)
|
||||
return last_interval_idx == ohlc_interval_idx
|
||||
|
||||
def rolling_min(self, period: int) -> float:
|
||||
# todo: check if index is out of bound
|
||||
return min(self.lows[-period:])
|
||||
|
||||
def rolling_max(self, period: int) -> float:
|
||||
# todo: check if index is out of bound
|
||||
return max(self.highs[-period:])
|
||||
|
||||
def crossunder_x_period_min(self, window: int, quote: Quote) -> bool:
|
||||
return self.__len__() >= window and crossunder(quote, self.rolling_min(window))
|
||||
|
||||
def crossover_x_period_max(self, window: int, quote: Quote) -> bool:
|
||||
return self.__len__() >= window and crossover(quote, self.rolling_max(window))
|
||||
|
||||
def append(self, ohlc: OHLC):
|
||||
if self.__to_blend(ohlc):
|
||||
self.__blend(ohlc)
|
||||
else:
|
||||
self.__append(ohlc)
|
||||
|
||||
def blended_ohlc(self, idx: int) -> OHLC:
|
||||
# todo: throw runtime error if idx >= len(self)
|
||||
return OHLC(instr=self.instr,
|
||||
timestamp=self.timestamps[idx],
|
||||
open=self.opens[idx],
|
||||
high=self.highs[idx],
|
||||
low=self.lows[idx],
|
||||
close=self.closes[idx],
|
||||
volume=self.volumes[idx])
|
||||
Reference in New Issue
Block a user