Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Messages stop being delivered to a consumer [2.10.24] #6290

Open
niks3089 opened this issue Dec 20, 2024 · 9 comments
Open

Messages stop being delivered to a consumer [2.10.24] #6290

niks3089 opened this issue Dec 20, 2024 · 9 comments
Labels
defect Suspected defect such as a bug or regression

Comments

@niks3089
Copy link

niks3089 commented Dec 20, 2024

Observed behavior

I have multiple consumers connecting to multiple leaf nodes across multiple regions and I am having issues with some consumers which don't process the messages even though they're available and messages are available to process.

Information for Consumer project_stream > janus-edge-fra-prod-1_apikey_consumer created 2024-12-20T01:03:01+05:30

Configuration:

                    Name: janus-edge-fra-prod-1_apikey_consumer
               Pull Mode: true
         Filter Subjects: project_stream.apikey
          Deliver Policy: All
              Ack Policy: Explicit
                Ack Wait: 30.00s
           Replay Policy: Instant
         Max Ack Pending: 1,000
       Max Waiting Pulls: 512

Cluster Information:

                    Name: momentum-prod-leaf-us-east
                  Leader: momentum-prod-leaf-pitt-1
                 Replica: momentum-prod-leaf-ewr-1, current, seen 825ms ago

State:

  Last Delivered Message: Consumer sequence: 132,361 Stream sequence: 15,775,201 Last delivery: 5h30m4s ago
    Acknowledgment Floor: Consumer sequence: 132,361 Stream sequence: 15,775,201 Last Ack: 5h30m3s ago
        Outstanding Acks: 0 out of maximum 1,000
    Redelivered Messages: 0
    Unprocessed Messages: 161
           Waiting Pulls: 0 of maximum 512

Basically, the consumer stalls and doesn't make any progress.

        let project_consumer: PullConsumer = jetstream
            .get_stream(&config.project_stream)
            .await?
            .get_or_create_consumer(
                &format!("{}_project_consumer", config.host),
                pull::Config {
                    durable_name: Some(format!("{}_project_consumer", config.host)),
                    ack_policy: AckPolicy::Explicit,
                    filter_subjects: vec![config.project_stream_filter.clone()],
                    ..Default::default()
                },
            )
            .await?;

The code to fetch the data is like this

    async fn project_subscriber_task(&self) {
        const BATCH_SIZE: i32 = 1000;

        loop {
            tokio::select! {
                _ = self.shutdown_notify.notified() => {
                    break;
                }
                _ = async {
                    let mut messages = match self.project_consumer.fetch()
                        .max_messages(BATCH_SIZE as usize)
                        .messages()
                        .await
                    {
                        Ok(msgs) => msgs,
                        Err(e) => {
                            statsd_count!("execution.consumer_messages_error", 1);
                            error!("Error getting messages: {:?}", e);
                            return;
                        }
                    };

                    while let Some(message_result) = messages.next().await {
                        match message_result {
                            Ok(message) => {
                                self.handle_project_message(message, &keydb).await;
                            }
                            Err(e) => {
                                statsd_count!("execution.project_message_receive_error", 1);
                                error!("Error receiving project message: {:?}", e);
                            }
                        }
                    }
                } => {}
            }
        }
    }

So when I log the above code, messages.next() doesn't return any messages and it keeps running in busy loop. This is a severe problem. Is it a known issue? Am I missing something? Any help is appreciated. Thanks!

Note: I ack the message in handle_project_message and the same code works for other consumers and random consumer gets stuck

Expected behavior

The consumers like other similar consumers connected to the same cluster and node processes the messages and I don't know what triggers this issue

Server and client version

nats-server: 2.10.24

 nats --version
0.1.5

Host environment

No response

Steps to reproduce

No response

@niks3089 niks3089 added the defect Suspected defect such as a bug or regression label Dec 20, 2024
@Jarema
Copy link
Member

Jarema commented Dec 27, 2024

There are two issues here:

1. Why consumer do not get the messges?

For that we need a bit more info - first of all - are you sure that the consumer in code matches the name with the consumer from info?

Maybe it would be useful to have consumer info called from withing the code to be sure.

2. Tight loop

