Skip to content

ksqlDB

ksqlDB is a streaming SQL engine built on Kafka Streams that translates SQL statements into Kafka Streams topologies, enabling stream processing without JVM programming.

Key Facts

  • Part of Confluent Platform (not Apache Kafka core)
  • Runs as JVM application connecting to Kafka cluster; each persistent query runs as a Kafka Streams application
  • SQL statements are converted into Kafka Streams topologies under the hood
  • STREAM = partitioned, immutable, append-only collection (maps to KStream)
  • TABLE = mutable, partitioned collection where updates overwrite previous values (maps to KTable)
  • Push queries: run continuously, emit results as new data arrives (EMIT CHANGES)
  • Pull queries: one-time point-in-time result on materialized tables (like traditional SQL SELECT)
  • Deployment modes: Interactive (CLI/REST) and Headless/Autonomous (queries from file only, production mode)
  • Supports formats: JSON, AVRO, PROTOBUF, DELIMITED (CSV), KAFKA (raw bytes)
  • Integrates with [[schema-registry]] automatically when using AVRO/PROTOBUF
  • Default port: 8088 (REST API)
  • ksqlDB is NOT a replacement for a database - it processes streams

Patterns

DDL - Create Stream and Table

CREATE STREAM pageviews (
  viewtime BIGINT,
  userid VARCHAR,
  pageid VARCHAR
) WITH (KAFKA_TOPIC='pageviews', VALUE_FORMAT='JSON');

CREATE TABLE users (
  userid VARCHAR PRIMARY KEY,
  regionid VARCHAR,
  gender VARCHAR
) WITH (KAFKA_TOPIC='users', VALUE_FORMAT='JSON');

Persistent Queries (Continuous Processing)

-- Continuous aggregation with join
CREATE TABLE pageviews_per_region AS
  SELECT regionid, COUNT(*) AS numusers
  FROM pageviews LEFT JOIN users ON pageviews.userid = users.userid
  GROUP BY regionid
  EMIT CHANGES;

-- Filtering stream
CREATE STREAM pageviews_female AS
  SELECT * FROM pageviews
  LEFT JOIN users ON pageviews.userid = users.userid
  WHERE gender = 'FEMALE'
  EMIT CHANGES;

Windowed Aggregations

CREATE TABLE order_count AS
  SELECT customerid, COUNT(*) AS cnt, SUM(amount) AS total
  FROM orders
  WINDOW TUMBLING (SIZE 1 HOUR)
  GROUP BY customerid
  EMIT CHANGES;

-- Hopping window with retention
WINDOW HOPPING (SIZE 30 SECONDS, ADVANCE BY 10 SECONDS,
                RETENTION 7 DAYS, GRACE PERIOD 30 MINUTES)

-- Session window
WINDOW SESSION (5 MINUTES)

Joins

-- Stream-Table join (enrichment, no window needed)
CREATE STREAM enriched_orders AS
  SELECT o.orderid, o.amount, c.name, c.region
  FROM orders o LEFT JOIN customers c ON o.customerid = c.id
  EMIT CHANGES;

-- Stream-Stream join (must be windowed)
CREATE STREAM combined AS
  SELECT * FROM stream1 s1
  INNER JOIN stream2 s2 WITHIN 1 HOUR ON s1.id = s2.id
  EMIT CHANGES;

REST API

# Execute statement
curl -X POST http://localhost:8088/ksql \
  -H "Content-Type: application/vnd.ksql.v1+json" \
  -d '{"ksql": "SHOW STREAMS;", "streamsProperties": {}}'

# Run query
curl -X POST http://localhost:8088/query \
  -H "Content-Type: application/vnd.ksql.v1+json" \
  -d '{"ksql": "SELECT * FROM pageviews EMIT CHANGES;"}'

Useful Commands

SHOW TOPICS;                    SHOW STREAMS;
SHOW TABLES;                    SHOW QUERIES;
DESCRIBE stream_name;           DESCRIBE EXTENDED stream_name;
PRINT 'topic_name' FROM BEGINNING;
DROP STREAM stream_name;        TERMINATE query_id;
INSERT INTO stream_name (col1) VALUES ('val1');

Event Metadata Pseudo-Columns

SELECT ROWTIME, ROWPARTITION, ROWOFFSET, HEADERS FROM stream EMIT CHANGES;

Kafka Connect Integration

-- External Connect cluster
-- Server config: ksql.connect.url=http://localhost:8083

-- Or embedded mode: ksql.connect.worker.config=...

CREATE SOURCE CONNECTOR `postgres-source` WITH (
  'connector.class' = 'io.confluent.connect.jdbc.JdbcSourceConnector',
  'connection.url' = 'jdbc:postgresql://localhost:5432/db',
  'table.whitelist' = 'orders',
  'mode' = 'incrementing'
);
SHOW CONNECTORS;
DESCRIBE CONNECTOR `postgres-source`;

Functions

Built-in scalar: SUBSTRING, CONCAT, CAST, TIMESTAMPTOSTRING, LEN, ABS, UCASE, LCASE Aggregate: COUNT, SUM, MIN, MAX, AVG, COLLECT_LIST, COLLECT_SET, TOPK, TOPKDISTINCT Custom UDFs: implement in Java, deploy as JAR files.

Gotchas

  • Pull queries only work on materialized tables - not on raw streams; must create a persistent aggregate query first
  • Join requirements - events must have same key type, collections must have same partition count and partitioning strategy
  • Each persistent query creates internal topics and state stores - resource usage scales with query count
  • Headless mode for production - queries.file=/path/to/query.sql disables REST API, prevents runtime modification
  • Grace period default is 24 hours - can consume significant memory for windowed aggregations; tune based on actual late-data patterns

See Also

  • [[kafka-streams]] - underlying library ksqlDB is built on
  • [[kafka-streams-windowing]] - window types and time semantics
  • [[kafka-connect]] - connector integration from ksqlDB
  • [[schema-registry]] - schema management for AVRO/PROTOBUF formats
  • ksqlDB Documentation