import math from enum import Enum, auto from typing import override import numpy as np from internal_types.types import Instrument, Position, Quote from strategy.strategy import Strategy from utils.utils import BlendedCandlesticks, Portfolio, max_abs class State(Enum): DAY_0_19 = auto() DAY_20 = auto() LONG = auto() SHORT = auto() SHADOW_LONG = auto() SHADOW_SHORT = auto() class TurtleSystem1(Strategy): def __init__(self, init_balance: float, instr: Instrument): self.state = State.DAY_0_19 self.init_balance = init_balance self.N: float = 0 self.tradable_acct_balance = init_balance self.unit_size: int = 0 self.instr = instr self.unit_limit = 4 self.trade_next_20_day_breakout = True self.daily_ohlcv = BlendedCandlesticks(24 * 60 * 60) self.last_fill_price: float | None = None self.portfolio = Portfolio() self.shadow_portfolio = Portfolio() self.desired_pos: int = 0 def __true_range(self, day_i) -> float: prev_q = self.daily_ohlcv.blended_quote(day_i - 1) curr_q = self.daily_ohlcv.blended_quote(day_i) return max_abs(curr_q.high - curr_q.low, curr_q.high - prev_q.close, prev_q.close - curr_q.low) def __init_day_20_N(self): days = min(len(self.daily_ohlcv), 20) q_0 = self.daily_ohlcv.blended_quote(-days) self.N = \ float(np.mean([q_0.high - q_0.low, *(self.__true_range(i) for i in range(-days + 1, 0))])) def __adjust_unit_size(self): self.unit_size = \ math.floor(self.tradable_acct_balance * 0.01 / (self.N * self.instr.multiplier)) def __reach_unit_limit(self): return abs(self.portfolio.curr_position(self.instr)) >= self.unit_limit * self.unit_size def __shadow_reach_unit_limit(self): return abs(self.shadow_portfolio.curr_position(self.instr)) >= self.unit_limit * self.unit_size def __add_shadow_pos(self, quantity: int, price: float): self.last_fill_price = price self.shadow_portfolio.add_pos(Position(self.instr, quantity, price)) def __rolling_min(self, days: int) -> float: return min(self.daily_ohlcv.lows[-days:]) def __rolling_max(self, days: int) -> float: return max(self.daily_ohlcv.highs[-days:]) def __crossover_10_day_high(self, quote: Quote) -> bool: return len(self.daily_ohlcv) >= 10 and quote.high > self.__rolling_max(10) def __crossunder_10_day_low(self, quote: Quote) -> bool: return len(self.daily_ohlcv) >= 10 and quote.low < self.__rolling_min(10) def __crossover_20_day_high(self, quote: Quote) -> bool: return len(self.daily_ohlcv) >= 20 and quote.high > self.__rolling_max(20) def __crossunder_20_day_low(self, quote: Quote) -> bool: return len(self.daily_ohlcv) >= 20 and quote.low < self.__rolling_min(20) def __crossover_55_day_high(self, quote: Quote) -> bool: return len(self.daily_ohlcv) >= 55 and quote.high > self.__rolling_max(55) def __crossunder_55_day_low(self, quote: Quote) -> bool: return len(self.daily_ohlcv) >= 55 and quote.low < self.__rolling_min(55) def __crossover_1_2_N(self, quote: Quote) -> bool: # this assumed that the 1st order was filled # todo: do a more robust check and return False if the 1st order after breakout wasn't filled return self.last_fill_price is not None and quote.high > self.last_fill_price + self.N / 2 def __crossunder_1_2_N(self, quote: Quote) -> bool: # this assumed that the 1st order was filled # todo: do a more robust check and return False if the 1st order after breakout wasn't filled return self.last_fill_price is not None and quote.low < self.last_fill_price - self.N / 2 def __crossover_2_N(self, quote: Quote) -> bool: # this assumed that the 1st order was filled # todo: do a more robust check and return False if the 1st order after breakout wasn't filled return self.last_fill_price is not None and quote.high > self.last_fill_price + self.N * 2 def __crossunder_2_N(self, quote: Quote) -> bool: # this assumed that the 1st order was filled # todo: do a more robust check and return False if the 1st order after breakout wasn't filled return self.last_fill_price is not None and quote.low < self.last_fill_price - self.N * 2 def __to_DAY_20_state(self): self.desired_pos = 0 self.shadow_portfolio = Portfolio() self.state = State.DAY_20 def __to_LONG_state(self): self.desired_pos = self.unit_size self.state = State.LONG def __to_SHORT_state(self): self.desired_pos = -self.unit_size self.state = State.SHORT def __to_SHADOW_LONG_state(self, quote: Quote): self.shadow_portfolio = Portfolio() self.__add_shadow_pos(self.unit_size, quote.close) self.state = State.SHADOW_LONG def __to_SHADOW_SHORT_state(self, quote: Quote): self.shadow_portfolio = Portfolio() self.__add_shadow_pos(-self.unit_size, quote.close) self.state = State.SHADOW_SHORT def __process_DAY_0_19_state(self, quote: Quote): if len(self.daily_ohlcv) >= 20: self.__init_day_20_N() self.tradable_acct_balance = self.net_liquid_value(quote.close) self.__adjust_unit_size() self.__to_DAY_20_state() def __process_DAY_20_state(self, quote: Quote): if self.__crossover_55_day_high(quote): self.__to_LONG_state() elif self.__crossunder_55_day_low(quote): self.__to_SHORT_state() elif self.__crossover_20_day_high(quote): if self.trade_next_20_day_breakout: self.__to_LONG_state() else: self.__to_SHADOW_LONG_state(quote) elif self.__crossunder_20_day_low(quote): if self.trade_next_20_day_breakout: self.__to_SHORT_state() else: self.__to_SHADOW_SHORT_state(quote) def __process_LONG_state(self, quote: Quote): if self.__crossunder_2_N(quote) or self.__crossunder_10_day_low(quote): self.trade_next_20_day_breakout = self.portfolio.unrealized_gains(self.instr, quote.close) < 0 self.__to_DAY_20_state() elif self.__crossover_1_2_N(quote) and not self.__reach_unit_limit(): self.desired_pos += self.unit_size def __process_SHORT_state(self, quote: Quote): if self.__crossover_2_N(quote) or self.__crossover_10_day_high(quote): self.trade_next_20_day_breakout = self.portfolio.unrealized_gains(self.instr, quote.close) < 0 self.__to_DAY_20_state() elif self.__crossunder_1_2_N(quote) and not self.__reach_unit_limit(): self.desired_pos -= self.unit_size def __process_SHADOW_LONG_state(self, quote: Quote): if self.__crossover_55_day_high(quote): self.__to_LONG_state() elif self.__crossunder_55_day_low(quote): self.__to_SHORT_state() elif self.__crossunder_2_N(quote) or self.__crossunder_10_day_low(quote): self.trade_next_20_day_breakout = \ self.shadow_portfolio.unrealized_gains(self.instr, quote.close) < 0 self.__to_DAY_20_state() elif self.__crossover_1_2_N(quote) and not self.__shadow_reach_unit_limit(): self.__add_shadow_pos(self.unit_size, quote.close) def __process_SHADOW_SHORT_state(self, quote: Quote): if self.__crossover_55_day_high(quote): self.__to_LONG_state() elif self.__crossunder_55_day_low(quote): self.__to_SHORT_state() elif self.__crossover_2_N(quote) or self.__crossover_10_day_high(quote): self.trade_next_20_day_breakout = \ self.shadow_portfolio.unrealized_gains(self.instr, quote.close) < 0 self.__to_DAY_20_state() elif self.__crossunder_1_2_N(quote) and not self.__shadow_reach_unit_limit(): self.__add_shadow_pos(-self.unit_size, quote.close) @override def curr_position(self) -> int: return self.portfolio.curr_position(self.instr) @override def desired_position(self) -> int: return self.desired_pos @override def order_filled(self, new_pos: Position): self.portfolio.add_pos(new_pos) self.last_fill_price = None if self.portfolio.empty(self.instr) else new_pos.price @override def process_quote(self, quote: Quote): prev_day_cnt = len(self.daily_ohlcv) self.daily_ohlcv.append(quote) curr_day_cnt = len(self.daily_ohlcv) if self.state == State.DAY_0_19: self.__process_DAY_0_19_state(quote) return if prev_day_cnt != curr_day_cnt: self.N = (19 * self.N + self.__true_range(-1)) / 20 net_liq = self.net_liquid_value(quote.close) while net_liq < self.tradable_acct_balance * 0.9: self.tradable_acct_balance *= 0.8 self.__adjust_unit_size() while net_liq > self.tradable_acct_balance / 0.8: self.tradable_acct_balance /= 0.8 self.__adjust_unit_size() match self.state: case State.DAY_20: self.__process_DAY_20_state(quote) case State.LONG: self.__process_LONG_state(quote) case State.SHORT: self.__process_SHORT_state(quote) case State.SHADOW_LONG: self.__process_SHADOW_LONG_state(quote) case State.SHADOW_SHORT: self.__process_SHADOW_SHORT_state(quote) case _: raise RuntimeError('invalid state') @override def net_liquid_value(self, closing_price: float) -> float: return self.init_balance + self.portfolio.total_gains(self.instr, closing_price)