- Overall
- How to
- REST API
- SDKs
- Schema Registry
- Connectors
- Integrations
- Monitoring
- Help
Kafka API
Upstash uses Apache Kafka for deployments and provides a serverless Kafka cluster access using both native Kafka clients (over TCP) and REST API (over HTTP). As a consequence of this flexible model, there are some restrictions when using Kafka protocol, mainly for administrative Kafka APIs.
Currently following Kafka Protocol APIs are supported by Upstash:
NAME | KEY | NAME | KEY | NAME | KEY |
---|---|---|---|---|---|
Produce | 0 | DescribeGroups | 15 | EndTxn | 26 |
Fetch | 1 | ListGroups | 16 | TxnOffsetCommit | 28 |
ListOffsets | 2 | SaslHandshake | 17 | DescribeConfigs | 32 |
Metadata | 3 | ApiVersions | 18 | AlterConfigs | 33 |
OffsetCommit | 8 | CreateTopics | 19 | DescribeLogDirs | 35 |
OffsetFetch | 9 | DeleteTopics | 20 | SaslAuthenticate | 36 |
FindCoordinator | 10 | DeleteRecords | 21 | CreatePartitions | 37 |
JoinGroup | 11 | InitProducerId | 22 | DeleteGroups | 42 |
Heartbeat | 12 | OffsetForLeaderEpoch | 23 | IncrementalAlterConfigs | 44 |
LeaveGroup | 13 | AddPartitionsToTxn | 24 | OffsetDelete | 47 |
SyncGroup | 14 | AddOffsetsToTxn | 25 | DescribeCluster | 60 |
Some of the unsupported Kafka APIs are in our roadmap to make them available. If you need an API that we do not support at the moment, please drop a note to support@upstash.com. So we can inform you when we are planning to support it.
Connect Using Kafka Clients
Connecting to Upstash Kafka using any Kafka client is very straightforward. If you do not have a Kafka cluster and/or topic already, follow these steps to create one.
After creating a cluster and a topic, just go to cluster details page on the Upstash Console and copy bootstrap endpoint, username and password.
Then replace following parameters in the code snippets of your favourite Kafka client or language below.
{{ BOOTSTRAP_ENDPOINT }}
{{ UPSTASH_KAFKA_USERNAME }}
{{ UPSTASH_KAFKA_PASSWORD }}
{{ TOPIC_NAME }}
Create a Topic
class CreateTopic {
public static void main(String[] args) throws Exception {
var props = new Properties();
props.put("bootstrap.servers", "{{ BOOTSTRAP_ENDPOINT }}");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"{{ UPSTASH_KAFKA_USERNAME }}\" " +
"password=\"{{ UPSTASH_KAFKA_PASSWORD }}\";");
try (var admin = Admin.create(props)) {
admin.createTopics(
Set.of(new NewTopic("{{ TOPIC_NAME }}", partitions, replicationFactor))
).all().get();
}
}
}
class CreateTopic {
public static void main(String[] args) throws Exception {
var props = new Properties();
props.put("bootstrap.servers", "{{ BOOTSTRAP_ENDPOINT }}");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"{{ UPSTASH_KAFKA_USERNAME }}\" " +
"password=\"{{ UPSTASH_KAFKA_PASSWORD }}\";");
try (var admin = Admin.create(props)) {
admin.createTopics(
Set.of(new NewTopic("{{ TOPIC_NAME }}", partitions, replicationFactor))
).all().get();
}
}
}
const { Kafka } = require("kafkajs");
const kafka = new Kafka({
brokers: ["{{ BOOTSTRAP_ENDPOINT }}"],
sasl: {
mechanism: "scram-sha-512",
username: "{{ UPSTASH_KAFKA_USERNAME }}",
password: "{{ UPSTASH_KAFKA_PASSWORD }}",
},
ssl: true,
});
const admin = kafka.admin();
const createTopic = async () => {
await admin.connect();
await admin.createTopics({
validateOnly: false,
waitForLeaders: true,
topics: [
{
topic: "{{ TOPIC_NAME }}",
numPartitions: partitions,
replicationFactor: replicationFactor,
},
],
});
await admin.disconnect();
};
createTopic();
from kafka import KafkaAdminClient
from kafka.admin import NewTopic
admin = KafkaAdminClient(
bootstrap_servers=['{{ BOOTSTRAP_ENDPOINT }}'],
sasl_mechanism='SCRAM-SHA-512',
security_protocol='SASL_SSL',
sasl_plain_username='{{ UPSTASH_KAFKA_USERNAME }}',
sasl_plain_password='{{ UPSTASH_KAFKA_PASSWORD }}',
)
admin.create_topics([NewTopic(name='{{ TOPIC_NAME }}', num_partitions=partitions, replication_factor=replicationFactor)])
admin.close()
import (
"context"
"crypto/tls"
"log"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/scram"
)
func main() {
mechanism, err := scram.Mechanism(scram.SHA512,
"{{ UPSTASH_KAFKA_USERNAME }}", "{{ UPSTASH_KAFKA_PASSWORD }}")
if err != nil {
log.Fatalln(err)
}
dialer := &kafka.Dialer{
SASLMechanism: mechanism,
TLS: &tls.Config{},
}
conn, err := dialer.Dial("tcp", "{{ BOOTSTRAP_ENDPOINT }}")
if err != nil {
log.Fatalln(err)
}
defer conn.Close()
controller, err := conn.Controller()
if err != nil {
log.Fatalln(err)
}
controllerConn, err := dialer.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
log.Fatalln(err)
}
defer controllerConn.Close()
err = controllerConn.CreateTopics(kafka.TopicConfig{
Topic: "{{ TOPIC_NAME }}",
NumPartitions: partitions,
ReplicationFactor: replicationFactor,
})
if err != nil {
log.Fatalln(err)
}
}
Produce a Message
class Produce {
public static void main(String[] args) throws Exception {
var props = new Properties();
props.put("bootstrap.servers", "{{ BOOTSTRAP_ENDPOINT }}");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"{{ UPSTASH_KAFKA_USERNAME }}\" " +
"password=\"{{ UPSTASH_KAFKA_PASSWORD }}\";");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
try (var producer = new KafkaProducer<String, String>(props)) {
producer.send(new ProducerRecord<>("{{ TOPIC_NAME }}", "Hello Upstash!"));
producer.flush();
}
}
}
class Produce {
public static void main(String[] args) throws Exception {
var props = new Properties();
props.put("bootstrap.servers", "{{ BOOTSTRAP_ENDPOINT }}");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"{{ UPSTASH_KAFKA_USERNAME }}\" " +
"password=\"{{ UPSTASH_KAFKA_PASSWORD }}\";");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
try (var producer = new KafkaProducer<String, String>(props)) {
producer.send(new ProducerRecord<>("{{ TOPIC_NAME }}", "Hello Upstash!"));
producer.flush();
}
}
}
const { Kafka } = require("kafkajs");
const kafka = new Kafka({
brokers: ["{{ BOOTSTRAP_ENDPOINT }}"],
sasl: {
mechanism: "scram-sha-512",
username: "{{ UPSTASH_KAFKA_USERNAME }}",
password: "{{ UPSTASH_KAFKA_PASSWORD }}",
},
ssl: true,
});
const producer = kafka.producer();
const produce = async () => {
await producer.connect();
await producer.send({
topic: "{{ TOPIC_NAME }}",
messages: [{ value: "Hello Upstash!" }],
});
await producer.disconnect();
};
produce();
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=['{{ BOOTSTRAP_ENDPOINT }}'],
sasl_mechanism='SCRAM-SHA-512',
security_protocol='SASL_SSL',
sasl_plain_username='{{ UPSTASH_KAFKA_USERNAME }}',
sasl_plain_password='{{ UPSTASH_KAFKA_PASSWORD }}',
)
future = producer.send('{{ TOPIC_NAME }}', b'Hello Upstash!')
record_metadata = future.get(timeout=10)
print (record_metadata)
producer.close()
import (
"context"
"crypto/tls"
"log"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/scram"
)
func main() {
mechanism, err := scram.Mechanism(scram.SHA512,
"{{ UPSTASH_KAFKA_USERNAME }}", "{{ UPSTASH_KAFKA_PASSWORD }}")
if err != nil {
log.Fatalln(err)
}
dialer := &kafka.Dialer{
SASLMechanism: mechanism,
TLS: &tls.Config{},
}
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"{{ BOOTSTRAP_ENDPOINT }}"},
Topic: "{{ TOPIC_NAME }}",
Dialer: dialer,
})
defer w.Close()
err = w.WriteMessages(context.Background(),
kafka.Message{
Value: []byte("Hello Upstash!"),
},
)
if err != nil {
log.Fatalln("failed to write messages:", err)
}
}
Consume Messages
class Consume {
public static void main(String[] args) throws Exception {
var props = new Properties();
props.put("bootstrap.servers", "{{ BOOTSTRAP_ENDPOINT }}");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"{{ UPSTASH_KAFKA_USERNAME }}\" " +
"password=\"{{ UPSTASH_KAFKA_PASSWORD }}\";");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
props.put("group.id", "{{ GROUP_NAME }}");
try(var consumer = new KafkaConsumer<String, String>(props)) {
consumer.subscribe(Collections.singleton("{{ TOPIC_NAME }}"));
var records = consumer.poll(Duration.ofSeconds(10));
for (var record : records) {
System.out.println(record);
}
}
}
}
class Consume {
public static void main(String[] args) throws Exception {
var props = new Properties();
props.put("bootstrap.servers", "{{ BOOTSTRAP_ENDPOINT }}");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"{{ UPSTASH_KAFKA_USERNAME }}\" " +
"password=\"{{ UPSTASH_KAFKA_PASSWORD }}\";");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
props.put("group.id", "{{ GROUP_NAME }}");
try(var consumer = new KafkaConsumer<String, String>(props)) {
consumer.subscribe(Collections.singleton("{{ TOPIC_NAME }}"));
var records = consumer.poll(Duration.ofSeconds(10));
for (var record : records) {
System.out.println(record);
}
}
}
}
const { Kafka } = require("kafkajs");
const kafka = new Kafka({
brokers: ["{{ BOOTSTRAP_ENDPOINT }}"],
sasl: {
mechanism: "scram-sha-512",
username: "{{ UPSTASH_KAFKA_USERNAME }}",
password: "{{ UPSTASH_KAFKA_PASSWORD }}",
},
ssl: true,
});
const consumer = kafka.consumer({ groupId: "{{ GROUP_NAME }}" });
const consume = async () => {
await consumer.connect();
await consumer.subscribe({ topic: "{{ TOPIC_NAME }}", fromBeginning: true });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
topic: topic,
partition: partition,
message: JSON.stringify(message),
});
},
});
};
consume();
from kafka import KafkaConsumer
consumer = KafkaConsumer(
bootstrap_servers=['{{ BOOTSTRAP_ENDPOINT }}'],
sasl_mechanism='SCRAM-SHA-512',
security_protocol='SASL_SSL',
sasl_plain_username='{{ UPSTASH_KAFKA_USERNAME }}',
sasl_plain_password='{{ UPSTASH_KAFKA_PASSWORD }}',
group_id='{{ GROUP_NAME }}',
auto_offset_reset='earliest',
)
consumer.subscribe(['{{ TOPIC_NAME }}'])
records = consumer.poll(timeout_ms=10000)
print(records)
consumer.close()
import (
"context"
"crypto/tls"
"log"
"time"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/scram"
)
func main() {
mechanism, err := scram.Mechanism(scram.SHA512,
"{{ UPSTASH_KAFKA_USERNAME }}", "{{ UPSTASH_KAFKA_PASSWORD }}")
if err != nil {
log.Fatalln(err)
}
dialer := &kafka.Dialer{
SASLMechanism: mechanism,
TLS: &tls.Config{},
}
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"{{ BOOTSTRAP_ENDPOINT }}"},
GroupID: "{{ GROUP_NAME }}",
Topic: "{{ TOPIC_NAME }}",
Dialer: dialer,
})
defer r.Close()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
m, err := r.ReadMessage(ctx)
if err != nil {
log.Fatalln(err)
}
log.Printf("%+v\n", m)
}
Was this page helpful?