import aiohttp from decimal import Decimal as D, ROUND_CEILING import dill as pickle import time import os import websockets import requests import urllib import json import aiostream import asyncio import pprint import yahoo_stream from utils import * from piecewise import * KALSHI_ROOT = "https://trading-api.kalshi.com/v1" if not DEMO else "https://demo-api.kalshi.co/v1" KALSHI_ORIGIN = "https://kalshi.com" if not DEMO else "https://demo.kalshi.co" KALSHI_AUTH = read_auth("kalshi") if not DEMO else read_auth("kalshi_demo") KALSHI_SESSION_FILE = ".kalshi_session" if not DEMO else ".kalshi_demo_session" COMMON_HEADERS = { "accept": "application/json", "accept-encoding": "gzip, deflate, br", "accept-language": "en-US,en;q=0.9", "origin": KALSHI_ORIGIN, "referer": "https://kalshi.com/" if not DEMO else "https://demo.kalshi.co/", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36", } #TODO: figure out what prices actually applies here def convert_contract_hundred_lower(data): rem = 0 ret = [] for p, s in data: s += rem if s//100: ret.append((p*100, s//100)) rem = s%100 return ret #TODO: unittests for this def convert_contract_avg(data): rem = 0 remp = 0 ret = [] for p, s in data: if rem: u = min(100-rem, s) remp += u*p rem += u s -= u if rem==100: ret.append((remp, 1)) rem=0 remp=0 if s: if s//100: ret.append((p*100, s//100)) if s%100: rem = s%100 remp = rem * p return ret def convert_sp_rulebook(data): if data['Above/Below/Between']=='above': return (cents(data["Value"])+1, None) if data['Above/Below/Between']=='below': return (None, cents(data["Value"])-1) return tuple(map(cents, data["Value"].split('-'))) def convert_nasdaq_rulebook(data): if data['Above/Below/Between']=='above': return (cents(data["Value"])+1, None) if data['Above/Below/Between']=='below': return (None, cents(data["Value"])-1) return tuple(map(cents, data["Value"].split(' and '))) def parse_orderbook_message(message): msg = message["msg"] mid = msg["market_id"] bids = [(int(p), s) for p, s in msg.get("yes", [])] asks = [(100 - int(p), s) for p, s in msg.get("no", [])] asks.sort() bids.sort() bids.reverse() return mid, bids, asks def calc_fee(p, cs, is_index): up = D(p) / D(cs) return int(( up * (D(100) - up) * D('0.035' if is_index else '0.07') * D(cs) / D(100) ).to_integral_value(rounding=ROUND_CEILING)) def apply_btic_shift(lims, btic, bid): leeway = 100 if bid else -100 leeway = 0 return ( None if lims[0] is None else lims[0]+btic-leeway, None if lims[1] is None else lims[1]+btic+leeway, ) def conv_ask_to_pc(lims, p, s, trans_hundred, is_index): fee = calc_fee(p, 100 if trans_hundred else 1, is_index) return (conv_lim_pc(*lims) + (-p -fee), s, p) def conv_bid_to_pc(lims, p, s, trans_hundred, is_index): fee = calc_fee(p, 100 if trans_hundred else 1, is_index) return (-conv_lim_pc(*lims) + (p - fee), s, p) class KalshiSession: def __init__(self, s): self.s = s def __repr__(self): return "kalshi" async def login(self): self.s.headers.clear() self.s.headers.update(COMMON_HEADERS) self.s.cookie_jar.clear() if os.path.exists(KALSHI_SESSION_FILE): with open(KALSHI_SESSION_FILE, 'rb') as f: self.user_id, cookies, csrf_token, self.frontend_session = pickle.load(f) self.s.headers['x-csrf-token'] = csrf_token self.s.cookie_jar._cookies = cookies else: print("doing new login") async with self.s.post( KALSHI_ROOT+"/log_in", json = KALSHI_AUTH, ) as resp: self.user_id = (await resp.json())["user_id"] self.s.headers['x-csrf-token'] = resp.headers['x-csrf-token'] self.frontend_session = await self.get_frontend_session() with open(KALSHI_SESSION_FILE, 'wb') as f: pickle.dump((self.user_id, self.s.cookie_jar._cookies, self.s.headers['x-csrf-token'], self.frontend_session), f) async def get_frontend_session(self): async with self.s.post( f"{KALSHI_ROOT}/users/{self.user_id}/frontend_sessions", json = {"cancelToken":{"promise":{}}}, ) as resp: return (await resp.json())['session_id'] def open_websocket(self): ws_url = ( "wss://" + KALSHI_ROOT.split("://")[1] + "/ws?csrfToken=" + urllib.parse.quote_plus(self.s.headers['x-csrf-token']) + "&frontend_session=" + urllib.parse.quote_plus(self.frontend_session) ) return websockets.connect( ws_url, extra_headers = { **self.s.headers, "Cookie": self.s.cookie_jar.filter_cookies(ws_url).output(header=''), }, origin = KALSHI_ORIGIN, ) async def markets_from_ticker(self, ticker, converter): lims = {} async with self.s.get(f"{KALSHI_ROOT}/events/{ticker}") as resp: for m in (await resp.json())['event']['markets']: lims[m["id"]] = converter(m['rulebook_variables']) return lims async def execute_order(self, mid, p, s, buy): async with self.s.post( f"{KALSHI_ROOT}/users/{self.user_id}/orders", json = { "count": s*100, "price": p//100 if buy else 100-p//100, "max_cost_cents": 0, "sell_position_capped": False, "expiration_unix_ts": int(time.time())+10, "market_id": mid, "side": "yes" if buy else "no" }, ) as resp: pprint.pprint("kalshi order") pprint.pprint(resp) pprint.pprint(await resp.json()) pprint.pprint("") async def orderbook_stream(self, ticker, converter, btic_ticker, is_index=True, trans_hundred=True): lims = await self.markets_from_ticker(ticker, converter) async with self.open_websocket() as ws: for i, mid in enumerate(lims): await ws.send(json.dumps({ "cmd": "subscribe", "id": i+1, "params": { "channels": ["orderbook"], "market_id": mid, }, })) book = {} async with aiostream.stream.ziplatest( ws, yahoo_stream.stream_ticker(btic_ticker), partial=False, ).stream() as streamer: async for message, btic_price in streamer: message = json.loads(message) if message["type"]=="subscribed": continue assert message["type"]=="orderbook_snapshot" mid, bids, asks = parse_orderbook_message(message) if trans_hundred: bids = convert_contract_hundred_lower(bids) asks = convert_contract_hundred_lower(asks) bid_shifed_lims = apply_btic_shift(lims[mid], btic_price, True) ask_shifed_lims = apply_btic_shift(lims[mid], btic_price, False) book[mid] = Digital( *lims[mid], bids = [conv_bid_to_pc(bid_shifed_lims, p, s, trans_hundred, is_index) for p, s in bids], asks = [conv_ask_to_pc(ask_shifed_lims, p, s, trans_hundred, is_index) for p, s in asks], exchange = self, market_id = mid, ) yield [*book.values()] async def main(): async with aiohttp.ClientSession() as ks: s = KalshiSession(ks) await s.login() await s.execute_order("05aed7ff-3506-4729-b78c-9e96c5b2f876", 2000, 1, False) # async for book in s.orderbook_stream("INXW-22NOV04", convert_sp_rulebook): # pprint.pprint(book) # pass if __name__=="__main__": asyncio.run(main())