fetch, as per docs:

    /// Returns a batch of specified number of messages, or if there are less messages on the
    /// [Stream] than requested, returns all available messages.

That means that if there are no messages for given consumer, it will close the Stream immediately.

Instead, you could use batch, which will respect the timeout/max messages and not return immediately.
This would avoid the tight loop.

However, probably the best way is to use messages() which will handle looping over batches for you in optimized way.

@niks3089
Copy link
Author

Thanks, will take a look.

That means that if there are no messages for given consumer, it will close the Stream immediately.

Where is this mentioned and is this the same case when using batch? I want to run a consumer in a loop forever without worrying about the stream closing so if using batch does that with avoiding busy loop while keeping the stream open, that will be the best

@Jarema
Copy link
Member

Jarema commented Dec 27, 2024

well or if there are less messages on the [Stream] than requested, returns all available messages - this also applies to no messages. Will make the docs more explicit :).

in messages() you should not worry about stream being closed - errors will be reported without closing it. But yes, batch() will work to and will require less refactoring.

@niks3089
Copy link
Author

But yes, batch() will work to and will require less refactoring.

Ok, I am going to try with batch and will see if it works over the weekend.

@niks3089
Copy link
Author

I tested with batch and I see the same issue. For example, I don't see waiting pulls when that issue happens. This is one of the consumer


Configuration:

                    Name: janus-edge-fra-prod-1_apikey_consumer
               Pull Mode: true
         Filter Subjects: project_stream.apikey
          Deliver Policy: All
              Ack Policy: Explicit
                Ack Wait: 30.00s
           Replay Policy: Instant
         Max Ack Pending: 1,000
       Max Waiting Pulls: 512

Cluster Information:

                    Name: momentum-prod-leaf-eu
                  Leader: momentum-prod-leaf-ams-1
                 Replica: momentum-prod-leaf-fra-1, current, seen 478ms ago

State:

  Last Delivered Message: Consumer sequence: 130,280 Stream sequence: 16,068,080 Last delivery: 22h6m54s ago
    Acknowledgment Floor: Consumer sequence: 130,280 Stream sequence: 16,057,495 Last Ack: 7m53s ago
        Outstanding Acks: 0 out of maximum 1,000
    Redelivered Messages: 0
    Unprocessed Messages: 6
           Waiting Pulls: 0 of maximum 512

@Jarema
Copy link
Member

Jarema commented Dec 27, 2024

It's normal on fetch, as those Pull Requests are cancelled really quickly if there is nothing to process, but I would expect to see it on batch.

Could you provide info().await from where it happens (no messages delivered), so we can get immediate state info that might help us?

@0xMavi
Copy link

0xMavi commented Jan 2, 2025

@Jarema : I am a colleague of @niks3089. I just deployed some code on our end to debug this. Will let you know of the outcome when we observe this again.
This is the debugging helper:

    async fn log_consumer_info(consumer: &Mutex<PullConsumer>, name: &str) {
        let mut consumer = consumer.lock().await;
        let previous_pending = consumer.cached_info().num_pending;
        let info = consumer.info().await;

        match info {
            Ok(info) => {
                // we consider a consumer stuck if it has pending messages for two consecutive periods
                let is_stuck = previous_pending > 0 && info.num_pending > 0;

                if is_stuck {
                    statsd_count!("execution.nats.stuck_consumer_detected", 1);
                    error!("consumer info for {}: {:?}", name, info);
                }
            }
            Err(e) => {
                error!("getting consumer info for {} failed with {}", name, e);
            }
        }
    }

@wallyqs wallyqs changed the title Messages stop being delivered to a consumer 2.10.24 Messages stop being delivered to a consumer [2.10.24] Jan 2, 2025
@wallyqs
Copy link
Member

wallyqs commented Jan 2, 2025

In the natscli from main there are a couple of commands that would be good to try to check the state of your consumers:

 nats server stream-check --unsynced --expected N # <-- number of servers
 nats server consumer-check --unsynced --expected N # <-- number of servers

@wallyqs
Copy link
Member

wallyqs commented Jan 2, 2025

could you share the stream info as well?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
defect Suspected defect such as a bug or regression
Projects
None yet
Development

No branches or pull requests

4 participants