Skip to content

Commit

Permalink
Allow users to set number of partitions in topic
Browse files Browse the repository at this point in the history
I have added new type, TopicConfig, and added it to options WithTopicConfig
in backwards-compatible way

For now it just has topic name and number of partitions, but it can be enhanced in the future with other options.

I wanted to also add a replication count, but there is just one broker in the cluster, so it makes no sense.
  • Loading branch information
karelbilek authored and orlangure committed Jun 4, 2024
1 parent 3ae68ce commit 60506f5
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 3 deletions.
10 changes: 10 additions & 0 deletions preset/kafka/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,22 @@ func WithVersion(version string) Option {

// WithTopics makes sure that the provided topics are available when Kafka is
// up and running.
// Both topics from WithTopics and WithTopicConfigs will be added to Kafka.
func WithTopics(topics ...string) Option {
return func(o *P) {
o.Topics = append(o.Topics, topics...)
}
}

// WithTopicConfigs makes sure that the provided topics with the given configs are available when Kafka is
// up and running. Unlike WithTopics, this allows to also set partition count.
// Both topics from WithTopics and WithTopicConfigs will be added to Kafka.
func WithTopicConfigs(topics ...TopicConfig) Option {
return func(o *P) {
o.TopicConfigs = append(o.TopicConfigs, topics...)
}
}

// WithMessages makes sure that these messages can be consumed during the test
// once the container is ready.
func WithMessages(messages ...Message) Option {
Expand Down
19 changes: 17 additions & 2 deletions preset/kafka/preset.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,20 @@ func Preset(opts ...Option) gnomock.Preset {
return p
}

type TopicConfig struct {
Topic string
NumPartitions int
}

// P is a Gnomock Preset implementation of Kafka.
type P struct {
Version string `json:"version"`
Topics []string `json:"topics"`
Messages []Message `json:"messages"`
MessagesFiles []string `json:"messages_files"`
UseSchemaRegistry bool `json:"use_schema_registry"`

TopicConfigs []TopicConfig `json:"topic_configs"`
}

// Image returns an image that should be pulled to create this container.
Expand Down Expand Up @@ -106,7 +113,7 @@ func (p *P) Options() []gnomock.Option {
gnomock.WithEnv("SAMPLEDATA=0"),
}

if len(p.Topics) > 0 || len(p.Messages) > 0 {
if len(p.Topics) > 0 || len(p.TopicConfigs) > 0 || len(p.Messages) > 0 {
opts = append(opts, gnomock.WithInit(p.initf))
}

Expand Down Expand Up @@ -226,7 +233,7 @@ func (p *P) ingestMessageFiles(ctx context.Context, c *gnomock.Container, conn *
p.Topics = append(p.Topics, topic)
}

topics := make([]kafka.TopicConfig, 0, len(p.Topics))
topics := make([]kafka.TopicConfig, 0, len(p.Topics)+len(p.TopicConfigs))

for _, topic := range p.Topics {
topics = append(topics, kafka.TopicConfig{
Expand All @@ -236,6 +243,14 @@ func (p *P) ingestMessageFiles(ctx context.Context, c *gnomock.Container, conn *
})
}

for _, topic := range p.TopicConfigs {
topics = append(topics, kafka.TopicConfig{
Topic: topic.Topic,
ReplicationFactor: 1, // cannot set more; cluster has just 1 node
NumPartitions: topic.NumPartitions,
})
}

if err := conn.CreateTopics(topics...); err != nil {
return fmt.Errorf("can't create topics: %w", err)
}
Expand Down
26 changes: 25 additions & 1 deletion preset/kafka/preset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ func testPreset(version string) func(t *testing.T) {
}

p := kafka.Preset(
kafka.WithTopics("topic-1", "topic-2"),
kafka.WithTopics("topic-1"),
kafka.WithTopicConfigs(kafka.TopicConfig{
Topic: "topic-2",
NumPartitions: 3,
}),
kafka.WithMessages(messages...),
kafka.WithVersion(version),
kafka.WithMessagesFile("./testdata/messages.json"),
Expand Down Expand Up @@ -89,6 +93,26 @@ func testPreset(version string) func(t *testing.T) {
c, err := kafkaclient.Dial("tcp", container.Address(kafka.BrokerPort))
require.NoError(t, err)

// Test that topic-1 exists, and topic-2 has all 3 partitions
topicReader := kafkaclient.NewReader(kafkaclient.ReaderConfig{
Brokers: []string{container.Address(kafka.BrokerPort)},
Topic: "topic-1",
})
_, err = topicReader.ReadLag(ctx)
require.NoError(t, err)
require.NoError(t, topicReader.Close())

for i := 0; i < 3; i++ {
topicReader := kafkaclient.NewReader(kafkaclient.ReaderConfig{
Brokers: []string{container.Address(kafka.BrokerPort)},
Topic: "topic-2",
Partition: i,
})
_, err = topicReader.ReadLag(ctx)
require.NoError(t, err)
require.NoError(t, topicReader.Close())
}

require.NoError(t, c.DeleteTopics("topic-1", "topic-2"))
require.Error(t, c.DeleteTopics("unknown-topic"))

Expand Down

0 comments on commit 60506f5

Please sign in to comment.