I looked last time at the very bare basics of writing a Kafka producer using Go. It worked, but only with everything lined up and pointing the right way. There was no error handling of any sorts. Let’s see about fixing this now.
A bit of code tidying
To make the code more readable to me, I split out the configuration into a new variable that’s then passed to the NewProducer
, so instead of the more compact but possibly less readable
p, _ := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092"})
I have this:
c := kafka.ConfigMap{
"boostrap.servers": "localhost:9092"}
p, _ := kafka.NewProducer(&c)
Catching errors from NewProducer()
Instead of p,
when invoking NewProducer
(the denoting an empty placeholder variable), we supply a variable into which the error can be stored:
p, e := kafka.NewProducer(&c)
Now we can check it and report back if there’s a problem:
if e != nil {
fmt.Printf("😢Oh noes, there's an error creating the Producer! %v", e)
} else {
// do producing stuff
}
If I make a deliberate mistake so that the NewProducer
returns an error we get this
😢 Oh noes, there's an error creating the Producer! No such configuration property: "boostrap.servers"
Be Assertive!
We can take this a step further and look at the type of error that’s returned. Whilst e
is a generic interface for holding an error, we can try casting it to a kafka.Error
. Why would we want to do this? Well, kafka.Error
exposes properties including an ErrorCode that describes the type of error. From that code we can handle the error in a more useful way than just dumping it to the screen. For example, if it’s an error about configuration properties (as in the example above) we could tell the user where to find the reference information for this; but including that in an error to the user if it’s not a problem in this area would be redundant (and possibly confusing).
To cast it to a kafka.Error
we use a type assertion:
ke := e.(kafka.Error)
We also check for success when we do it (just in case it’s not actually a kafka.Error
)
if ke, ok := e.(kafka.Error); ok==true {
// it's a kafka.Error
} else {
// it's an error, but not a kafka.Error
}
Once we’ve established that it’s a kafka.Error
we can use the Code()
function to access the ErrorCode
and handle it with a switch
:
if e != nil {
if ke, ok := e.(kafka.Error); ok == true {
switch ec := ke.Code(); ec {
case kafka.ErrInvalidArg:
fmt.Printf("😢 Can't create the producer because you've configured it wrong (code: %d)!\n\t%v\n\nTo see the configuration options, refer to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md", ec, e)
default:
fmt.Printf("😢 Can't create the producer (Kafka error code %d)\n\tError: %v\n", ec, e)
}
} else {
// It's not a kafka.Error
fmt.Printf("😢 Oh noes, there's a generic error creating the Producer! %v", e.Error())
}
So now the same mistake as before in configuring bootstrap.servers
is caught and reported like this:
😢 Can't create the producer because you've configured it wrong (code: -186)!
No such configuration property: "boostrap.servers"
To see the configuration options, refer to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
Handling Producer Errors
When it comes to the Produce
I’ve done the same as above - split out the creation of the message into a new variable for clarity, and added a check for error:
// Build the message
m := kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic},
Value: []byte("Hello world")}
// Produce the message
if e := p.Produce(&m, nil); e != nil {
fmt.Printf("😢 Darn, there's an error producing the message! %v", e.Error())
}
But if I kill my Kafka broker and run my code, I don’t get an error. Why not? Because if you look at the documentation for Produce
you’ll see that it says
This is an asynchronous call that enqueues the message on the internal transmit queue, thus returning immediately
So all we’re doing is "fire and forget". Put it on an internal queue, and we’re done. We still don’t know if it was delivered.
Producer Events
Since the Producer is called asychronously, it uses Go Channels to provide events back to the calling application about what’s going on. These can be of different types, covering both errors, and the status of messages that have been sent for producing.
You can consume these events in two different ways:
-
Poll the producer’s
Events()
channel and triage by type -
Create a dedicated delivery report channel, and poll
Events()
for errors only
Here’s an example of the first option, in which we use a Go Routine to listen to all events and handle them based on type:
go func() {
for {
// The `select` blocks until one of the `case` conditions
// are met - therefore we run it in a Go Routine.
select {
case ev := <-p.Events():
// Look at the type of Event we've received
switch ev.(type) {
case *kafka.Message:
// It's a delivery report
km := ev.(*kafka.Message)
if km.TopicPartition.Error != nil {
fmt.Printf("☠️ Failed to send message '%v' to topic '%v'\n\tErr: %v",
string(km.Value),
string(*km.TopicPartition.Topic),
km.TopicPartition.Error)
} else {
fmt.Printf("✅ Message '%v' delivered to topic '%v' (partition %d at offset %d)\n",
string(km.Value),
string(*km.TopicPartition.Topic),
km.TopicPartition.Partition,
km.TopicPartition.Offset)
}
case kafka.Error:
// It's an error
em := ev.(kafka.Error)
fmt.Printf("☠️ Uh oh, caught an error:\n\t%v\n", em)
default:
// It's not anything we were expecting
fmt.Printf("Got an event that's not a Message or Error 👻\n\t%v\n", ev)
}
}
}
}()
Now when we produce a message successfully we receive a kafka.Message
with a nil
value in TopicPartition.Error
and the offset of the produced message in TopicPartition.Offset
:
✅ Message 'Hello world' delivered to topic 'test_topic_02' (partition 0 at offset 0)
And if there’s a problem we get full details of it
☠️ Uh oh, caught an error:
foobar:9092/1: Failed to resolve 'foobar:9092': nodename nor servname provided, or not known (after 64ms in state CONNECT)
Phantom events
If you run the code as-is you’ll notice you get this
✅ Message 'Hello world' delivered to topic 'test_topic_02' (partition 0 at offset 3)
--
✨ All messages flushed from the queue
Got an event that's not a Message or Error 👻
<nil>
Got an event that's not a Message or Error 👻
<nil>
Got an event that's not a Message or Error 👻
<nil>
Got an event that's not a Message or Error 👻
<nil>
Got an event that's not a Message or Error 👻
<nil>
…
After the Close()
is called, there are still events being consumed by our Go Routine event handler. We don’t want to be doing this (if the Producer is closed, then its Events()
channel is meaningless), so use this pattern (inspired by this code) to avoid it:
// For signalling termination from main to go-routine
termChan := make(chan bool, 1)
// For signalling that termination is done from go-routine to main
doneChan := make(chan bool)
go func() {
doTerm := false
for !doTerm {
select {
// channels that we're listening to
case <-termChan:
doTerm = true
}
}
close(doneChan)
}()
// …
// We're ready to finish
termChan <- true
// wait for go-routine to terminate
<-doneChan
// Now we can exit
p.Close()
Make sure you Flush()
Once we’ve sent our message to the Producer, we get control back straight away, because it’s an asynchronous process. If we don’t put anything else in place the code will run on through to the Close()
and exit. We want to make sure we’ve sent all the messages successfully - or not. To do this we use the Flush()
function with a timeout of how long we’ll wait before considering sending messages to have failed.
// Flush the Producer queue
if r := p.Flush(10000); r > 0 {
fmt.Printf("\n--\n⚠️ Failed to flush all messages after 10 seconds. %d message(s) remain\n", r)
} else {
fmt.Println("\n--\n✨ All messages flushed from the queue")
}
With this in place we get a confirmation on exit of success:
✨ All messages flushed from the queue
or failure:
⚠️ Failed to flush all messages after 10 seconds. 1 message(s) remain
The finished result
package main
import (
"fmt"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)
func main() {
// --
// The topic is passed as a pointer to the Producer, so we can't
// use a hard-coded literal. And a variable is a nicer way to do
// it anyway ;-)
topic := "test_topic_02"
// --
// Create Producer instance
// https://docs.confluent.io/current/clients/confluent-kafka-go/index.html#NewProducer
// Store the config
c := kafka.ConfigMap{
"bootstrap.servers": "localhost:9092"}
// Variable p holds the new Producer instance.
p, e := kafka.NewProducer(&c)
// Check for errors in creating the Producer
if e != nil {
if ke, ok := e.(kafka.Error); ok == true {
switch ec := ke.Code(); ec {
case kafka.ErrInvalidArg:
fmt.Printf("😢 Can't create the producer because you've configured it wrong (code: %d)!\n\t%v\n\nTo see the configuration options, refer to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md", ec, e)
default:
fmt.Printf("😢 Can't create the producer (Kafka error code %d)\n\tError: %v\n", ec, e)
}
} else {
// It's not a kafka.Error
fmt.Printf("😢 Oh noes, there's a generic error creating the Producer! %v", e.Error())
}
} else {
// For signalling termination from main to go-routine
termChan := make(chan bool, 1)
// For signalling that termination is done from go-routine to main
doneChan := make(chan bool)
// --
// Send a message using Produce()
// https://docs.confluent.io/current/clients/confluent-kafka-go/index.html#Producer.Produce
//
// Build the message
m := kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic},
Value: []byte("Hello world")}
// Handle any events that we get
go func() {
doTerm := false
for !doTerm {
// The `select` blocks until one of the `case` conditions
// are met - therefore we run it in a Go Routine.
select {
case ev := <-p.Events():
// Look at the type of Event we've received
switch ev.(type) {
case *kafka.Message:
// It's a delivery report
km := ev.(*kafka.Message)
if km.TopicPartition.Error != nil {
fmt.Printf("☠️ Failed to send message '%v' to topic '%v'\n\tErr: %v",
string(km.Value),
string(*km.TopicPartition.Topic),
km.TopicPartition.Error)
} else {
fmt.Printf("✅ Message '%v' delivered to topic '%v' (partition %d at offset %d)\n",
string(km.Value),
string(*km.TopicPartition.Topic),
km.TopicPartition.Partition,
km.TopicPartition.Offset)
}
case kafka.Error:
// It's an error
em := ev.(kafka.Error)
fmt.Printf("☠️ Uh oh, caught an error:\n\t%v\n", em)
default:
// It's not anything we were expecting
fmt.Printf("Got an event that's not a Message or Error 👻\n\t%v\n", ev)
}
case <-termChan:
doTerm = true
}
}
close(doneChan)
}()
// Produce the message
if e := p.Produce(&m, nil); e != nil {
fmt.Printf("😢 Darn, there's an error producing the message! %v", e.Error())
}
// --
// Flush the Producer queue
t := 10000
if r := p.Flush(t); r > 0 {
fmt.Printf("\n--\n⚠️ Failed to flush all messages after %d milliseconds. %d message(s) remain\n", t, r)
} else {
fmt.Println("\n--\n✨ All messages flushed from the queue")
}
// --
// Stop listening to events and close the producer
// We're ready to finish
termChan <- true
// wait for go-routine to terminate
<-doneChan
// Now we can exit
p.Close()
}
}