Storing tick by tick Webscocket data into ClickHouse.

Ravindra Elicherla
4 min readOct 21, 2024

--

Recently I picked up ClickHouse as a Database to store stock market tick by tick data for algo trading. I looked at multiple places to understand how the websocket data can be stored in Clickhouse. But did not find any. Finally, I completed the coding. I thought this blog will be useful for others who are looking for a solution like this. Before getting to details, some basics.

What is clickhouse?

ClickHouse is a column-oriented database that enables its users to generate powerful analytics, using SQL queries, in real-time. From all the databases I had seen so far, this seems fastest. Below are the few use cases of ClickHouse.

1. Real-Time Analytics

  • Use Case: Tracking user behavior, website performance, or IoT device data in real-time.
  • Example: Monitoring website traffic and user interactions with low-latency queries.
  • Benefit: ClickHouse can process large volumes of data with minimal delay, making it ideal for real-time dashboards and monitoring systems.

2. Time-Series Data Storage

  • Use Case: Storing and querying time-series data, such as metrics, logs, or financial data.
  • Example: Cryptocurrency trading platforms storing price and volume data over time.
  • Benefit: Its efficient compression and fast query capabilities make it suitable for time-series applications like IoT data or financial trading systems.

3. Big Data Analytics

  • Use Case: Handling massive datasets for reporting, ad-hoc queries, and advanced analytics.
  • Example: E-commerce platforms analyzing customer purchasing patterns and product performance.
  • Benefit: It’s designed to handle billions of rows with efficient indexing, making it ideal for high-volume, big data scenarios.

Installing ClickHouse is super easy

  1. Download the binary into a folder.

curl https://clickhouse.com/ | sh

2. Start the server

./clickhouse server

3. Start the client

./clickhouse client

You will see something like this if everything is successful

local :)

Code:

  1. Import Lybraries
import asyncio
import websockets
import json
from datetime import datetime
import clickhouse_connect

2. ClickHouse client setup and test the connection

client = clickhouse_connect.get_client(host='localhost', port=8123, username='default', password='')

try:
# Test connection by running a simple query
response = client.command('SELECT version()')
print(f"Connected to ClickHouse. Server version: {response}")
except Exception as e:
print(f"Error occurred: {e}")

2a. Create ClickHouse table if it doesn’t exist. This is one off step.

# Create ClickHouse table if it doesn't exist
client.command('''
CREATE TABLE IF NOT EXISTS binance_kline_data (
pair String,
event_time DateTime,
minute String,
open_price Float32,
close_price Float32,
high_price Float32,
low_price Float32,
volume Float32
) ENGINE = MergeTree()
ORDER BY event_time
''')

3. Function to store data into ClickHouse

async def store_data(pair, event_time, minute, open_price, close_price, high_price, low_price, volume):
query = "INSERT INTO binance_kline_data (pair, event_time, minute, open_price, close_price, high_price, low_price, volume) VALUES"
values = (pair, event_time, minute, open_price, close_price, high_price, low_price, volume)
client.command(f"{query} {values}")

4. WebSocket function to receive data and store it

async def run_websocket():
url = "wss://fstream.binance.com/stream?streams=btcusdt@kline_1m"
async with websockets.connect(url, ping_timeout=None, max_size=10000000) as websocket:
headers = {"method": "SUBSCRIBE", "params": ["btcusdt@kline_1m"], "id": 1}
await websocket.send(json.dumps(headers))

while True:
msg = await websocket.recv()
raw_data = json.loads(msg)
if 'result' in raw_data:
continue

data = raw_data['data']
pair = data['s']
candle = data['k']
dt_format = '%Y-%m-%d %H:%M:%S.%f' # Updated format with a dot before microseconds
event_time = datetime.fromtimestamp(data['E'] / 1000).strftime(dt_format)
minute = datetime.fromtimestamp(candle['t'] / 1000).strftime('%H:%M')

open_price = float(candle['o'])
close_price = float(candle['c'])
high_price = float(candle['h'])
low_price = float(candle['l'])
volume = float(candle['v'])

# Store the data in ClickHouse
await store_data(pair, event_time, minute, open_price, close_price, high_price, low_price, volume)

print(f'[{event_time}] {pair} - minute: {minute} | open: {open_price} | close: {close_price} | '
f'high: {high_price} | low: {low_price} | volume: {volume}')

5. Finally Run the WebSocket connection

if __name__ == "__main__":
asyncio.run(run_websocket())

Here is the entire code

import asyncio
import websockets
import json
from datetime import datetime
import clickhouse_connect

# ClickHouse client setup
client = clickhouse_connect.get_client(host='localhost', port=8123, username='default', password='')

try:
# Test connection by running a simple query
response = client.command('SELECT version()')
print(f"Connected to ClickHouse. Server version: {response}")
except Exception as e:
print(f"Error occurred: {e}")

# Function to store data into ClickHouse
async def store_data(pair, event_time, minute, open_price, close_price, high_price, low_price, volume):
query = "INSERT INTO binance_kline_data (pair, event_time, minute, open_price, close_price, high_price, low_price, volume) VALUES"
values = (pair, event_time, minute, open_price, close_price, high_price, low_price, volume)
client.command(f"{query} {values}")

# WebSocket function to receive data and store it
async def run_websocket():
url = "wss://fstream.binance.com/stream?streams=btcusdt@kline_1m"
async with websockets.connect(url, ping_timeout=None, max_size=10000000) as websocket:
headers = {"method": "SUBSCRIBE", "params": ["btcusdt@kline_1m"], "id": 1}
await websocket.send(json.dumps(headers))

while True:
msg = await websocket.recv()
raw_data = json.loads(msg)
if 'result' in raw_data:
continue

data = raw_data['data']
pair = data['s']
candle = data['k']
dt_format = '%Y-%m-%d %H:%M:%S.%f' # Updated format with a dot before microseconds
event_time = datetime.fromtimestamp(data['E'] / 1000).strftime(dt_format)
minute = datetime.fromtimestamp(candle['t'] / 1000).strftime('%H:%M')

open_price = float(candle['o'])
close_price = float(candle['c'])
high_price = float(candle['h'])
low_price = float(candle['l'])
volume = float(candle['v'])

# Store the data in ClickHouse
await store_data(pair, event_time, minute, open_price, close_price, high_price, low_price, volume)

print(f'[{event_time}] {pair} - minute: {minute} | open: {open_price} | close: {close_price} | '
f'high: {high_price} | low: {low_price} | volume: {volume}')


# Run the WebSocket connection
if __name__ == "__main__":
asyncio.run(run_websocket())

If everything goes well.. you will see screen something like this..

Below is the query on ClickHouse on client server

 SELECT *
FROM binance_kline_data
ORDER BY event_time

Below is the DB performance.

1439 rows in set. Elapsed: 0.005 sec. Processed 1.44 thousand rows, 77.71 KB (291.39 thousand rows/s., 15.74 MB/s.)

Peak memory usage: 79.13 KiB.

Enjoy coding!!!

--

--

Ravindra Elicherla
Ravindra Elicherla

Written by Ravindra Elicherla

Geek, Painter, Fitness enthusiast, Book worm, Options expert and Simple human

No responses yet