diff --git a/pyalgotrade/__init__.py b/pyalgotrade/__init__.py index 7935e8ca4..7f7a16663 100644 --- a/pyalgotrade/__init__.py +++ b/pyalgotrade/__init__.py @@ -20,3 +20,27 @@ name = "PyAlgoTrade" __version__ = "0.20" + + +class Frequency(object): + + """Enum like class for bar frequencies. Valid values are: + + * **Frequency.TRADE**: The bar represents a single trade. + * **Frequency.SECOND**: The bar summarizes the trading activity during 1 second. + * **Frequency.MINUTE**: The bar summarizes the trading activity during 1 minute. + * **Frequency.HOUR**: The bar summarizes the trading activity during 1 hour. + * **Frequency.DAY**: The bar summarizes the trading activity during 1 day. + * **Frequency.WEEK**: The bar summarizes the trading activity during 1 week. + * **Frequency.MONTH**: The bar summarizes the trading activity during 1 month. + """ + + # It is important for frequency values to get bigger for bigger windows. + TRADE = -1 + QUOTE = -1 + SECOND = 1 + MINUTE = 60 + HOUR = 60*60 + DAY = 24*60*60 + WEEK = 24*60*60*7 + MONTH = 24*60*60*31 \ No newline at end of file diff --git a/pyalgotrade/alpaca/__init__.py b/pyalgotrade/alpaca/__init__.py new file mode 100644 index 000000000..346270fc8 --- /dev/null +++ b/pyalgotrade/alpaca/__init__.py @@ -0,0 +1,19 @@ +# PyAlgoTrade +# +# Copyright 2011-2018 Gabriel Martin Becedillas Ruiz +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +.. moduleauthor:: Gabriel Martin Becedillas Ruiz +""" diff --git a/pyalgotrade/alpaca/common.py b/pyalgotrade/alpaca/common.py new file mode 100644 index 000000000..fcef1cd99 --- /dev/null +++ b/pyalgotrade/alpaca/common.py @@ -0,0 +1,83 @@ +# PyAlgoTrade +# +# Copyright 2011-2018 Gabriel Martin Becedillas Ruiz +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +.. moduleauthor:: Robert Lee +""" +import os +from datetime import datetime + +import msgpack +import pandas as pd + +import alpaca_trade_api as tradeapi +from alpaca_trade_api.rest_async import AsyncRest +from alpaca_trade_api.stream import Stream + +import pyalgotrade.logger +from pyalgotrade import broker + + +logger = pyalgotrade.logger.getLogger("alpaca") + +def make_connection(connection_type, api_key_id = None, api_secret_key = None, live = False): + """Makes a connection to Alpaca. + + https://alpaca.markets/docs/api-documentation/api-v2/ + + Args: + connection_type: The connection to make to Alpaca. One of [rest, async_rest, stream]. + api_key_id (str, optional): If none, looks at the environment variable ALPACA_API_KEY_ID. + Defaults to None. + api_secret_key (str, optional): If none, looks at the environment variable ALPACA_API_SECRET_KEY. + Defaults to None. + """ + + # credentials + if live: + api_key_id = api_key_id or os.environ.get('ALPACA_API_KEY_ID') + api_secret_key = api_secret_key or os.environ.get('ALPACA_API_SECRET_KEY') + else: + api_key_id = api_key_id or os.environ.get('ALPACA_API_KEY_ID_PAPER') + api_secret_key = api_secret_key or os.environ.get('ALPACA_API_SECRET_KEY_PAPER') + + if api_key_id is None: + logger.error('Unable to retrieve API Key ID.') + if api_key_id is None: + logger.error('Unable to retrieve API Secret Key.') + + if connection_type == 'async_rest': + connection = AsyncRest(key_id=api_key_id, secret_key=api_secret_key) + elif connection_type == 'rest': + connection = tradeapi.REST(key_id = api_key_id, secret_key = api_secret_key) + elif connection_type == 'stream': + connection = Stream(data_feed = 'IEX', key_id=api_key_id, secret_key=api_secret_key, raw_data = True) + + return connection + +def json_serializer(obj): + if isinstance(obj, datetime): + return {'_isoformat': obj.isoformat()} + elif isinstance(obj, msgpack.ext.Timestamp): + return {'_unix_nano': obj.to_unix_nano()} + raise TypeError('...') + +def json_deserializer(obj): + if (_isoformat := obj.get('_isoformat')) is not None: + return datetime.fromisoformat(_isoformat) + elif (_unix_nano := obj.get('_unix_nano')) is not None: + return pd.to_datetime(_unix_nano) + return obj \ No newline at end of file diff --git a/pyalgotrade/alpaca/historicaldata.py b/pyalgotrade/alpaca/historicaldata.py new file mode 100644 index 000000000..21bf96f5a --- /dev/null +++ b/pyalgotrade/alpaca/historicaldata.py @@ -0,0 +1,193 @@ +# PyAlgoTrade +# +# Copyright 2011-2018 Gabriel Martin Becedillas Ruiz +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +.. moduleauthor:: Robert Lee +https://github.com/alpacahq/alpaca-trade-api-python/blob/master/examples/historic_async.py + +Example usage: + + In a script: + from pyalgotrade.alpaca.common import make_connection + from pyalgotrade.alpaca.historicaldata import get_historical_data + + async_rest = make_connection(connection_type = 'async_rest') + results = await get_historical_data(async_rest, ['AAPL', 'IBM'], '2021-01-01', '2021-01-10, 'QUOTES') + # results = [('AAPL', pandas.DataFrame()), ('IBM', pandas.DataFrame)] + + In command line: + $ python /home/jibi/Documents/repos/pyalgotrade/pyalgotrade/alpaca/historicaldata.py + --symbols AAPL IBM + --start-date 2021-01-01 + --end-date 2021-01-10 + --storage /home/jibi/Documents/mochi/sample_data/test.csv + + +""" + +import sys +import asyncio +import argparse + +import pandas as pd + +from alpaca_trade_api.rest_async import gather_with_concurrency + +from pyalgotrade.alpaca import common + + +NY = 'America/New_York' + +async def get_historical_data(async_rest, symbols, start_date, end_date, + data_type = 'BARS', timeframe = '1Day'): + """ + Retrieve historical data for multiple symbols using Alpaca's get_[datatype]_async + from the AsyncRest object. + + Args: + async_rest (Alpaca AsyncRest object): See alpaca_trade_api.rest_async.AsyncRest. + symbols (list): A list of symbols for which to get data. + start_date (str): Start date of time period of data request. + end_date (str): End date of time period of data request. + data_type (str, optional): One of 'BARS', 'TRADES', or 'QUOTES'. Defaults to 'BARS'. + timeframe (str): Frequency of data requested. Format as [amount][unit], + where [amount]is an integer, and [unit] is one of Min, Hour, or Day. Defaults to 1Day. + Ignored if data_type is not 'BARS'. + + Returns: + [(symbol, df),]: List of tuples of (symbol, pandas DataFrame) + """ + # Check Python version + major = sys.version_info.major + minor = sys.version_info.minor + if major < 3 or minor < 6: + raise Exception('asyncio is not support in your python version') + msg = f"Getting {data_type} data for {len(symbols)} symbols" + msg += f", timeframe: {timeframe}" if timeframe else "" + msg += f" between dates: start={start_date}, end={end_date}" + common.logger.info(msg) + + # define what data we're trying to get + if data_type.upper() == 'BARS': + get_data_method = async_rest.get_bars_async + elif data_type.upper() == 'TRADES': + get_data_method = async_rest.get_trades_async + elif data_type.upper() == 'QUOTES': + get_data_method = async_rest.get_quotes_async + else: + raise Exception(f"Unsupoported data type: {data_type}") + + # Time period of data request + start_date = pd.Timestamp(start_date, tz=NY).date().isoformat() + end_date = pd.Timestamp(end_date, tz=NY).date().isoformat() + + # ignore timeframe argument if data_type is not 'BARS' + if data_type.upper() != 'BARS': + timeframe = None + + # Create one task for each symbol + # execute up to 1000 tasks each loop + step_size = 1000 + results = [] + for i in range(0, len(symbols), step_size): + tasks = [] + for symbol in symbols[i:i+step_size]: + args = [symbol, start_date, end_date, timeframe] if timeframe else \ + [symbol, start_date, end_date] + tasks.append(get_data_method(*args)) + + if minor >= 8: + results.extend(await asyncio.gather(*tasks, return_exceptions=True)) + else: + results.extend(await gather_with_concurrency(500, *tasks)) + + # notify the user of any bad reuests + bad_requests = 0 + for response in results: + if isinstance(response, Exception): + common.logger.error(f"Got an error: {response}") + elif not len(response[1]): + bad_requests += 1 + + common.logger.info(f"Total of {len(results)} {data_type}, and {bad_requests} " + f"empty responses.") + + return results + +if __name__ == '__main__': + + # Get parameters + parser = argparse.ArgumentParser(description="Alpaca Rest Datafeed") + + # data request + parser.add_argument("--symbols", required = True, nargs = '+', + help = "One or more symbols for which to download data.") + parser.add_argument("--start-date", required=True, + type=str, help="Start date of data.") + parser.add_argument("--end-date", required=True, + type=str, help="End date of data.") + parser.add_argument("--datatype", required = False, default="bars", + choices = ['bars', 'trades', 'quotes'], + help="The type of data to request. One of bars, trades, or quotes.") + parser.add_argument("--timeframe", required = False, default="1Day", + help="The frequency of the bars, in format [n]Min, [n]Hour, or [n]Day.") + # credentials + parser.add_argument("--api-key-id", required=False, + help="Alpaca Key ID if it is not saved as an environment variable.") + parser.add_argument("--api-secret-key", required=False, + help="Alpaca secret key if it is not saved as an environment variable.") + # storage + parser.add_argument("--storage", required=True, + help="The path were the files will be downloaded to") + # parser.add_argument("--force-download", action='store_false', + # help="Force downloading even if the files exist") + # Set up variables + args = parser.parse_args() + + # make rest connection to API + async_rest = common.make_connection('async_rest', args.api_key_id, args.api_secret_key) + + # storage + # if not os.path.exists(args.storage): + # common.logger.info("Creating %s directory" % (args.storage)) + # os.mkdir(args.storage) + storage = args.storage + + # rest of data request + symbols = args.symbols + start_date = args.start_date + end_date = args.end_date + datatype = args.datatype.upper() + timeframe = args.timeframe + + # Request the data + loop = asyncio.get_event_loop() + results = loop.run_until_complete( + get_historical_data(async_rest, symbols, start_date, end_date, datatype, timeframe) + ) + # Stack the results into 1 dataframe + # Current it is in [(symbol0, df0), (symbol1, df1)] format + result = None + for symbol_i, df_i in results: + df_i['symbol'] = symbol_i + df_i = df_i.reset_index().set_index(['symbol', 'timestamp']) + if result is None: + result = df_i + else: + result = pd.concat([result, df_i], axis = 0, ignore_index = False) + + # save to csv + result.to_csv(storage) \ No newline at end of file diff --git a/pyalgotrade/alpaca/livebroker.py b/pyalgotrade/alpaca/livebroker.py new file mode 100644 index 000000000..5b4f8e169 --- /dev/null +++ b/pyalgotrade/alpaca/livebroker.py @@ -0,0 +1,803 @@ +# PyAlgoTrade +# +# Copyright 2011-2018 Gabriel Martin Becedillas Ruiz +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +.. moduleauthor:: Robert Lee +""" + +from os import kill +import threading +import time +import alpaca + +from six.moves import queue +from ws4py.websocket import EchoWebSocket + +import zmq + +from pyalgotrade import broker +from pyalgotrade.alpaca import httpclient +from pyalgotrade.alpaca import common +from alpaca.livefeed import EventQueuer + +from observer import Event + +class LiveBroker(broker.Broker): + """An Alpaca live broker. + + The live broker listens to a ZMQ SUB socket for trade updates, + and uses a rest connection to get account info and place trades. + + :param liveFeedAddress: Address to which the ZMQ SUB socket should be connected. + :type liveFeedAddress: string. + :param restConnection: An Alpaca rest connection from alpaca_trade_api. + :type restConnection: string. + """ + + QUEUE_TIMEOUT = 0.01 + + def __init__(self, liveFeedAddress, restConnection): + super(LiveBroker, self).__init__() + + self._restConnection = restConnection + + self.__tradeMonitor = EventQueuer(liveFeedAddress) + self.__stop = False + + def __getattr__(self, name): + """Transfer methods of the underlying api rest connection to the live broker. + """ + if hasattr(self._restConnection, name): + return getattr(self._restConnection, name) + else: + raise AttributeError + + @property + def account(self): + return self._restConnection.getAccount() + + @property + def cash(self): + return self.account['cash'] + + @property + def openPositions(self): + return self._restConnection.list_positions() + + @property + def openOrders(self): + orders = self._restConnection.list_orders(status = 'open') + orders = map(fromAlpacaOrder, orders) + return {order['client_order_id']: order for order in orders} + + def _startTradeMonitor(self): + self.__stop = True # Stop running in case of errors. + common.logger.info("Initializing trade monitor.") + self.__tradeMonitor.start() + self.__stop = False # No errors. Keep running. + + + # BEGIN observer.Subject interface + def start(self): + super(LiveBroker, self).start() + + def stop(self): + self.__stop = True + common.logger.info("Shutting down trade monitor.") + self.__tradeMonitor.stop() + + def join(self): + pass + + def eof(self): + return self.__stop + + def dispatch(self): + try: + update = self.__tradeMonitor.getQueue().get( + block = True, timeout = LiveBroker.QUEUE_TIMEOUT) + order = update['order'] + self.notifyOrderEvent(fromAlpacaOrder(order)) + + except queue.Empty: + pass + + def peekDateTime(self): + # Return None since this is a realtime subject. + return None + + # END observer.Subject interface + + # BEGIN broker.Broker interface + + def getCash(self, includeShort=True): + return self.cash + + def getInstrumentTraits(self, instrument): + return broker.IntegerTraits() + + def getShares(self, instrument): + return [pos for pos in self.openPositions if pos['symbol'] == instrument] + + def getPositions(self): + return self.openPositions + + def getActiveOrders(self, instrument=None): + if instrument is not None: + return [openOrder for openOrder in self.openOrders if openOrder.instrument == instrument] + else: + return self.openOrders + + def submitOrder(self, order): + self._restConnection.submit_order(**toAlpacaOrder(order)) + + def cancelOrder(self, order): + self._restConnection.cancel_order(order.orderId) + + # Notify that the order was canceled. + self.notifyOrderEvent(AlpacaOrder.OrderEvent(order, AlpacaOrder.OrderEvent.Type.CANCELED, "User requested cancellation")) + + # END broker.Broker interface + + def getClock(self): + return self._restConnection.get_clock() + + def getCalendar(self, start = None, end = None): + return self._restConnection.get_calendar(start = start, end = end) + + def getPortfolioHistory(self, + dateStart = None, dateEnd = None, period = None, + timeframe = None, extendedHours = None): + return self._restConnection.get_portfolio_history( + date_start = dateStart, + date_end = dateEnd, + period = period, + timeframe = timeframe, + extended_hours = extendedHours + ) + +# Types of orders +class AlpacaOrder(broker.Order): + """Base class for Alpaca orders. + Contains a few more fields than the broker.Order class. + """ + + class State: + # https://alpaca.markets/docs/trading-on-alpaca/orders/#order-lifecycle + + # Typical states + NEW = 1 + PARTIALLY_FILLED = 2 + FILLED = 3 + DONE_FOR_DAY = 4 + CANCELED = 5 + EXPIRED = 6 + REPLACED = 7 + PENDING_CANCEL = 8 + PENDING_REPLACE = 9 + + # Less common states + ACCEPTED = 101 + PENDING_NEW = 102 + ACCEPTED_FOR_BIDDING = 103 + STOPPED = 104 + REJECTED = 105 + SUSPENDED = 106 + CALCULATED = 107 + + @classmethod + def toString(cls, state): + if state == cls.NEW: + return 'new' + elif state == cls.PARTIALLY_FILLED: + return 'partially_filled' + elif state == cls.FILLED: + return 'filled' + elif state == cls.DONE_FOR_DAY: + return 'done_for_day' + elif state == cls.CANCELED: + return 'canceled' + elif state == cls.EXPIRED: + return 'expired' + elif state == cls.REPLACED: + return 'replaced' + elif state == cls.PENDING_CANCEL: + return 'pending_cancel' + elif state == cls.PENDING_REPLACE: + return 'pending_replace' + elif state == cls.ACCEPTED: + return 'accepted' + elif state == cls.PENDING_NEW: + return 'pending_new' + elif state == cls.ACCEPTED_FOR_BIDDING: + return 'accepted_for_bidding' + elif state == cls.STOPPED: + return 'stopped' + elif state == cls.REJECTED: + return 'rejected' + elif state == cls.SUSPENDED: + return 'suspended' + elif state == cls.CALCULATED: + return 'calculated' + else: + raise Exception("Invalid state") + + @classmethod + def fromString(cls, strState): + if strState == 'new': + return cls.NEW + elif strState == 'partially_filled': + return cls.PARTIALLY_FILLED + elif strState == 'filled': + return cls.FILLED + elif strState == 'done_for_day': + return cls.DONE_FOR_DAY + elif strState == 'canceled': + return cls.CANCELED + elif strState == 'expired': + return cls.EXPIRED + elif strState == 'replaced': + return cls.REPLACED + elif strState == 'pending_cancel': + return cls.PENDING_CANCEL + elif strState == 'pending_replace': + return cls.PENDING_REPLACE + elif strState == 'accepted': + return cls.ACCEPTED + elif strState == 'pending_new': + return cls.PENDING_NEW + elif strState == 'accepted_for_bidding': + return cls.ACCEPTED_FOR_BIDDING + elif strState == 'stopped': + return cls.STOPPED + elif strState == 'rejected': + return cls.REJECTED + elif strState == 'suspended': + return cls.SUSPENDED + elif strState == 'calculated': + return cls.CALCULATED + else: + raise Exception('Invalid order state') + + class Type(broker.Order.Type): + # MARKET = 1 + # LIMIT = 2 + # STOP = 3 + # STOP_LIMIT = 4 + TRAILING_STOP = 5 + + @classmethod + def toString(cls, type_): + if type_ == 'market': + return cls.MARKET + elif type_ == 'limit': + return cls.LIMIT + elif type_ == 'stop': + return cls.STOP + elif type_ == 'stop_limit': + return cls.STOP_LIMIT + elif type == 'trailing_stop': + return cls.TRAILING_STOP + else: + raise Exception('Inavlid order type') + + @classmethod + def fromString(cls, strType): + if strType == cls.MARKET: + return 'market' + elif strType == cls.LIMIT: + return 'limit' + elif strType == cls.STOP: + return 'stop' + elif strType == cls.STOP_LIMIT: + return 'stop_limit' + elif strType == cls.TRAILING_STOP: + return 'trailing_stop' + else: + raise Exception('Invalid order type') + + class OrderClass(object): + SIMPLE = 1 + BRACKET = 2 + OCO = 3 + OTO = 4 + + @classmethod + def toString(cls, orderclass): + if orderclass == cls.SIMPLE: + return 'simple' + elif orderclass == cls.BRACKET: + return 'bracket' + elif orderclass == cls.OCO: + return 'oco' + elif orderclass == cls.OTO: + return 'oto' + else: + raise Exception('Inavlid order class') + + @classmethod + def fromString(cls, strOrderClass): + if strOrderClass == 'simple': + return cls.SIMPLE + elif strOrderClass == 'bracket': + return cls.BRACKET + elif strOrderClass == 'oco': + return cls.OCO + elif strOrderClass == 'oto': + return cls.OTO + else: + raise Exception('Inavlid order class') + + class Action(broker.Order.Action): + + @classmethod + def toString(cls, action): + if action == 'buy': + return cls.BUY + elif action == 'sell': + return cls.SELL + else: + raise Exception('Inavlid order action') + + @classmethod + def fromString(cls, strAction): + if strAction == cls.BUY: + return 'buy' + elif strAction == cls.BUY_TO_COVER: + return 'buy' + elif strAction == cls.SELL: + return 'sell' + elif strAction == cls.SELL_SHORT: + return 'sell' + else: + raise Exception('Inavlid order action') + + class TimeInForce(object): + # https://alpaca.markets/docs/trading-on-alpaca/orders/#time-in-force + + DAY = 1 # good for day + GTC = 2 # good till canceled + OPG = 3 # market on open / limit on open + CLS = 4 # market on close / limt on close + IOC = 5 # immediate or cancel + FOK = 6 # fill or kill + + @classmethod + def toString(cls, timeInForce): + if timeInForce == cls.DAY: + return 'day' + elif timeInForce == cls.GTC: + return 'gtc' + elif timeInForce == cls.OPG: + return 'opg' + elif timeInForce == cls.CLS: + return 'cls' + elif timeInForce == cls.IOC: + return 'ioc' + elif timeInForce == cls.FOK: + return 'fok' + else: + raise Exception('Inavlid order time in force') + + @classmethod + def fromString(cls, strTimeInForce): + if strTimeInForce == 'day': + return cls.DAY + elif strTimeInForce == 'gtc': + return cls.GTC + elif strTimeInForce == 'opg': + return cls.OPG + elif strTimeInForce == 'cls': + return cls.CLS + elif strTimeInForce == 'ioc': + return cls.IOC + elif strTimeInForce == 'fok': + return cls.fok + else: + raise Exception('Inavlid order time in force') + + + def __init__( + self, + # broker.Order attributes + type_, + action, + instrument, + quantity, + instrumentTraits = broker.InstrumentTraits(), + # Alpaca-specific attributes + orderId = None, + clientOrderId = None, + createdAt = None, + updatedAt = None, + submittedAt = None, + filledAt = None, + expiredAt = None, + canceledAt = None, + failedAt = None, + replacedAt = None, + replacedBy = None, + replaces = None, + assetId = None, + assetClass = None, + notional = None, + filledQuantity = None, + filledAveragePrice = None, + orderClass = None, + timeInForce = None, + limiPrice = None, + stopPrice = None, + extendedHours = False, + legs = None, + trailPercent = None, + trailPrice = None, + hwm = None, + # for bracket orders + takeProfit = None, + stopLossStop = None, + stopLossLimit = None + ): + super(AlpacaOrder, self).__init__(type_, action, instrument, quantity, instrumentTraits) + self.orderId = orderId + self.clientOrderId = clientOrderId + self.createdAt = createdAt + self.updatedAt = updatedAt + self.submittedAt = submittedAt + self.filledAt = filledAt + self.expiredAt = expiredAt + self.canceledAt = canceledAt + self.failedAt = failedAt + self.replacedAt = replacedAt + self.replacedBy = replacedBy + self.replaces = replaces + self.assetId = assetId + self.assetClass = assetClass + self.notional = notional + self.filledQuantity = filledQuantity + self.filledAveragePrice = filledAveragePrice + self.orderClass = orderClass + self.timeInForce = timeInForce + self.limiPrice = limiPrice + self.stopPrice = stopPrice + self.extendedHours = extendedHours + self.legs = legs + self.trailPercent = trailPercent + self.trailPrice = trailPrice + self.hwm = hwm + # for bracket orders + self.takeProfit = takeProfit + self.stopLossStop = stopLossStop + self.stopLossLimit = stopLossLimit + +class MarketOrder(AlpacaOrder): + """A market order is a request to buy or sell a security at the + currently available market price. + """ + def __init__(self, action, instrument, quantity = None, notional = None, **kwargs): + super(MarketOrder, self).__init__( + type_ = AlpacaOrder.Type.MARKET, + action = action, + instrument = instrument, + quantity = quantity, + notional = notional, + **kwargs + ) + +class LimitOrder(AlpacaOrder): + """A limit order is an order to buy or sell at a specified price or better. + """ + def __init__(self, action, instrument, limitPrice, quantity = None, notional = None, **kwargs): + super(LimitOrder, self).__init__( + type_ = AlpacaOrder.Type.LIMIT, + action = action, + instrument = instrument, + limitPrice = limitPrice, + quantity = quantity, + notional = notional, + **kwargs + ) + +class StopOrder(AlpacaOrder): + """A stop (market) order is an order to buy or sell a security + when its price moves past a particular point, + ensuring a higher probability of achieving a predetermined + entry or exit price. + + NOTE: Alpaca converts buy stop orders into stop limit orders + with a limit price that is 4% higher than a stop price < $50 + (or 2.5% higher than a stop price >= $50). + """ + def __init__(self, action, instrument, stopPrice, quantity = None, notional = None, **kwargs): + super(LimitOrder, self).__init__( + type_ = AlpacaOrder.Type.LIMIT, + action = action, + instrument = instrument, + stopPrice = stopPrice, + quantity = quantity, + notional = notional, + **kwargs + ) + +class StopLimitOrder(AlpacaOrder): + """A stop-limit order is a conditional trade over a set time frame + that combines the features of a stop order with those of a limit order + and is used to mitigate risk. + """ + def __init__(self, action, instrument, stopPrice, limitPrice, quantity = None, notional = None, **kwargs): + super(LimitOrder, self).__init__( + type_ = AlpacaOrder.Type.LIMIT, + action = action, + instrument = instrument, + stopPrice = stopPrice, + limitPrice = limitPrice, + quantity = quantity, + notional = notional, + **kwargs + ) + +class MarketOnOpenOrder(MarketOrder): + def __init__(self, action, instrument, quantity = None, notional = None, **kwargs): + super(MarketOrder, self).__init__( + type_ = AlpacaOrder.Type.MARKET, + action = action, + instrument = instrument, + quantity = quantity, + notional = notional, + timeInForce = AlpacaOrder.TimeInForce.OPG + **kwargs + ) + +class MarketOnCloseOrder(MarketOrder): + def __init__(self, action, instrument, quantity = None, notional = None, **kwargs): + super(MarketOrder, self).__init__( + type_ = AlpacaOrder.Type.MARKET, + action = action, + instrument = instrument, + quantity = quantity, + notional = notional, + timeInForce = AlpacaOrder.TimeInForce.CLS + **kwargs + ) + +class LimitOnOpenOrder(LimitOrder): + def __init__(self, action, instrument, limitPrice, quantity = None, notional = None, **kwargs): + super(LimitOrder, self).__init__( + type_ = AlpacaOrder.Type.LIMIT, + action = action, + instrument = instrument, + limitPrice = limitPrice, + quantity = quantity, + notional = notional, + timeInForce = AlpacaOrder.TimeInForce.OPG + **kwargs + ) + +class LimitOnCloseOrder(LimitOrder): + def __init__(self, action, instrument, limitPrice, quantity = None, notional = None, **kwargs): + super(LimitOrder, self).__init__( + type_ = AlpacaOrder.Type.LIMIT, + action = action, + instrument = instrument, + limitPrice = limitPrice, + quantity = quantity, + notional = notional, + timeInForce = AlpacaOrder.TimeInForce.CLS + **kwargs + ) + +class BracketOrder(AlpacaOrder): + """A bracket order is a chain of three orders that can be used to + manage your position entry and exit. + + Args: + openingOrder (AlpacaOrder): The opening order for the bracket order. + takeProfitLimit (numeric): The limit price for the exiting take profit limit order. + stopLossStop (numeric): The price to trigger the exiting stop loss order. + stopLossLimit (numeric, Optional): The limit price for the exiting stop loss order + if the stop loss order is a limit order. + + Returns: + AlpacaOrder: A bracket order. + """ + + def __new__(cls, openingOrder, takeProfitLimit, stopLossStop, stopLossLimit = None): + + # check exiting order conditions + if openingOrder.action == AlpacaOrder.Action.BUY: + assert takeProfitLimit > stopLossStop, \ + 'Take profit price must be greater than stop price for buy orders.' + elif openingOrder.action == AlpacaOrder.Action.SELL: + assert takeProfitLimit < stopLossStop, \ + 'take profit price must be less than stop price for sell orders.' + else: + raise Exception('Invalid order action: {openingOrder.action}') + + if openingOrder.extendedHours: + raise Exception( + 'Extended hours are not supported for bracket orders, ' + \ + 'converting to regular hours order.' + ) + + if openingOrder.TimeInForce not in [AlpacaOrder.TimeInForce.DAY, AlpacaOrder.TimeInForce.GTC]: + raise Exception( + 'Time in force must be "day" or "gtc".' + ) + + order = openingOrder + order.takeProfitLimit = takeProfitLimit + order.stopLossStop = stopLossStop + order.stopLossLimit = stopLossLimit + order.orderClass = AlpacaOrder.OrderClass.BRACKET + + return order + +class OneCancelsOtherOrder(AlpacaOrder): + """This is a set of two orders with the same side (buy/buy or sell/sell) and + currently only exit order is supported. + """ + def __init__(self, action, instrument, takeProfitLimit, stopLossStop, stopLossLimit = None): + super(OneCancelsOtherOrder, self).__init__( + type_ = AlpacaOrder.Type.LIMIT, # OCO orders must be placed as limit orders + action = action, + instrument = instrument, + takeProfitLimit = takeProfitLimit, + stopLossStop = stopLossStop, + stopLossLimit = stopLossLimit + ) + +class OneTriggersOther(AlpacaOrder): + """OTO (One-Triggers-Other) is a variant of bracket order. + It takes one of the take-profit or stop-loss order in addition to the entry order. + """ + def __init__(self, action, instrument, takeProfitLimit, stopLossStop, stopLossLimit = None): + super(OneCancelsOtherOrder, self).__init__( + type_ = AlpacaOrder.Type.LIMIT, # OCO orders must be placed as limit orders + action = action, + instrument = instrument, + takeProfitLimit = takeProfitLimit, + stopLossStop = stopLossStop, + stopLossLimit = stopLossLimit + ) + + +class OneTriggersOther(AlpacaOrder): + """OTO (One-Triggers-Other) is a variant of bracket order. + It takes one of the take-profit or stop-loss order in addition to the entry order. + + Args: + openingOrder (AlpacaOrder): The opening order for the bracket order. + takeProfitLimit (numeric, Optional): The limit price for the exiting take profit limit order. + stopLossStop (numeric, Optional): The price to trigger the exiting stop loss order. + stopLossLimit (numeric, Optional): The limit price for the exiting stop loss order + if the stop loss order is a limit order. + + Returns: + AlpacaOrder: An OTO order. + """ + + def __new__(cls, openingOrder, takeProfitLimit = None, stopLossStop = None, stopLossLimit = None): + + # check exiting order conditions + if openingOrder.extendedHours: + raise Exception( + 'Extended hours are not supported for bracket orders, ' + \ + 'converting to regular hours order.' + ) + + if openingOrder.TimeInForce not in [AlpacaOrder.TimeInForce.DAY, AlpacaOrder.TimeInForce.GTC]: + raise Exception( + 'Time in force must be "day" or "gtc".' + ) + + assert (takeProfitLimit or stopLossStop) is not None, \ + 'One of takeProfitLimit or stopLossStop must be present' + + order = openingOrder + order.takeProfitLimit = takeProfitLimit + order.stopLossStop = stopLossStop + order.stopLossLimit = stopLossLimit + order.orderClass = AlpacaOrder.OrderClass.OTO + + return order + + + +# Order constructers +def fromAlpacaOrder(alpacaOrderEntity): + # https://alpaca.markets/docs/api-documentation/api-v2/orders/#order-entity + order = AlpacaOrder( + type_ = AlpacaOrder.Type.fromString(alpacaOrderEntity['type']), + action = AlpacaOrder.Action.fromString(alpacaOrderEntity['side']), + instrument = alpacaOrderEntity['symbol'], + quantity = alpacaOrderEntity['qty'], + instrumentTraits = broker.IntegerTraits(), + orderId = alpacaOrderEntity['order_id'], + clientOrderId = alpacaOrderEntity['client_order_id'], + createdAt = alpacaOrderEntity['created_at'], + updatedAt = alpacaOrderEntity['updated_at'], + submittedAt = alpacaOrderEntity['submitted_at'], + filledAt = alpacaOrderEntity['filled_at'], + expiredAt = alpacaOrderEntity['expired_at'], + canceledAt = alpacaOrderEntity['canceled_at'], + failedAt = alpacaOrderEntity['failed_at'], + replacedAt = alpacaOrderEntity['replaced_at'], + replacedBy = alpacaOrderEntity['replaced_by'], + replaces = alpacaOrderEntity['replaces'], + assetId = alpacaOrderEntity['asset_id'], + assetClass = alpacaOrderEntity['asset_class'], + filledQuantity = alpacaOrderEntity['filled_qty'], + filledAveragePrice = alpacaOrderEntity['filled_avg_price'], + orderClass = alpacaOrderEntity['order_class'], + timeInForce = AlpacaOrder.TimeInForce.fromString(alpacaOrderEntity['time_in_force']), + limitPrice = alpacaOrderEntity['limit_price'], + stopPrice = alpacaOrderEntity['stop_price'], + extendedHours = alpacaOrderEntity['extended_hours'], + legs = alpacaOrderEntity['legs'], + trailPercent = alpacaOrderEntity['trail_percent'], + trailPrice = alpacaOrderEntity['trail_price'], + hwm = alpacaOrderEntity['hwm'], + takeProfit = alpacaOrderEntity['take_profit']['limit_price'], + stopLossStop = alpacaOrderEntity['stop_loss']['stop_price'], + stopLossLimit = alpacaOrderEntity['stop_loss']['limit_price'] + ) + return order + +def toAlpacaOrder(order): + """ order should be a AlpacaOrder object. + + see https://alpaca.markets/docs/api-documentation/api-v2/orders/#order-entity + for details. + """ + + alpacaOrder = { + 'order_id': order.orderId, + 'symbol': order.instrument, + 'qty': order.quantity, + 'notional': order.notional, + 'side': AlpacaOrder.Action.toString(order.action), + 'type': AlpacaOrder.Type.toString(order.type_), + 'time_in_force': AlpacaOrder.TimeInForce.toString(order.timeInForce), + 'limit_price': order.limitPrice, + 'stop_price': order.stopPrice, + 'trail_price': order.trailPrice, + 'trail_percent': order.trailPercent, + 'extended_hours': order.extendedHours, + 'client_order_id': order.clientOrderId, + 'order_class': AlpacaOrder.OrderClass.toString(order.orderClass), + 'take_profit': {'limit_price': order.takeProfit}, + 'stop_loss':{'stop_price': order.stopLossStop} + } + + # for bracket / OTO orders + if order.takeProfit is not None: + alpacaOrder['take_profit'] = {'limit_price': order.takeProfit} + if order.stopLossStop is not None: + alpacaOrder['stop_loss'] = {'stop_price': order.stopLossStop} + if order.stopLossLimit is not None: + alpacaOrder['stop_loss']['limit_price'] = order.stopLossLimit + + # omit items that are None + alpacaOrder = {k: v for k, v in alpacaOrder.items() if v is not None} + + return alpacaOrder + + +class AlpacaOrderEvent(broker.OrderEvent): + """Adds Alpaca specific order states to broker.OrderEvent. + """ + class Type(AlpacaOrder.State): + # use order states + pass \ No newline at end of file diff --git a/pyalgotrade/alpaca/livefeed.py b/pyalgotrade/alpaca/livefeed.py new file mode 100644 index 000000000..f12107e94 --- /dev/null +++ b/pyalgotrade/alpaca/livefeed.py @@ -0,0 +1,306 @@ +# PyAlgoTrade +# +# Copyright 2011-2018 Gabriel Martin Becedillas Ruiz +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +.. moduleauthor:: Robert Lee + Splits out LiveFeed to allow for both live bar feeds and live trade feeds. + +""" + +import abc +import datetime +import time +import queue +import threading +import asyncio +from alpaca_trade_api.entity import Quote + +import zmq +import json +import pandas as pd + +from pyalgotrade import feed +from pyalgotrade import dataseries +from pyalgotrade import bar, quote, trade + +from pyalgotrade.dataseries import bards, tradeds, quoteds +from pyalgotrade.alpaca import common + + +class LiveFeed(threading.Thread): + """A thread that takes incoming messages from Alapaca and publishes them on ZMQ sockets. + """ + + def __init__(self, publishing_address = 'tcp://*:34567', api_key_id = None, api_secret_key = None): + + # Create sockets + self.__zmq_context = zmq.Context() + + # for publishing data from the websocket + # (from Alpaca's stream.TradingStream and stream.DataStream) + self._publishing_address = publishing_address + self.__socket = self.__zmq_context.socket(zmq.PUB) + self.__socket.bind(self._publishing_address) + common.logger.info( + f'Live feed publishing data at {self._publishing_address}' + ) + + # threading stuff + self.__stop = False + self.__stopped = False + + # make connection + # use the non-paper connection to get data + self.stream = common.make_connection( + connection_type = 'stream', api_key_id = api_key_id, api_secret_key = api_secret_key, live = True) + + + def start(self): + self.stream.run() + + def stop(self): + self.__zmq_context.term() + + + # for dynamic subscription + # See https://github.com/alpacahq/alpaca-trade-api-python/blob/master/examples/websockets/dynamic_subscription_example.py + # def consumer_thread(): + # try: + # # make sure we have an event loop, if not create a new one + # loop = asyncio.get_event_loop() + # loop.set_debug(True) + # except RuntimeError: + # asyncio.set_event_loop(asyncio.new_event_loop()) + + + # Functions to handle incoming messages + async def publish(self, topic, messages): + for message in messages: + self.__socket.send_multipart([topic.encode(), message]) + + def publish_with_topic(self, topic): + async def publish(message): + msg = json.dumps(message, default = common.json_serializer) + self.__socket.send_multipart([topic.encode(), msg.encode()]) + return publish + # async def print_stuff(message): + # print(pd.to_datetime(message['t'].to_unix_nano())) + # return print_stuff + + # subscribe to real time data + def subscribe_trade_updates(self): + self.stream.subscribe_trade_updates(self.publish_with_topic('BROKER')) + + def subscribe_trades(self, *symbols, handler_cancel_errors = None, handler_corrections = None): + self.stream.subscribe_trades(self.publish_with_topic('TRADES'), *symbols, handler_cancel_errors, handler_corrections) + + def subscribe_quotes(self, *symbols): + self.stream.subscribe_quotes(self.publish_with_topic('QUOTES'), *symbols) + + def subscribe_bars(self, *symbols): + self.stream.subscribe_bars(self.publish_with_topic('BARS'), *symbols) + + def subscribe_dailiy_bars(self, *symbols): + self.stream.subscribe_daily_bars(self.publish_with_topic('BARS'), *symbols) + + def subscribe_statuses(self, *symbols): + self.stream.subscribe_statuses(self.publish_with_topic('STATUSES'), *symbols) + + def subscribe_lulds(self, *symbols): + self.stream.subscribe_lulds(self.publish_with_topic('LULDS'), *symbols) + + def subscribe_crypto_trades(self, *symbols): + # self.stream.subscribe_crypto_trades(self.publish_with_topic('TRADES'), symbols) + self.stream.subscribe_crypto_trades(self.publish_with_topic('TRADES'), *symbols) + + def subscribe_crypto_quotes(self, *symbols): + self.stream.subscribe_crypto_quotes(self.publish_with_topic('QUOTES'), *symbols) + + def subscribe_crypto_bars(self, *symbols): + self.stream.subscribe_crypto_bars(self.publish_with_topic('BARS'), *symbols) + + def subscribe_crypto_daily_bars(self, *symbols): + self.stream.subscribe_crypto_daily_bars(self.publish_with_topic('BARS'), *symbols) + +class EventQueuer(threading.Thread): + """A thread that checks a ZMQ SUB socket for streaming data. + """ + POLL_FREQUENCY = 0.5 + + def __init__(self, liveFeedAddress = 'tcp://localhost:34567', topic = ''): + super(EventQueuer, self).__init__() + + self.__zmq_context = zmq.Context() + self.__event_socket = self.__zmq_context.socket(zmq.SUB) + self.__event_socket.setsockopt(zmq.SUBSCRIBE, str(topic).encode()) + self.__event_socket.connect(liveFeedAddress) + self.__queue = queue.Queue() + self.__stop = False + + def _getNewEvent(self): + try: + topic, update = self.__event_socket.recv_multipart(zmq.NOBLOCK) + update = update.decode() + update = json.loads(update, object_hook = common.json_deserializer) + return update + except zmq.ZMQError as exc: + if exc.errno == zmq.EAGAIN: + # nothing to get + return + else: + raise + + def getQueue(self): + return self.__queue + + def start(self): + if (newEvent:= self._getNewEvent()): + self.__queue.put(newEvent) + # common.logger.info(f'New Event: {newEvent}') + super(EventQueuer, self).start() + + def run(self): + while not self.__stop: + try: + if (newEvent:= self._getNewEvent()): + self.__queue.put(newEvent) + # common.logger.info(f'New Event: {newEvent}') + else: + time.sleep(EventQueuer.POLL_FREQUENCY) + except Exception as e: + common.logger.critical("Error retrieving new events", exc_info=e) + + def stop(self): + self.__stop = True + +class BaseLiveDataFeed(feed.BaseFeed): + + QUEUE_TIMEOUT = 0.01 + + def __init__(self, liveFeedAddress, topic, maxLen = None): + super(BaseLiveDataFeed, self).__init__(maxLen) + + # Queue to get data from + self.__dataQueuer = EventQueuer(liveFeedAddress, topic) + + # keep track of most recent data + self.__currentData = None + self.__lastData = None + + # BEGIN feed.BaseFeed interface + def reset(self): + self.__currentData = None + self.__lastData = {} + super(BaseLiveDataFeed, self).reset() + + @abc.abstractmethod + def createDataSeries(self, key, maxLen): + pass + + def getNextValues(self): + # from barfeed.BaseBarFeed.getNextValues + dateTime = None + data = self.getNextData() + if data is not None: + dateTime = data.getDateTime + + self.__currentData = data + for instrument in data.getInstruments(): + self.__lastData[instrument] = data[instrument] + + return (dateTime, data) + + # END feed.BaseFeed interface + + def getNextData(self): + ret = None + try: + ret = self.__dataQueuer.getQueue().get(block = True, timeout = BaseLiveDataFeed.QUEUE_TIMEOUT) + return ret + except: + return False + +class LiveBarFeed(BaseLiveDataFeed): + def __init__(self, liveFeedAddress, maxLen = None): + super(LiveBarFeed, self).__init__(liveFeedAddress, 'BARS') + + def createDataSeries(self, key, maxLen): + ret = bards.BarDataSeries(maxLen) + # real time objects do not use adjusted values + ret.setUseAdjustedValues(False) + return ret + +class LiveTradeFeed(BaseLiveDataFeed): + def __init__(self, liveFeedAddress, maxLen = None): + super(LiveBarFeed, self).__init__(liveFeedAddress, 'TRADES') + + def createDataSeries(self, key, maxLen): + ret = tradeds.TradeDataSeries(maxLen) + return ret + +class LiveQuoteFeed(BaseLiveDataFeed): + def __init__(self, liveFeedAddress, maxLen = None): + super(LiveBarFeed, self).__init__(liveFeedAddress, 'QUOTES') + + def createDataSeries(self, key, maxLen): + ret = quoteds.QuoteDataSeries(maxLen) + return ret + +def fromAlpacaData(alpacaDataPoint): + # see https://alpaca.markets/docs/api-documentation/api-v2/market-data/alpaca-data-api-v2/real-time/ + # bars + if alpacaDataPoint['T'] == 'b': + data = bar.Bar( + dateTime = alpacaDataPoint.get('t'), + open_ = alpacaDataPoint.get('o'), + high = alpacaDataPoint.get('h'), + low = alpacaDataPoint.get('l'), + close = alpacaDataPoint.get('c'), + volume = alpacaDataPoint.get('v'), + adjClose = alpacaDataPoint.get('c'), + frequency = None, + extra = {'symbol': alpacaDataPoint.get('S')} + ) + # trades + elif alpacaDataPoint.get('T') == 't': + data = trade.Trade( + dateTime = alpacaDataPoint.get('t'), + tradeId = alpacaDataPoint.get('i'), + price = alpacaDataPoint.get('p'), + size = alpacaDataPoint.get('s'), + exchange = alpacaDataPoint.get('x'), + condition = alpacaDataPoint.get('c'), + tape = alpacaDataPoint.get('z'), + takerSide = alpacaDataPoint.get('tks'), + extra = {'symbol': alpacaDataPoint.get('S')} + ) + # quotes + elif alpacaDataPoint.get('T') == 'q': + data = quote.Quote( + dateTime = alpacaDataPoint.get('t'), + askExchange = alpacaDataPoint.get('ax'), + askPrice = alpacaDataPoint.get('ap'), + askSize = alpacaDataPoint.get('as'), + bidExchange = alpacaDataPoint.get('bx'), + bidPrice = alpacaDataPoint.get('bp'), + bidSize = alpacaDataPoint.get('bs'), + condition = alpacaDataPoint.get('c'), + tape = alpacaDataPoint.get('z'), + extra = {'symbol': alpacaDataPoint.get('S')} + ) + + return data + diff --git a/pyalgotrade/bar.py b/pyalgotrade/bar.py index cd9e13cd9..92c6b82b1 100644 --- a/pyalgotrade/bar.py +++ b/pyalgotrade/bar.py @@ -38,6 +38,7 @@ class Frequency(object): # It is important for frequency values to get bigger for bigger windows. TRADE = -1 + QUOTE = -1 SECOND = 1 MINUTE = 60 HOUR = 60*60 @@ -241,7 +242,6 @@ def getPrice(self): def getExtraColumns(self): return self.__extra - class Bars(object): """A group of :class:`Bar` objects. diff --git a/pyalgotrade/dataseries/quoteds.py b/pyalgotrade/dataseries/quoteds.py new file mode 100644 index 000000000..64147cb86 --- /dev/null +++ b/pyalgotrade/dataseries/quoteds.py @@ -0,0 +1,95 @@ +""" +.. moduleauthor:: Robert Lee +""" + +from pyalgotrade import dataseries + +import six + + +class QuoteDataSeries(dataseries.SequenceDataSeries): + """A DataSeries of :class:`pyalgotrade.bar.Quote` instances. + + :param maxLen: The maximum number of values to hold. + Once a bounded length is full, when new items are added, a corresponding number of items are discarded from the + opposite end. If None then dataseries.DEFAULT_MAX_LEN is used. + :type maxLen: int. + """ + + def __init__(self, maxLen=None): + super(QuoteDataSeries, self).__init__(maxLen) + self.__askExchangeDS = dataseries.SequenceDataSeries(maxLen) + self.__askPriceDS = dataseries.SequenceDataSeries(maxLen) + self.__askSizeDS = dataseries.SequenceDataSeries(maxLen) + self.__bidExchangeDS = dataseries.SequenceDataSeries(maxLen) + self.__bidPriceDS = dataseries.SequenceDataSeries(maxLen) + self.__bidSizeDS = dataseries.SequenceDataSeries(maxLen) + self.__quoteConditionDS = dataseries.SequenceDataSeries(maxLen) + self.__tapeDS = dataseries.SequenceDataSeries(maxLen) + self.__extraDS = {} + + def __getOrCreateExtraDS(self, name): + ret = self.__extraDS.get(name) + if ret is None: + ret = dataseries.SequenceDataSeries(self.getMaxLen()) + self.__extraDS[name] = ret + return ret + + def append(self, trade): + self.appendWithDateTime(trade.getDateTime(), trade) + + def appendWithDateTime(self, dateTime, quote): + assert(dateTime is not None) + assert(quote is not None) + + super(QuoteDataSeries, self).appendWithDateTime(dateTime, quote) + + self.__askExchangeDS.appendWithDateTime(dateTime, quote.getAskExchange()) + self.__askPriceDS.appendWithDateTime(dateTime, quote.getAskPrice()) + self.__askSizeDS.appendWithDateTime(dateTime, quote.getAskSize()) + self.__bidExchangeDS.appendWithDateTime(dateTime, quote.getBidExchange()) + self.__bidPriceDS.appendWithDateTime(dateTime, quote.getBidPrice()) + self.__bidSizeDS.appendWithDateTime(dateTime, quote.getBidSize()) + self.__quoteConditionDS.appendWithDateTime(dateTime, quote.getQuoteCondition()) + self.__tapeDS.appendWithDateTime(dateTime, quote.getTape()) + + # Process extra columns. + for name, value in six.iteritems(quote.getExtraColumns()): + extraDS = self.__getOrCreateExtraDS(name) + extraDS.appendWithDateTime(dateTime, value) + + def getAskExchangeDataSeries(self): + """Returns a :class:`pyalgotrade.dataseries.DataSeries` with the ask exchanges.""" + return self.__askExchangeDS + + def getAskPriceDataSeries(self): + """Returns a :class:`pyalgotrade.dataseries.DataSeries` with the ask prices.""" + return self.__askPriceDS + + def getAskSizeDataSeries(self): + """Returns a :class:`pyalgotrade.dataseries.DataSeries` with the ask sizes.""" + return self.__askSizeDS + + def getBidExchangeDataSeries(self): + """Returns a :class:`pyalgotrade.dataseries.DataSeries` with the bid exchanges.""" + return self.__bidExchangeDS + + def getBidPriceDataSeries(self): + """Returns a :class:`pyalgotrade.dataseries.DataSeries` with the bid prices.""" + return self.__bidPriceDS + + def getBidSizeDataSeries(self): + """Returns a :class:`pyalgotrade.dataseries.DataSeries` with the bid sizes.""" + return self.__bidSizeDS + + def getQuoteConditionDataSeries(self): + """Returns a :class:`pyalgotrade.dataseries.DataSeries` with the quote conditions.""" + return self.__quoteConditionDS + + def getTapeDataSeries(self): + """Returns a :class:`pyalgotrade.dataseries.DataSeries` with the quote tapes.""" + return self.__tapeDS + + def getExtraDataSeries(self, name): + """Returns a :class:`pyalgotrade.dataseries.DataSeries` for an extra column.""" + return self.__getOrCreateExtraDS(name) \ No newline at end of file diff --git a/pyalgotrade/dataseries/tradeds.py b/pyalgotrade/dataseries/tradeds.py new file mode 100644 index 000000000..097185a36 --- /dev/null +++ b/pyalgotrade/dataseries/tradeds.py @@ -0,0 +1,71 @@ +""" +.. moduleauthor:: Robert Lee +""" + +from pyalgotrade import dataseries + +import six + + +class TradeDataSeries(dataseries.SequenceDataSeries): + """A DataSeries of :class:`pyalgotrade.bar.Trade` instances. + + :param maxLen: The maximum number of values to hold. + Once a bounded length is full, when new items are added, a corresponding number of items are discarded from the + opposite end. If None then dataseries.DEFAULT_MAX_LEN is used. + :type maxLen: int. + """ + + def __init__(self, maxLen=None): + super(TradeDataSeries, self).__init__(maxLen) + self.__tradeIdDS = dataseries.SequenceDataSeries(maxLen) + self.__priceDS = dataseries.SequenceDataSeries(maxLen) + self.__sizeDS = dataseries.SequenceDataSeries(maxLen) + self.__isBuyDS = dataseries.SequenceDataSeries(maxLen) + self.__extraDS = {} + + def __getOrCreateExtraDS(self, name): + ret = self.__extraDS.get(name) + if ret is None: + ret = dataseries.SequenceDataSeries(self.getMaxLen()) + self.__extraDS[name] = ret + return ret + + def append(self, trade): + self.appendWithDateTime(trade.getDateTime(), trade) + + def appendWithDateTime(self, dateTime, trade): + assert(dateTime is not None) + assert(trade is not None) + + super(TradeDataSeries, self).appendWithDateTime(dateTime, trade) + + self.__tradeIdDS.appendWithDateTime(dateTime, trade.getTradeId()) + self.__priceDS.appendWithDateTime(dateTime, trade.getPrice()) + self.__sizeDS.appendWithDateTime(dateTime, trade.getSize()) + self.__isBuyDS.appendWithDateTime(dateTime, trade.getIsBuy()) + + # Process extra columns. + for name, value in six.iteritems(trade.getExtraColumns()): + extraDS = self.__getOrCreateExtraDS(name) + extraDS.appendWithDateTime(dateTime, value) + + def getTradeIdDataSeries(self): + """Returns a :class:`pyalgotrade.dataseries.DataSeries` with the trade Ids.""" + return self.__tradeIdDs + + def getPriceDataSeries(self): + """Returns a :class:`pyalgotrade.dataseries.DataSeries` with the trade prices.""" + return self.__priceDS + + def getSizeDataSeries(self): + """Returns a :class:`pyalgotrade.dataseries.DataSeries` with the trade sizes.""" + return self.__sizeDS + + def getIsBuyDataSeries(self): + """Returns a :class:`pyalgotrade.dataseries.DataSeries` with whether the trades are buys.""" + return self.__isBuyDS + + def getExtraDataSeries(self, name): + """Returns a :class:`pyalgotrade.dataseries.DataSeries` for an extra column.""" + return self.__getOrCreateExtraDS(name) diff --git a/pyalgotrade/quote.py b/pyalgotrade/quote.py new file mode 100644 index 000000000..fb444364f --- /dev/null +++ b/pyalgotrade/quote.py @@ -0,0 +1,159 @@ +""" +.. moduleauthor:: Robert Lee +""" + +from pyalgotrade import bar + +Frequency = bar.Frequency + +class Quote(object): + # TODO: add __str__ and __repr__ method? + # Optimization to reduce memory footprint. + __slots__ = ( + '__dateTime', + '__ask_exchange', + '__ask_price', + '__ask_size', + '__bid_exchange', + '__bid_price', + '__bid_size', + '__condition', + '__tape', + '__extra' + ) + + def __init__(self, dateTime, + askExchange, askPrice, askSize, + bidExchange, bidPrice, bidSize, + condition, tape, extra = {}): + + self.__dateTime = dateTime + self.__askExchange = askExchange + self.__askPrice = askPrice + self.__askSize = askSize + self.__bidExchange = bidExchange + self.__bidPrice = bidPrice + self.__bidSize = bidSize + self.__condition = condition + self.__tape = tape + self.__extra = extra + + def __setstate__(self, state): + (self.__dateTime, + self.__askExchange, + self.__askPrice, + self.__askSize, + self.__bidExchange, + self.__bidPrice, + self.__bidSize, + self.__condition, + self.__tape, + self.__extra + ) = state + + def __getstate__(self): + return ( + self.__dateTime, + self.__askExchange, + self.__askPrice, + self.__askSize, + self.__bidExchange, + self.__bidPrice, + self.__bidSize, + self.__condition, + self.__tape, + self.__extra + ) + + def getFrequency(self): + return Frequency.QUOTE + + def getDateTime(self): + return self.__dateTime + + def getAskExchange(self): + return self.__askExchange + + def getAskPrice(self): + return self.__askPrice + + def getAskSize(self): + return self.__askSize + + def getBidExchange(self): + return self.__bidExchange + + def getBidPrice(self): + return self.__bidPrice + + def getBidSize(self): + return self.__bidSize + + def getCondition(self): + return self.__condition + + def getTape(self): + return self.__tape + + def getExtraColumns(self): + return self.__extra + +class Quotes(object): + + """A group of :class:`Quote` objects. + + :param quoteDict: A map of instrument to :class:`Quote` objects. + :type quoteDict: map. + + .. note:: + All bars must have the same datetime. + """ + + def __init__(self, quoteDict): + if len(quoteDict) == 0: + raise Exception("No quotes supplied") + + # Check that bar datetimes are in sync + firstDateTime = None + firstInstrument = None + for instrument, currentQuote in six.iteritems(quoteDict): + if firstDateTime is None: + firstDateTime = currentQuote.getDateTime() + firstInstrument = instrument + elif currentQuote.getDateTime() != firstDateTime: + raise Exception("Quote data times are not in sync. %s %s != %s %s" % ( + instrument, + currentQuote.getDateTime(), + firstInstrument, + firstDateTime + )) + + self.__quoteDict = quoteDict + self.__dateTime = firstDateTime + + def __getitem__(self, instrument): + """Returns the :class:`pyalgoquote.bar.Quote` for the given instrument. + If the instrument is not found an exception is raised.""" + return self.__quoteDict[instrument] + + def __contains__(self, instrument): + """Returns True if a :class:`pyalgoquote.bar.Quote` for the given instrument is available.""" + return instrument in self.__quoteDict + + def items(self): + return list(self.__quoteDict.items()) + + def keys(self): + return list(self.__quoteDict.keys()) + + def getInstruments(self): + """Returns the instrument symbols.""" + return list(self.__quoteDict.keys()) + + def getDateTime(self): + """Returns the :class:`datetime.datetime` for this set of quotes.""" + return self.__dateTime + + def getQuote(self, instrument): + """Returns the :class:`pyalgoquote.bar.Quote` for the given instrument or None if the instrument is not found.""" + return self.__quoteDict.get(instrument, None) \ No newline at end of file diff --git a/pyalgotrade/tools/quandl.py b/pyalgotrade/tools/quandl.py index 5014a2b70..90f845394 100644 --- a/pyalgotrade/tools/quandl.py +++ b/pyalgotrade/tools/quandl.py @@ -31,10 +31,15 @@ import pyalgotrade.logger -# http://www.quandl.com/help/api +# https://docs.data.nasdaq.com/docs/in-depth-usage + +base_url = 'https://data.nasdaq.com/api/v3/datasets' + def download_csv(sourceCode, tableCode, begin, end, frequency, authToken): - url = "http://www.quandl.com/api/v1/datasets/%s/%s.csv" % (sourceCode, tableCode) + + url = f'{base_url}/{sourceCode}/{tableCode}/data.csv' + params = { "trim_start": begin.strftime("%Y-%m-%d"), "trim_end": end.strftime("%Y-%m-%d"), @@ -61,7 +66,8 @@ def download_daily_bars(sourceCode, tableCode, year, csvFile, authToken=None): :type authToken: string. """ - bars = download_csv(sourceCode, tableCode, datetime.date(year, 1, 1), datetime.date(year, 12, 31), "daily", authToken) + bars = download_csv(sourceCode, tableCode, datetime.date( + year, 1, 1), datetime.date(year, 12, 31), "daily", authToken) f = open(csvFile, "w") f.write(bars) f.close() @@ -82,8 +88,10 @@ def download_weekly_bars(sourceCode, tableCode, year, csvFile, authToken=None): :type authToken: string. """ - begin = dt.get_first_monday(year) - datetime.timedelta(days=1) # Start on a sunday - end = dt.get_last_monday(year) - datetime.timedelta(days=1) # Start on a sunday + begin = dt.get_first_monday( + year) - datetime.timedelta(days=1) # Start on a sunday + end = dt.get_last_monday( + year) - datetime.timedelta(days=1) # Start on a sunday bars = download_csv(sourceCode, tableCode, begin, end, "weekly", authToken) f = open(csvFile, "w") f.write(bars) @@ -145,37 +153,51 @@ def build_feed(sourceCode, tableCodes, fromYear, toYear, storage, frequency=bar. for year in range(fromYear, toYear+1): for tableCode in tableCodes: - fileName = os.path.join(storage, "%s-%s-%d-quandl.csv" % (sourceCode, tableCode, year)) + fileName = os.path.join( + storage, "%s-%s-%d-quandl.csv" % (sourceCode, tableCode, year)) if not os.path.exists(fileName) or forceDownload: - logger.info("Downloading %s %d to %s" % (tableCode, year, fileName)) + logger.info("Downloading %s %d to %s" % + (tableCode, year, fileName)) try: if frequency == bar.Frequency.DAY: - download_daily_bars(sourceCode, tableCode, year, fileName, authToken) + download_daily_bars( + sourceCode, tableCode, year, fileName, authToken) else: assert frequency == bar.Frequency.WEEK, "Invalid frequency" - download_weekly_bars(sourceCode, tableCode, year, fileName, authToken) + download_weekly_bars( + sourceCode, tableCode, year, fileName, authToken) except Exception as e: if skipErrors: logger.error(str(e)) continue else: raise e - ret.addBarsFromCSV(tableCode, fileName, skipMalformedBars=skipMalformedBars) + ret.addBarsFromCSV(tableCode, fileName, + skipMalformedBars=skipMalformedBars) return ret def main(): parser = argparse.ArgumentParser(description="Quandl utility") - parser.add_argument("--auth-token", required=False, help="An authentication token needed if you're doing more than 50 calls per day") - parser.add_argument("--source-code", required=True, help="The dataset source code") - parser.add_argument("--table-code", required=True, help="The dataset table code") - parser.add_argument("--from-year", required=True, type=int, help="The first year to download") - parser.add_argument("--to-year", required=True, type=int, help="The last year to download") - parser.add_argument("--storage", required=True, help="The path were the files will be downloaded to") - parser.add_argument("--force-download", action='store_true', help="Force downloading even if the files exist") - parser.add_argument("--ignore-errors", action='store_true', help="True to keep on downloading files in case of errors") - parser.add_argument("--frequency", default="daily", choices=["daily", "weekly"], help="The frequency of the bars. Only daily or weekly are supported") + parser.add_argument("--auth-token", required=False, + help="An authentication token needed if you're doing more than 50 calls per day") + parser.add_argument("--source-code", required=True, + help="The dataset source code") + parser.add_argument("--table-code", required=True, + help="The dataset table code") + parser.add_argument("--from-year", required=True, + type=int, help="The first year to download") + parser.add_argument("--to-year", required=True, type=int, + help="The last year to download") + parser.add_argument("--storage", required=True, + help="The path were the files will be downloaded to") + parser.add_argument("--force-download", action='store_true', + help="Force downloading even if the files exist") + parser.add_argument("--ignore-errors", action='store_true', + help="True to keep on downloading files in case of errors") + parser.add_argument("--frequency", default="daily", choices=[ + "daily", "weekly"], help="The frequency of the bars. Only daily or weekly are supported") args = parser.parse_args() @@ -186,15 +208,19 @@ def main(): os.mkdir(args.storage) for year in range(args.from_year, args.to_year+1): - fileName = os.path.join(args.storage, "%s-%s-%d-quandl.csv" % (args.source_code, args.table_code, year)) + fileName = os.path.join(args.storage, "%s-%s-%d-quandl.csv" % + (args.source_code, args.table_code, year)) if not os.path.exists(fileName) or args.force_download: - logger.info("Downloading %s %d to %s" % (args.table_code, year, fileName)) + logger.info("Downloading %s %d to %s" % + (args.table_code, year, fileName)) try: if args.frequency == "daily": - download_daily_bars(args.source_code, args.table_code, year, fileName, args.auth_token) + download_daily_bars( + args.source_code, args.table_code, year, fileName, args.auth_token) else: assert args.frequency == "weekly", "Invalid frequency" - download_weekly_bars(args.source_code, args.table_code, year, fileName, args.auth_token) + download_weekly_bars( + args.source_code, args.table_code, year, fileName, args.auth_token) except Exception as e: if args.ignore_errors: logger.error(str(e)) diff --git a/pyalgotrade/trade.py b/pyalgotrade/trade.py new file mode 100644 index 000000000..595181f36 --- /dev/null +++ b/pyalgotrade/trade.py @@ -0,0 +1,151 @@ +""" +.. moduleauthor:: Robert Lee +""" + +from pyalgotrade import bar + +Frequency = bar.Frequency + +class Trade(object): + # TODO: add __str__ and __repr__ method? + # Optimization to reduce memory footprint. + __slots__ = ( + '__dateTime', + '__tradeId', + '__price', + '__size', + '__exchange', + '__condition', + '__tape', + '__takerSide', + '__extra' + ) + + def __init__(self, dateTime, + tradeId, price, size, + exchange, condition, tape, takerSide, + extra = {}): + + self.__dateTime = dateTime + self.__tradeId = tradeId + self.__price = price + self.__size = size + self.__exchange = exchange + self.__condition = condition + self.__tape = tape + self.__takerSide = takerSide + self.__extra = extra + + def __setstate__(self, state): + (self.__dateTime, + self.__tradeId, + self.__price, + self.__size, + self.__exchange, + self.__condition, + self.__tape, + self.__takerSide, + self.__extra) = state + + def __getstate__(self): + return ( + self.__dateTime, + self.__tradeId, + self.__price, + self.__size, + self.__exchange, + self.__condition, + self.__tape, + self.__takerSide, + self.__extra + ) + + def getFrequency(self): + return Frequency.TRADE + + def getDateTime(self): + return self.__dateTime + + def getTradeId(self): + return self.__tradeId + + def getPrice(self): + return self.__price + + def getSize(self): + return self.__size + + def getExchange(self): + return self.__exchange + + def getCondition(self): + return self.__condition + + def getTape(self): + return self.__tape + + def getTakerSide(self): + return self.__takerSide + + def getExtraColumns(self): + return self.__extra + +class Trades(object): + + """A group of :class:`Trade` objects. + + :param tradeDict: A map of instrument to :class:`Trade` objects. + :type tradeDict: map. + + .. note:: + All bars must have the same datetime. + """ + + def __init__(self, tradeDict): + if len(tradeDict) == 0: + raise Exception("No trades supplied") + + # Check that bar datetimes are in sync + firstDateTime = None + firstInstrument = None + for instrument, currentTrade in six.iteritems(tradeDict): + if firstDateTime is None: + firstDateTime = currentTrade.getDateTime() + firstInstrument = instrument + elif currentTrade.getDateTime() != firstDateTime: + raise Exception("Trade data times are not in sync. %s %s != %s %s" % ( + instrument, + currentTrade.getDateTime(), + firstInstrument, + firstDateTime + )) + + self.__tradeDict = tradeDict + self.__dateTime = firstDateTime + + def __getitem__(self, instrument): + """Returns the :class:`pyalgotrade.bar.Trade` for the given instrument. + If the instrument is not found an exception is raised.""" + return self.__tradeDict[instrument] + + def __contains__(self, instrument): + """Returns True if a :class:`pyalgotrade.bar.Trade` for the given instrument is available.""" + return instrument in self.__tradeDict + + def items(self): + return list(self.__tradeDict.items()) + + def keys(self): + return list(self.__tradeDict.keys()) + + def getInstruments(self): + """Returns the instrument symbols.""" + return list(self.__tradeDict.keys()) + + def getDateTime(self): + """Returns the :class:`datetime.datetime` for this set of trades.""" + return self.__dateTime + + def getTrade(self, instrument): + """Returns the :class:`pyalgotrade.bar.Trade` for the given instrument or None if the instrument is not found.""" + return self.__tradeDict.get(instrument, None)