From 9ece99e62d21453290e5259fa5b0e532ad2919e7 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Fri, 10 May 2024 23:24:28 -0700 Subject: [PATCH 1/7] Added the first C example: pull consumer --- .gitignore | 1 + cmd/nbe/docker.go | 1 + docker/c/Dockerfile | 23 ++++++ docker/c/install-nats-c.sh | 15 ++++ .../jetstream/pull-consumer/c/CMakeLists.txt | 8 ++ examples/jetstream/pull-consumer/c/main.c | 79 +++++++++++++++++++ 6 files changed, 127 insertions(+) create mode 100644 docker/c/Dockerfile create mode 100755 docker/c/install-nats-c.sh create mode 100644 examples/jetstream/pull-consumer/c/CMakeLists.txt create mode 100644 examples/jetstream/pull-consumer/c/main.c diff --git a/.gitignore b/.gitignore index e97bbf17..76f3bb99 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ dist node_modules html .idea/ +.vscode/ **/node/node_modules/ **/node/package.json **/node/package-lock.json diff --git a/cmd/nbe/docker.go b/cmd/nbe/docker.go index 1c533349..44c43951 100644 --- a/cmd/nbe/docker.go +++ b/cmd/nbe/docker.go @@ -160,6 +160,7 @@ func (r *ImageBuilder) Run() (string, error) { c := exec.Command( "docker", "build", + // "--progress=plain", // Use plain output for debuggig "--tag", imageTag, buildDir, ) diff --git a/docker/c/Dockerfile b/docker/c/Dockerfile new file mode 100644 index 00000000..adc9c9ea --- /dev/null +++ b/docker/c/Dockerfile @@ -0,0 +1,23 @@ +FROM ubuntu:latest AS build + +WORKDIR /opt/app + +RUN apt-get update +RUN apt-get install -y build-essential +RUN apt-get install -y cmake +RUN apt-get install -y curl +RUN apt-get install -y git +RUN apt-get install -y golang +RUN apt-get install -y jq +RUN apt-get install -y libprotobuf-c-dev +RUN apt-get install -y libsodium-dev +RUN apt-get install -y libssl-dev +RUN apt-get install -y wget + +COPY install-nats-c.sh . +RUN bash install-nats-c.sh + +COPY . ./ +RUN mkdir build && cd build && cmake .. && make + +CMD ["/opt/app/build/app"] diff --git a/docker/c/install-nats-c.sh b/docker/c/install-nats-c.sh new file mode 100755 index 00000000..e7a4a413 --- /dev/null +++ b/docker/c/install-nats-c.sh @@ -0,0 +1,15 @@ +rel=$(curl -s https://api.github.com/repos/nats-io/nats.c/releases/latest | jq -r '.tag_name') +wget https://github.com/nats-io/nats.c/archive/refs/tags/${rel}.tar.gz +tar -xzf ${rel}.tar.gz +cd nats.c-${rel#v} +mkdir build +cd build +cmake \ + -DNATS_BUILD_TLS_USE_OPENSSL_1_1_API=ON \ + -DNATS_BUILD_STREAMING=OFF \ + -DNATS_BUILD_EXAMPLES=OFF \ + .. +make +make install + + diff --git a/examples/jetstream/pull-consumer/c/CMakeLists.txt b/examples/jetstream/pull-consumer/c/CMakeLists.txt new file mode 100644 index 00000000..7026457d --- /dev/null +++ b/examples/jetstream/pull-consumer/c/CMakeLists.txt @@ -0,0 +1,8 @@ +cmake_minimum_required(VERSION 3.28) + +set(CMAKE_BUILD_WITH_INSTALL_RPATH TRUE) +set(CMAKE_INSTALL_RPATH "/usr/local/lib") + +file(GLOB SOURCES "*.c") +add_executable(app ${SOURCES}) +target_link_libraries(app nats) diff --git a/examples/jetstream/pull-consumer/c/main.c b/examples/jetstream/pull-consumer/c/main.c new file mode 100644 index 00000000..5c9b5be7 --- /dev/null +++ b/examples/jetstream/pull-consumer/c/main.c @@ -0,0 +1,79 @@ +#include +#include + +int main() +{ + natsStatus s = NATS_OK; + natsOptions *opts = NULL; + natsConnection *nc = NULL; + natsSubscription *sub = NULL; + natsMsg *msg = NULL; + jsOptions jsOpts; + jsCtx *js = NULL; + jsStreamConfig cfg; + jsSubOptions so; + jsStreamInfo *si = NULL; + jsErrCode jerr = 0; + natsMsgList list = {0}; + int ifetch, iack; + + const char *url = getenv("NATS_URL"); + s = natsOptions_Create(&opts); + if (s == NATS_OK && url != NULL) + { + s = natsOptions_SetURL(opts, url); + } + + if (s == NATS_OK) + s = natsConnection_Connect(&nc, opts); + if (s == NATS_OK) + s = jsOptions_Init(&jsOpts); + if (s == NATS_OK) + s = natsConnection_JetStream(&js, nc, &jsOpts); + + if (s == NATS_OK) + s = jsStreamConfig_Init(&cfg); + if (s == NATS_OK) + { + cfg.Name = "EVENTS"; + cfg.Subjects = (const char *[1]){"event.>"}; + cfg.SubjectsLen = 1; + s = js_AddStream(&si, js, &cfg, &jsOpts, &jerr); + } + + if (s == NATS_OK) + s = js_Publish(NULL, js, "event.1", NULL, 0, NULL, &jerr); + if (s == NATS_OK) + s = js_Publish(NULL, js, "event.2", NULL, 0, NULL, &jerr); + if (s == NATS_OK) + s = js_Publish(NULL, js, "event.3", NULL, 0, NULL, &jerr); + + if (s == NATS_OK) + s = jsSubOptions_Init(&so); + if (s == NATS_OK) + { + so.Stream = "EVENTS"; + s = js_PullSubscribe(&sub, js, "event.>", NULL, &jsOpts, &so, &jerr); + } + + for (ifetch = 0; (s == NATS_OK) && (ifetch < 3);) + { + s = natsSubscription_Fetch(&list, sub, 10, 5000, &jerr); + if (s == NATS_OK) + ifetch += (int64_t)list.Count; + for (iack = 0; (s == NATS_OK) && (iack < list.Count); iack++) + { + s = natsMsg_Ack(list.Msgs[iack], &jsOpts); + if (s == NATS_OK) + printf("received message on %s\n", natsMsg_GetSubject(list.Msgs[iack])); + } + natsMsgList_Destroy(&list); + } + + natsSubscription_Destroy(sub); + jsCtx_Destroy(js); + natsConnection_Destroy(nc); + natsOptions_Destroy(opts); + + return 0; +} \ No newline at end of file From 5e2671da8f45995b8eb3ac856076e6c583246e66 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Fri, 10 May 2024 23:32:43 -0700 Subject: [PATCH 2/7] Added main.c ref --- cmd/nbe/parse.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/nbe/parse.go b/cmd/nbe/parse.go index 715b3933..5623411f 100644 --- a/cmd/nbe/parse.go +++ b/cmd/nbe/parse.go @@ -65,6 +65,7 @@ var ( DotNet: "Main.cs", DotNet2: "Main.cs", Elixir: "main.exs", + C: "main.c", } languageMultiCommentDelims = map[string][2]string{ From d366a7fedb529bfe9be9af2c8a4fd650150bb63e Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sun, 12 May 2024 04:24:24 -0700 Subject: [PATCH 3/7] wip --- examples/jetstream/pull-consumer/c/main.c | 129 ++++++++++++++++++++-- 1 file changed, 117 insertions(+), 12 deletions(-) diff --git a/examples/jetstream/pull-consumer/c/main.c b/examples/jetstream/pull-consumer/c/main.c index 5c9b5be7..872d9c5b 100644 --- a/examples/jetstream/pull-consumer/c/main.c +++ b/examples/jetstream/pull-consumer/c/main.c @@ -1,5 +1,5 @@ #include -#include +#include int main() { @@ -7,16 +7,19 @@ int main() natsOptions *opts = NULL; natsConnection *nc = NULL; natsSubscription *sub = NULL; - natsMsg *msg = NULL; + jsOptions jsOpts; jsCtx *js = NULL; jsStreamConfig cfg; jsSubOptions so; jsStreamInfo *si = NULL; jsErrCode jerr = 0; + natsMsgList list = {0}; - int ifetch, iack; + int c, i, ibatch; + // Use the env variable if running in the container, otherwise use the + // default. const char *url = getenv("NATS_URL"); s = natsOptions_Create(&opts); if (s == NATS_OK && url != NULL) @@ -24,13 +27,18 @@ int main() s = natsOptions_SetURL(opts, url); } + // Create an unauthenticated connection to NATS. if (s == NATS_OK) s = natsConnection_Connect(&nc, opts); + + // Access JetStream for managing streams and consumers as well as for + // publishing and consuming messages to and from the stream. if (s == NATS_OK) s = jsOptions_Init(&jsOpts); if (s == NATS_OK) s = natsConnection_JetStream(&js, nc, &jsOpts); + // Add a simple limits-based stream. if (s == NATS_OK) s = jsStreamConfig_Init(&cfg); if (s == NATS_OK) @@ -41,39 +49,136 @@ int main() s = js_AddStream(&si, js, &cfg, &jsOpts, &jerr); } + // Publish a few messages for the example. if (s == NATS_OK) - s = js_Publish(NULL, js, "event.1", NULL, 0, NULL, &jerr); + printf("Publish 3 messages for the example\n"); if (s == NATS_OK) - s = js_Publish(NULL, js, "event.2", NULL, 0, NULL, &jerr); + s = js_Publish(NULL, js, "event.1", "0123456789", 10, NULL, &jerr); if (s == NATS_OK) - s = js_Publish(NULL, js, "event.3", NULL, 0, NULL, &jerr); + s = js_Publish(NULL, js, "event.2", "0123456789", 10, NULL, &jerr); + if (s == NATS_OK) + s = js_Publish(NULL, js, "event.3", "0123456789", 10, NULL, &jerr); + // Create a pull consumer subscription bound to the previously created + // stream. If durable name is not supplied, consumer will be removed after + // InactiveThreshold (defaults to 5 seconds) is reached when not actively + // consuming messages. `Name` is optional, if not provided it will be + // auto-generated. For this example, let's use the consumer with no options, + // which will be ephemeral with auto-generated name. if (s == NATS_OK) s = jsSubOptions_Init(&so); if (s == NATS_OK) { + printf("Create a pull consumer and use natsSubscription_Fetch to receive messages\n"); + so.Stream = "EVENTS"; + s = js_PullSubscribe(&sub, js, "event.>", NULL, &jsOpts, &so, &jerr); + } + + // Use natsSubscription_Fetch to fetch the messages. Here we attempt to + // fetch a batch of up to 2 messages with a 5 second timeout, and we stop + // trying once the expected 3 messages are successfully fetched. + // + // **Note**: natsSubscription_Fetch will not wait for the timeout while we are + // fetching pre-buffered messages. The response time is in single ms. + // + // **Note**: each fetched message must be acknowledged. + for (ibatch = 0, c = 0; (s == NATS_OK) && (c < 3); ibatch++) + { + int64_t start = nats_Now(); + s = natsSubscription_Fetch(&list, sub, 2, 5000, &jerr); + if (s == NATS_OK) + { + c += (int64_t)list.Count; + printf("natsSubscription_Fetch: batch %d of %d messages in %ldms\n", ibatch, list.Count, nats_Now() - start); + } + else + { + printf("natsSubscription_Fetch error: %d:\n", s); + nats_PrintLastErrorStack(stderr); + } + for (i = 0; (s == NATS_OK) && (i < list.Count); i++) + { + s = natsMsg_Ack(list.Msgs[i], &jsOpts); + if (s == NATS_OK) + printf("received and acked message on %s\n", natsMsg_GetSubject(list.Msgs[i])); + } + natsMsgList_Destroy(&list); + } + + // Attempt to fetch more messages, but this time we will wait for the + // timeout since there are no more pre-buffered messages. + if (s == NATS_OK) + { + int64_t start = nats_Now(); + s = natsSubscription_Fetch(&list, sub, 2, 500, &jerr); + printf("extra natsSubscription_Fetch returned status %d and %d messages in %ldms\n", s, list.Count, nats_Now() - start); + s = NATS_OK; + } + + // This consumer will be deleted automatically, but need to free the sub. + natsSubscription_Destroy(sub); + sub = NULL; + + // Create another similar pull consumer, same scenario but using + // `natsSubscription_FetchRequest` for precise control. + if (s == NATS_OK) + { + printf("Create another pull consumer and use natsSubscription_FetchRequest to receive messages\n"); so.Stream = "EVENTS"; s = js_PullSubscribe(&sub, js, "event.>", NULL, &jsOpts, &so, &jerr); } - for (ifetch = 0; (s == NATS_OK) && (ifetch < 3);) + // Use `natsSubscription_FetchRequest` to fetch the messages. + // + // We set the batch size to 1000, but MaxBytes of 200 so we will only get 2 + // messages at a time. Note that since we do not set NoWait, the call will + // block until the batch is filled or the timeout expires, unlike + // `natsSubscription_Fetch`. + for (ibatch = 0, c = 0; (s == NATS_OK) && (c < 3); ibatch++) { - s = natsSubscription_Fetch(&list, sub, 10, 5000, &jerr); + int64_t start = nats_Now(); + jsFetchRequest fr = { + .Batch = 1000, + // .NoWait = true, + .Expires = 500 * 1000 * 1000, + .MaxBytes = 200, + }; + s = natsSubscription_FetchRequest(&list, sub, &fr); if (s == NATS_OK) - ifetch += (int64_t)list.Count; - for (iack = 0; (s == NATS_OK) && (iack < list.Count); iack++) { - s = natsMsg_Ack(list.Msgs[iack], &jsOpts); + c += (int64_t)list.Count; + printf("natsSubscription_FetchRequest: batch %d of %d messages in %ldms\n", ibatch, list.Count, nats_Now() - start); + } + else + { + printf("FetchRequest error: %d:\n", s); + nats_PrintLastErrorStack(stderr); + } + for (i = 0; (s == NATS_OK) && (i < list.Count); i++) + { + s = natsMsg_Ack(list.Msgs[i], &jsOpts); if (s == NATS_OK) - printf("received message on %s\n", natsMsg_GetSubject(list.Msgs[iack])); + printf("received and acked message on %s\n", natsMsg_GetSubject(list.Msgs[i])); } natsMsgList_Destroy(&list); } + // Clean up the consumer by unsubscribing. natsSubscription_Destroy(sub); + sub = NULL; + + // Finally, create a durable pull consumer explicitly, and bind a + // subscription to it more than once. + // js_AddConsumer(&ci, js, "EVENTS", cfg, &jsOpts, &jerr); + jsCtx_Destroy(js); natsConnection_Destroy(nc); natsOptions_Destroy(opts); + jsStreamInfo_Destroy(si); + if (s != NATS_OK) + { + nats_PrintLastErrorStack(stderr); + } return 0; } \ No newline at end of file From ff219be80596cbdf7da64e539662ad2d13c98afc Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sun, 12 May 2024 04:29:18 -0700 Subject: [PATCH 4/7] wip 1 --- examples/jetstream/pull-consumer/c/main.c | 37 +++++++++++++++++------ 1 file changed, 28 insertions(+), 9 deletions(-) diff --git a/examples/jetstream/pull-consumer/c/main.c b/examples/jetstream/pull-consumer/c/main.c index 872d9c5b..1d53c72f 100644 --- a/examples/jetstream/pull-consumer/c/main.c +++ b/examples/jetstream/pull-consumer/c/main.c @@ -1,6 +1,24 @@ #include #include +// Publish 3 messages for the example. +static natsStatus +publishTestMessages(jsCtx *js) +{ + natsStatus s = NATS_OK; + int jerr; + + printf("Publish 3 messages for the example\n"); + if (s == NATS_OK) + s = js_Publish(NULL, js, "event.1", "0123456789", 10, NULL, &jerr); + if (s == NATS_OK) + s = js_Publish(NULL, js, "event.2", "0123456789", 10, NULL, &jerr); + if (s == NATS_OK) + s = js_Publish(NULL, js, "event.3", "0123456789", 10, NULL, &jerr); + + return s; +} + int main() { natsStatus s = NATS_OK; @@ -50,14 +68,15 @@ int main() } // Publish a few messages for the example. - if (s == NATS_OK) - printf("Publish 3 messages for the example\n"); - if (s == NATS_OK) - s = js_Publish(NULL, js, "event.1", "0123456789", 10, NULL, &jerr); - if (s == NATS_OK) - s = js_Publish(NULL, js, "event.2", "0123456789", 10, NULL, &jerr); - if (s == NATS_OK) - s = js_Publish(NULL, js, "event.3", "0123456789", 10, NULL, &jerr); + // if (s == NATS_OK) + // printf("Publish 3 messages for the example\n"); + // if (s == NATS_OK) + // s = js_Publish(NULL, js, "event.1", "0123456789", 10, NULL, &jerr); + // if (s == NATS_OK) + // s = js_Publish(NULL, js, "event.2", "0123456789", 10, NULL, &jerr); + // if (s == NATS_OK) + // s = js_Publish(NULL, js, "event.3", "0123456789", 10, NULL, &jerr); + publishTestMessages(js); // Create a pull consumer subscription bound to the previously created // stream. If durable name is not supplied, consumer will be removed after @@ -168,7 +187,7 @@ int main() sub = NULL; // Finally, create a durable pull consumer explicitly, and bind a - // subscription to it more than once. + // subscription to it more than once. // js_AddConsumer(&ci, js, "EVENTS", cfg, &jsOpts, &jerr); jsCtx_Destroy(js); From 8278a26364060d03277db7b8c25be57fb6ffd4a6 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sun, 12 May 2024 09:44:34 -0700 Subject: [PATCH 5/7] wip refactored into example functions --- examples/jetstream/pull-consumer/c/main.c | 309 ++++++++++++++++------ 1 file changed, 230 insertions(+), 79 deletions(-) diff --git a/examples/jetstream/pull-consumer/c/main.c b/examples/jetstream/pull-consumer/c/main.c index 1d53c72f..78cd0982 100644 --- a/examples/jetstream/pull-consumer/c/main.c +++ b/examples/jetstream/pull-consumer/c/main.c @@ -1,41 +1,67 @@ #include #include -// Publish 3 messages for the example. -static natsStatus -publishTestMessages(jsCtx *js) +#define STREAM_NAME "EVENTS" +#define CONSUMER_NAME "event-consumer" +#define SUBSCRIBE_SUBJECT "event.>" +#define SUBJECT_PREFIX "event." +#define NUM_MESSAGES 3 + +static natsStatus publishTestMessages(jsCtx *js); +static natsStatus init(natsConnection **newnc, jsCtx **newjs, jsOptions *jsOpts); +static natsStatus exampleFetch(jsCtx *js, jsOptions *jsOpts); +static natsStatus exampleFetchRequest(jsCtx *js, jsOptions *jsOpts); +static natsStatus exampleNamedConsumer(jsCtx *js, jsOptions *jsOpts); + +typedef natsStatus (*examplef)(jsCtx *js, jsOptions *jsOpts); + +int main() { natsStatus s = NATS_OK; - int jerr; + natsConnection *nc = NULL; + jsOptions jsOpts; + jsCtx *js = NULL; + examplef examples[] = {exampleFetch, exampleFetchRequest, exampleNamedConsumer}; + int i; + int N = sizeof(examples) / sizeof(examples[0]); - printf("Publish 3 messages for the example\n"); - if (s == NATS_OK) - s = js_Publish(NULL, js, "event.1", "0123456789", 10, NULL, &jerr); - if (s == NATS_OK) - s = js_Publish(NULL, js, "event.2", "0123456789", 10, NULL, &jerr); + // Initialize the NATS connection and JetStream context. + s = init(&nc, &js, &jsOpts); + + // Publish NUM_MESSAGES messages for the example. if (s == NATS_OK) - s = js_Publish(NULL, js, "event.3", "0123456789", 10, NULL, &jerr); + s = publishTestMessages(js); - return s; + // Run the examples. + for (i = 0; (i < N) && (s == NATS_OK); i++) + { + examplef f = examples[i]; + s = f(js, &jsOpts); + } + + jsCtx_Destroy(js); + natsConnection_Destroy(nc); + + if (s != NATS_OK) + { + nats_PrintLastErrorStack(stderr); + return 1; + } + return 0; } -int main() +// Initialize the NATS connection and JetStream context. +static natsStatus init(natsConnection **newnc, jsCtx **newjs, jsOptions *jsOpts) { natsStatus s = NATS_OK; natsOptions *opts = NULL; natsConnection *nc = NULL; - natsSubscription *sub = NULL; - jsOptions jsOpts; jsCtx *js = NULL; jsStreamConfig cfg; - jsSubOptions so; jsStreamInfo *si = NULL; jsErrCode jerr = 0; - natsMsgList list = {0}; - int c, i, ibatch; - // Use the env variable if running in the container, otherwise use the // default. const char *url = getenv("NATS_URL"); @@ -48,59 +74,91 @@ int main() // Create an unauthenticated connection to NATS. if (s == NATS_OK) s = natsConnection_Connect(&nc, opts); + natsOptions_Destroy(opts); // Access JetStream for managing streams and consumers as well as for // publishing and consuming messages to and from the stream. if (s == NATS_OK) - s = jsOptions_Init(&jsOpts); + s = jsOptions_Init(jsOpts); if (s == NATS_OK) - s = natsConnection_JetStream(&js, nc, &jsOpts); + s = natsConnection_JetStream(&js, nc, jsOpts); // Add a simple limits-based stream. if (s == NATS_OK) s = jsStreamConfig_Init(&cfg); if (s == NATS_OK) { - cfg.Name = "EVENTS"; - cfg.Subjects = (const char *[1]){"event.>"}; + cfg.Name = STREAM_NAME; + cfg.Subjects = (const char *[1]){SUBSCRIBE_SUBJECT}; cfg.SubjectsLen = 1; - s = js_AddStream(&si, js, &cfg, &jsOpts, &jerr); - } - - // Publish a few messages for the example. - // if (s == NATS_OK) - // printf("Publish 3 messages for the example\n"); - // if (s == NATS_OK) - // s = js_Publish(NULL, js, "event.1", "0123456789", 10, NULL, &jerr); - // if (s == NATS_OK) - // s = js_Publish(NULL, js, "event.2", "0123456789", 10, NULL, &jerr); - // if (s == NATS_OK) - // s = js_Publish(NULL, js, "event.3", "0123456789", 10, NULL, &jerr); - publishTestMessages(js); - - // Create a pull consumer subscription bound to the previously created - // stream. If durable name is not supplied, consumer will be removed after - // InactiveThreshold (defaults to 5 seconds) is reached when not actively - // consuming messages. `Name` is optional, if not provided it will be - // auto-generated. For this example, let's use the consumer with no options, - // which will be ephemeral with auto-generated name. + s = js_AddStream(&si, js, &cfg, jsOpts, &jerr); + jsStreamInfo_Destroy(si); + } + if (s == NATS_OK) - s = jsSubOptions_Init(&so); + { + *newnc = nc; + *newjs = js; + } + else + { + jsCtx_Destroy(js); + natsConnection_Destroy(nc); + } + + return s; +} + +// Publish NUM_MESSAGES messages for the example. +static natsStatus +publishTestMessages(jsCtx *js) +{ + natsStatus s = NATS_OK; + jsErrCode jerr; + char subject[] = SUBJECT_PREFIX "99999999999999"; + int i; + + printf("Publish %d messages for the example\n", NUM_MESSAGES); + for (i = 0; (s == NATS_OK) && (i < NUM_MESSAGES); i++) + { + sprintf(subject, "%s%d", SUBJECT_PREFIX, i + 1); + s = js_Publish(NULL, js, subject, "01234567890123456789012345678901234567890123456789", 50, NULL, &jerr); + } + return s; +} + +// Create a pull consumer subscription and use `natsSubscription_Fetch` to +// receive messages. +static natsStatus exampleFetch(jsCtx *js, jsOptions *jsOpts) +{ + natsStatus s = NATS_OK; + natsSubscription *sub = NULL; + jsErrCode jerr = 0; + natsMsgList list = {0}; + jsSubOptions so; + int c, i, ibatch; + + // Create a pull consumer subscription. The durable name is not supplied, so + // the consumer will be removed after `InactiveThreshold` (defaults to 5 + // seconds) is reached when not actively consuming messages. + s = jsSubOptions_Init(&so); if (s == NATS_OK) { - printf("Create a pull consumer and use natsSubscription_Fetch to receive messages\n"); - so.Stream = "EVENTS"; - s = js_PullSubscribe(&sub, js, "event.>", NULL, &jsOpts, &so, &jerr); + printf("exampleFetch: create a pull consumer and use natsSubscription_Fetch to receive messages\n"); + so.Stream = STREAM_NAME; + s = js_PullSubscribe(&sub, js, SUBSCRIBE_SUBJECT, NULL, jsOpts, &so, &jerr); } - // Use natsSubscription_Fetch to fetch the messages. Here we attempt to + // Use `natsSubscription_Fetch` to fetch the messages. Here we attempt to // fetch a batch of up to 2 messages with a 5 second timeout, and we stop // trying once the expected 3 messages are successfully fetched. // - // **Note**: natsSubscription_Fetch will not wait for the timeout while we are - // fetching pre-buffered messages. The response time is in single ms. + // **Note**: `natsSubscription_Fetch` will not wait for the timeout while we + // are fetching pre-buffered messages. The response time is in single ms. // // **Note**: each fetched message must be acknowledged. + // + // **Note**: `natsMsgList_Destroy` will destroy the fetched messages. for (ibatch = 0, c = 0; (s == NATS_OK) && (c < 3); ibatch++) { int64_t start = nats_Now(); @@ -108,48 +166,64 @@ int main() if (s == NATS_OK) { c += (int64_t)list.Count; - printf("natsSubscription_Fetch: batch %d of %d messages in %ldms\n", ibatch, list.Count, nats_Now() - start); + printf("exampleFetch: batch #%d (%d messages) in %dms\n", ibatch, list.Count, (int)(nats_Now() - start)); } else { - printf("natsSubscription_Fetch error: %d:\n", s); + printf("exampleFetch: error: %d:\n", s); nats_PrintLastErrorStack(stderr); } for (i = 0; (s == NATS_OK) && (i < list.Count); i++) { - s = natsMsg_Ack(list.Msgs[i], &jsOpts); - if (s == NATS_OK) - printf("received and acked message on %s\n", natsMsg_GetSubject(list.Msgs[i])); + s = natsMsg_Ack(list.Msgs[i], jsOpts); + printf("exampleFetch: received and acked message on %s\n", natsMsg_GetSubject(list.Msgs[i])); } natsMsgList_Destroy(&list); } // Attempt to fetch more messages, but this time we will wait for the - // timeout since there are no more pre-buffered messages. + // 500ms timeout since there are no more pre-buffered messages. if (s == NATS_OK) { int64_t start = nats_Now(); s = natsSubscription_Fetch(&list, sub, 2, 500, &jerr); - printf("extra natsSubscription_Fetch returned status %d and %d messages in %ldms\n", s, list.Count, nats_Now() - start); + printf("exampleFetch: extra natsSubscription_Fetch returned status %d and %d messages in %dms\n", + s, list.Count, (int)(nats_Now() - start)); s = NATS_OK; } - // This consumer will be deleted automatically, but need to free the sub. + // Cleanup. + natsSubscription_Drain(sub); natsSubscription_Destroy(sub); - sub = NULL; + + return s; +} + +// Create another similar pull consumer subscription and use +// `natsSubscription_FetchRequest` to receive messages with more precise +// control. +static natsStatus exampleFetchRequest(jsCtx *js, jsOptions *jsOpts) +{ + natsStatus s = NATS_OK; + natsSubscription *sub = NULL; + jsErrCode jerr = 0; + natsMsgList list = {0}; + jsSubOptions so; + int c, i, ibatch; // Create another similar pull consumer, same scenario but using // `natsSubscription_FetchRequest` for precise control. + s = jsSubOptions_Init(&so); if (s == NATS_OK) { - printf("Create another pull consumer and use natsSubscription_FetchRequest to receive messages\n"); - so.Stream = "EVENTS"; - s = js_PullSubscribe(&sub, js, "event.>", NULL, &jsOpts, &so, &jerr); + printf("exampleFetchRequest: create pull consumer and use natsSubscription_FetchRequest to receive messages\n"); + so.Stream = STREAM_NAME; + s = js_PullSubscribe(&sub, js, SUBSCRIBE_SUBJECT, NULL, jsOpts, &so, &jerr); } // Use `natsSubscription_FetchRequest` to fetch the messages. // - // We set the batch size to 1000, but MaxBytes of 200 so we will only get 2 + // We set the batch size to 1000, but MaxBytes of 300 so we will only get 2 // messages at a time. Note that since we do not set NoWait, the call will // block until the batch is filled or the timeout expires, unlike // `natsSubscription_Fetch`. @@ -160,44 +234,121 @@ int main() .Batch = 1000, // .NoWait = true, .Expires = 500 * 1000 * 1000, - .MaxBytes = 200, + .MaxBytes = 300, }; s = natsSubscription_FetchRequest(&list, sub, &fr); if (s == NATS_OK) { c += (int64_t)list.Count; - printf("natsSubscription_FetchRequest: batch %d of %d messages in %ldms\n", ibatch, list.Count, nats_Now() - start); + printf("exampleFetchRequest: batch #%d (%d messages) in %dms\n", ibatch, list.Count, (int)(nats_Now() - start)); } else { - printf("FetchRequest error: %d:\n", s); + printf("exampleFetchRequest: error: %d:\n", s); nats_PrintLastErrorStack(stderr); } for (i = 0; (s == NATS_OK) && (i < list.Count); i++) { - s = natsMsg_Ack(list.Msgs[i], &jsOpts); + s = natsMsg_Ack(list.Msgs[i], jsOpts); if (s == NATS_OK) - printf("received and acked message on %s\n", natsMsg_GetSubject(list.Msgs[i])); + printf("exampleFetchRequest: received and acked message on %s\n", natsMsg_GetSubject(list.Msgs[i])); } natsMsgList_Destroy(&list); } - // Clean up the consumer by unsubscribing. + natsSubscription_Drain(sub); natsSubscription_Destroy(sub); - sub = NULL; - // Finally, create a durable pull consumer explicitly, and bind a - // subscription to it more than once. - // js_AddConsumer(&ci, js, "EVENTS", cfg, &jsOpts, &jerr); + return s; +} - jsCtx_Destroy(js); - natsConnection_Destroy(nc); - natsOptions_Destroy(opts); - jsStreamInfo_Destroy(si); +// Create a pull consumer, then bind 2 subscriptions to it. +static natsStatus exampleNamedConsumer(jsCtx *js, jsOptions *jsOpts) +{ + natsStatus s = NATS_OK; + jsConsumerInfo *ci = NULL; + jsConsumerConfig cfg; + natsSubscription *sub1 = NULL; + natsSubscription *sub2 = NULL; + jsErrCode jerr = 0; + natsMsgList list = {0}; + jsSubOptions so; + int i; + bool done = false; - if (s != NATS_OK) + jsConsumerConfig_Init(&cfg); + cfg.Name = CONSUMER_NAME; + s = js_AddConsumer(&ci, js, STREAM_NAME, &cfg, jsOpts, &jerr); + if (s == NATS_OK) { - nats_PrintLastErrorStack(stderr); + printf("exampleNamedConsumer: create a pull consumer named '%s'\n", CONSUMER_NAME); } - return 0; + jsConsumerInfo_Destroy(ci); + + // Create a named pull consumer explicitly and subscribe to it. + // + // **Note**: no delivery subject in `js_PullSubsccribe` since it will bind + // to the consumer by name. + if (s == NATS_OK) + s = jsSubOptions_Init(&so); + + if (s == NATS_OK) + { + printf("exampleNamedConsumer: bind 2 subscriptions to the consumer\n"); + so.Stream = STREAM_NAME; + so.Consumer = CONSUMER_NAME; + s = js_PullSubscribe(&sub1, js, NULL, NULL, jsOpts, &so, &jerr); + } + if (s == NATS_OK) + s = js_PullSubscribe(&sub2, js, NULL, NULL, jsOpts, &so, &jerr); + + int64_t start = nats_Now(); + jsFetchRequest fr = { + .Batch = 1, + .NoWait = true, + .Expires = 500 * 1000 * 1000, + }; + for (i = 0; (s == NATS_OK) && (!done); i++) + { + natsSubscription *sub = (i % 2 == 0) ? sub1 : sub2; + const char *name = (i % 2 == 0) ? "sub1" : "sub2"; + start = nats_Now(); + + s = natsSubscription_FetchRequest(&list, sub, &fr); + if ((s == NATS_OK) && (list.Count == 1)) + { + printf("exampleNamedConsumer: fetched from %s subject '%s' in %dms\n", + name, natsMsg_GetSubject(list.Msgs[0]), (int)(nats_Now() - start)); + s = natsMsg_Ack(list.Msgs[0], jsOpts); + } + else if ((s == NATS_OK) && (list.Count != 1)) + { + printf("exampleNamedConsumer: fetched wrong number of messages from %s: expected 1, got %d\n", name, list.Count); + s = NATS_ERR; + } + else if (s == NATS_TIMEOUT && list.Count == 0) + { + printf("exampleNamedConsumer: got NATS_TIMEOUT from %s, no more messages for now\n", name); + s = NATS_OK; + done = true; + } + else + { + printf("exampleNamedConsumer: error: %d:\n", s); + nats_PrintLastErrorStack(stderr); + } + + natsMsgList_Destroy(&list); + } + + natsSubscription_Destroy(sub1); + natsSubscription_Destroy(sub2); + + s = js_DeleteConsumer(js, STREAM_NAME, CONSUMER_NAME, jsOpts, &jerr); + if (s == NATS_OK) + { + printf("exampleNamedConsumer: deleted consumer '%s'\n", CONSUMER_NAME); + } + + return s; } \ No newline at end of file From 7f26cd615449e7305e03d84a5d43bac9cff680d0 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sun, 12 May 2024 10:15:51 -0700 Subject: [PATCH 6/7] touches --- examples/jetstream/pull-consumer/c/main.c | 73 ++++++++++------------- 1 file changed, 32 insertions(+), 41 deletions(-) diff --git a/examples/jetstream/pull-consumer/c/main.c b/examples/jetstream/pull-consumer/c/main.c index 78cd0982..a714ae47 100644 --- a/examples/jetstream/pull-consumer/c/main.c +++ b/examples/jetstream/pull-consumer/c/main.c @@ -5,7 +5,7 @@ #define CONSUMER_NAME "event-consumer" #define SUBSCRIBE_SUBJECT "event.>" #define SUBJECT_PREFIX "event." -#define NUM_MESSAGES 3 +#define NUM_MESSAGES 5 static natsStatus publishTestMessages(jsCtx *js); static natsStatus init(natsConnection **newnc, jsCtx **newjs, jsOptions *jsOpts); @@ -28,10 +28,6 @@ int main() // Initialize the NATS connection and JetStream context. s = init(&nc, &js, &jsOpts); - // Publish NUM_MESSAGES messages for the example. - if (s == NATS_OK) - s = publishTestMessages(js); - // Run the examples. for (i = 0; (i < N) && (s == NATS_OK); i++) { @@ -39,9 +35,9 @@ int main() s = f(js, &jsOpts); } + // Cleanup and finish. jsCtx_Destroy(js); natsConnection_Destroy(nc); - if (s != NATS_OK) { nats_PrintLastErrorStack(stderr); @@ -50,7 +46,8 @@ int main() return 0; } -// Initialize the NATS connection and JetStream context. +// Initialize the NATS connection and JetStream context, publish NUM_MESSAGES +// test messages. static natsStatus init(natsConnection **newnc, jsCtx **newjs, jsOptions *jsOpts) { natsStatus s = NATS_OK; @@ -61,6 +58,8 @@ static natsStatus init(natsConnection **newnc, jsCtx **newjs, jsOptions *jsOpts) jsStreamConfig cfg; jsStreamInfo *si = NULL; jsErrCode jerr = 0; + char subject[] = SUBJECT_PREFIX "99999999999999"; + int i; // Use the env variable if running in the container, otherwise use the // default. @@ -94,6 +93,17 @@ static natsStatus init(natsConnection **newnc, jsCtx **newjs, jsOptions *jsOpts) s = js_AddStream(&si, js, &cfg, jsOpts, &jerr); jsStreamInfo_Destroy(si); } + if (s == NATS_OK) + printf("Created a stream named '%s' with 1 subject '%s'\n", STREAM_NAME, SUBSCRIBE_SUBJECT); + + // Publish NUM_MESSAGES messages for the examples. + for (i = 0; (s == NATS_OK) && (i < NUM_MESSAGES); i++) + { + sprintf(subject, "%s%d", SUBJECT_PREFIX, i + 1); + s = js_Publish(NULL, js, subject, "01234567890123456789012345678901234567890123456789", 50, NULL, &jerr); + } + if (s == NATS_OK) + printf("Published %d messages for the example\n", NUM_MESSAGES); if (s == NATS_OK) { @@ -109,24 +119,6 @@ static natsStatus init(natsConnection **newnc, jsCtx **newjs, jsOptions *jsOpts) return s; } -// Publish NUM_MESSAGES messages for the example. -static natsStatus -publishTestMessages(jsCtx *js) -{ - natsStatus s = NATS_OK; - jsErrCode jerr; - char subject[] = SUBJECT_PREFIX "99999999999999"; - int i; - - printf("Publish %d messages for the example\n", NUM_MESSAGES); - for (i = 0; (s == NATS_OK) && (i < NUM_MESSAGES); i++) - { - sprintf(subject, "%s%d", SUBJECT_PREFIX, i + 1); - s = js_Publish(NULL, js, subject, "01234567890123456789012345678901234567890123456789", 50, NULL, &jerr); - } - return s; -} - // Create a pull consumer subscription and use `natsSubscription_Fetch` to // receive messages. static natsStatus exampleFetch(jsCtx *js, jsOptions *jsOpts) @@ -138,9 +130,9 @@ static natsStatus exampleFetch(jsCtx *js, jsOptions *jsOpts) jsSubOptions so; int c, i, ibatch; - // Create a pull consumer subscription. The durable name is not supplied, so - // the consumer will be removed after `InactiveThreshold` (defaults to 5 - // seconds) is reached when not actively consuming messages. + // Create a pull consumer subscription. The durable name (4th parameter) is + // not supplied, so the consumer will be removed after `InactiveThreshold` + // (defaults to 5 seconds) is reached when not actively consuming messages. s = jsSubOptions_Init(&so); if (s == NATS_OK) { @@ -151,7 +143,7 @@ static natsStatus exampleFetch(jsCtx *js, jsOptions *jsOpts) // Use `natsSubscription_Fetch` to fetch the messages. Here we attempt to // fetch a batch of up to 2 messages with a 5 second timeout, and we stop - // trying once the expected 3 messages are successfully fetched. + // trying once the expected NUM_MESSAGES messages are successfully fetched. // // **Note**: `natsSubscription_Fetch` will not wait for the timeout while we // are fetching pre-buffered messages. The response time is in single ms. @@ -159,7 +151,7 @@ static natsStatus exampleFetch(jsCtx *js, jsOptions *jsOpts) // **Note**: each fetched message must be acknowledged. // // **Note**: `natsMsgList_Destroy` will destroy the fetched messages. - for (ibatch = 0, c = 0; (s == NATS_OK) && (c < 3); ibatch++) + for (ibatch = 0, c = 0; (s == NATS_OK) && (c < NUM_MESSAGES); ibatch++) { int64_t start = nats_Now(); s = natsSubscription_Fetch(&list, sub, 2, 5000, &jerr); @@ -193,7 +185,6 @@ static natsStatus exampleFetch(jsCtx *js, jsOptions *jsOpts) } // Cleanup. - natsSubscription_Drain(sub); natsSubscription_Destroy(sub); return s; @@ -227,7 +218,7 @@ static natsStatus exampleFetchRequest(jsCtx *js, jsOptions *jsOpts) // messages at a time. Note that since we do not set NoWait, the call will // block until the batch is filled or the timeout expires, unlike // `natsSubscription_Fetch`. - for (ibatch = 0, c = 0; (s == NATS_OK) && (c < 3); ibatch++) + for (ibatch = 0, c = 0; (s == NATS_OK) && (c < NUM_MESSAGES); ibatch++) { int64_t start = nats_Now(); jsFetchRequest fr = { @@ -256,7 +247,6 @@ static natsStatus exampleFetchRequest(jsCtx *js, jsOptions *jsOpts) natsMsgList_Destroy(&list); } - natsSubscription_Drain(sub); natsSubscription_Destroy(sub); return s; @@ -285,10 +275,13 @@ static natsStatus exampleNamedConsumer(jsCtx *js, jsOptions *jsOpts) } jsConsumerInfo_Destroy(ci); - // Create a named pull consumer explicitly and subscribe to it. + // Create a named pull consumer explicitly and subscribe to it twice. // // **Note**: no delivery subject in `js_PullSubsccribe` since it will bind // to the consumer by name. + // + // **Note**: subscriptions are "balanced" in that each message is processed + // by one or the other. if (s == NATS_OK) s = jsSubOptions_Init(&so); @@ -303,18 +296,13 @@ static natsStatus exampleNamedConsumer(jsCtx *js, jsOptions *jsOpts) s = js_PullSubscribe(&sub2, js, NULL, NULL, jsOpts, &so, &jerr); int64_t start = nats_Now(); - jsFetchRequest fr = { - .Batch = 1, - .NoWait = true, - .Expires = 500 * 1000 * 1000, - }; for (i = 0; (s == NATS_OK) && (!done); i++) { natsSubscription *sub = (i % 2 == 0) ? sub1 : sub2; const char *name = (i % 2 == 0) ? "sub1" : "sub2"; start = nats_Now(); - s = natsSubscription_FetchRequest(&list, sub, &fr); + s = natsSubscription_Fetch(&list, sub, 1, 100, &jerr); if ((s == NATS_OK) && (list.Count == 1)) { printf("exampleNamedConsumer: fetched from %s subject '%s' in %dms\n", @@ -328,7 +316,7 @@ static natsStatus exampleNamedConsumer(jsCtx *js, jsOptions *jsOpts) } else if (s == NATS_TIMEOUT && list.Count == 0) { - printf("exampleNamedConsumer: got NATS_TIMEOUT from %s, no more messages for now\n", name); + printf("exampleNamedConsumer: got NATS_TIMEOUT from %s in %dms, no more messages for now\n", name, (int)(nats_Now() - start)); s = NATS_OK; done = true; } @@ -341,6 +329,9 @@ static natsStatus exampleNamedConsumer(jsCtx *js, jsOptions *jsOpts) natsMsgList_Destroy(&list); } + // Cleanup. + natsSubscription_Drain(sub1); + natsSubscription_Drain(sub2); natsSubscription_Destroy(sub1); natsSubscription_Destroy(sub2); From 7805603aa0f794890cdec7457dda843e872b6d2a Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Tue, 14 May 2024 05:17:59 -0700 Subject: [PATCH 7/7] PR feedback: NoWait comment --- examples/jetstream/pull-consumer/c/main.c | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/examples/jetstream/pull-consumer/c/main.c b/examples/jetstream/pull-consumer/c/main.c index a714ae47..2ff894bd 100644 --- a/examples/jetstream/pull-consumer/c/main.c +++ b/examples/jetstream/pull-consumer/c/main.c @@ -212,18 +212,20 @@ static natsStatus exampleFetchRequest(jsCtx *js, jsOptions *jsOpts) s = js_PullSubscribe(&sub, js, SUBSCRIBE_SUBJECT, NULL, jsOpts, &so, &jerr); } - // Use `natsSubscription_FetchRequest` to fetch the messages. + // Use `natsSubscription_FetchRequest` to fetch the messages. We set the + // batch size to 1000, but MaxBytes of 300 so we will only get 2 messages at + // a time. // - // We set the batch size to 1000, but MaxBytes of 300 so we will only get 2 - // messages at a time. Note that since we do not set NoWait, the call will - // block until the batch is filled or the timeout expires, unlike - // `natsSubscription_Fetch`. + // **Note**: Setting `.NoWait` causes the request to return as soon as there + // are some messages availabe, not necessarily the entire batch. By default, + // we wait `.Expires` time if there are not enough messages to make a full + // batch. for (ibatch = 0, c = 0; (s == NATS_OK) && (c < NUM_MESSAGES); ibatch++) { int64_t start = nats_Now(); jsFetchRequest fr = { .Batch = 1000, - // .NoWait = true, + /* .NoWait = true, */ .Expires = 500 * 1000 * 1000, .MaxBytes = 300, };