Storing tick by tick Webscocket data into ClickHouse.
Recently I picked up ClickHouse as a Database to store stock market tick by tick data for algo trading. I looked at multiple places to understand how the websocket data can be stored in Clickhouse. But did not find any. Finally, I completed the coding. I thought this blog will be useful for others who are looking for a solution like this. Before getting to details, some basics.
What is clickhouse?
ClickHouse is a column-oriented database that enables its users to generate powerful analytics, using SQL queries, in real-time. From all the databases I had seen so far, this seems fastest. Below are the few use cases of ClickHouse.
1. Real-Time Analytics
- Use Case: Tracking user behavior, website performance, or IoT device data in real-time.
- Example: Monitoring website traffic and user interactions with low-latency queries.
- Benefit: ClickHouse can process large volumes of data with minimal delay, making it ideal for real-time dashboards and monitoring systems.
2. Time-Series Data Storage
- Use Case: Storing and querying time-series data, such as metrics, logs, or financial data.
- Example: Cryptocurrency trading platforms storing price and volume data over time.
- Benefit: Its efficient compression and fast query capabilities make it suitable for time-series applications like IoT data or financial trading systems.
3. Big Data Analytics
- Use Case: Handling massive datasets for reporting, ad-hoc queries, and advanced analytics.
- Example: E-commerce platforms analyzing customer purchasing patterns and product performance.
- Benefit: It’s designed to handle billions of rows with efficient indexing, making it ideal for high-volume, big data scenarios.
Installing ClickHouse is super easy
- Download the binary into a folder.
curl https://clickhouse.com/ | sh
2. Start the server
./clickhouse server
3. Start the client
./clickhouse client
You will see something like this if everything is successful
local :)
Code:
- Import Lybraries
import asyncio
import websockets
import json
from datetime import datetime
import clickhouse_connect
2. ClickHouse client setup and test the connection
client = clickhouse_connect.get_client(host='localhost', port=8123, username='default', password='')
try:
# Test connection by running a simple query
response = client.command('SELECT version()')
print(f"Connected to ClickHouse. Server version: {response}")
except Exception as e:
print(f"Error occurred: {e}")
2a. Create ClickHouse table if it doesn’t exist. This is one off step.
# Create ClickHouse table if it doesn't exist
client.command('''
CREATE TABLE IF NOT EXISTS binance_kline_data (
pair String,
event_time DateTime,
minute String,
open_price Float32,
close_price Float32,
high_price Float32,
low_price Float32,
volume Float32
) ENGINE = MergeTree()
ORDER BY event_time
''')
3. Function to store data into ClickHouse
async def store_data(pair, event_time, minute, open_price, close_price, high_price, low_price, volume):
query = "INSERT INTO binance_kline_data (pair, event_time, minute, open_price, close_price, high_price, low_price, volume) VALUES"
values = (pair, event_time, minute, open_price, close_price, high_price, low_price, volume)
client.command(f"{query} {values}")
4. WebSocket function to receive data and store it
async def run_websocket():
url = "wss://fstream.binance.com/stream?streams=btcusdt@kline_1m"
async with websockets.connect(url, ping_timeout=None, max_size=10000000) as websocket:
headers = {"method": "SUBSCRIBE", "params": ["btcusdt@kline_1m"], "id": 1}
await websocket.send(json.dumps(headers))
while True:
msg = await websocket.recv()
raw_data = json.loads(msg)
if 'result' in raw_data:
continue
data = raw_data['data']
pair = data['s']
candle = data['k']
dt_format = '%Y-%m-%d %H:%M:%S.%f' # Updated format with a dot before microseconds
event_time = datetime.fromtimestamp(data['E'] / 1000).strftime(dt_format)
minute = datetime.fromtimestamp(candle['t'] / 1000).strftime('%H:%M')
open_price = float(candle['o'])
close_price = float(candle['c'])
high_price = float(candle['h'])
low_price = float(candle['l'])
volume = float(candle['v'])
# Store the data in ClickHouse
await store_data(pair, event_time, minute, open_price, close_price, high_price, low_price, volume)
print(f'[{event_time}] {pair} - minute: {minute} | open: {open_price} | close: {close_price} | '
f'high: {high_price} | low: {low_price} | volume: {volume}')
5. Finally Run the WebSocket connection
if __name__ == "__main__":
asyncio.run(run_websocket())
Here is the entire code
import asyncio
import websockets
import json
from datetime import datetime
import clickhouse_connect
# ClickHouse client setup
client = clickhouse_connect.get_client(host='localhost', port=8123, username='default', password='')
try:
# Test connection by running a simple query
response = client.command('SELECT version()')
print(f"Connected to ClickHouse. Server version: {response}")
except Exception as e:
print(f"Error occurred: {e}")
# Function to store data into ClickHouse
async def store_data(pair, event_time, minute, open_price, close_price, high_price, low_price, volume):
query = "INSERT INTO binance_kline_data (pair, event_time, minute, open_price, close_price, high_price, low_price, volume) VALUES"
values = (pair, event_time, minute, open_price, close_price, high_price, low_price, volume)
client.command(f"{query} {values}")
# WebSocket function to receive data and store it
async def run_websocket():
url = "wss://fstream.binance.com/stream?streams=btcusdt@kline_1m"
async with websockets.connect(url, ping_timeout=None, max_size=10000000) as websocket:
headers = {"method": "SUBSCRIBE", "params": ["btcusdt@kline_1m"], "id": 1}
await websocket.send(json.dumps(headers))
while True:
msg = await websocket.recv()
raw_data = json.loads(msg)
if 'result' in raw_data:
continue
data = raw_data['data']
pair = data['s']
candle = data['k']
dt_format = '%Y-%m-%d %H:%M:%S.%f' # Updated format with a dot before microseconds
event_time = datetime.fromtimestamp(data['E'] / 1000).strftime(dt_format)
minute = datetime.fromtimestamp(candle['t'] / 1000).strftime('%H:%M')
open_price = float(candle['o'])
close_price = float(candle['c'])
high_price = float(candle['h'])
low_price = float(candle['l'])
volume = float(candle['v'])
# Store the data in ClickHouse
await store_data(pair, event_time, minute, open_price, close_price, high_price, low_price, volume)
print(f'[{event_time}] {pair} - minute: {minute} | open: {open_price} | close: {close_price} | '
f'high: {high_price} | low: {low_price} | volume: {volume}')
# Run the WebSocket connection
if __name__ == "__main__":
asyncio.run(run_websocket())
If everything goes well.. you will see screen something like this..
Below is the query on ClickHouse on client server
SELECT *
FROM binance_kline_data
ORDER BY event_time
Below is the DB performance.
1439 rows in set. Elapsed: 0.005 sec. Processed 1.44 thousand rows, 77.71 KB (291.39 thousand rows/s., 15.74 MB/s.)
Peak memory usage: 79.13 KiB.
Enjoy coding!!!