import copy from collections import defaultdict, deque from datetime import datetime from typing import List import numpy as np from internal_types.types import OHLC, BidAsk, Instrument, Position, Quote TRADING_DAYS = 252 SEC_15_MINUTES = 15 * 60 SEC_30_MINUTES = 30 * 60 SEC_1_HOUR = 60 * 60 SEC_1_DAY = 24 * 60 * 60 def sign(x): # return x // abs(x) # e.g. sign(-5) = -1, sign(0) = 0, sign(5) = 1 return (x > 0) - (x < 0) def max_abs(*xs): return max(map(abs, xs)) def min_abs(*xs): return min(map(abs, xs)) def simple_sharpe_ratio(historical_net_liquid_value: List[float], interval_sec: int) -> float: if len(historical_net_liquid_value) < 2: return np.nan xs = np.asarray(historical_net_liquid_value, dtype=float) returns = xs[1:] / xs[:-1] - 1.0 mean_r = np.mean(returns) std_r = np.std(returns, ddof=1) if std_r == 0: return np.nan periods_per_year = TRADING_DAYS * SEC_1_DAY / interval_sec return (mean_r / std_r) * np.sqrt(periods_per_year) def log_sharpe_ratio(historical_net_liquid_value: List[float], interval_sec: int) -> float: if len(historical_net_liquid_value) < 2: return np.nan xs = np.asarray(historical_net_liquid_value, dtype=float) log_returns = np.log(xs[1:] / xs[:-1]) mean_r = log_returns.mean() std_r = log_returns.std(ddof=1) if std_r == 0: return np.nan periods_per_year = TRADING_DAYS * SEC_1_DAY / interval_sec return (mean_r / std_r) * np.sqrt(periods_per_year) # def consolidate_positions(ps: List[Position]) -> Position: # # assumes that ps is not empty and contains the same instruments # qty = 0 # outstanding_balance = 0 # for p in ps: # qty += p.quantity # outstanding_balance += p.price * p.quantity # return Position(ps[0].instr, qty, outstanding_balance / qty if qty else 0) def timestamp_to_str(timestamp: int) -> str: return datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S UTC') def interval_idx(timestamp: int, interval_sec: int) -> int: return timestamp // interval_sec def long(pos: Position) -> bool: return pos.quantity > 0 def unrealized_gains(pos: Position, at_price: float) -> float: return (at_price - pos.price) * pos.quantity * pos.instr.multiplier def crossover(quote: Quote, price: float) -> bool: # return true if quote crosses over price return (quote.bid if isinstance(quote, BidAsk) else quote.high) > price def crossunder(quote: Quote, price: float) -> bool: # return true if quote crosses under price return (quote.ask if isinstance(quote, BidAsk) else quote.low) < price class Portfolio: def __init__(self): self._realized_gains = defaultdict(float) self.outstanding_pos = defaultdict(deque[Position]) self.pos_history = defaultdict(list[Position]) def empty(self, instr: Instrument) -> bool: return len(self.outstanding_pos[instr]) == 0 def add_position(self, new_pos: Position): if new_pos.quantity == 0: return instr = new_pos.instr self.pos_history[instr].append(copy.deepcopy(new_pos)) if self.empty(instr) or long(self.outstanding_pos[instr][0]) == long(new_pos): self.outstanding_pos[instr].append(new_pos) return while not self.empty(instr) and new_pos.quantity: old_pos = self.outstanding_pos[instr].popleft() min_abs_qty = min_abs(old_pos.quantity, new_pos.quantity) tmp_pos = Position(instr, min_abs_qty * sign(old_pos.quantity), old_pos.price) self._realized_gains[instr] += unrealized_gains(tmp_pos, new_pos.price) old_pos = Position(instr, old_pos.quantity - min_abs_qty * sign(old_pos.quantity), old_pos.price) new_pos = Position(instr, new_pos.quantity - min_abs_qty * sign(new_pos.quantity), new_pos.price) if old_pos.quantity: self.outstanding_pos[instr].appendleft(old_pos) if new_pos.quantity: self.outstanding_pos[instr].append(new_pos) def liquidate_positions(self, instr: Instrument, price: float): self.add_position(Position(instr, -self.outstanding_shares(instr), price)) def outstanding_shares(self, instr: Instrument) -> int: # todo: handle fractional shares (crypto) return sum(pos.quantity for pos in self.outstanding_pos[instr]) def has_outstanding_shares(self, instr: Instrument) -> bool: return len(self.outstanding_pos[instr]) != 0 def consolidate_last_x_shares(self, instr: Instrument, x: int) -> Position: # todo: handle fractional shares (crypto) qty = 0 outstanding_balance = 0 for pos in self.pos_history[instr][::-1]: if qty == x: break to_add = pos.quantity if (qty < x < qty + to_add) or (qty + to_add < x < qty): to_add = x - qty qty += to_add outstanding_balance += pos.price * to_add # todo: should price be 0 or np.nan if qty == 0? # todo: take care of the case where qty != x return Position(instr, qty, outstanding_balance / qty if qty else 0) def realized_gains(self, instr: Instrument) -> float: return self._realized_gains[instr] def unrealized_gains(self, instr: Instrument, at_price: float) -> float: return sum(unrealized_gains(pos, at_price) for pos in self.outstanding_pos[instr]) def total_gains(self, instr: Instrument, at_price: float) -> float: return self._realized_gains[instr] + self.unrealized_gains(instr, at_price) class SMA: def __init__(self, interval_sec: int, window_sec: int): if interval_sec == 0: raise ValueError('interval_sec == 0') if window_sec < interval_sec: raise ValueError('window_sec < interval_sec') self.periods = window_sec // interval_sec self.cnt = 0 self.xs: List[float] = [0 for _ in range(self.periods)] self.rolling_sum = 0 self.sma = 0 def append(self, x: float): self.cnt += 1 idx = (self.cnt - 1) % self.periods if self.has_val(): self.rolling_sum -= self.xs[idx] self.xs[idx] = x self.rolling_sum += x self.sma = self.rolling_sum / self.periods def has_val(self): return self.cnt >= self.periods def val(self) -> float: if not self.has_val(): raise RuntimeError('cnt < periods') return self.sma class EMA: def __init__(self, interval_sec: int, window_sec: int): if interval_sec == 0: raise ValueError('interval_sec == 0') if window_sec < interval_sec: raise ValueError('window_sec < interval_sec') self.periods = window_sec // interval_sec self.alpha = 2 / (self.periods + 1) self.cnt = 0 self.tmp_sum = 0 self.ema = 0 def append(self, x: float): self.cnt += 1 if self.cnt <= self.periods: self.tmp_sum += x if self.cnt == self.periods: self.ema = self.tmp_sum / self.periods else: self.ema = self.alpha * x + (1 - self.alpha) * self.ema def has_val(self): return self.cnt >= self.periods def val(self) -> float: if not self.has_val(): raise RuntimeError('cnt < periods') return self.ema class BlendedOHLC: def __init__(self, instr: Instrument, interval_sec: int): self.instr = instr self.interval_sec = interval_sec self.timestamps: List[int] = [] self.opens: List[float] = [] self.highs: List[float] = [] self.lows: List[float] = [] self.closes: List[float] = [] self.volumes: List[int] = [] # todo: float for crypto self.incomplete_bar: OHLC | None = None def __len__(self): return len(self.timestamps) def __append(self, ohlc: OHLC): if self.incomplete_bar is not None: self.timestamps.append(self.incomplete_bar.timestamp) self.opens.append(self.incomplete_bar.open) self.highs.append(self.incomplete_bar.high) self.lows.append(self.incomplete_bar.low) self.closes.append(self.incomplete_bar.close) self.volumes.append(self.incomplete_bar.volume) self.incomplete_bar = ohlc def __blend(self, ohlc: OHLC): if self.incomplete_bar is None: self.incomplete_bar = ohlc else: self.incomplete_bar = OHLC(instr=self.instr, timestamp=self.incomplete_bar.timestamp, open=self.incomplete_bar.open, high=max(self.incomplete_bar.high, ohlc.high), low=min(self.incomplete_bar.low, ohlc.low), close=ohlc.close, volume=self.incomplete_bar.volume + ohlc.volume) def __to_blend(self, ohlc: OHLC): if not self.timestamps or self.incomplete_bar is None: return False last_interval_idx = interval_idx(self.incomplete_bar.timestamp, self.interval_sec) ohlc_interval_idx = interval_idx(ohlc.timestamp, self.interval_sec) return last_interval_idx == ohlc_interval_idx def rolling_min(self, period: int) -> float: # todo: check if index is out of bound return min(self.lows[-period:]) def rolling_max(self, period: int) -> float: # todo: check if index is out of bound return max(self.highs[-period:]) def crossunder_x_period_min(self, window: int, quote: Quote) -> bool: return self.__len__() >= window and crossunder(quote, self.rolling_min(window)) def crossover_x_period_max(self, window: int, quote: Quote) -> bool: return self.__len__() >= window and crossover(quote, self.rolling_max(window)) def append(self, ohlc: OHLC): if self.__to_blend(ohlc): self.__blend(ohlc) else: self.__append(ohlc) def blended_ohlc(self, idx: int) -> OHLC: # todo: throw runtime error if idx >= len(self) return OHLC(instr=self.instr, timestamp=self.timestamps[idx], open=self.opens[idx], high=self.highs[idx], low=self.lows[idx], close=self.closes[idx], volume=self.volumes[idx])