Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Overview
Comment: | update nats to version 3.0 |
---|---|
Timelines: | family | ancestors | descendants | both | trunk |
Files: | files | file ages | folders |
SHA1: |
e8a1d80965a4f16673f89188b7371ba6 |
User & Date: | chw 2024-01-30 16:50:02.851 |
Context
2024-01-30
| ||
17:09 | update sqlite to version 3.45.1 check-in: 3b2cb5e970 user: chw tags: trunk | |
16:50 | update nats to version 3.0 check-in: e8a1d80965 user: chw tags: trunk | |
12:10 | updates in pdf4tcl check-in: c3c706a281 user: chw tags: trunk | |
Changes
Name change from assets/nats2/CoreAPI.md to assets/nats3/CoreAPI.md.
1 2 3 4 5 | # Core NATS API `package require nats` All commands are defined in and exported from the `::nats` namespace. | | > | | > | > > | > | > > | | | < | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 | # Core NATS API `package require nats` All commands are defined in and exported from the `::nats` namespace. # Synopsis ## Class `nats::connection` [nats::connection new ?*conn_name*? ?-logger *logger*? ?-log_chan *channel*? ?-log_level *level*?](#constructor-conn_name--logger-logger--log_chan-channel--log_level-level) <br/> [*objectName* cget *option*](#objectName-cget-option) <br/> [*objectName* configure *?option? ?value option value ...?*](#objectName-configure-option-value-option-value) <br/> [*objectName* reset *?option ... ?*](#objectname-reset-option--) <br/> [*objectName* connect ?*-async*?](#objectName-connect--async) <br/> [*objectName* disconnect](#objectName-disconnect) <br/> [*objectName* publish *subject message* ?-reply *replyTo*?](#objectname-publish-subject-message--reply-replyto) <br/> [*objectName* publish_msg *msg*](#objectname-publish_msg-msg) <br/> [*objectName* subscribe *subject ?args?*](#objectName-subscribe-subject-args) <br/> [*objectName* unsubscribe *subID* ?-max_msgs *maxMsgs*?](#objectName-unsubscribe-subID--max_msgs-maxMsgs) <br/> [*objectName* request *subject message ?args?*](#objectName-request-subject-message-args) <br/> [*objectName* request_msg *msg* ?-timeout *ms* ?-callback *cmdPrefix*? ?-dictmsg *dictmsg*?](#objectname-request_msg-msg--timeout-ms--callback-cmdprefix--dictmsg-dictmsg)<br/> [*objectName* ping ?-timeout *ms*?](#objectName-ping--timeout-ms) <br/> [*objectName* inbox](#objectName-inbox) <br/> [*objectName* current_server](#objectName-current_server) <br/> [*objectName* all_servers](#objectName-all_servers) <br/> [*objectName* server_info](#objectName-server_info) <br/> [*objectName* jet_stream *?args?*](#objectname-jet_stream-args) <br/> [*objectName* destroy](#objectName-destroy) <br/> ## Ensembles [nats::msg](#ensemble-natsmsg) <br/> [msg create *subject* ?-data *payload*? ?-reply *replyTo*?](#msg-create-subject--data-payload--reply-replysubj)<br/> [msg set *msgVariable option value*](#msg-set-msgvariable-option-value)<br/> [msg subject *msgValue*](#msg-subject-msgvalue)<br/> [msg data *msgValue*](#msg-data-msgvalue)<br/> [msg reply *msgValue*](#msg-reply-msgvalue)<br/> [msg no_responders *msgValue*](#msg-no_responders-msgvalue)<br/> [msg idle_heartbeat *msgValue*](#msg-idle_heartbeat-msgvalue)<br/> [msg flow_control *msgValue*](#msg-flow_control-msgvalue)<br/> [msg seq *msgValue*](#msg-seq-msgvalue)<br/> [msg timestamp *msgValue*](#msg-timestamp-msgvalue)<br/> [nats::header](#ensemble-natsheader)<br/> [header add *msgVariable key value*](#header-add-msgvariable-key-value)<br/> [header set *msgVariable key value ?key value?..*](#header-set-msgvariable-key-value-key-value)<br/> [header delete *msgVariable key*](#header-delete-msgvariable-key)<br/> [header values *msgValue key*](#header-values-msgvalue-key)<br/> [header get *msgValue key*](#header-get-msgvalue-key)<br/> [header keys *msgValue ?globPattern?*](#header-keys-msgvalue-globpattern)<br/> [header lookup *msgValue key default*](#header-lookup-msgValue-key-default)<br/> ## Namespace Commands [nats::timestamp](#natstimestamp)<br/> [nats::isotime_to_msec *isotime*](#natsisotime_to_msec-isotime)<br/> [nats::msec_to_isotime *msec ?tz?*](#natsmsec_to_isotime-msec-tz)<br/> [nats::mymethod *method ?args?*](#natsmymethod-method-args)<br/> # Description ## Event processing The client relies on a running event loop to send and deliver messages and uses only non-blocking sockets. Everything works in your Tcl interpreter and no background Tcl threads or interpreters are created under the hood. So, if your application might leave the event loop for a long time (e.g. a long computation without event processing), the NATS client should be created in a separate thread. Calls to blocking API (synchronous versions of `connect`, `request`, `ping`) involve `vwait` under the hood, so that other event processing can continue. If the API is called from a coroutine, `coroutine::util vwait` is used instead of a plain `vwait` to avoid nested event loops. ## Message headers When using NATS server version 2.2 and later, you can publish and receive messages with [headers](https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-4.md). Please, keep in mind that: - keys are case-sensitive (unlike standard HTTP headers) - duplicate keys are allowed (just like standard HTTP headers). In Tcl this is represented as a key pointing to a *list* of values, mimicking the same API as in nats.go and nats.java. - `Status` and `Description` keys are reserved by the NATS protocol, in particular for implementation of the [no-responders](https://docs.nats.io/whats_new_22#react-quicker-with-no-responder-notifications) feature. ## Receiving a message as a Tcl dict For simplicity, an incoming message is returned by `request` or a subscription callback as a string. This is only the payload. If you need more advanced access, e.g. to get message headers, you can pass the `-dictmsg true` argument to indicate that the package should deliver `message` as a dict. Then you can work with this variable using the [nats::msg ensemble](#natsmsg). Also, instead of passing `-dictmsg true` to every call, you can `configure` your connection to return messages always as dicts. Note that the JetStream API **always** returns messages as dicts. ## Public variables The connection object exposes 3 "public" read-only variables: - `last_error` - used to deliver asynchronous errors, e.g. if the network fails. It is a dict with 2 keys similar to the arguments for `throw`: - code: error code, e.g. {NATS ErrAuthorization} - errorMessage: human-readable error message - `status` - connection status, one of `$nats::status_closed`, `$nats::status_connecting`, `$nats::status_connected` or `$nats::status_reconnecting`. Also you can query the status using `$connection cget -status`. - `serverInfo` - array with INFO from the current server. Intended only for tracing. Note there is `server_info` method that returns a dict with the same data. You can set up traces on these variables to get notified e.g. when the connection status changes or NATS server enters `ldm` - lame duck mode. See the example below in the paragraph about asynchronous error handling. ## Options The `configure` method accepts the following options. Make sure to set them *before* calling `connect`. | Option | Type | Default | Comment | | ------------- |--------|---------|---------| | -servers | list | (mandatory) | URLs of NATS servers | | -name | string | | Client name sent to a NATS server during the handshake| | -pedantic | boolean |false | Pedantic protocol mode. If true, some extra checks are performed by a NATS server| | -verbose | boolean | false | If true, every protocol message is echoed by the server with +OK. Has no effect on functioning of the client itself | |
︙ | ︙ | |||
96 97 98 99 100 101 102 103 | | -user | string | | Default username| | -password | string | | Default password| | -token | string | | Default authentication token| | -secure | boolean | false | If secure=true, connection will fail if a server can't provide a TLS connection | | -check_subjects | boolean | true | Enable client-side checking of subjects when publishing or subscribing | | -dictmsg | boolean | false | Return messages from `subscribe` and `request` as dicts by default | | -utf8_convert | boolean | false | By default, the client does not change a message body when it is sent or received. Setting this option to `true` will encode outgoing messages to UTF-8 and decode incoming messages from UTF-8. This option applies to the higher-level classes as well: `jet_stream` and `key_value`. | | > | | | | 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 | | -user | string | | Default username| | -password | string | | Default password| | -token | string | | Default authentication token| | -secure | boolean | false | If secure=true, connection will fail if a server can't provide a TLS connection | | -check_subjects | boolean | true | Enable client-side checking of subjects when publishing or subscribing | | -dictmsg | boolean | false | Return messages from `subscribe` and `request` as dicts by default | | -utf8_convert | boolean | false | By default, the client does not change a message body when it is sent or received. Setting this option to `true` will encode outgoing messages to UTF-8 and decode incoming messages from UTF-8. This option applies to the higher-level classes as well: `jet_stream` and `key_value`. | -request_timeout | integer | 10000 |Default timeout (ms) for requests| # Commands ## `nats::connection` ### constructor ?*conn_name*? ?-logger *logger*? ?-log_chan *channel*? ?-log_level *level*? Creates a new instance of the TclOO object `nats::connection`. If you provide a connection name (recommended!), it is sent to NATS in the `CONNECT` message.<br/> The constructor also initializes the logging functionality. With no arguments, the default severity level is `warn` and destination is `stdout`. You can configure logging in 2 ways: - either create and configure your own [logger](https://core.tcl-lang.org/tcllib/doc/trunk/embedded/md/tcllib/files/modules/log/logger.md) object and pass it with `-logger` option - or set severity with `-log_level` and output channel with `-log_chan`. The class uses only 4 levels: debug, info, warn, error See also the [examples](examples) folder. ### objectName cget *option* Returns the current value of a NATS option as described in the table above. You can also query the connection `-status`. ### objectName configure *?option? ?value option value...?* When given no arguments, returns a dict of all options with their current values. When given one option, returns its current value (same as `cget`). When given more arguments, assigns each value to an option. The only mandatory option is `servers`, and others have reasonable defaults. ### objectName reset ?*option* ... ? Resets the option(s) to default values. |
︙ | ︙ | |||
133 134 135 136 137 138 139 | ### objectName publish *subject message* ?-reply *replyTo*? Publishes a message to the specified subject. See the NATS [documentation](https://docs.nats.io/nats-concepts/subjects) for more details about subjects and wildcards. The client will check subject's validity before sending according to [NATS Naming Rules](https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-6.md). <br/> `message` is the payload (can be a binary string). If you specify a `replyTo` subject, the receiver of your message will know where to send a reply. You can use the [inbox](#objectname-inbox) method to generate a transient [subject name](https://docs.nats.io/developing-with-nats/sending/replyto) starting with `_INBOX`. However, using asynchronous requests might accomplish the same task in an easier manner - see below. ### objectName publish_msg *msg* Publishes a message created using [nats::msg](#natsmsg) commands. This method is especially useful if you need to send a message with headers. | | | > > > > > > > > > < < > | | | | | | > > > > > | | | | > > | | | > | > | | | > > | > > | > > > > > | | > > > > > > > > > > > > > > > > > > > > > > | | | > > > > > > > > > > > > > > > > > > > > > > > > > > > | | 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 | ### objectName publish *subject message* ?-reply *replyTo*? Publishes a message to the specified subject. See the NATS [documentation](https://docs.nats.io/nats-concepts/subjects) for more details about subjects and wildcards. The client will check subject's validity before sending according to [NATS Naming Rules](https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-6.md). <br/> `message` is the payload (can be a binary string). If you specify a `replyTo` subject, the receiver of your message will know where to send a reply. You can use the [inbox](#objectname-inbox) method to generate a transient [subject name](https://docs.nats.io/developing-with-nats/sending/replyto) starting with `_INBOX`. However, using asynchronous requests might accomplish the same task in an easier manner - see below. ### objectName publish_msg *msg* Publishes a message created using [nats::msg](#natsmsg) commands. This method is especially useful if you need to send a message with headers. ### objectName subscribe *subject* ?*args*? Subscribes to a subject (possibly with wildcards) and returns a subscription ID. You can provide the following options: - `-callback cmdPrefix` (mandatory - see below) - `-queue queueGroup` - subscribe to a [queue group](https://docs.nats.io/developing-with-nats/receiving/queues) - `-max_msgs int` - automatically unsubscribe after `max_msgs` messages have been received - `-dictmsg bool` - if false (default), only a payload is delivered in `message`. Set to true to receive `message` as a dict, e.g. to access headers using the `nats::msg` ensemble. You can also `configure` the connection to have `-dictmsg` as true by default. - `-post bool` - controls how the callback is invoked, see below. Default true. Exercise with caution. Whenever a message arrives, the command prefix `cmdPrefix` will be invoked from the event loop. It must have the following signature:<br/> **subscriptionCallback** *subject message replyTo*<br/> The default invocation by posting a Tcl event ensures that user code is separated from the library code, e.g. in case the user code throws an error or takes a long time, the library can still function normally. However, posting a Tcl event has a performance cost. If your callback is trivial, e.g. only sets a variable or posts another Tcl event, you can use `-post false`. Then the library will invoke your callback directly. Note that it is done in the library's coroutine, so in your callback you can't use such functions as `publish` or `request`. ### objectName unsubscribe *subID* ?-max_msgs *maxMsgs*? Unsubscribes from a subscription with a given `subID` immediately. If `-max_msgs` is given, unsubscribes after this number of messages has been received **on this `subID`**. In other words, if you have already received 10 messages, and then you call `unsubscribe $subID -max_msgs 10`, you will be unsubscribed immediately. ### objectName request *subject message* ?*args*? Sends `message` (payload) to the specified `subject` with an automatically generated transient reply-to (inbox). You can provide the following options: - `-timeout ms` - expire the request after X ms (recommended!). Default timeout is taken from the `-request_timeout` option. - `-callback cmdPrefix` - do not block and deliver the reply to this callback. - `-dictmsg bool` - return the reply as a dict accessible to the [nats::msg](#natsmsg) ensemble. - `-max_msgs int` - gather multiple replies. If this option is not used, the 'new-style' request is triggered under the hood (uses a shared subscription for all requests), and only the first reply is returned. If this option is used (even with `maxMsgs`=1), it triggers the 'old-style' request that creates its own subscription. `-dictmsg` is always true in this case. Depending if there's a callback, the method works in a sync or async manner. If no callback is given, the request is synchronous and blocks in a (coroutine-aware) `vwait` and then returns a reply. If `-max_msgs` is used, the returned value is a list of message dicts. If the timeout has fired, this list contains only the messages received so far. If no reply arrives within `timeout`, it raises the error `ErrTimeout`. When using NATS server version 2.2+, `ErrNoResponders` is raised if nobody is subscribed to `subject`. If a callback is given, the call returns immediately. The return value is a unique ID that can be used to cancel the request. When a reply is received or a timeout fires, the callback will be invoked from the event loop. It must have the following signature:<br/> **asyncRequestCallback** *timedOut reply*<br/> `timedOut` is `true`, if the request timed out or no responders are available. In the latter case, the no-responders message is passed to the callback in `reply`.<br/> `reply` is the received message. If `-max_msgs`>1, the callback is invoked for each message. If the timeout fires before `-max_msgs` are received, the callback is invoked one last time with `timedOut=true`. If `disconnect` is called, all pending requests are cancelled. In contrast, if the connection is lost, and the client transitions to `$nats::status_closed`, all pending requests are marked as timed out without waiting for the actual timer to fire. ### objectName request_msg *msg* ?-timeout *ms*? ?-callback *cmdPrefix*? ?-dictmsg *dictmsg*? Sends a request with a message created using [nats::msg](#natsmsg). The rest of arguments work the same as in `request`. ### objectName cancel_request *reqID* Cancels the asynchronous request with the given `reqID`. ### objectName ping ?-timeout *ms*? Triggers a ping-pong exchange with the NATS server, enters (coroutine-aware) `vwait` and returns true upon success. If the server does not reply within the specified timeout (ms), it raises `ErrTimeout`. Default timeout is 10s. You can use this method to check if the server is alive or ensure all prior calls to `publish` and `subscribe` have been flushed to NATS. Note that in other NATS clients this function is usually called "flush". ### objectName inbox Returns a new inbox - random subject starting with _INBOX. ### objectName current_server Returns a 2-element list with host and port of the current NATS server. ### objectName all_servers Returns a list with all servers in the pool. ### objectName server_info Returns a dict with the INFO message from the current server. ### objectName jet_stream *?args?* This 'factory' method creates [jetStreamObject](JsAPI.md) to work with JetStream. You can provide the following options: - `-timeout ms` - timeout for all requests to JetStream API. Default is 5s. - `-domain str` - specifies the JetStream [domain](https://docs.nats.io/running-a-nats-service/configuration/leafnodes/jetstream_leafnodes). - `-api_prefix str` - specifies the JetStream API prefix. This prefix is needed when JS API is [imported](https://docs.nats.io/running-a-nats-service/configuration/securing_nats/accounts#import-export-example) from another account. You can specify either `-domain` or `-api_prefix`, but not both. - `-trace bool` - enables debug logging of every request to the JS API similar to the `--trace` option in NATS CLI. Remember to set the logging level in `connection` to `debug` as well. Remember to destroy this object when it is no longer needed - there's no built-in garbage collection in `connection`. ### objectName destroy TclOO destructor. It calls `disconnect` and then destroys the connection together with all [children objects](#TclOO-Lifecycle). ## Ensemble `nats::msg` This ensemble encapsulates all commands to work with a NATS message. Accessing it as a dict is deprecated. ### msg create *subject* ?-data *payload*? ?-reply *replySubj*? Returns a new message with the specified subject, payload and reply subject. ### msg set *msgVariable option value* Updates the message. Possible options are `-subject`, `-data` and `-reply`. ### msg subject *msgValue* Returns the message subject. ### msg data *msgValue* Returns the message payload. ### msg reply *msgValue* Returns the message reply-to subject. ### msg no_responders *msgValue* Returns true if this is a no-responders message (status 503). ### msg idle_heartbeat *msgValue* Returns true if this is an idle heartbeat. ### msg flow_control *msgValue* Returns true if this is a flow control message. ### msg seq *msgValue* Returns the message sequence number (only for messages returned by `stream_msg_get` & `stream_direct_get`). ### msg timestamp *msgValue* Returns the message timestamp in the ISO format, e.g. 2022-11-22T13:31:35.4514983Z (only for messages returned by `stream_msg_get` & `stream_direct_get`). ## Ensemble `nats::header` This ensemble encapsulates all commands to work with message headers. Accessing them as a dict is deprecated. ### header add *msgVariable key value* Appends a new value to the `key` header in the message. If this header does not exist yet, it is created. ### header set *msgVariable key value ?key value?..* Sets the `key` header to `value`. Multiple headers can be set at once by repeating key-value arguments (like in `dict create`). ### header delete *msgVariable key* Deletes the `key` header from the message. ### header values *msgValue key* Returns a list of all values of the `key` header. ### header get *msgValue key* Returns the first value of the `key` header. This is a convenient shortcut for the `values` command, since usually each header has only one value. ### header keys *msgValue ?globPattern?* Returns a list of all header keys in the message. With `globPattern`, only matching keys are returned (like in `dict keys`) ### header lookup *msgValue key default* Same as [header get](#header-get-msgvalue-key), but returns a default value if the key does not exist. ## Namespace Commands ### nats::timestamp Returns current local time in the ISO 8601 format, including milliseconds. Useful for logging. ### nats::isotime_to_msec *isotime* Converts an ISO timestamp (as used by the NATS wire format, e.g. 2022-11-22T13:31:35.4514983Z) to integer milliseconds since the epoch (note possible rounding of fractional seconds). ### nats::msec_to_isotime *msec ?tz?* Converts integer milliseconds to an ISO timestamp in the given timezone (default UTC). The local time zone can be specified as `:localtime` (see the [Tcl reference](https://www.tcl.tk/man/tcl8.6/TclCmd/clock.html#M78)). Note that the time zone designator is not included in the returned string. ### nats::mymethod *method ?args?* Same thing as [mymethod](https://core.tcl-lang.org/tcllib/doc/trunk/embedded/md/tcllib/files/modules/ooutil/ooutil.md#1), but safe to use even if the object is destroyed by the time when the callback is scheduled. # Error Handling Error codes are similar to those from the nats.go client as much as possible. A few additional error codes provide more information about failed connection attempts to the NATS server: ErrBrokenSocket, ErrTLS, ErrConnectionRefused. All synchronous errors are raised using `throw {NATS <error_code>} human-readable message`, so you can handle them using try&trap, for example: ```Tcl try { ... } trap {NATS ErrTimeout} {err opts} { # handle a request timeout } trap NATS {err opts} { # handle other NATS errors } trap {} {err opts} { # handle other (non-NATS) errors } ``` | Synchronous errors | Reason | | ------------- |--------| | ErrConnectionClosed | Attempt to subscribe or publish a message before calling `connect` | | ErrNoServers | No NATS servers available| | ErrInvalidArg | Invalid argument | | ErrBadSubject | Invalid subject for publishing or subscribing | | ErrBadQueueName | Invalid queue name | | ErrBadTimeout | Invalid timeout argument | | ErrMaxPayload | Message size is more than allowed by the server | | ErrBadSubscription | Invalid subscription ID | | ErrTimeout | Timeout of a synchronous request or ping | | ErrNoResponders | No responders are available for request | | ErrHeadersNotSupported| Headers are not supported by this server | Asynchronous errors are sent to the logger and can also be queried/traced using `$last_error`, for example: ```Tcl # the proper way to access an object's internal namespace set ns [info object namespace $conn] set err [set ${ns}::last_error] puts "Error code: [dict get $err code]" puts "Error text: [dict get $err errorMessage]" ``` | Async errors | Reason | Terminates connection | | ------------- |--------|----| | ErrBrokenSocket | TCP socket failed | yes | | ErrTLS | TLS handshake failed | yes | | ErrStaleConnection | The client or server closed the connection, because the other party did not respond to PING on time | yes | | ErrConnectionRefused | TCP connection to the server was refused, possibly due to a wrong port, DNS resolution failure, or the server was not running | yes | | ErrSecureConnWanted | Client requires TLS, but the server does not provide TLS | yes | | ErrConnectionTimeout | Connection to a server could not be established within `-connect_timeout` ms | yes| | ErrBadHeaderMsg | The client failed to parse message headers. Nevertheless, the message body is delivered | no | | ErrServer | Generic error reported by NATS | yes | | ErrBadSubject | Message had an invalid subject | no | | ErrPermissions | The user is not authorized to publish to this subject | no | | ErrAuthorization | User authorization has failed or no credentials are known for this server | yes | | ErrAuthExpired | User authorization has expired | yes | | ErrAuthRevoked | User authorization has been revoked | yes | | ErrAccountAuthExpired | NATS server account authorization has expired| yes | | ErrProtocol | Received an invalid protocol token | yes | # Connection Status and the Reconnection Process You can check the connection status as follows: ```Tcl if {[$conn cget -status] eq $nats::status_closed} { puts "Connection closed!" } ``` The connection can have one of the four statuses: - `$nats::status_closed`: initial state after creating the object. The TCP socket is closed. Calling `subscribe`, `unsubscribe`, `publish`, `request` etc raises `ErrConnectionClosed`. Calling `disconnect` is no-op. - `$nats::status_connecting`: triggered by calling `connect`. The client is trying to connect to servers in the pool one by one. All servers are tried only once regardless of `-max_reconnect_attempts`. If no servers are available, the client logs the error and transitions into `$nats::status_closed`. If the synchronous version of `connect` is used, it raises `ErrNoServers` (in case of multiple servers configured in the pool) or a more specific error if the pool has only one server. Calling `subscribe`, `unsubscribe` and `publish` is allowed - they will be flushed as soon as the client transitions into `$nats::status_connected`. You can also use `request`, if the timeout is sufficiently long. - `$nats::status_connected`: the TCP connection to a NATS server is established (including TLS upgrade and credentials verification, if needed). Calling `disconnect` transitions the client into `$nats::status_closed`. If the connection is lost, the client transitions into `$nats::status_reconnecting`. - `$nats::status_reconnecting` - triggered by any of the above asynchronous errors that terminate the connection. The client is trying to connect to servers in the pool one by one. Consecutive attempts to connect to a specific server are at least `-reconnect_time_wait` ms apart. Every failed connection to a server increments its `reconnects` counter. Once this counter exceeds `-max_reconnect_attempts`, the server is removed from the pool. Once no servers are left in the pool, or the user calls `disconnect`, the client transitions into `$nats::status_closed`. Calling `subscribe`, `unsubscribe`, `publish` etc is allowed. As soon as the client transitions into `$nats::status_connected`, they will be flushed along with restoring all current subscriptions. Calling `connect` when the status is not `$nats::status_closed`, is no-op.<br/> Calling `ping` when the status is not `$nats::status_connected`, raises `ErrConnectionClosed`.<br/> Calling `disconnect` cancels all pending requests and pings as described in the table below.<br/> After connection was lost, the server pool must be restored by calling `configure -servers` before attempting to `connect` again. Official NATS clients have a few more statuses: - They distinguish between `DISCONNECTED` (when initial connection attempts failed) and `CLOSED` (if the user called `close` or the connection was lost). I don't see any value in this, so both statuses correspond to `$nats::status_closed`. - `DRAINING_PUBS` - [drain](https://docs.nats.io/using-nats/developer/receiving/drain) function has been called. The (official) client will flush all pending data to the socket and perform the PING/PONG exchange before closing the socket. With the Tcl client, calling `disconnect` always flushes pending data before closing the socket, and there's no need in a separate status. There's no final PING/PONG though. - `DRAINING_SUBS` - the (official) client is draining all subscriptions before closing the socket, which is equivalent to sending `UNSUB` + `PING/PONG`. With the Tcl client, calling `disconnect` discards all pending input data in the socket, but already scheduled subscription callbacks will be invoked. Calling `unsubscribe` deletes the subscription immediately, so if the socket buffer still contains any `MSG` for this subscription, it will be discarded. If you have a continuous stream of incoming messages that must not be lost, you have two options: - `unsubscribe -max_msgs` + `ping` and wait until the subscription callback is no longer invoked - or use JetStream # Requests Error Handling This table summarizes how [request](#objectName-request-subject-message-args), [ping](#objectName-ping--timeout-ms) and [fetch](JsApi.md#js-fetch-stream-consumer-args) behave in case of: - timeout - no-responders message from NATS - if the connection goes into the reconnecting mode after a request has been issued - if the connection is lost (no more NATS servers to try) - if a user calls `disconnect` and the connection is closed together with references to the testing suite. Note that if a connection is closed deliberately by calling `disconnect` or `destroy`, pending synchronous requests immediately throw `ErrConnectionClosed`, while callbacks for asynchronous requests are simply not invoked. | | Timeout |No responders|Reconnecting|Connection lost|Connection closed| | ---- |--------|----|---- |--------|----| |sync request|ErrTimeout<br/>basic-7|ErrNoResponders<br/>pubsub-5 |continues ok|ErrTimeout<br/>cluster-3.2 |ErrConnectionClosed<br/>basic-18.2| |async request|$timeout=1, blank msg<br/>basic-10|$timeout=1, no-resp msg<br/> pubsub-6 |continues ok<br/>cluster-4|$timeout=1, blank msg<br/>cluster-3.1 |callback is not invoked<br/>basic-18.1| |ping|ErrTimeout<br/>basic-19|N/A|ErrConnectionClosed|ErrConnectionClosed<br/>cluster-3.1|ErrConnectionClosed<br/>basic-18.3| |sync fetch|ErrTimeout<br/>jet_stream-4.2|N/A |continues ok |ErrTimeout<br/>jet_stream-8.3 |ErrConnectionClosed | |async fetch|$timeout=1, blank msg<br/>jet_stream-5.2|N/A |continues ok |$timeout=1, blank msg |callback not invoked | Both "connection lost" and "connection closed" are represented as `$nats::status_closed`. Users can distinguish between the two cases by checking `$last_error` that is blank in the latter case. # Encrypted TLS Connections NATS can be [configured](https://docs.nats.io/running-a-nats-service/configuration/securing_nats/tls) to encrypt connections using TLS. Note that according to the NATS protocol, the handshake always starts with plain TCP. If the [INFO](https://docs.nats.io/reference/reference-protocols/nats-protocol#info) message from NATS contains `tls_required=true`, then the client upgrades the connection to TLS before sending the `CONNECT` message. You can configure the client to require a TLS connection in two ways: - use `-secure` option (applies to all servers in the pool) - use `tls://` schema in a NATS URL (applies only to this server) Make sure that the TclTLS package is available in your system[^1]. E.g. on OpenSUSE it is called `tls` in zypper. Most likely you will need to provide additional options via `-tls_opts`: at least one of `-cadir` or `-cafile`, otherwise the client can not recognize the server's certificate. E.g. if you have installed your CA certificate on OpenSUSE system-wide, you can use ```Tcl $connection configure -tls_opts {-cadir /etc/ssl} ``` Consult the documentation of [tls::import](https://core.tcl-lang.org/tcltls/wiki?name=Documentation#tls::import) for all supported options. The client supplies default options `-require 1` and `-command ::nats::tls_callback` that you can override. The default callback does nothing. If NATS server requires clients to authenticate using TLS certificates, you need to use `-certfile` and `-keyfile` options. # TclOO Lifecycle `nats::connection` is the top-level object that can be created using both `new` and `create` TclOO commands. All other objects are created using respective factory methods and their lifecycle is limited to the lifecycle of the "parent" object. Depending on a use case, you can choose to call `destroy` explicitly or to rely on the automatic destruction of objects that this library implements as follows: ```mermaid classDiagram direction LR class connection { jet_stream() } class jet_stream { ordered_consumer() bind_kv_bucket() create_kv_bucket() create_kv_aggregate() } class key_value { watch() } connection "1" *-- "*" jet_stream jet_stream "1" *-- "*" ordered_consumer jet_stream "1" *-- "*" key_value jet_stream "1" *-- "*" kv_watcher key_value --> kv_watcher ``` Note that `kv_watcher` can outlive the `key_value` object from which it was created. [^1]: Due to a [bug](https://core.tcl-lang.org/tcltls/tktview/3c42b2ba11) in TclTLS, the client does **not** verify that the certificate from NATS matches the hostname (X509v3 Subject Alternative Name). |
Name change from assets/nats2/JsAPI.md to assets/nats3/JsAPI.md.
1 2 3 4 | # JetStream API JetStream functionality of NATS can be accessed by creating the `nats::jet_stream` TclOO object. Do not create it directly - instead, call the [jet_stream](CoreAPI.md#objectname-jet_stream--timeout-ms--domain-domain) method of your `nats::connection`. You can have multiple JS objects created from the same connection, each having its own timeout and domain. | > > | | | < > > > > | < | | < > > > > > > > > > > > | > > > | > | > > > > > > > > > > > > > | > > > | > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 | # JetStream API JetStream functionality of NATS can be accessed by creating the `nats::jet_stream` TclOO object. Do not create it directly - instead, call the [jet_stream](CoreAPI.md#objectname-jet_stream--timeout-ms--domain-domain) method of your `nats::connection`. You can have multiple JS objects created from the same connection, each having its own timeout and domain. Key/value-related functions of `nats::jet_stream` are documented [here](KvAPI.md). # Synopsis ## Class `nats::jet_stream` [*js* publish *subject message ?args?*](#js-publish-subject-message-args)<br/> [*js* publish_msg *message ?args?*](#js-publish_msg-message-args)<br/> [*js* fetch *stream consumer ?args?*](#js-fetch-stream-consumer-args)<br/> [*js* ack *message*](#js-ack-message)<br/> [*js* ack_sync *message*](#js-ack_sync-message)<br/> [*js* nak *message* ?-delay *ms*?](#js-nak-message--delay-ms)<br/> [*js* in_progress *message*](#js-in_progress-message)<br/> [*js* term *message*](#js-term-message)<br/> [*js* cancel_pull_request *reqID*](#js-cancel_pull_request-reqID)<br/> [*js* add_stream *stream* ?-option *value*?..](#js-add_stream-stream--option-value)<br/> [*js* update_stream *stream* ?-option *value*?..](#js-update_stream-stream--option-value)<br/> [*js* add_stream_from_json *json_config*](#js-add_stream_from_json-json_config)<br/> [*js* delete_stream *stream*](#js-delete_stream-stream)<br/> [*js* purge_stream *stream* ?-filter *subject*? ?-keep *int*? ?-seq *int*?](#js-purge_stream-stream--filter-subject--keep-int--seq-int)<br/> [*js* stream_info *stream*](#js-stream_info-stream)<br/> [*js* stream_names ?-subject *subject*?](#js-stream_names--subject-subject)<br/> [*js* add_consumer *stream* ?-option *value*?..](#js-add_consumer-stream--option-value)<br/> [*js* update_consumer *stream* ?-option *value*?..](#js-update_consumer-stream--option-value)<br/> [*js* add_pull_consumer *stream consumer ?args?*](#js-add_pull_consumer-stream-consumer-args)<br/> [*js* add_push_consumer *stream consumer deliver_subject ?args?*](#js-add_push_consumer-stream-consumer-deliver_subject-args)<br/> [*js* add_consumer_from_json *stream consumer json_config*](#js-add_consumer_from_json-stream-consumer-json_config)<br/> [*js* delete_consumer *stream consumer*](#js-delete_consumer-stream-consumer)<br/> [*js* consumer_info *stream consumer*](#js-consumer_info-stream-consumer)<br/> [*js* consumer_names *stream*](#js-consumer_names-stream)<br/> [*js* ordered_consumer *stream ?args?*](#js-ordered_consumer-stream-args)<br/> [*js* stream_msg_get *stream* ?-last_by_subj *subj*? ?-next_by_subj *subj*? ?-seq *int*?](#js-stream_msg_get-stream--last_by_subj-subj--next_by_subj-subj--seq-int)<br/> [*js* stream_direct_get *stream* ?-last_by_subj *subj*? ?-next_by_subj *subj*? ?-seq *int*?](#js-stream_direct_get-stream--last_by_subj-subj--next_by_subj-subj--seq-int)<br/> [*js* stream_msg_delete *stream* -seq *int* ?-no_erase *bool*?](#js-stream_msg_delete-stream--seq-int--no_erase-bool)<br/> [*js* account_info](#js-account_info)<br/> [*js* api_prefix](#js-api_prefix)<br/> [*js* timeout](#js-timeout)<br/> [*js* destroy](#js-destroy)<br/> ## Class `nats::ordered_consumer` [*consumer* info](#consumer-info)<br/> [*consumer* name](#consumer-name)<br/> [*consumer* destroy](#consumer-destroy)<br/> ## Namespace Commands [nats::metadata *message*](#natsmetadata-message)<br/> [nats::make_stream_source ?-option *value*?..](#natsmake_stream_source--option-value)<br/> [nats::make_subject_transform ?-option *value*?..](#natsmake_subject_transform--option-value)<br/> [nats::make_republish ?-option *value*?..](#natsmake_republish--option-value) # Description The [Core NATS](CoreAPI.md) pub/sub functionality offers the at-most-once delivery guarantee based on TCP. This is sufficient for many applications, where an individual message doesn't have much value. In case of a transient network disconnection, a subscriber simply waits until the connection is restored and a new message is delivered. In some applications, however, each message has a real business value and must not be lost in transit. These applications should use [JetStream](https://docs.nats.io/nats-concepts/jetstream) that offers at-least-once and exactly-once delivery guarantees despite network disruptions or software crashes. Also, JetStream provides temporal decoupling of publishers and subscribers (consumers), i.e. each published message is persisted on disk and delivered to a consumer when it is ready. JetStream introduces no new elements in the NATS protocol, but builds on top of it: primarily the request-reply function, with special JSON messages and status headers. Unlike the core NATS server functionality that is simple, stable and well-documented, JetStream is quite large and under active development by Synadia. Not all aspects of JetStream are consistently documented. I've used these sources for development of nats-tcl: - [docs.nats.io](https://docs.nats.io/nats-concepts/jetstream) - [ADRs](https://github.com/nats-io/nats-architecture-and-design) - [nats CLI](https://github.com/nats-io/natscli) subcommand `schema` - `nats-server` in tracing mode (-DV) - and of course studying source code of nats.go, nats.c and nats.py Unfortunately, I don't have enough capacity to cover the whole JetStream functionality or keep up with Synadia's development. So, I've decided to focus on the most useful functions: - publishing messages to streams with confirmations - fetching messages from pull consumers - support for all kinds of message acknowledgement - JetStream asset management (streams and pull/push consumers) - [Key/Value store](KvAPI.md) The implementation can tolerate minor changes in JetStream API. E.g. a publish acknowledgment is returned just as a dict parsed from JSON. So, if in future the JSON schema gets a new field, it will be automatically available in this dict. If you need other JetStream functions, e.g. Object Store, you can easily implement them yourself using core NATS requests. No need to interact directly with the TCP socket. Of course, PRs are always welcome. ## Notes on the JetStream Client API v2 In June 2023 Synadia has [announced](https://nats.io/blog/preview-release-new-jetstream-client-api/) some major changes to the JetStream Client API. You can find more details in [ADR-37](https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-37.md), [nats.go](https://pkg.go.dev/github.com/nats-io/nats.go/jetstream) docs and the [migration guide](https://natsbyexample.com/examples/jetstream/api-migration/go). Note that these changes are purely client-side, and there are no new server-side concepts. Of course, this announcement affects development of the Tcl client as well. A lot of effort has been invested in the design following JetStream API v1. I need to balance my workload vs keeping in line (more or less) with other client libraries. So, in this library there is no clear distinction between v1 and v2, but rather a pragmatic middle ground. Here is a list of most important design changes by Synadia together with my responses: 1. Streams and Consumers have their own classes now. So, e.g. to query the `CONSUMER.INFO` NATS API using JetStream v1 you call: ```go info, err := js.ConsumerInfo(streamName, consumerName) ``` While with JetStream v2 you call: ```go stream, err := js.Stream(ctx, streamName) consumer, err := stream.Consumer(ctx, consumerName) info, err := consumer.Info(ctx) ``` The benefit is clear for all programming languages having proper autocompletion support, because you get a comprehensible list of all functions related to a stream or consumer, instead of one huge list of functions in `JetStreamContext`. Unfortunately, Tcl doesn't have such autocompletion, so introduction of new TclOO classes for streams and consumers would only complicate the library. However, there is `nats::ordered_consumer` that is similar to [OrderedConsumer](https://pkg.go.dev/github.com/nats-io/nats.go/jetstream#readme-ordered-consumers) in JetStream v2. 2. Removal of the overly complex `JetStream.Subscribe` function. This is achieved by breaking it down to smaller specialized functions and by deprecating push consumers. <br/> This change does not affect the Tcl client. Pull and push consumers are always created explicitly by calling `add_consumer`. There is no dedicated method to subscribe to push consumers. The core NATS subscription is perfectly adequate for durable push consumers. If it is configured with idle heartbeats, you will need to filter them out by checking `nats::msg idle_heartbeat`. And for ephemeral consumers you can use `nats::ordered_consumer`. 3. Removal of `JetStream.PullSubscribe` function. `Fetch` is now a function of `Consumer` interface that performs both subscribing and receiving messages. This change actually brings nats.go closer to nats-tcl, where `fetch` (aka `consume`) has always been working like this. 4. Introduction of the new way to pull messages from a consumer using continuous polling. This API is designed to combine the best of pull and push consumers, thus helping users to move away from push consumers. The new function is called `Consume`, while the old method is called `Fetch`. <br/> Unfortunately, the Tcl client already has `$js consume` method that actually performs fetching and should have been called `fetch` from the beginning. So, to avoid future confusion, I've added a new method [fetch](#js-fetch-stream-consumer-args) that works exactly like `consume`. The old `consume` stays in place for backwards compatibility. If in future I decide to implement the real `consume` (continuous polling), it will be done in another TclOO class. 5. Having a dedicated Stream class allows the implementation to choose between `STREAM.MSG.GET` and `DIRECT.GET` API depending on the stream configuration, i.e. if it has AllowDirect=true. This is done transparently for the user - compare e.g. `Stream.GetMsg` in JetStream v2 with `JetStreamManager.GetMsg` in JetStream v1 that has an option `DirectGet`.<br/>The Tcl client provides separate methods for these APIs: `stream_msg_get` and `stream_direct_get` respectively. However, [key_value](KvAPI.md) class knows the stream configuration and chooses between these 2 methods automatically. ## JetStream wire format The JetStream wire format uses nanoseconds for timestamps and durations in all requests and replies. To be consistent with the rest of the Tcl API, the client converts them to milliseconds before returning to a user. And vice versa: all function arguments are accepted as ms and converted to ns before sending. Streams and consumers are checked according to the naming rules described in [ADR-6](https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-6.md). Paging with total/offset/limit is not supported. |
︙ | ︙ | |||
92 93 94 95 96 97 98 | This approach has 2 benefits: - Configuration of streams and consumers is kept separately from Tcl source code. It can be saved in VCS or generated on the fly, and shared with NATS CLI or other NATS clients. - It is future-proof: if the Tcl client lags behind JetStream development, you still have access to the latest JetStream features, and the library still takes care of error checking. You can find an example in [js_mgmt.tcl](examples/js_mgmt.tcl). | | > | 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 | This approach has 2 benefits: - Configuration of streams and consumers is kept separately from Tcl source code. It can be saved in VCS or generated on the fly, and shared with NATS CLI or other NATS clients. - It is future-proof: if the Tcl client lags behind JetStream development, you still have access to the latest JetStream features, and the library still takes care of error checking. You can find an example in [js_mgmt.tcl](examples/js_mgmt.tcl). # Commands ## `nats::jet_stream` ### js publish *subject message ?args?* Publishes `message` (payload) to a [stream](https://docs.nats.io/jetstream/concepts/streams) on the specified `subject` and returns an acknowledgement (`pubAck`) from the NATS server. The method uses [request](CoreAPI.md#objectName-request-subject-message-args) under the hood. You can provide the following options: - `-timeout ms` - timeout for the underlying NATS request. Default timeout is taken from the `jet_stream` object. - `-callback cmdPrefix` - do not block and deliver the acknowledgement to this callback. - `-stream stream` - set the expected target stream (recommended!). If the subject does not match the stream, NATS will return an error. |
︙ | ︙ | |||
121 122 123 124 125 126 127 | - code: high-level HTTP-like code e.g. 404 if a stream wasn't found - err_code: more specific JetStream code, e.g. 10060 - errorMessage: human-readable error message, e.g. "expected stream does not match" Note that you can publish messages to a stream using [nats::connection publish](CoreAPI.md#objectname-publish-subject-message--reply-replyto) as well. But in this case you have no confirmation that the message has reached the NATS server, so it misses the whole point of using JetStream. ### js publish_msg *message ?args?* Publishes `message` (created with [nats::msg create](CoreAPI.md#msg-create-subject--data-payload--reply-replysubj)) to a stream. Other options are the same as above. Use this method to publish a message with headers. | | | | < < | | | | > > < < < < < < < < < < < | | | | | | | | | | | | | | | | | > > | | | | > > > > | | > | > > | | | 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 | - code: high-level HTTP-like code e.g. 404 if a stream wasn't found - err_code: more specific JetStream code, e.g. 10060 - errorMessage: human-readable error message, e.g. "expected stream does not match" Note that you can publish messages to a stream using [nats::connection publish](CoreAPI.md#objectname-publish-subject-message--reply-replyto) as well. But in this case you have no confirmation that the message has reached the NATS server, so it misses the whole point of using JetStream. ### js publish_msg *message ?args?* Publishes `message` (created with [nats::msg create](CoreAPI.md#msg-create-subject--data-payload--reply-replysubj)) to a stream. Other options are the same as above. Use this method to publish a message with headers. ### js fetch *stream consumer ?args?* Consumes a number of messages from a [pull consumer](https://docs.nats.io/jetstream/concepts/consumers) defined on a [stream](https://docs.nats.io/jetstream/concepts/streams). This is the analogue of `PullSubscribe` + `fetch` in official NATS clients. You can provide the following options: - `-batch_size int` - number of messages to consume. Default batch is 1. - `-timeout ms` - pull request timeout - see below. - `-callback cmdPrefix` - do not block and deliver messages to this callback. The underlying JetStream API is rather intricate, so I recommend reading [ARD-13](https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-13.md) for better understanding. Pulled messages are always returned as Tcl dicts irrespectively of the `-dictmsg` option. They contain metadata that can be accessed using [nats::metadata](#natsmetadata-message). If `-timeout` is omitted, the client sends a `no_wait` request, asking NATS to deliver only currently pending messages. If there are no pending messages, the method returns an empty list. If `-timeout` is given, it defines both the client-side and server-side timeouts for the pull request: - the client-side timeout is the timeout for the underlying `request` - the server-side timeout is 10ms shorter than `timeout`, and it is sent in the `expires` JSON field[^1]. This behaviour is consistent with `nats.go`. If a callback is not given, the request is synchronous and blocks in a (coroutine-aware) `vwait` until all expected messages are received or the pull request expires. If the client-side timeout fires *before* the server-side timeout, and no messages have been received, the method raises `ErrTimeout`[^2]. In all other cases it returns a list with as many messages as currently avaiable, but not more than `batch_size`. If a callback is given, the call returns immediately. Return value is a unique ID that can be used to cancel the pull request. When a message is pulled or a timeout fires, the callback will be invoked from the event loop. It must have the following signature: **cmdPrefix** *timedOut message* The client handles status messages 404 (no messages), 408 (request expired) and 409 (consumer deleted) appropriately. You can see them in the debug log, if needed. Also, they are passed to the callback together with `timedOut=1`. Overall, the synchronous form of `fetch` has clearer error reporting, because it can throw `ErrConsumerNotFound`, `ErrStreamNotFound`, `ErrJetStreamNotEnabled` etc that are not available to the asynchronous callback. Depending on the consumer's [AckPolicy](https://docs.nats.io/nats-concepts/jetstream/consumers#ackpolicy), you might need to acknowledge the received messages with one of the methods below. [This page](https://docs.nats.io/using-nats/developer/develop_jetstream/consumers#delivery-reliability) explains all different kinds of ACKs. **NB!** This method used to be called `consume`. However, [JetStream Client API V2](https://nats.io/blog/preview-release-new-jetstream-client-api/) has introduced a new way for continuously fetching messages using a self-refilling buffer, called "consume". This method is not supported yet by this library. So, to avoid confusion for new users of the library, `consume` is now deprecated, and new Tcl code should use `fetch`. ### js ack *message* Sends a positive ACK to NATS. This method is implemented using [publish](CoreAPI.md#objectname-publish-subject-message--reply-replyto) and returns immediately. Using `ack_sync` is more reliable. ### js ack_sync *message* Sends a positive ACK to NATS and waits for a confirmation (recommended). ### js nak *message* ?-delay *ms*? Negatively acknowledges a message. This tells the server to redeliver the message either immediately or after `delay` ms. ### js in_progress *message* Sends "work in progress" ACK to NATS and resets the redelivery timer on the server ### js term *message* Sends "terminate" ACK to NATS. The message will not be redelivered. ### js cancel_pull_request *reqID* Cancels the asynchronous pull request with the given `reqID`. ### js add_stream *stream* ?-option *value*?.. Creates a new `stream` with configuration specified as option-value pairs. See the [official docs](https://docs.nats.io/nats-concepts/jetstream/streams#configuration) for explanation of these options. | Option | Type | Default | Comment | | ------------- |--------|---------|---------| | -description | string | | | | -subjects | list of strings | (required)| | | -retention | one of: limits, interest,<br/> workqueue |limits | | | -max_consumers | int | | | | -max_msgs | int | | | | -max_bytes | int | | | | -discard | one of: new, old | | | | -max_age | ms | | | | -max_msgs_per_subject | int | | | | -max_msg_size | int | | | | -storage | one of: memory, file | file | | | -num_replicas | int | | | | -no_ack | boolean | | | | -duplicate_window | ms | | | | -mirror | JSON | |use [nats::make_stream_source](#natsmake_stream_source--option-value)| | -sources | list of JSON | |use [nats::make_stream_source](#natsmake_stream_source--option-value)| | -sealed | boolean | | | | -deny_delete | boolean | | | | -deny_purge | boolean | | | | -allow_rollup_hdrs | boolean | | | | -compression | one of: none, s2 | none | | | -first_seq | int | | | | -subject_transform | JSON | |use [nats::make_subject_transform](#natsmake_subject_transform--option-value)| | -republish | JSON | |use [nats::make_republish](#natsmake_republish--option-value)| | -allow_direct | boolean | | | | -mirror_direct | boolean | | | | -metadata | dict | | | Returns a JetStream reply (same as `stream_info`). ### js update_stream *stream* ?-option *value*?.. Updates the `stream` configuration with new options. Arguments and the return value are the same as in `add_stream`.[^3] ### js add_stream_from_json *json_config* Creates a stream with configuration specified as JSON. The stream name is taken from the JSON. ### js delete_stream *stream* Deletes the stream. ### js purge_stream *stream* ?-filter *subject*? ?-keep *int*? ?-seq *int*? Purges the stream, deleting all messages or a subset based on filtering criteria. See also [ADR-10](https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-10.md). ### js stream_info *stream* Returns stream information as a dict. ### js stream_names ?-subject *subject*? Returns a list of all streams or the streams matching the filter. ### js add_consumer *stream* ?-option *value*?.. Creates or updates a pull or push consumer defined on `stream`. See the [official docs](https://docs.nats.io/nats-concepts/jetstream/consumers#configuration) for explanation of these options. | Option | Type | Default | | ------------- |--------|---------| | -name[^4] | string | | | -durable_name | string | | | -description | string | | | -deliver_policy | one of: all, last, new, by_start_sequence<br/> by_start_time last_per_subject | all| | -opt_start_seq | int | | | -opt_start_time | string | | | -ack_policy |one of: none, all, explicit, | explicit | | -ack_wait | ms | | |
︙ | ︙ | |||
239 240 241 242 243 244 245 246 | | -idle_heartbeat | ms | | | -headers_only | boolean | | | -deliver_subject | string | | | -deliver_group | string | | | -inactive_threshold | ms | | | -num_replicas | int | | | -mem_storage | boolean | | | > < | > > < < < < | < < | < < > | < > > > > | | > > > | > | | | | > > > < > > > > > > | | < > | | < < | > > > > > > > > > > > > | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | > > > > > > > > > > > | 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 | | -idle_heartbeat | ms | | | -headers_only | boolean | | | -deliver_subject | string | | | -deliver_group | string | | | -inactive_threshold | ms | | | -num_replicas | int | | | -mem_storage | boolean | | | -metadata | dict | | Returns a JetStream reply (same as `consumer_info`). ### js update_consumer *stream* ?-option *value*?.. Updates the consumer configuration with new options[^5]. Arguments and the return value are the same as in `add_consumer`. ### js add_pull_consumer *stream consumer ?args?* A shortcut for `add_consumer` to create a durable pull consumer. Rest of `args` are the same as above. ### js add_push_consumer *stream consumer deliver_subject ?args?* A shortcut for `add_consumer` to create a durable push consumer. Rest of `args` are the same as above. ### js add_consumer_from_json *stream consumer json_config* Creates or updates a `consumer` defined on a `stream` with configuration specified as JSON. ### js delete_consumer *stream consumer* Deletes the consumer. ### js consumer_info *stream consumer* Returns consumer information as a dict. ### js consumer_names *stream* Returns a list of all consumers defined on this stream. ### js ordered_consumer *stream ?args?* Creates an [ordered](https://docs.nats.io/using-nats/developer/develop_jetstream/consumers#python) ephemeral push consumer on a `stream` and returns a new object [nats::ordered_consumer](#natsordered_consumer). You can provide the following options that have the same meaning as in `add_consumer`: - `-description string` - `-headers_only bool` default false - `-deliver_policy policy` default `all` - `-idle_heartbeat ms` default 5000 - `-filter_subject subject` - `-callback cmdPrefix` (mandatory) Whenever a message arrives, the command prefix `cmdPrefix` will be invoked from the event loop. It must have the following signature:<br/> **cmdPrefix** *message*<br/> `message` is delivered as a dict to be used with the `nats::msg` ensemble. Since ordered consumers always have `-ack_policy none`, you don't need to `ack` the message. ### js stream_msg_get *stream* ?-last_by_subj *subj*? ?-next_by_subj *subj*? ?-seq *int*? Returns a message from `stream` using the [STREAM.MSG.GET](https://docs.nats.io/reference/reference-protocols/nats_api_reference#fetching-from-a-stream-by-sequence) JS API. The following combinations of options are possible: - sequence number - last by subject - next by subject (assumes sequence = 0) - next by subject + sequence This API guarantees read-after-write coherency but may be slower than "Direct Get" in a clustered setup. ### js stream_direct_get *stream* ?-last_by_subj *subj*? ?-next_by_subj *subj*? ?-seq *int*? Returns a message from `stream` using the [DIRECT.GET](https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-31.md) JS API. All options have the same meaning as for [stream_msg_get](#js-stream_msg_get-stream--last_by_subj-subj--next_by_subj-subj--seq-int). This method performs better than `stream_msg_get` if the stream has replicas or mirrors, but does not guarantee read-after-write coherency. The stream must be configured with `-allow_direct true` and/or `-mirror_direct true` respectively. ### js stream_msg_delete *stream* -seq *int* ?-no_erase *bool*? Deletes a message from `stream` with the given `sequence` number. `-no_erase` is true by default. Set it to false if NATS should overwrite the message with random data, like `SecureDeleteMsg` in nats.go. ### js account_info Returns a dict with information about the current account, e.g. used storage, number of streams, consumers, various limits etc. ### js api_prefix Returns the API prefix used for requests to JetStream API. It is based on the `-domain` and `-api_prefix` options passed to [$connection jet_stream](CoreAPI.md#objectname-jet_stream-args). Default is "$JS.API". ### js timeout Returns the timeout for all JetStream calls. ### js destroy TclOO destructor. ## `nats::ordered_consumer` Implements Ordered Consumer. Do not create it directly - instead, call the [ordered_consumer](JsAPI.md#js-ordered_consumer-stream-args) method of your `nats::jet_stream`. The ordered consumer handles idle heartbeats and flow control, and guarantees to deliver messages in the order of `consumer_seq` with no gaps. If any problem happens e.g.: - the push consumer is deleted or lost due to NATS restart - the connection to NATS is lost and the client reconnects to another server in the cluster - a message is lost - no idle heartbeats arrive for longer than 3*idle_heartbeat ms the consumer object will reset and recreate the push consumer requesting messages starting from the last known message using the `-opt_start_seq` option. Such events are logged as warnings to the connection's logger and also signalled using the "public" variable `last_error`. Ordered consumers are explained in detail in [ADR-17](https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-17.md). ### Error handling `nats::ordered_consumer` reports asynchronous errors via `last_error` member variable, just like `nats::connection`: | Async Errors | Reason | Retry | | ------------- |--------|------| | ErrConsumerNotActive | Consumer received no heartbeats|yes| | ErrConsumerSequenceMismatch | Consumer received a message with unexpected `consumer_seq`|yes| |ErrTimeout| Request to recreate the push consumer timed out|yes| | ErrStreamNotFound | The stream was deleted |no| | ErrConnectionClosed| Connection to NATS was closed or lost |no| Most errors are considered transient and lead to the consumer reset, starting from the last known message. It happens automatically in background. However, it is not possible to recover from `ErrStreamNotFound` and `ErrConnectionClosed`, so in case of these errors the consumer will stop. ### consumer info Returns the cached consumer info, same as `$js consumer_info`. ### consumer name Returns the auto-generated consumer name, like `QWGBg8xp`. It is a shortcut for `dict get [$consumer info] name`. While consumer reset is in progress, `name` is an empty string. ### consumer destroy Unsubscribes from messages and destroys the object. NATS server will delete the push consumer after InactiveThreshold=2s. ## Namespace Commands ### nats::metadata *message* Returns a dict with metadata of the message that is extracted from the reply-to field. The dict has these fields: - stream - consumer - num_delivered - stream_seq - consumer_seq - timestamp (ms) - num_pending Note that when a message is received using `stream_msg_get`, this metadata is not available. Instead, you can get the stream sequence number and the timestamp using the `nats::msg` ensemble. ### nats::make_stream_source ?-option *value*?.. Returns a stream source configuration formatted as JSON to be used with `-mirror` and `-sources` arguments to [add_stream](#js-add_stream-stream--option-value). You can provide the following options: - `-name originStreamName` (required) - `-opt_start_seq int` - `-opt_start_time string` formatted as ISO time - `-filter_subject subject` - `-subject_transforms` - list of subject transforms If the source stream is in another JetStream domain or account, you need two more options: - `-api APIPrefix` (required) - the subject prefix that imports the other account/domain - `-deliver deliverySubject` (optional) - the delivery subject to use for the push consumer Example of creating a stream sourcing messages from 2 other streams `SOURCE_STREAM_1` and `SOURCE_STREAM_1` that are located in the `hub` domain: ```Tcl set source1 [nats::make_stream_source -name SOURCE_STREAM_1 -api "\$JS.hub.API"] set source2 [nats::make_stream_source -name SOURCE_STREAM_2 -api "\$JS.hub.API"] $js add_stream AGGREGATE_STREAM -sources [list $source1 $source2] ``` More details can be found in the official docs about [NATS replication](https://docs.nats.io/running-a-nats-service/nats_admin/jetstream_admin/replication). ### nats::make_subject_transform ?-option *value*?.. Returns a [subject transform](https://docs.nats.io/nats-concepts/subject_mapping) configuration formatted as JSON to be used with `-subject_transform` option in `add_stream` and `nats::make_stream_source`. You *must* provide the following options: - `-src string` - `-dest string` Example: ```Tcl set t1 [nats::make_subject_transform -src foo.* -dest "foo2.{{wildcard(1)}}"] set t2 [nats::make_subject_transform -src bar.* -dest "bar2.{{wildcard(1)}}"] set sourceConfig [nats::make_stream_source -name SOURCE_STREAM -subject_transforms [list $t1 $t2]] $js add_stream AGGREGATE -sources [list $sourceConfig] ``` Note that for plural options like `-subject_transforms` and `-sources` you *need* to use `[list]` even if it has only one element. See also [ADR-36](https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-36.md). ### nats::make_republish ?-option *value*?.. Returns a [RePublish](https://docs.nats.io/nats-concepts/jetstream/streams#republish) configuration formatted as JSON to be used with `-republish` option in [add_stream](#js-add_stream-stream--option-value). You can provide the following options: - `-src string` (required) - `-dest string` (required) - `-headers_only bool` default false # Error handling in JetStream and Key/Value Store In addition to all [core NATS errors](CoreAPI.md#error-handling), the `jet_stream` and `key_value` classes may throw these errors: | Error | JS Error Code | Reason | | ------------- |--------|--------| | ErrJetStreamNotEnabled | | JetStream is not enabled in the NATS server | | ErrJetStreamNotEnabledForAccount | 503/10039 | JetStream is not enabled for this account | | ErrWrongLastSequence | 400/10071 | <ul><li>JS publish with the header Nats-Expected-Last-Subject-Sequence failed</li><li>KV `update` failed due to revision mismatch</li></ul> | | ErrStreamNotFound | 404/10059 | Stream does not exist | | ErrConsumerNotFound | 404/10014| Consumer does not exist | | ErrBucketNotFound | from ErrStreamNotFound | Bucket does not exist | | ErrMsgNotFound | 404/10037 | Message not found in a stream | | ErrKeyNotFound | from ErrMsgNotFound | Key not found in a bucket | | ErrJSResponse | any| Other JetStream error. `code` and `err_code` is passed in the Tcl error code and `description` is used for the error message. | | ErrNotJSMessage | N/A | Thrown by [nats::metadata](#natsmetadata-message) when a message didn't come from a consumer | See also "Error Response" in [ADR-1](https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-1.md#error-response). [^1]: You can specify the `-expires` option explicitly (ms), but this is an advanced use case and normally should not be needed. [^2]: Throwing `ErrTimeout` might seem counter-intuitive and inconvenient, since users need to check for an empty list *and* a timeout. However, this is consistent with nats.go JS API v1. [^3]: In principle, it is enough to pass only the new option-values, and the rest of configuration is left untouched. However, if your stream is configured with an option which is non-editable and not default (e.g. storage=memory), calling `update_stream` will result in a NATS error "stream configuration update can not change ... ". In such cases you need to get the current configuration first using [stream_info](#js-stream_info-stream), update the options and pass it to `update_stream`. [^4]: `-name` and `-durable_name` are mutually exclusive. Depending on this choice, the library will invoke either `$JS.API.CONSUMER.CREATE` (default `InactiveThreshold` is 5s) or `$JS.API.CONSUMER.DURABLE.CREATE` (default `InactiveThreshold` is unlimited). `-name` is supported only by NATS>=2.9. `CONSUMER.DURABLE.CREATE` is considered [legacy API](https://github.com/nats-io/nats.go/blob/main/js.go). [^5]: Prior to NATS 2.10, the same request could create a new consumer or update its configuration. This behaviour leads to potential race conditions and has been [fixed](https://github.com/nats-io/nats.go/pull/1379) in NATS 2.10 by adding a new field "action" to the JSON request. The Tcl library detects the server version and includes this field automatically. |
Name change from assets/nats2/KvAPI.md to assets/nats3/KvAPI.md.
1 2 | # Key-Value API | | > > | > > > > > > > > > | > > > > > | | > > | < < > | | | > | | | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 | # Key-Value API NATS server implements a Key-Value storage on top of JetStream streams. A specific KV bucket is accessed using the `nats::key_value` TclOO object. Do not create it directly - instead, call the [bind_kv_bucket](JsAPI.md#js-bind_kv_bucket-bucket) or [create_kv_bucket](JsAPI.md#js-create_kv_bucket-bucket--option-value) method of your `nats::jet_stream`. KV management functions are available in `nats::jet_stream`, but they are documented here for cohesion. Please refer to the [official documentation](https://docs.nats.io/nats-concepts/jetstream/key-value-store) for the description of general concepts and to [ADR-8](https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-8.md) for implementation details. # Synopsis ## Class `nats::jet_stream` [*js* bind_kv_bucket *bucket*](#js-bind_kv_bucket-bucket)<br/> [*js* create_kv_bucket *bucket* ?-option *value*?..](#js-create_kv_bucket-bucket--option-value)<br/> [*js* create_kv_aggregate *bucket writable origins* ?-option *value*?..](#js-create_kv_aggregate-bucket-writable-origins--option-value)<br/> [*js* create_kv_mirror *name origin* ?-option *value*?..](#js-create_kv_mirror-name-origin--option-value)<br/> [*js* delete_kv_bucket *bucket*](#js-delete_kv_bucket-bucket)<br/> [*js* kv_buckets](#js-kv_buckets)<br/> [*js* empty_kv_bucket *bucket*](#js-empty_kv_bucket-bucket)<br/> ## Class `nats::key_value` [*kv* get *key* ?-revision *int*?](#kv-get-key--revision-int)<br/> [*kv* get_value *key* ?-revision *int*?](#kv-get_value-key--revision-int)<br/> [*kv* put *key value*](#kv-put-key-value)<br/> [*kv* create *key value*](#kv-create-key-value)<br/> [*kv* update *key value revision*](#kv-update-key-value-revision)<br/> [*kv* delete *key ?revision?*](#kv-delete-key-revision)<br/> [*kv* purge *key*](#kv-purge-key)<br/> [*kv* revert *key revision*](#kv-revert-key-revision)<br/> [*kv* status](#kv-status)<br/> [*kv* history *key*](#kv-history-key)<br/> [*kv* keys](#kv-keys)<br/> [*kv* watch *key* ?-option *value*...?](#kv-watch-key--option-value)<br/> [*kv* destroy](#kv-destroy)<br/> ## Class `nats::kv_watcher` [*watcher* consumer](#watcher-consumer)<br/> [*watcher* destroy](#watcher-destroy)<br/> ## Namespace Commands [nats::make_kv_origin ?-option *value*?..](#natsmake_kv_origin--option-value) # Description The `key_value` object provides access to a specific KV bucket. A bucket is merely a JS `stream` that has some default options, and its name always starts with "KV_". And a key is, in fact, a (portion of) a subject that this stream listens to. Therefore, all KV operations are implemented in terms of standard JetStream operations such as `publish`, `stream_msg_get` etc. They all block in a (coroutine-aware) `vwait` with the same timeout as in the parent `jet_stream`. [NATS by Example](https://natsbyexample.com/examples/kv/intro/go) provides a good overview of how KV buckets work on top of streams. The [naming rules](https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-6.md) of NATS subjects apply to keys as well, and keys can't start with "_kv". Keys *may* contain dots. You can access a KV bucket across JetStream domains and create KV mirrors as well. These concepts are explained in the chapters about [NATS Leaf Nodes](https://docs.nats.io/running-a-nats-service/configuration/leafnodes/jetstream_leafnodes) and [Stream Replication](https://docs.nats.io/running-a-nats-service/nats_admin/jetstream_admin/replication). ## Entry A KV entry is a dict with the following fields: - `bucket` - a bucket name - `key` - a key - `value` - a value or an empty string if it is a DEL or PURGE entry - `revision` - revision number, starting with 1 (`seq` of the message in the underlying stream) - `created` - creation timestamp as milliseconds since the epoch - `delta` - distance from the latest revision, starting with 0. It is available only when using [watch](#kv-watch-key-args). - `operation` - one of `PUT`, `DEL` or `PURGE` ## Bucket status A bucket status is a dict with the following fields: - `bucket` - name - `bytes` - size of the bucket - `history` - number of history entries per key - `ttl` - for how long (ms) the bucket keeps values or 0 for unlimited time - `values` - total number of entries in the bucket including historical ones - `is_compressed` - if data is compressed on disk - `mirror_name` - name of the origin bucket (optional) - `mirror_domain` - JetStream domain of the origin bucket (optional) - `stream_config` - configuration of the backing stream - `stream_state` - state of the backing stream # Commands ## `nats::jet_stream` ### js bind_kv_bucket *bucket* This 'factory' method creates `nats::key_value` to access the `bucket`. ### js create_kv_bucket *bucket* ?-option *value*?.. Creates a Key-Value `bucket` with configuration specified as option-value pairs. | Option | Type | Default | | ------------- |--------|---------| | -description | string | | | -max_value_size | int | | | -history | int | 1 | | -ttl | ms | | | -max_bucket_size | int | | | -storage | one of: memory, file | file | | -num_replicas | int | 1 | | -compression | one of: none, s2 | | | -mirror_name[^1] | string | | | -mirror_domain | string| | | -metadata | dict | | Returns `nats::key_value` object. ### js create_kv_aggregate *bucket writable origins* ?-option *value*?.. Creates a KV aggregate that collects data from one or more origin buckets (possibly across JetStream domains). The data can be filtered based on keys. You *must* provide the following arguments: - `bucket string` - name of the aggregate - `writable bool` - if it is writable or read-only. A read-only aggregate has no ingest subject. - `origins list` - list of origins created with [make_kv_origin](#natsmake_kv_origin--option-value). Note that you need to use `[list]` even if you have only one origin. You can use all other options for a normal bucket as well, except `-mirror_name` and `-mirror_domain`. Returns `nats::key_value` object. ### js create_kv_mirror *name origin* ?-option *value*?.. Creates a read-only KV mirror of another bucket. Unlike a normal bucket or an aggregate, you can't bind to a mirror. Their main purpose is to scale `get` operations by replying to `DIRECT.GET` requests. You *must* provide the following arguments: - `name string` - name of the mirror[^2] - `origin dict` - a *single* origin created with [make_kv_origin](#natsmake_kv_origin--option-value). You can use all other options for a normal bucket as well, except `-mirror_name` and `-mirror_domain`. In order to delete a KV mirror, you need to use [delete_stream](JsAPI.md#js-delete_stream-stream) instead of `delete_kv_bucket`. ### js delete_kv_bucket *bucket* Deletes the bucket. ### js kv_buckets Returns a list of all Key-Value buckets. ### js empty_kv_bucket *bucket* Deletes all entries and history from the bucket without destroying the bucket itself. Note that it does **not** reset the bucket's revision counter. ## `nats::key_value` ### kv get *key* ?-revision *int*? Returns the latest entry for the `key` or the entry with the specified `revision`. Throws `ErrKeyNotFound` if the key doesn't exist or was deleted. `DIRECT.GET` request is used under the hood, if available. ### kv get_value *key* ?-revision *int*? A shorthand for `kv get` that returns only the value from the entry. ### kv put *key value* Puts the new `value` for the `key`. Returns the new revision number. |
︙ | ︙ | |||
75 76 77 78 79 80 81 82 83 84 | ### kv status Returns the status of the KV bucket as described above. ### kv history *key* Returns all historical entries for the `key`. A NATS wildcard pattern can be used as well, e.g. ">" to get all entries in the bucket. ### kv keys Returns all keys in the bucket. Throws `ErrKeyNotFound` if the bucket is empty. | > > | | > > > > > | | | | > > > | | | | > > | | < | > | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | > > > > > > > > > > | 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 | ### kv status Returns the status of the KV bucket as described above. ### kv history *key* Returns all historical entries for the `key`. A NATS wildcard pattern can be used as well, e.g. ">" to get all entries in the bucket. **NB!** Depending on the amount of received data and network throughput, this method may take longer than other KV operations. Since it inherits the timeout value from the parent JetStream, consider setting it to a generous value to avoid `ErrTimeout`. ### kv keys Returns all keys in the bucket. Throws `ErrKeyNotFound` if the bucket is empty. ### kv watch *key* ?-option *value*...? Starts watching the `key` (that can be a NATS wildcard) and returns a new object [nats::kv_watcher](#natskv_watcher). To watch the whole bucket, use: ```Tcl kv watch > ``` [Ordered consumer](JsAPI.md#js-ordered_consumer-stream-args) is used under the hood[^3]. KV entries can be delivered to a callback or to an array (or both): - `-callback cmdPrefix` - deliver **entries** to this callback. - `-values_array varName` - deliver **values** to this array. `varName` must be a namespace or global array variable. Usual namespace resolution rules apply, like for `trace`. At least one of `-callback` or `-values_array` must be provided. You can refine what is delivered using these options: - `-include_history bool` - deliver historical entries as well (default false). - `-meta_only bool` - deliver entries without values (default false). E.g. to watch for available keys. - `-ignore_deletes bool` - do not deliver DELETE and PURGE entries (default false). - `-updates_only bool` - deliver only updates (default false). The underlying `nats::ordered_consumer` can be configured with these options: - `-idle_heartbeat ms` If you opt for the **callback** option, it will be invoked from the event loop with the following signature: **cmdPrefix** *entry* The callback is invoked in the following order, once for each entry: 1. Historical entries for all matching keys (only with `-include_history true`). 2. Current entries for all matching keys. 3. Then it is invoked once again with an empty `entry` to signal "end of current data". 4. When a key is updated, it is invoked with a new entry. With `-updates_only true`, the watcher starts with step #3. [^4] If you opt for the **array** option: 1. Current keys and values from the bucket are inserted into this array. 2. Afterwards, updates in the bucket are delivered as they happen. 3. If a key is deleted or purged from the bucket, and `-ignore_deletes false`, the corresponding key will be removed from the array as well. Thus, you effectively have a local cache of a whole KV bucket or its portion that is always up-to-date. Depending on your use case, this might be more efficient than querying the bucket with `$kv get`. The array can't be a local variable. ### kv destroy TclOO destructor. See also the note on [automatic destruction](CoreAPI.md#TclOO-Lifecycle). ## `nats::kv_watcher` ### watcher consumer Returns the internal `nats::ordered_consumer` object (for advanced use cases, e.g. tracking its `$last_error`). ### watcher destroy Stops watching and destroys the object. ## Namespace Commands ### nats::make_kv_origin ?-option *value*?.. Returns a KV origin configuration to be used with [create_kv_aggregate](#js-create_kv_aggregate-bucket-writable-origins--option-value) and [create_kv_mirror](#js-create_kv_mirror-name-origin--option-value). You can provide the following options: - `-bucket str` (required) - the origin bucket - `-stream str` - in case the origin is not an actual bucket, but a mirror, you need to pass the stream/mirror name as well [^5] - `-keys list` - optional filter If the origin bucket is in another JetStream domain: - `-domain str` - domain name If the origin bucket is in another account: - `-api APIPrefix` - the subject prefix that imports the other account In both cases you can also specify: - `-deliver deliverySubject` (optional) - the delivery subject to use for the push consumer/KV watcher *Note*: `-domain` is a shorthand option equivalent to `-api "\$JS.$domain.API"` Example of creating a writable KV aggregate sourcing a subset `new.>` of keys/values from another bucket `HUB_BUCKET` located in the `hub` domain: ```Tcl set origin [nats::make_kv_origin -bucket HUB_BUCKET -keys new.> -domain hub] set kv [$js create_kv_aggregate LEAF_KV true [list $origin] -description "writable filtered KV aggregate"] ``` See also [nats::make_stream_source](JsApi.md#natsmake_stream_source--option-value). # Error handling KV-specific errors are listed in JsAPI.md [^1]: `-mirror_name` and `-mirror_domain` are deprecated per ADR-8 API v1.1 in favour of KV aggregates and mirrors based on subject transforms. Subject transforms are available in NATS server starting from [version 2.10](https://docs.nats.io/release-notes/whats_new/whats_new_210). [^2]: while for normal KV buckets and aggregates the name of the underlying stream always starts with "KV_", this is not the case for KV mirrors. [^3]: with all relevant implications, e.g. if the consumer [stops](JsAPI.md#error-handling), the KV watcher stops as well with no additional error reporting. [^4]: nats.go deviates from ADR-8 and does *not* send the End Of Initial Data marker. [^5]: I think, `type KVAggregateOrigin` in ADR-8 is a bit confusing in specifying `Stream` as required and `Bucket` as optional. So, users always need to pass the underlying stream name starting with "KV_", which is not very convenient. So, in the Tcl client it is other way round: `Bucket` is required and `Stream` is optional. |
Name change from assets/nats2/LICENSE to assets/nats3/LICENSE.
︙ | ︙ |
Name change from assets/nats2/README.md to assets/nats3/README.md.
︙ | ︙ | |||
42 43 44 45 46 47 48 49 50 51 | - Publish and receive messages, also with headers (NATS version 2.2+) - Synchronous and asynchronous requests (optimized: under the hood a single wildcard subscription is used for all requests) - Queue groups - Gather multiple responses to a request - Publishing and consuming messages from JetStream, providing "at least once" or "exactly once" delivery guarantee - Management of JetStream streams, consumers and Key-Value buckets - Key-Value store, including watchers - Standard `configure` method with many options - Protected connections using TLS - Automatic reconnection in case of network or server failure | > < | | > > > > > > > > > > > > > > > > > > > | > | 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 | - Publish and receive messages, also with headers (NATS version 2.2+) - Synchronous and asynchronous requests (optimized: under the hood a single wildcard subscription is used for all requests) - Queue groups - Gather multiple responses to a request - Publishing and consuming messages from JetStream, providing "at least once" or "exactly once" delivery guarantee - Management of JetStream streams, consumers and Key-Value buckets - Key-Value store, including watchers - Hub/leaf JetStream topology - Standard `configure` method with many options - Protected connections using TLS - Automatic reconnection in case of network or server failure - Authentication with a login+password, an authentication token or a TLS certificate - Cluster support (including receiving additional server addresses from INFO messages) - Configurable logging, compatible with the [logger](https://core.tcl-lang.org/tcllib/doc/trunk/embedded/md/tcllib/files/modules/log/logger.md) package - (Windows-specific) If the [iocp package](https://iocp.magicsplat.com/) is available, the client will use it for better TCP socket performance - Extensive test suite with 230+ unit tests, checking nominal use cases, error handling, timings and the wire protocol ensures that the Tcl client behaves in line with official NATS clients ## Examples Look into the [examples](examples) folder. ## Implemented ADRs ADRs (architecture decision records) provide a *single source of truth* that all NATS clients should follow. Thus, they have been immensely helpful in understanding the NATS design and rationale behind it. The Tcl client follows these ADRs: | ADR | Title | Comment | | ----- |--------|--------| | [ADR-1](https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-1.md) | JetStream JSON API Design | | | [ADR-4](https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-4.md) | NATS Message Headers | | | [ADR-6](https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-6.md) | Naming Rules | | | [ADR-8](https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-8.md) | JetStream based Key-Value Stores |Including Oct 2023 updates with API v1.1| | [ADR-9](https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-9.md) | JetStream Consumer Idle Heartbeats | | | [ADR-10](https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-10.md) | JetStream Extended Purge | | | [ADR-13](https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-13.md) | Pull Subscribe internals | | | [ADR-15](https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-15.md) | JetStream Subscribe Workflow | | | [ADR-17](https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-17.md) | Ordered Consumer | | | [ADR-19](https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-19.md) | API prefixes for materialized JetStream views | | | [ADR-31](https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-31.md) | JetStream Direct Get | | | [ADR-33](https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-33.md) | Metadata for Stream and Consumer | | | [ADR-36](https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-36.md) | Subject Mapping Transforms in Streams | | API documentation in this repository focuses on specific details of the Tcl implementation. Users are expected to have sufficient understanding of NATS in general, so other information is provided through links to ADRs and other external docs. ## Missing features (in comparison to official NATS clients) - Authentication with [NKey](https://docs.nats.io/running-a-nats-service/configuration/securing_nats/auth_intro/nkey_auth) & JWT. - WebSocket is not supported. The only available transport is TCP. - [Object Store](https://docs.nats.io/nats-concepts/jetstream/obj_store) |
Name change from assets/nats2/jet_stream.tcl to assets/nats3/jet_stream.tcl.
1 2 3 4 5 6 7 | # Copyright (c) 2021-2023 Petro Kazmirchuk https://github.com/Kazmirchuk # Copyright (c) 2021 ANT Solutions https://antsolutions.eu/ # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. oo::class create ::nats::SyncPullRequest { | > > > | | | | | | | | | > > > > > > > > > > | > | | | | > > > | | | | | | | | | | | | | > > > > > > > > | > | > > > | | | | | | | | < | | | > > > > > > | | | | > > > > > > > > > | > > | < < > > | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | | < > > | > | | > | > | < < < < | > > | > > > > > > > > | < > | | | | < < > > > > > | | | | | | | | > > > > > > | < | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 | # Copyright (c) 2021-2023 Petro Kazmirchuk https://github.com/Kazmirchuk # Copyright (c) 2021 ANT Solutions https://antsolutions.eu/ # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. # JetStream wire API Reference https://docs.nats.io/reference/reference-protocols/nats_api_reference # JetStream JSON API Design https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-1.md oo::class create ::nats::SyncPullRequest { variable Conn MsgList Status ID constructor {} { set MsgList [list] set Status running ;# one of: running, done, timeout } method run {conn subject msg timeout batch } { set Conn $conn set ID [$conn request $subject $msg -dictmsg true -timeout $timeout -max_msgs $batch -callback [nats::mymethod OnMsg]] try { while {1} { nats::_coroVwait [self namespace]::Status ;# wait for 1 message set msgCount [llength $MsgList] switch -- $Status { timeout { # check if the connection is lost/closed. # It could have been still possible to return messages received so far, but the user wouldn't be able to ACK them anyway, # so we discard them set connNs [info object namespace $Conn] if {[set ${connNs}::status] eq $nats::status_closed} { if {[set ${connNs}::last_error] eq ""} { throw {NATS ErrConnectionClosed} "Connection closed" } throw {NATS ErrTimeout} "Connection lost" } if {$msgCount > 0} { break ;# we've received at least some messages - return them } # it might seem strange to throw ErrTimeout only in this case # but it is consistent with nats.go, see func TestPullSubscribeFetchWithHeartbeat throw {NATS ErrTimeout} "Sync pull request timeout, subject=$subject" } done { # we've received a status message, which means that the pull request is done if {$batch - $msgCount > 1} { # no need to cancel the request if this was the last expected message $Conn cancel_request $ID } break } default { if {$msgCount == $batch} { break } } } } return $MsgList } finally { my destroy } } method OnMsg {timedOut msg} { if {$timedOut} { # client-side timeout or connection lost; we may have received some messages before [info object namespace $Conn]::log::debug "Sync pull request $ID timed out" set Status timeout return } set msgStatus [nats::header lookup $msg Status ""] switch -- $msgStatus { 100 { return } 404 - 408 - 409 { [info object namespace $Conn]::log::debug "Sync pull request $ID got status message $msgStatus" set Status done } default { lappend MsgList $msg set Status running } } } } oo::class create ::nats::AsyncPullRequest { variable Conn Batch UserCb MsgCount ID constructor {cb} { set UserCb $cb set MsgCount 0 ;# only user's messages set ID 0 } method run {conn subject msg timeout batch} { set Conn $conn set Batch $batch set ID [$conn request $subject $msg -dictmsg true -timeout $timeout -max_msgs $batch -callback [nats::mymethod OnMsg]] return [self] } method OnMsg {timedOut msg} { if {$timedOut} { # client-side timeout or connection lost; we may have received some messages before [info object namespace $Conn]::log::debug "Async pull request $ID timed out" set invokeCb 1 set connNs [info object namespace $Conn] if {[set ${connNs}::status] eq $nats::status_closed} { if {[set ${connNs}::last_error] eq ""} { set invokeCb 0 ;# same as async requests } } if {$invokeCb} { after 0 [list {*}$UserCb 1 ""] } set ID 0 ;# the request has been already cancelled by OldStyleRequest my destroy return } set msgStatus [nats::header lookup $msg Status ""] switch -- $msgStatus { 100 { return } 404 - 408 - 409 { [info object namespace $Conn]::log::debug "Async pull request $ID got status message $msgStatus" after 0 [list {*}$UserCb 1 $msg] ;# just like with old-style requests, inform the user that the pull request timed out my destroy } default { incr MsgCount after 0 [list {*}$UserCb 0 $msg] if {$MsgCount == $Batch} { my destroy } } } } destructor { if {$Batch - $MsgCount <= 1 || $ID == 0} { return } $Conn cancel_request $ID } } oo::class create ::nats::jet_stream { variable Conn Timeout ApiPrefix Domain Trace ChildrenRef constructor {conn timeout api_prefix domain trace} { set Conn $conn set Timeout $timeout set Trace $trace array set ChildrenRef {} if {$api_prefix ne ""} { set ApiPrefix $api_prefix return } set Domain $domain if {$domain eq ""} { set ApiPrefix "\$JS.API" } else { set ApiPrefix "\$JS.$domain.API" } } destructor { $Conn releaseRef [self] foreach obj [array names ChildrenRef] { $obj destroy } } # internal method addRef {obj} { set ChildrenRef($obj) "" } method releaseRef {obj} { unset -nocomplain ChildrenRef($obj) } method api_prefix {} { return $ApiPrefix } method timeout {} { return $Timeout } # JetStream Direct Get https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-31.md method stream_direct_get {stream args} { set spec {last_by_subj valid_str null next_by_subj valid_str null seq int null} nats::_parse_args $args $spec if [info exists last_by_subj] { set reqSubj "$ApiPrefix.DIRECT.GET.$stream.$last_by_subj" set reqMsg "" } else { set reqSubj "$ApiPrefix.DIRECT.GET.$stream" set reqMsg [nats::_local2json $spec] } # ApiRequest assumes that the reply is always JSON which is not the case for DIRECT.GET if {$Trace} { [info object namespace $Conn]::log::debug ">>> $reqSubj $reqMsg" } set msg [$Conn request $reqSubj $reqMsg -timeout $Timeout -dictmsg 1] if {$Trace} { [info object namespace $Conn]::log::debug "<<< $reqSubj $msg" } set status [nats::header lookup $msg Status 0] switch -- $status { 404 { throw {NATS ErrMsgNotFound} "no message found" } 408 { throw {NATS ErrInvalidArg} "Invalid request" } } dict set msg seq [nats::header get $msg Nats-Sequence] dict set msg time [nats::header get $msg Nats-Time-Stamp] dict set msg subject [nats::header get $msg Nats-Subject] foreach h {Nats-Sequence Nats-Time-Stamp Nats-Subject Nats-Stream} { nats::header delete msg $h } return $msg } # nats schema info --yaml io.nats.jetstream.api.v1.stream_msg_get_request # nats schema info --yaml io.nats.jetstream.api.v1.stream_msg_get_response # https://docs.nats.io/reference/reference-protocols/nats_api_reference#fetching-from-a-stream-by-sequence method stream_msg_get {stream args} { set spec {last_by_subj valid_str null next_by_subj valid_str null seq int null} set response [my ApiRequest "STREAM.MSG.GET.$stream" [nats::_dict2json $spec $args]] set encoded_msg [dict get $response message] set data [binary decode base64 [dict lookup $encoded_msg data]] if {[$Conn cget -utf8_convert]} { # ofc method MSG has "convertfrom" as well, but it has no effect on base64 data, so we need to call "convertfrom" again set data [encoding convertfrom utf-8 $data] } set msg [nats::msg create [dict get $encoded_msg subject] -data $data] dict set msg seq [dict get $encoded_msg seq] dict set msg time [dict get $encoded_msg time] set header [binary decode base64 [dict lookup $encoded_msg hdrs]] if {$header ne ""} { dict set msg header [nats::_parse_header $header] } return $msg } # nats schema info --yaml io.nats.jetstream.api.v1.stream_msg_delete_request # nats schema info --yaml io.nats.jetstream.api.v1.stream_msg_delete_response method stream_msg_delete {stream args} { set spec {no_erase bool true seq int NATS_TCL_REQUIRED} set response [my ApiRequest "STREAM.MSG.DELETE.$stream" [nats::_dict2json $spec $args]] return [dict get $response success] } # for backwards compatibility; do not confuse with the new "consume" algorithm from JetStream client API 2.0 method consume {args} { return [my fetch {*}$args] } # Pull Subscribe internals https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-13.md # JetStream Subscribe Workflow https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-15.md # nats schema info --yaml io.nats.jetstream.api.v1.consumer_getnext_request method fetch {stream consumer args} { if {![my CheckFilenameSafe $stream]} { throw {NATS ErrInvalidArg} "Invalid stream name $stream" } if {![my CheckFilenameSafe $consumer]} { throw {NATS ErrInvalidArg} "Invalid consumer name $consumer" } set subject "$ApiPrefix.CONSUMER.MSG.NEXT.$stream.$consumer" nats::_parse_args $args { timeout timeout null batch_size pos_int 1 expires timeout null callback valid_str "" } # timeout specifies the client-side timeout; if not given, this is a no_wait fetch # expires specifies the server-side timeout (undocumented option only for testing) if {[info exists timeout]} { set no_wait false if {![info exists expires]} { set expires [expr {$timeout >= 20 ? $timeout - 10 : $timeout}] ;# same as in nats.go v1, see func (sub *Subscription) Fetch # but in JS v2 they've changed default expires to 30s } } else { if {[info exists expires]} { throw {NATS ErrInvalidArg} "-expires requires -timeout" } set no_wait true set timeout $Timeout } # implementation in official clients is overly complex and is done in 2 steps: # 1. a no_wait fetch # 2. followed by a long fetch # and they have a special optimized case for batch=1. # I don't see a need for such intricacies in this client set json_spec { expires ns null batch int null no_wait bool null } set batch $batch_size # if there are no messages at all, and I send a no_wait request, I get back 404 # if there are no messages, and I send no_wait=false, I get 408 after the request expires # if there are some messages, I get them followed by 408 # if there are all needed messages, there's no additional status message # both classes self-destruct, when the pull request is done if {$callback eq ""} { set req [nats::SyncPullRequest new] } else { set req [nats::AsyncPullRequest new $callback] } set msg [nats::_local2json $json_spec] if {$Trace} { [info object namespace $Conn]::log::debug ">>> $subject $msg" } try { return [$req run $Conn $subject $msg $timeout $batch] } trap {NATS ErrTimeout} {err errOpts} { if {$err ne "Connection lost"} { # only for sync fetches: # probably wrong stream/consumer - see also https://github.com/nats-io/nats-server/issues/2107 # raise a more meaningful ErrConsumerNotFound/ErrStreamNotFound/ErrJetStreamNotEnabled my consumer_info $stream $consumer } # if consumer_info doesn't throw, rethrow the original error return -options $errOpts $err } } method cancel_pull_request {fetchID} { if {![info object isa object $fetchID]} { throw {NATS ErrInvalidArg} "Invalid fetch ID $fetchID" } $fetchID destroy } # different types of ACKs: https://docs.nats.io/using-nats/developer/develop_jetstream/consumers#delivery-reliability method ack {message} { $Conn publish [nats::msg reply $message] "" } method ack_sync {message} { $Conn request [nats::msg reply $message] "" -timeout $Timeout } method nak {message args} { nats::_parse_args $args { delay timeout null } set nack_msg "-NAK" if {[info exists delay]} { append nack_msg " [nats::_local2json {delay ns null}]" } $Conn publish [nats::msg reply $message] $nack_msg } method term {message} { $Conn publish [nats::msg reply $message] "+TERM" } method in_progress {message} { $Conn publish [nats::msg reply $message] "+WPI" } # nats schema info --yaml io.nats.jetstream.api.v1.pub_ack_response method publish {subject message args} { set msg [nats::msg create $subject -data $message] return [my publish_msg $msg {*}$args] } method publish_msg {msg args} { nats::_parse_args $args { timeout timeout null callback valid_str "" stream valid_str "" } if {![info exists timeout]} { set timeout $Timeout } if {$stream ne ""} { nats::header set msg Nats-Expected-Stream $stream } if {$callback ne ""} { return [$Conn request_msg $msg -callback [nats::mymethod PublishCallback $callback] -timeout $timeout -dictmsg false] } set response [json::json2dict [$Conn request_msg $msg -timeout $timeout -dictmsg false]] nats::_checkJsError $response return $response ;# fields: stream,seq,duplicate } method add_consumer {stream args} { my AddUpdateConsumer $stream create {*}$args } method update_consumer {stream args} { my AddUpdateConsumer $stream update {*}$args } # nats schema info --yaml io.nats.jetstream.api.v1.consumer_create_request # nats schema info --yaml io.nats.jetstream.api.v1.consumer_create_response method AddUpdateConsumer {stream action args} { set spec {name valid_str null durable_name valid_str null description valid_str null deliver_policy {enum all last new by_start_sequence by_start_time last_per_subject} all opt_start_seq int null opt_start_time valid_str null ack_policy {enum none all explicit} explicit |
︙ | ︙ | |||
313 314 315 316 317 318 319 | flow_control bool null idle_heartbeat ns null headers_only bool null deliver_subject valid_str null deliver_group valid_str null inactive_threshold ns null num_replicas int null | | > > > | > | | > > > > > | | | 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 | flow_control bool null idle_heartbeat ns null headers_only bool null deliver_subject valid_str null deliver_group valid_str null inactive_threshold ns null num_replicas int null mem_storage bool null metadata metadata null} nats::_parse_args $args $spec if {[info exists name]} { if {![my CheckFilenameSafe $name]} { throw {NATS ErrInvalidArg} "Invalid consumer name $name" } if {[info exists durable_name]} { throw {NATS ErrInvalidArg} "-name conflicts with -durable_name" } } # see JetStreamManager.add_consumer in nats.py set version_cmp [package vcompare 2.9 [dict get [$Conn server_info] version]] set check_subj true if {($version_cmp < 1) && [info exists name]} { if {[info exists filter_subject] && $filter_subject ne ">"} { set subject "CONSUMER.CREATE.$stream.$name.$filter_subject" set check_subj false ;# if filter_subject has * or >, it can't pass the check in CheckSubject } else { set subject "CONSUMER.CREATE.$stream.$name" } } elseif {[info exists durable_name]} { set subject "CONSUMER.DURABLE.CREATE.$stream.$durable_name" } else { set subject "CONSUMER.CREATE.$stream" } set jsonDict [dict create stream_name [json::write string $stream] config [nats::_local2json $spec]] set version_cmp [package vcompare 2.10 [dict get [$Conn server_info] version]] if {$version_cmp < 1} { # seems like older NATS servers ignore "action", but let's be safe and send it only if NATS version >= 2.10 dict set jsonDict action [json::write string $action] } set response [my ApiRequest $subject [json::write object {*}$jsonDict] $check_subj] set result_config [dict get $response config] nats::_ns2ms result_config ack_wait idle_heartbeat inactive_threshold dict set response config $result_config return $response } method add_pull_consumer {stream consumer args} { set config $args dict set config durable_name $consumer return [my add_consumer $stream {*}$config] } method add_push_consumer {stream consumer deliver_subject args} { dict set args durable_name $consumer dict set args deliver_subject $deliver_subject return [my add_consumer $stream {*}$args] } method add_consumer_from_json {stream consumer json_config} { set msg [json::write object stream_name [json::write string $stream] config $json_config] set json_response [$Conn request "$ApiPrefix.CONSUMER.DURABLE.CREATE.$stream.$consumer" $msg -timeout $Timeout -dictmsg false] set dict_response [json::json2dict $json_response] nats::_checkJsError $dict_response return $json_response } # no request body # nats schema info --yaml io.nats.jetstream.api.v1.consumer_delete_response |
︙ | ︙ | |||
390 391 392 393 394 395 396 397 398 | # the schema suggests possibility to filter by subject, but it doesn't work! # nats schema info --yaml io.nats.jetstream.api.v1.consumer_names_response method consumer_names {stream} { set response [my ApiRequest "CONSUMER.NAMES.$stream" ""] return [dict get $response consumers] } # nats schema info --yaml io.nats.jetstream.api.v1.stream_create_request # nats schema info --yaml io.nats.jetstream.api.v1.stream_create_response | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | > > > | | | > > > > > > | < | < < | | | | | 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 | # the schema suggests possibility to filter by subject, but it doesn't work! # nats schema info --yaml io.nats.jetstream.api.v1.consumer_names_response method consumer_names {stream} { set response [my ApiRequest "CONSUMER.NAMES.$stream" ""] return [dict get $response consumers] } method ordered_consumer {stream args} { if {![my CheckFilenameSafe $stream]} { throw {NATS ErrInvalidArg} "Invalid stream name $stream" } set spec {callback valid_str "" description valid_str null headers_only bool null deliver_policy str null idle_heartbeat ns 5000 filter_subject valid_str null post bool true} nats::_parse_args $args $spec set consumerConfig [nats::_local2dict $spec] # remove the args that do not belong to a consumer configuration dict unset consumerConfig callback dict unset consumerConfig post set ordConsumer [nats::ordered_consumer new $Conn [self] $stream $consumerConfig $callback $post] if {$post} { # if $post=false, the consumer is owned by a KV watcher set ChildrenRef($ordConsumer) "" } return $ordConsumer } method add_stream {stream args} { return [my AddUpdateStream $stream CREATE {*}$args] } method update_stream {stream args} { return [my AddUpdateStream $stream UPDATE {*}$args] } # nats schema info --yaml io.nats.jetstream.api.v1.stream_create_request # nats schema info --yaml io.nats.jetstream.api.v1.stream_create_response # nats schema info --yaml io.nats.jetstream.api.v1.stream_update_response method AddUpdateStream {stream action args} { if {![my CheckFilenameSafe $stream]} { throw {NATS ErrInvalidArg} "Invalid stream name $stream" } # follow the same order of fields as in nats.go/jetstream/stream_config.go set spec { name valid_str null description valid_str null subjects list null retention {enum limits interest workqueue} limits max_consumers int null max_msgs int null max_bytes int null discard {enum new old} null max_age ns null max_msgs_per_subject int null max_msg_size int null storage {enum memory file} file num_replicas int null no_ack bool null duplicate_window ns null mirror json null sources json_list null sealed bool null deny_delete bool null deny_purge bool null allow_rollup_hdrs bool null compression {enum none s2} null first_seq int null subject_transform json null republish json null allow_direct bool null mirror_direct bool null metadata metadata null} dict set args name $stream ;# required by NATS despite having it already in the subject # -subjects is normally also required unless we have -mirror or -sources # rely on NATS server to check it set response [my ApiRequest "STREAM.$action.$stream" [nats::_dict2json $spec $args]] # response fields: config, created (timestamp), state, did_create set result_config [dict get $response config] nats::_ns2ms result_config duplicate_window max_age dict set response config $result_config return $response } method add_stream_from_json {json_config} { set stream_name [dict get [json::json2dict $json_config] name] set json_response [$Conn request "$ApiPrefix.STREAM.CREATE.$stream_name" $json_config -timeout $Timeout -dictmsg false] set dict_response [json::json2dict $json_response] nats::_checkJsError $dict_response return $json_response } # no request body # nats schema info --yaml io.nats.jetstream.api.v1.stream_delete_response |
︙ | ︙ | |||
480 481 482 483 484 485 486 | # nats schema info --yaml io.nats.jetstream.api.v1.stream_names_request # nats schema info --yaml io.nats.jetstream.api.v1.stream_names_response method stream_names {args} { set spec {subject valid_str null} nats::_parse_args $args $spec set response [my ApiRequest "STREAM.NAMES" [nats::_local2json $spec]] if {[dict get $response total] == 0} { | < | > > > > > > > > > > > > > > > > > > > > > > > > > | | | | | | > | | > > > > < < < < < < | < < | > > > | > > | | > | > > > > > > > > > > > > > > > > > > > > > > > > | | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 | # nats schema info --yaml io.nats.jetstream.api.v1.stream_names_request # nats schema info --yaml io.nats.jetstream.api.v1.stream_names_response method stream_names {args} { set spec {subject valid_str null} nats::_parse_args $args $spec set response [my ApiRequest "STREAM.NAMES" [nats::_local2json $spec]] if {[dict get $response total] == 0} { return [list] } return [dict get $response streams] } method bind_kv_bucket {bucket} { my CheckBucketName $bucket set stream "KV_$bucket" try { set stream_info [my stream_info $stream] } trap {NATS ErrStreamNotFound} err { throw {NATS ErrBucketNotFound} "Bucket $bucket not found" } if {[dict get $stream_info config max_msgs_per_subject] < 1} { throw {NATS ErrBucketNotFound} "Bucket $bucket not found" } set keyValue [nats::key_value new $Conn [self] $Domain $bucket [dict get $stream_info config]] set ChildrenRef($keyValue) "" return $keyValue } method create_kv_bucket {bucket args} { return [my CreateBucket $bucket "" "" false {*}$args] } method create_kv_aggregate {bucket writable origins args} { if {[llength $origins] == 0} { throw {NATS ErrInvalidArg} "List of KV origins is required" } if {![string is boolean -strict $writable]} { throw {NATS ErrInvalidArg} "writable = $writable is not a boolean" } if {"-mirror_name" in $args || "-mirror_domain" in $args} { throw {NATS ErrInvalidArg} "-mirror_name and -mirror_domain are not allowed" } return [my CreateBucket $bucket $writable $origins false {*}$args] } method create_kv_mirror {name origin args} { if {"-mirror_name" in $args || "-mirror_domain" in $args} { throw {NATS ErrInvalidArg} "-mirror_name and -mirror_domain are not allowed" } my CreateBucket $name false $origin true {*}$args return } method CreateBucket {bucket writable origins is_mirror args} { my CheckBucketName $bucket set streamName "KV_$bucket" nats::_parse_args $args { description valid_str null max_value_size int null history pos_int 1 ttl pos_int null max_bucket_size pos_int null storage {enum memory file} file num_replicas int 1 compression {enum none s2} null mirror_name valid_str null mirror_domain valid_str null metadata metadata null } if {$history < 1 || $history > 64} { throw {NATS ErrInvalidArg} "History must be between 1 and 64" } set duplicate_window 120000 ;# 2 min if {[info exists ttl] && $ttl < $duplicate_window} { set duplicate_window $ttl } set stream_config [dict create \ allow_rollup_hdrs true \ deny_delete true \ discard new \ duplicate_window $duplicate_window \ deny_purge false \ max_msgs_per_subject $history \ allow_direct true] if {[info exists ttl]} { dict set stream_config max_age $ttl } if {[info exists max_value_size]} { dict set stream_config max_msg_size $max_value_size } if {[info exists max_bucket_size]} { dict set stream_config max_bytes $max_bucket_size } foreach opt {description storage num_replicas compression metadata} { if {[info exists $opt]} { dict set stream_config $opt [set $opt] } } # -mirror_name and -mirror_domain are deprecated; use create_kv_mirror instead if {[info exists mirror_name]} { set srcArgs [list -name "KV_$mirror_name"] if {[info exists mirror_domain]} { lappend srcArgs -api "\$JS.$mirror_domain.API" } dict set stream_config mirror [nats::make_stream_source {*}$srcArgs] dict set stream_config mirror_direct true } else { dict set stream_config subjects "\$KV.$bucket.>" } if {$is_mirror} { # this is a KV mirror, it can have only one origin, and you can't bind to it # can't check for [llength $origins] != 1 because it's a dict dict set stream_config mirror_direct true dict unset stream_config subjects if [string match "KV_*" $bucket] { throw {NATS ErrInvalidArg} "Mirror name must not be KV" ;# ensure users can't bind to it } set streamName $bucket dict set stream_config mirror [my OriginToMirror $origins] } else { if [llength $origins] { # this is a KV aggregate, it can have one or more origins, and you can bind to it foreach origin $origins { lappend streamSources [my OriginToSource $origin $bucket] } dict set stream_config sources $streamSources if {$writable} { dict set stream_config deny_delete false } else { dict unset stream_config subjects } } } set stream_info [my add_stream $streamName {*}$stream_config] set keyValue [nats::key_value new $Conn [self] $Domain $bucket [dict get $stream_info config]] set ChildrenRef($keyValue) "" return $keyValue } method OriginToSource {origin new_bucket} { dict with origin { if {![info exists stream]} { set stream "KV_$bucket" } if {![info exists keys] || [llength $keys] == 0} { lappend keys > } foreach key $keys { lappend transforms [nats::make_subject_transform -src "\$KV.$bucket.$key" -dest "\$KV.$new_bucket.$key"] } set srcArgs [list -name $stream -subject_transforms $transforms] if [info exists api] { lappend srcArgs -api $api if [info exists deliver] { lappend srcArgs -deliver $deliver } } nats::make_stream_source {*}$srcArgs ;# implicit return } } method OriginToMirror {origin} { dict with origin { set srcArgs [list -name "KV_$bucket"] if {[info exists keys] && [llength $keys]} { foreach key $keys { lappend transforms [nats::make_subject_transform -src "\$KV.$bucket.$key" -dest "\$KV.$bucket.$key"] } lappend srcArgs -subject_transforms $transforms } if [info exists api] { lappend srcArgs -api $api if [info exists deliver] { lappend srcArgs -deliver $deliver } } nats::make_stream_source {*}$srcArgs ;# implicit return } } method delete_kv_bucket {bucket} { my CheckBucketName $bucket set stream "KV_$bucket" try { return [my delete_stream $stream] |
︙ | ︙ | |||
581 582 583 584 585 586 587 588 589 590 591 592 593 594 | } } return $kv_list } method empty_kv_bucket {bucket} { return [my purge_stream "KV_$bucket"] } method CheckBucketName {bucket} { if {![regexp {^[a-zA-Z0-9_-]+$} $bucket]} { throw {NATS ErrInvalidArg} "Invalid bucket name $bucket" } } | > > > > | 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 | } } return $kv_list } method empty_kv_bucket {bucket} { return [my purge_stream "KV_$bucket"] } # nats schema info --yaml io.nats.jetstream.api.v1.account_info_response method account_info {} { return [my ApiRequest "INFO" ""] } method CheckBucketName {bucket} { if {![regexp {^[a-zA-Z0-9_-]+$} $bucket]} { throw {NATS ErrInvalidArg} "Invalid bucket name $bucket" } } |
︙ | ︙ | |||
614 615 616 617 618 619 620 | # https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-6.md # only the Unix variant, also no " [] {} method CheckFilenameSafe {str} { return [regexp -- {^[-[:alnum:]!#$%&()+,:;<=?@^_`|~]+$} $str] } method ApiRequest {subj msg {checkSubj true}} { try { | | | | | | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 | # https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-6.md # only the Unix variant, also no " [] {} method CheckFilenameSafe {str} { return [regexp -- {^[-[:alnum:]!#$%&()+,:;<=?@^_`|~]+$} $str] } method ApiRequest {subj msg {checkSubj true}} { try { if {$Trace} { [info object namespace $Conn]::log::debug ">>> $ApiPrefix.$subj $msg" } set replyJson [$Conn request "$ApiPrefix.$subj" $msg -timeout $Timeout -dictmsg false -check_subj $checkSubj] if {$Trace} { [info object namespace $Conn]::log::debug "<<< $ApiPrefix.$subj $replyJson" } set response [json::json2dict $replyJson] } trap {NATS ErrNoResponders} err { throw {NATS ErrJetStreamNotEnabled} "JetStream is not enabled in the server" } nats::_checkJsError $response dict unset response type ;# no-op if the key doesn't exist return $response } } # see ADR-15 and ADR-17 oo::class create ::nats::ordered_consumer { variable Conn Js Stream Config StreamSeq ConsumerSeq Name SubID UserCb Timer PostEvent RetryInterval ConsumerInfo # "public" variable variable last_error constructor {connection jet_stream streamName conf cb post} { set Conn $connection set Js $jet_stream set Stream $streamName set UserCb $cb set StreamSeq 0 set ConsumerSeq 0 set Name "" set SubID "" set Timer "" ;# used both for HB and reset retries set RetryInterval 10000 ;# same as in nats.go/jetstream/ordered.go set PostEvent $post set Config [dict replace $conf \ flow_control true \ ack_policy none \ max_deliver 1 \ ack_wait [expr {22 * 3600 * 1000}] \ num_replicas 1 \ mem_storage true \ inactive_threshold 2000] set last_error "" my Reset } destructor { after cancel $Timer if {$PostEvent} { $Js releaseRef [self] } if {$SubID eq ""} { return } try { $Conn unsubscribe $SubID ;# NATS will delete the ephemeral consumer } trap {NATS ErrConnectionClosed} err {} } method Reset {{errCode ""}} { set inbox "_INBOX.[nats::_random_string]" dict set Config deliver_subject $inbox if {$errCode ne ""} { dict set Config deliver_policy by_start_sequence set startSeq [expr {$StreamSeq + 1}] dict set Config opt_start_seq $startSeq my AsyncError $errCode "reset with opt_start_seq = $startSeq due to $errCode" } if {$errCode eq ""} { set ConsumerInfo [$Js add_consumer $Stream {*}$Config] ;# let any error propagate to the caller } else { # we are working in the background, so all errors must be reported via AsyncError try { set ConsumerInfo [$Js add_consumer $Stream {*}$Config] } trap {NATS ErrStreamNotFound} {err opts} { my AsyncError ErrStreamNotFound "stopped due to $err" return ;# can't recover from this } trap {NATS} {err opts} { # most likely ErrTimeout if we're reconnecting to NATS or ErrJetStreamNotEnabled if a JetStream cluster is electing a new leader my AsyncError [lindex [dict get $opts -errorcode] 1] "failed to reset: $err" if {[$Conn cget status] == $nats::status_closed} { my AsyncError ErrConnectionClosed "stopped" return } # default delay is 10s to avoid spamming the log with warnings my ScheduleReset $errCode $RetryInterval return } } set Config [dict get $ConsumerInfo config] set Name [dict get $ConsumerInfo name] set StreamSeq [dict get $ConsumerInfo delivered stream_seq] set ConsumerSeq [dict get $ConsumerInfo delivered consumer_seq] set SubID [$Conn subscribe $inbox -dictmsg true -callback [nats::mymethod OnMsg] -post false] my RestartHbTimer [info object namespace $Conn]::log::debug "Ordered consumer $Name subscribed to $Stream, stream_seq = $StreamSeq, filter = [dict get $Config filter_subject]" } method RestartHbTimer {} { after cancel $Timer # reset if we don't receive any message within interval*3 ms; works also if somebody deletes the consumer in NATS set Timer [after [expr {[dict get $Config idle_heartbeat] * 3}] [nats::mymethod OnMissingHb]] } method ScheduleReset {errCode {delay 0}} { set Name "" ;# make the name blank while reset in is progress after cancel $Timer ;# stop the HB timer if {$SubID ne ""} { $Conn unsubscribe $SubID ;# unsub immediately to avoid OnMsg being called again while reset is in progress set SubID "" } set Timer [after $delay [nats::mymethod Reset $errCode]] ;# due to -post=false we are inside the coroutine, so can't call reset directly } method OnMsg {subj msg reply} { my RestartHbTimer if {[nats::msg idle_heartbeat $msg]} { set flowControlReply [nats::header lookup $msg Nats-Consumer-Stalled ""] if {$flowControlReply ne ""} { $Conn publish $flowControlReply "" } set cseq [nats::header get $msg Nats-Last-Consumer] if {$cseq == $ConsumerSeq} { return } my ScheduleReset ErrConsumerSequenceMismatch return } elseif {[nats::msg flow_control $msg]} { $Js ack $msg return } set meta [nats::metadata $msg] set cseq [dict get $meta consumer_seq] if {$cseq != $ConsumerSeq + 1} { my ScheduleReset ErrConsumerSequenceMismatch return } incr ConsumerSeq set StreamSeq [dict get $meta stream_seq] if {$PostEvent} { after 0 [list {*}$UserCb $msg] } else { {*}$UserCb $msg ;# only for KV watchers } } method OnMissingHb {} { $Conn unsubscribe $SubID set SubID "" set Name "" my Reset ErrConsumerNotActive } method info {} { return $ConsumerInfo } method name {} { return $Name } method AsyncError {code msg} { set logMsg "Ordered consumer [self]: $msg" [info object namespace $Conn]::log::warn $logMsg set last_error [dict create code [list NATS $code] errorMessage $msg] } } # these clients have more specific JS errors # https://github.com/nats-io/nats.go/blob/main/jserrors.go # https://github.com/nats-io/nats.py/blob/main/nats/js/errors.py proc ::nats::_checkJsError {msg} { if {![dict exists $msg error]} { |
︙ | ︙ | |||
661 662 663 664 665 666 667 668 669 670 671 672 673 674 | throw {NATS ErrMsgNotFound} $errDescr } 10059 { throw {NATS ErrStreamNotFound} $errDescr } } } throw [list NATS ErrJSResponse [dict get $errDict code] [dict get $errDict err_code]] $errDescr } proc ::nats::_format_json {name val type} { set errMsg "Invalid value for the $type option $name : $val" switch -- $type { | > > > > > > > | 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 | throw {NATS ErrMsgNotFound} $errDescr } 10059 { throw {NATS ErrStreamNotFound} $errDescr } } } if {[dict get $errDict code] == 503} { switch -- [dict get $errDict err_code] { 10039 { throw {NATS ErrJetStreamNotEnabledForAccount} $errDescr } } } throw [list NATS ErrJSResponse [dict get $errDict code] [dict get $errDict err_code]] $errDescr } proc ::nats::_format_json {name val type} { set errMsg "Invalid value for the $type option $name : $val" switch -- $type { |
︙ | ︙ | |||
695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 | throw {NATS ErrInvalidArg} $errMsg } # assume list of strings return [json::write array {*}[lmap element $val { json::write string $element }]] } ns { # val must be in milliseconds return [expr {entier($val*1000*1000)}] } json { return $val } default { throw {NATS ErrInvalidArg} "Wrong type $type" ;# should not happen } } } proc ::nats::_format_enum {name val type} { | > > > > > > > > > > > > > > > > > > > | 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 | throw {NATS ErrInvalidArg} $errMsg } # assume list of strings return [json::write array {*}[lmap element $val { json::write string $element }]] } metadata { # see ADR-33 if {[dict size $val] == 0} { throw {NATS ErrInvalidArg} $errMsg } set formattedDict [dict map {k v} $val { if [string match "_nats*" $k] { throw {NATS ErrInvalidArg} "_nats is a reserved prefix" } json::write string $v }] return [json::write object {*}$formattedDict] } ns { # val must be in milliseconds return [expr {entier($val*1000*1000)}] } json { return $val } json_list { if {[llength $val] == 0} { throw {NATS ErrInvalidArg} $errMsg } return [json::write array {*}$val] } default { throw {NATS ErrInvalidArg} "Wrong type $type" ;# should not happen } } } proc ::nats::_format_enum {name val type} { |
︙ | ︙ | |||
752 753 754 755 756 757 758 | json::write indented false json::write aligned false return [json::write object {*}$json_dict] } else { return "" } } | > > > > > > > > > | > > > | 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 | json::write indented false json::write aligned false return [json::write object {*}$json_dict] } else { return "" } } proc ::nats::_local2dict {spec} { set result [dict create] foreach {name type def} $spec { try { set val [uplevel 1 [list set $name]] dict set result $name $val } trap {TCL READ VARNAME} {err errOpts} - \ trap {TCL LOOKUP VARNAME} {err errOpts} { # nothing to do } } return $result } proc ::nats::_dict2json {spec src} { if {[llength $src] % 2} { throw {NATS ErrInvalidArg} "Missing value for option [lindex $src end]" } if {[dict size $src] == 0} { return "" } |
︙ | ︙ | |||
799 800 801 802 803 804 805 | } } } # metadata is encoded in the reply field: # V1: $JS.ACK.<stream>.<consumer>.<delivered>.<sseq>.<cseq>.<time>.<pending> # V2: $JS.ACK.<domain>.<account hash>.<stream>.<consumer>.<delivered>.<sseq>.<cseq>.<time>.<pending>.<random token> | | | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 | } } } # metadata is encoded in the reply field: # V1: $JS.ACK.<stream>.<consumer>.<delivered>.<sseq>.<cseq>.<time>.<pending> # V2: $JS.ACK.<domain>.<account hash>.<stream>.<consumer>.<delivered>.<sseq>.<cseq>.<time>.<pending>.<random token> # NB! as of Dec 2023, V2 metadata is not implemented yet in NATS, see nats-server/server/consumer.go: const expectedNumReplyTokens = 9 proc ::nats::metadata {msg} { set mlist [split [dict get $msg reply] .] if {[llength $mlist] != 9} { throw {NATS ErrNotJSMessage} "Message with subject [dict get $msg subject] is not a JetStream message" } set mdict [dict create \ stream [lindex $mlist 2] \ consumer [lindex $mlist 3] \ num_delivered [lindex $mlist 4] \ stream_seq [lindex $mlist 5] \ consumer_seq [lindex $mlist 6] \ timestamp [lindex $mlist 7] \ num_pending [lindex $mlist 8]] nats::_ns2ms mdict timestamp return $mdict } proc ::nats::make_stream_source {args} { # the top-level JSON object StreamSource may have a nested ExternalStream object, so the easiest way is to break down the spec into 2 set streamSourceSpec { name valid_str NATS_TCL_REQUIRED opt_start_seq pos_int null opt_start_time valid_str null filter_subject valid_str null external json null subject_transforms json_list null} set externalStreamSpec { api valid_str null deliver valid_str null} nats::_parse_args $args [list {*}$streamSourceSpec {*}$externalStreamSpec] if [info exists api] { set external [nats::_local2json $externalStreamSpec] } return [nats::_local2json $streamSourceSpec] } proc ::nats::make_subject_transform {args} { set spec { src valid_str NATS_TCL_REQUIRED dest valid_str NATS_TCL_REQUIRED} return [nats::_dict2json $spec $args] } proc ::nats::make_republish {args} { set spec { src valid_str NATS_TCL_REQUIRED dest valid_str NATS_TCL_REQUIRED headers_only bool false} return [nats::_dict2json $spec $args] } |
Name change from assets/nats2/key_value.tcl to assets/nats3/key_value.tcl.
1 2 3 4 5 6 7 8 | # Copyright (c) 2023 Petro Kazmirchuk https://github.com/Kazmirchuk # Copyright (c) 2023 ANT Solutions https://antsolutions.eu/ # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. # based on https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-8.md oo::class create ::nats::key_value { | | | < | | | > > > > > | > > > | > > > > | | | < | > > | | > > | | > > > > > > > > > | | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 | # Copyright (c) 2023 Petro Kazmirchuk https://github.com/Kazmirchuk # Copyright (c) 2023 ANT Solutions https://antsolutions.eu/ # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. # based on https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-8.md oo::class create ::nats::key_value { variable Conn Js Bucket Stream ReadPrefix WritePrefix UseJsPrefix UseDirect ;# mirrored_bucket constructor {connection jet_stream domain bucket_name stream_config} { set Conn $connection set Js $jet_stream set Bucket $bucket_name set Stream "KV_$Bucket" # since keys work on top of subjects, using mirrors, JS domains or API import prefixes affects ReadPrefix and WritePrefix # see also nats.go, func mapStreamToKVS # ADR-19 https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-19.md # https://github.com/nats-io/nats-architecture-and-design/issues/167 # however, ADR-8 v1.1 deprecates this approach and uses subject transforms instead set ReadPrefix "\$KV.$Bucket" set WritePrefix $ReadPrefix set UseJsPrefix false if { [$Js api_prefix] ne "\$JS.API"} { set UseJsPrefix true } set UseDirect [dict get $stream_config allow_direct] if {[dict exists $stream_config mirror name]} { set originStream [dict get $stream_config mirror name] set originBucket [string range $originStream 3 end] ;# remove "KV_" from "KV_bucket_name" set WritePrefix "\$KV.$originBucket" if {[dict exists $stream_config mirror external api]} { set UseJsPrefix false set ReadPrefix "\$KV.$originBucket" set externalApi [dict get $stream_config mirror external api] set WritePrefix "$externalApi.\$KV.$originBucket" } } } destructor { $Js releaseRef [self] } method PublishToStream {key {value ""} {hdrs ""}} { my CheckKeyName $key if {$UseJsPrefix} { append subject "[$Js api_prefix]." } append subject "$WritePrefix.$key" set msg [nats::msg create $subject -data $value] if {$hdrs ne ""} { dict set msg header $hdrs } try { return [$Js publish_msg $msg] } trap {NATS ErrNoResponders} err { throw {NATS ErrBucketNotFound} "Bucket $Bucket not found" } } method get {key args} { my CheckKeyName $key nats::_parse_args $args { revision pos_int "" |
︙ | ︙ | |||
51 52 53 54 55 56 57 | } method get_value {key args} { dict get [my get $key {*}$args] value } method Get {key {revision ""}} { | | < < | | | | < < | | | > > | | | < < < | | | 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 | } method get_value {key args} { dict get [my get $key {*}$args] value } method Get {key {revision ""}} { set subject "$ReadPrefix.$key" if {$UseDirect} { set methodName stream_direct_get } else { set methodName stream_msg_get } try { if {$revision ne ""} { set msg [$Js $methodName $Stream -seq $revision] # not sure under what conditions this may happen, but nats.go does this check if {$subject ne [nats::msg subject $msg]} { throw {NATS ErrKeyNotFound} "Expected $subject, got [nats::msg subject $msg]" } } else { set msg [$Js $methodName $Stream -last_by_subj $subject] } } trap {NATS ErrMsgNotFound} err { throw {NATS ErrKeyNotFound} "Key $key not found in bucket $Bucket" } trap {NATS ErrStreamNotFound} err { # looks like a bug or design flaw in nats-server that only STREAM.MSG.GET can reply with ErrStreamNotFound; requests to DIRECT.GET simply time out # which is not a real problem because we get here only if somebody else deletes the bucket *after* we've bound to it throw {NATS ErrBucketNotFound} "Bucket $Bucket not found" } # we know the delta only when using the KV watcher; same in nats.go set entry [dict create \ bucket $Bucket \ key $key \ value [nats::msg data $msg] \ revision [nats::msg seq $msg] \ created [nats::isotime_to_msec [nats::msg timestamp $msg]] \ operation [nats::header lookup $msg KV-Operation PUT]] if {[dict get $entry operation] in {DEL PURGE}} { throw [list NATS ErrKeyDeleted [dict get $entry revision]] "Key $key was deleted or purged" } return $entry } method put {key value} { try { return [dict get [my PublishToStream $key $value] seq] } trap {NATS ErrNoResponders} err { throw {NATS ErrBucketNotFound} "Bucket $Bucket not found" } } method create {key value} { try { return [my update $key $value 0] } trap {NATS ErrWrongLastSequence} err { |
︙ | ︙ | |||
122 123 124 125 126 127 128 | # retry with a proper revision return [my update $key $value $revision] } } } method update {key value revision} { | | < | | < < | | | | < < | < | | | > > > > > > > | | < | > > | > > > | | > | | > | | > | | | > | > | | | | | > | | > | > | | < | < < | < < > | < < < < < < < < < < | > > > | | < | > | > | > | | | | | < < < | | | | < < | < < < < | < | | | | | | | | | | | < < | | | < | | > | > > > > > > > | > > > > > > > > > > > > > > > > > > > | 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 | # retry with a proper revision return [my update $key $value $revision] } } } method update {key value revision} { # nats.go Update doesn't use WritePrefix - seems like their bug set header [dict create Nats-Expected-Last-Subject-Sequence $revision] set resp [my PublishToStream $key $value $header] ;# throws ErrWrongLastSequence in case of mismatch return [dict get $resp seq] } method delete {key {revision ""}} { set header [dict create KV-Operation DEL] if {$revision ne "" && $revision > 0} { dict set header Nats-Expected-Last-Subject-Sequence $revision } my PublishToStream $key "" $header return } method purge {key} { my PublishToStream $key "" [dict create KV-Operation PURGE Nats-Rollup sub] return } method revert {key revision} { set entry [my get $key -revision $revision] return [my put $key [dict get $entry value]] } method status {} { try { set stream_info [$Js stream_info $Stream] } trap {NATS ErrStreamNotFound} err { throw {NATS BucketNotFound} "Bucket $Bucket not found" } return [my StreamInfoToKvInfo $stream_info] } method watch {key_pattern args} { set spec {callback str "" include_history bool false meta_only bool false ignore_deletes bool false updates_only bool false values_array str "" idle_heartbeat str null} nats::_parse_args $args $spec if {$include_history && $updates_only} { throw {NATS ErrInvalidArg} "-include_history conflicts with -updates_only" } set deliver_policy [expr {$include_history ? "all" : "last_per_subject"}] if {$updates_only} { set deliver_policy "new" } set filter_subject "$ReadPrefix.$key_pattern" set consumerOpts [list -description "KV watcher" -headers_only $meta_only -deliver_policy $deliver_policy -filter_subject $filter_subject] if [info exists idle_heartbeat] { lappend consumerOpts -idle_heartbeat $idle_heartbeat } set watcher [nats::kv_watcher new [self] $consumerOpts $callback $ignore_deletes $values_array] $Js addRef $watcher return $watcher } method keys {} { set w [my watch > -ignore_deletes 1 -meta_only 1] set ns [info object namespace $w] set result [${ns}::my Gather keys] $w destroy if {[llength $result] == 0} { throw {NATS ErrKeyNotFound} "No keys found in bucket $Bucket" ;# nats.go raises ErrNoKeysFound instead } return $result } method history {key} { set w [my watch $key -include_history 1] set ns [info object namespace $w] set result [${ns}::my Gather history] $w destroy if {[llength $result] == 0} { throw {NATS ErrKeyNotFound} "Key $key not found in bucket $Bucket" } return $result } method StreamInfoToKvInfo {stream_info} { set config [dict get $stream_info config] set isCompressed [expr {[dict lookup $config compression none] ne "none"}] set kv_info [dict create \ bucket $Bucket \ bytes [dict get $stream_info state bytes] \ history [dict get $config max_msgs_per_subject] \ ttl [dict get $config max_age] \ values [dict get $stream_info state messages] \ is_compressed $isCompressed] if {[dict exists $config mirror name]} { # strip the leading "KV_" dict set kv_info mirror_name [string range [dict get $config mirror name] 3 end] if {[dict exists $config mirror external api]} { # in format "$JS.some-domain.API" unless it is imported from another account set externalApi [split [dict get $config mirror external api] .] if {[llength $externalApi] == 3 && [lindex $externalApi 2] eq "API"} { dict set kv_info mirror_domain [lindex $externalApi 1] } } } # do it here so that underlying stream config will be at the end dict set kv_info stream_config [dict get $stream_info config] dict set kv_info stream_state [dict get $stream_info state] return $kv_info } method CheckKeyName {key} { if {[string index $key 0] eq "." || \ [string index $key end] eq "." || \ [string range $key 0 2] eq "_kv" || \ ![regexp {^[-/_=\.a-zA-Z0-9]+$} $key]} { throw {NATS ErrInvalidArg} "Invalid key name $key" } } } oo::class create ::nats::kv_watcher { # watcher options/vars variable SubID UserCb InitDone IgnoreDeletes Gathering ResultList ValuesArray Consumer # copied from the parent KV bucket, so that the user can destroy it while the watcher is living variable Conn Bucket PrefixLen Js constructor {kv consumer_opts cb ignore_del arr} { set InitDone false ;# becomes true after the current/historical data has been received set Gathering "" set kvNS [info object namespace $kv] set Conn [set ${kvNS}::Conn] set Bucket [set ${kvNS}::Bucket] set stream [set ${kvNS}::Stream] set Js [set ${kvNS}::Js] set PrefixLen [string length [set ${kvNS}::ReadPrefix]] set UserCb $cb set IgnoreDeletes $ignore_del if {$arr ne ""} { upvar 2 $arr [self namespace]::ValuesArray } try { set Consumer [$Js ordered_consumer $stream -callback [nats::mymethod OnMsg] -post false {*}$consumer_opts] } trap {NATS ErrStreamNotFound} err { throw {NATS BucketNotFound} "Bucket $Bucket not found" } if {[dict get [$Consumer info] num_pending] == 0} { after 0 [nats::mymethod InitStageDone] ;# NB do not call it directly, because the user should be able to call e.g. "history" right after "watch" } } destructor { if {[info exists Consumer]} { $Consumer destroy ;# account for the case when ordered_consumer throws, like in test key_value_watchers-watch-5 } $Js releaseRef [self] } method InitStageDone {} { set InitDone true if {$UserCb ne ""} { after 0 [list {*}$UserCb ""] } } method OnMsg {msg} { set meta [::nats::metadata $msg] set delta [dict get $meta num_pending] set op [nats::header lookup $msg KV-Operation PUT] ;# note that normal PUT entries are delivered using MSG, so they can't have headers if {$IgnoreDeletes} { if {$op in {PURGE DEL}} { if {$delta == 0 && !$InitDone} { my InitStageDone } return } } set key [string range [nats::msg subject $msg] $PrefixLen+1 end] set entry [dict create \ bucket $Bucket \ key $key \ value [nats::msg data $msg] \ revision [dict get $meta stream_seq] \ created [dict get $meta timestamp] \ delta $delta \ operation $op] switch -- $Gathering { keys { lappend ResultList $key } history { lappend ResultList $entry } default { if {$UserCb ne ""} { after 0 [list {*}$UserCb $entry] } if {[info exists ValuesArray]} { if {$op eq "PUT"} { set ValuesArray($key) [dict get $entry value] } else { unset ValuesArray($key) } } } } if {$delta == 0 && !$InitDone} { my InitStageDone } } method Gather {what} { set Gathering $what ;# keys or history set ResultList [list] set timerID [after [$Js timeout] [list set [self namespace]::InitDone "timeout"]] nats::_coroVwait [self namespace]::InitDone if {$InitDone eq "timeout"} { throw {NATS ErrTimeout} "Timeout while gathering $what in bucket $Bucket" } after cancel $timerID return $ResultList } method consumer {} { return $Consumer } } proc ::nats::make_kv_origin {args} { set spec { stream valid_str null bucket valid_str NATS_TCL_REQUIRED keys str null api valid_str null deliver valid_str null domain valid_str null} nats::_parse_args $args $spec if {[info exists domain]} { if {[info exists api]} { throw {NATS ErrInvalidArg} "-domain and -api are mutually exclusive" } set api "\$JS.$domain.API" unset domain } return [nats::_local2dict $spec] } |
Name change from assets/nats2/nats_client.tcl to assets/nats3/nats_client.tcl.
1 2 3 4 5 6 7 8 9 10 11 12 | # Copyright (c) 2020-2023 Petro Kazmirchuk https://github.com/Kazmirchuk # Copyright (c) 2021 ANT Solutions https://antsolutions.eu/ # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. # References: # NATS protocol: https://docs.nats.io/reference/reference-protocols/nats-protocol # Tcllib: https://core.tcl-lang.org/tcllib/doc/trunk/embedded/md/toc.md package require json package require json::write | < > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 | # Copyright (c) 2020-2023 Petro Kazmirchuk https://github.com/Kazmirchuk # Copyright (c) 2021 ANT Solutions https://antsolutions.eu/ # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. # References: # NATS protocol: https://docs.nats.io/reference/reference-protocols/nats-protocol # Tcllib: https://core.tcl-lang.org/tcllib/doc/trunk/embedded/md/toc.md package require json package require json::write package require coroutine # optional packages catch {package require tls} if {$::tcl_platform(platform) eq "windows"} { catch {package require iocp_inet} } namespace eval ::nats { # improvised enum variable status_closed "closed" variable status_connecting "connecting" variable status_connected "connected" variable status_reconnecting "reconnecting" # mymethod from oo::util does not account for a chance that the method's object can be destroyed after an event has been scheduled proc SafeCallback {mycmd method args} { if {[llength [info commands $mycmd]]} { $mycmd $method {*}$args } } proc mymethod {method args} { set mycmd [uplevel 1 {namespace which my}] list nats::SafeCallback $mycmd $method {*}$args } } # all options for "configure" set ::nats::_option_spec { servers valid_str "" name valid_str "" pedantic bool false |
︙ | ︙ | |||
44 45 46 47 48 49 50 51 52 53 54 55 | user str "" password str "" token str "" secure bool false check_subjects bool true dictmsg bool false utf8_convert bool false } oo::class create ::nats::connection { # "private" variables variable config sock coro timers counters subscriptions requests serverPool \ | > | > > > > < < < | 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 | user str "" password str "" token str "" secure bool false check_subjects bool true dictmsg bool false utf8_convert bool false request_timeout timeout 10000 } oo::class create ::nats::connection { # "private" variables variable config sock coro timers counters subscriptions requests serverPool \ outBuffer requestsInboxPrefix pong ChildrenRef # "public" variables, so that users can set up traces if needed variable status last_error serverInfo constructor { { conn_name "" } args } { # if _parse_args throws, TclOO will call the destructor, so at least these vars must be valid set serverPool [nats::server_pool new [self]] set status $nats::status_closed nats::_parse_args $args { logger valid_str "" log_chan valid_str stdout log_level valid_str warn } set last_error "" # initialise default configuration foreach {name type def} $nats::_option_spec { set config($name) $def } set config(name) $conn_name set sock "" ;# the TCP socket set coro "" ;# the coroutine handling readable and writeable events on the socket |
︙ | ︙ | |||
86 87 88 89 90 91 92 | array set serverInfo {} ;# INFO from a current NATS server # all outgoing messages are put in this list before being flushed to the socket, # so that even when we are reconnecting, messages can still be sent set outBuffer [list] set requestsInboxPrefix "" set pong 0 my InitLogger $logger $log_chan $log_level | | > > > > > > > > | | 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 | array set serverInfo {} ;# INFO from a current NATS server # all outgoing messages are put in this list before being flushed to the socket, # so that even when we are reconnecting, messages can still be sent set outBuffer [list] set requestsInboxPrefix "" set pong 0 my InitLogger $logger $log_chan $log_level array set ChildrenRef {} } destructor { my disconnect $serverPool destroy foreach obj [array names ChildrenRef] { $obj destroy } } # internal method releaseRef {obj} { unset -nocomplain ChildrenRef($obj) } method InitLogger {logger log_chan log_level} { set ns [self namespace] if {$logger ne ""} { # user has provided a pre-configured logger object logger::import -namespace ${ns}::log [${logger}::servicename] # log_chan and log_level have no effect in this case return } if {$log_level ni {error warn info debug}} { throw {NATS ErrInvalidArg} "Invalid -log_level $log_level" } |
︙ | ︙ | |||
122 123 124 125 126 127 128 | proc log::log {level msg} { variable logChannel variable loggerName puts $logChannel "\[[nats::timestamp] $loggerName $level\] $msg" } proc log::suppressed {level msg} {} | < | | > > > | 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 | proc log::log {level msg} { variable logChannel variable loggerName puts $logChannel "\[[nats::timestamp] $loggerName $level\] $msg" } proc log::suppressed {level msg} {} set belowLogLevel 0 # provide the same interface as Tcllib's logger; we use only these 4 logging levels foreach level {error warn info debug} { if {$belowLogLevel} { interp alias {} ${ns}::log::${level} {} ${ns}::log::suppressed $level } else { interp alias {} ${ns}::log::${level} {} ${ns}::log::log $level } if {$log_level eq $level} { set belowLogLevel 1 } } } method cget {option} { set opt [string trimleft $option -] if {$opt eq "status"} { return $status } if {[info exists config($opt)]} { return $config($opt) } throw {NATS ErrInvalidArg} "Invalid option $option" } method configure {args} { |
︙ | ︙ | |||
160 161 162 163 164 165 166 | # cmdline::typedGetoptions is garbage nats::_parse_args $args $nats::_option_spec 1 set servers_opt [lsearch -exact $args "-servers"] if {$servers_opt == -1} { return } | | | 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 | # cmdline::typedGetoptions is garbage nats::_parse_args $args $nats::_option_spec 1 set servers_opt [lsearch -exact $args "-servers"] if {$servers_opt == -1} { return } if {$status ne $nats::status_closed} { # in principle, most other config options can be changed on the fly # allowing -servers to be changed when connected is possible, but a bit tricky throw {NATS ErrInvalidArg} "Cannot configure servers when already connected" } # if any URL is invalid, this function will throw an error - let it propagate $serverPool set_servers [lindex $args $servers_opt+1] return |
︙ | ︙ | |||
211 212 213 214 215 216 217 | "" { set async 0 } default { throw {NATS ErrInvalidArg} "Unknown option $args" } } | | | | | | | | < < | 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 | "" { set async 0 } default { throw {NATS ErrInvalidArg} "Unknown option $args" } } if {$status ne $nats::status_closed} { return } if {[llength [$serverPool all_servers]] == 0} { throw {NATS ErrNoServers} "Server pool is empty" } set last_error "" # "reconnects" counter should be reset only once here rather than on every reconnect # e.g. if a server is in the pool, but it is down, we want to keep track of its "reconnects" counter $serverPool reset_counters set status $nats::status_connecting # this coroutine will handle all work to connect and read from the socket # current namespace is prepended to the coroutine name, so it's unique coroutine coro {*}[nats::mymethod CoroMain] if {!$async} { # $status will become "closed" straightaway # in case all calls to [socket] fail immediately and we exhaust the server pool # so we shouldn't vwait in this case if {$status eq $nats::status_connecting} { log::debug "Waiting for connection status" nats::_coroVwait [self namespace]::status log::debug "Finished waiting for connection status" } if {$status ne $nats::status_connected} { # if there's only one server in the pool, it's more user-friendly to report the actual error if {[dict exists $last_error code] && [llength [$serverPool all_servers]] == 1} { throw [dict get $last_error code] [dict get $last_error errorMessage] } throw {NATS ErrNoServers} "No servers available for connection" } } return } method disconnect {} { if {$status eq $nats::status_closed} { return } set last_error "" if {$sock eq ""} { # if a user calls disconnect while we are waiting for reconnect_time_wait, we only need to stop the coroutine $coro stop } else { my CloseSocket } # rest of cleanup is done in CoroMain |
︙ | ︙ | |||
346 347 348 349 350 351 352 | if {$queue ne "" && ![my CheckSubject $queue -queue]} { throw {NATS ErrBadQueueName} "Invalid queue $queue" } set subID [incr counters(subscription)] set subscriptions($subID) [dict create subj $subject queue $queue callback $callback maxMsgs $max_msgs recMsgs 0 isDictMsg $dictmsg post $post] | | | 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 | if {$queue ne "" && ![my CheckSubject $queue -queue]} { throw {NATS ErrBadQueueName} "Invalid queue $queue" } set subID [incr counters(subscription)] set subscriptions($subID) [dict create subj $subject queue $queue callback $callback maxMsgs $max_msgs recMsgs 0 isDictMsg $dictmsg post $post] if {$status eq $nats::status_connected} { # it will be sent anyway when we reconnect lappend outBuffer "SUB $subject $queue $subID" if {$max_msgs > 0} { lappend outBuffer "UNSUB $subID $max_msgs" } my ScheduleFlush } |
︙ | ︙ | |||
369 370 371 372 373 374 375 | if {![info exists subscriptions($subID)]} { throw {NATS ErrBadSubscription} "Invalid subscription ID $subID" } #the format is UNSUB <sid> [max_msgs] if {$max_msgs == 0 || [dict get $subscriptions($subID) recMsgs] >= $max_msgs} { | | | > | > | | | | | > > > | 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 | if {![info exists subscriptions($subID)]} { throw {NATS ErrBadSubscription} "Invalid subscription ID $subID" } #the format is UNSUB <sid> [max_msgs] if {$max_msgs == 0 || [dict get $subscriptions($subID) recMsgs] >= $max_msgs} { unset subscriptions($subID) set data "UNSUB $subID" } else { dict set subscriptions($subID) maxMsgs $max_msgs set data "UNSUB $subID $max_msgs" } if {$status eq $nats::status_connected} { lappend outBuffer $data my ScheduleFlush } return } method request_msg {msg args} { set timeout $config(request_timeout) set dictmsg $config(dictmsg) nats::_parse_args $args { timeout timeout null callback str "" dictmsg bool null } set reply [dict get $msg reply] if {$reply ne ""} { log::warn "request_msg: the reply $reply will be ignored" } return [my request [dict get $msg subject] [dict get $msg data] \ -header [dict get $msg header] \ -timeout $timeout -callback $callback -dictmsg $dictmsg] } method request {subject message args} { set timeout $config(request_timeout) set dictmsg $config(dictmsg) nats::_parse_args $args { timeout timeout null callback str "" dictmsg bool null header dict "" max_msgs pos_int null check_subj bool true } if {[info exists max_msgs]} { return [my OldStyleRequest $subject $message $header $timeout $callback $max_msgs] ;# isDictMsg always true } else { return [my NewStyleRequest $subject $message $header $timeout $callback $dictmsg $check_subj] } } method NewStyleRequest {subject message header timeout callback dictmsg check_subj} { # "new-style" request with one wildcard subscription # only the first response is delivered if {$requestsInboxPrefix eq ""} { set requestsInboxPrefix [my inbox] my subscribe "$requestsInboxPrefix.*" -dictmsg 1 -callback [nats::mymethod NewStyleRequestCb -1] -post false } set reqID [incr counters(request)] # will perform more argument checking, so it may raise an error my publish $subject $message -reply "$requestsInboxPrefix.$reqID" -header $header -check_subj $check_subj set timerID "" if {$callback ne ""} { if {$timeout != 0} { set timerID [after $timeout [nats::mymethod NewStyleRequestCb $reqID "" "" ""]] } set requests($reqID) [dict create timer $timerID callback $callback isDictMsg $dictmsg] return $reqID } # sync request # remember that we can get a reply after timeout, so vwait must wait on a specific reqID if {$timeout != 0} { set timerID [after $timeout [list dict set [self namespace]::requests($reqID) timedOut 1]] } # if connection is lost, we need to cancel this timer, see also CoroMain set requests($reqID) [dict create timer $timerID] nats::_coroVwait [self namespace]::requests($reqID) if {![info exists requests($reqID)]} { if {$last_error eq ""} { throw {NATS ErrConnectionClosed} "Connection closed" } throw {NATS ErrTimeout} "Connection lost" } set sync_req $requests($reqID) unset requests($reqID) if {[dict lookup $sync_req timedOut 0]} { throw {NATS ErrTimeout} "Request to $subject timed out" } |
︙ | ︙ | |||
469 470 471 472 473 474 475 | method OldStyleRequest {subject message header timeout callback max_msgs} { # "old-style" request with a SUB per each request is needed for JetStream, # because messages received from a stream have a subject that differs from our reply-to # we still use the same requests array to vwait on set reply [my inbox] set reqID [incr counters(request)] | | | | | > > > | 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 | method OldStyleRequest {subject message header timeout callback max_msgs} { # "old-style" request with a SUB per each request is needed for JetStream, # because messages received from a stream have a subject that differs from our reply-to # we still use the same requests array to vwait on set reply [my inbox] set reqID [incr counters(request)] set subID [my subscribe $reply -dictmsg 1 -callback [nats::mymethod OldStyleRequestCb $reqID] -max_msgs $max_msgs -post false] my publish $subject $message -reply $reply -header $header set timerID "" if {$callback ne ""} { if {$timeout != 0} { set timerID [after $timeout [nats::mymethod OldStyleRequestCb $reqID "" "" ""]] } set requests($reqID) [dict create timer $timerID callback $callback subID $subID] return $reqID } #sync request if {$timeout != 0} { set timerID [after $timeout [list dict set [self namespace]::requests($reqID) timedOut 1]] } set requests($reqID) [dict create timer $timerID] while {1} { nats::_coroVwait [self namespace]::requests($reqID) if {![info exists requests($reqID)]} { if {$last_error eq ""} { throw {NATS ErrConnectionClosed} "Connection closed" } throw {NATS ErrTimeout} "Connection lost" } set sync_req $requests($reqID) if {[dict lookup $sync_req timedOut 0]} { break } if {[llength [dict lookup $sync_req inMsgs]] == $max_msgs} { |
︙ | ︙ | |||
534 535 536 537 538 539 540 | #this function is called "flush" in all other NATS clients, but I find it confusing # default timeout in nats.go is 10s method ping { args } { nats::_parse_args $args { timeout timeout 10000 } | | | | | > > > > > > > > | > > | 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 | #this function is called "flush" in all other NATS clients, but I find it confusing # default timeout in nats.go is 10s method ping { args } { nats::_parse_args $args { timeout timeout 10000 } if {$status ne $nats::status_connected} { # this is different from nats.go (func FlushTimeout) that throws ErrConnectionClosed only if the connection is closed throw {NATS ErrConnectionClosed} "No connection to NATS server" } set timerID [after $timeout [list set [self namespace]::pong 0]] lappend outBuffer "PING" log::debug "sending PING" my ScheduleFlush nats::_coroVwait [self namespace]::pong after cancel $timerID if {$pong} { return true } if {$status eq $nats::status_closed} { # user called disconnect while we've been waiting for PONG throw {NATS ErrConnectionClosed} "Connection closed" } throw {NATS ErrTimeout} "PING timeout" } method jet_stream {args} { nats::_parse_args $args { timeout timeout 5000 domain valid_str "" trace bool false api_prefix valid_str "" } if {$domain ne "" && $api_prefix ne ""} { throw {NATS ErrInvalidArg} "-domain and -api_prefix are mutually exclusive" } set js [nats::jet_stream new [self] $timeout $api_prefix $domain $trace] set ChildrenRef($js) "" return $js } method inbox {} { # resulting inboxes look the same as in official NATS clients, but use a much simpler RNG return "_INBOX.[nats::_random_string]" } |
︙ | ︙ | |||
605 606 607 608 609 610 611 | unset requests($reqID) } # we received a message for a sync or async request # or we got a timeout for async request method OldStyleRequestCb {reqID subj msg reply} { if {![info exists requests($reqID)]} { | | | 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 | unset requests($reqID) } # we received a message for a sync or async request # or we got a timeout for async request method OldStyleRequestCb {reqID subj msg reply} { if {![info exists requests($reqID)]} { return ;# overdue message } if {$subj eq ""} { set subID [dict get $requests($reqID) subID] set callback [dict get $requests($reqID) callback] unset requests($reqID) my unsubscribe $subID # invoke the callback even if it received some messages before |
︙ | ︙ | |||
642 643 644 645 646 647 648 | method CloseSocket { {broken 0} } { # this method is only for closing an established TCP connection # it is not convenient to re-use it for all cases of close $sock (timeout or rejected connection) # because it does a lot of other work chan event $sock readable {} if {$broken} { | | | | > > | | | | > | 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 | method CloseSocket { {broken 0} } { # this method is only for closing an established TCP connection # it is not convenient to re-use it for all cases of close $sock (timeout or rejected connection) # because it does a lot of other work chan event $sock readable {} if {$broken} { if {$status ne $nats::status_connected} { # whether we are connecting or reconnecting, increment reconnect count for this server $serverPool current_server_connected false } if {$status eq $nats::status_connected} { # recall that during initial connection round we try all servers only once # method next_server relies on this status to know that set status $nats::status_reconnecting } } else { # we get here only from method disconnect lassign [my current_server] host port log::info "Closing connection to $host:$port" ;# in case of broken socket, the error will be logged in AsyncError # make sure we wait until successful flush, if the connection was not broken # but not if we're trying to connect to a new server! if {![chan configure $sock -connecting]} { chan configure $sock -blocking 1 foreach msg $outBuffer { append msg "\r\n" puts -nonewline $sock $msg } } set outBuffer [list] } try { close $sock ;# all buffered input is discarded, all buffered output is flushed } on error err { log::error "Failed to close the socket: $err" |
︙ | ︙ | |||
682 683 684 685 686 687 688 | if {!$broken} { $coro stop } } } method Pinger {} { | | | | | 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 | if {!$broken} { $coro stop } } } method Pinger {} { set timers(ping) [after $config(ping_interval) [nats::mymethod Pinger]] if {$counters(pendingPings) >= $config(max_outstanding_pings)} { my AsyncError ErrStaleConnection "The server did not respond to $counters(pendingPings) PINGs" 1 set counters(pendingPings) 0 return } lappend outBuffer "PING" log::debug "Sending PING" incr counters(pendingPings) my ScheduleFlush } method ScheduleFlush {} { if {$timers(flush) eq "" && $status eq $nats::status_connected} { set timers(flush) [after 0 [nats::mymethod Flusher]] } } method Flusher { } { set timers(flush) "" try { foreach msg $outBuffer { |
︙ | ︙ | |||
755 756 757 758 759 760 761 | # tls_required=true in CONNECT seems unnecessary to me, because TLS handshake has already happened # but nats.go does this set connectParams [list verbose $config(verbose) \ pedantic $config(pedantic) \ tls_required $tls_done \ name [json::write string $config(name)] \ lang [json::write string Tcl] \ | | | 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 | # tls_required=true in CONNECT seems unnecessary to me, because TLS handshake has already happened # but nats.go does this set connectParams [list verbose $config(verbose) \ pedantic $config(pedantic) \ tls_required $tls_done \ name [json::write string $config(name)] \ lang [json::write string Tcl] \ version [json::write string 3.0] \ protocol 1 \ echo $config(echo)] if {[info exists serverInfo(headers)] && $serverInfo(headers)} { lappend connectParams headers true no_responders true } if {[info exists serverInfo(auth_required)] && $serverInfo(auth_required)} { |
︙ | ︙ | |||
798 799 800 801 802 803 804 | set remainingMsgs [expr {$maxMsgs - $recMsgs}] lappend subsBuffer "UNSUB $subID $remainingMsgs" } } if {[llength $subsBuffer] > 0} { # ensure SUBs are sent before any pending PUBs set outBuffer [linsert $outBuffer 0 {*}$subsBuffer] | < | | 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 | set remainingMsgs [expr {$maxMsgs - $recMsgs}] lappend subsBuffer "UNSUB $subID $remainingMsgs" } } if {[llength $subsBuffer] > 0} { # ensure SUBs are sent before any pending PUBs set outBuffer [linsert $outBuffer 0 {*}$subsBuffer] } } method INFO {cmd} { if {$status eq $nats::status_connected} { # when we say "proto":1 in CONNECT, we may receive information about other servers in the cluster - add them to serverPool # and mark as discovered=true # example connect_urls : ["192.168.2.5:4222", "192.168.91.1:4222", "192.168.157.1:4223", "192.168.2.5:4223"] # by default each server will advertise IPs of all network interfaces, so the server pool may seem bigger than it really is # --client_advertise NATS option can be used to make it clearer array set serverInfo [json::json2dict $cmd] if {[info exists serverInfo(connect_urls)]} { |
︙ | ︙ | |||
958 959 960 961 962 963 964 | unset subscriptions($subID) ;# UNSUB has already been sent, no need to do it here } else { dict set subscriptions($subID) recMsgs $recMsgs } if {$postEvent} { after 0 [list {*}$callback $subject $msg $replyTo] } else { | | | > > > | | 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 | unset subscriptions($subID) ;# UNSUB has already been sent, no need to do it here } else { dict set subscriptions($subID) recMsgs $recMsgs } if {$postEvent} { after 0 [list {*}$callback $subject $msg $replyTo] } else { # direct call - exercise with caution {*}$callback $subject $msg $replyTo } # now we return back to CoroMain and enter "yield" there } method PING {cmd} { lappend outBuffer "PONG" log::debug "received PING, sending PONG" my ScheduleFlush } method PONG {cmd} { set pong 1 set counters(pendingPings) 0 log::debug "received PONG, status: $status" if {$status ne $nats::status_connected} { # auth OK: finalise the connection process $serverPool current_server_connected true lassign [my current_server] host port log::info "Connected to the server at $host:$port" set last_error "" ;# cleanup possible error messages about prior connection attempts set status $nats::status_connected ;# exit from vwait in "connect" my RestoreSubs if {[llength $outBuffer]} { my ScheduleFlush } set timers(ping) [after $config(ping_interval) [nats::mymethod Pinger]] } } method +OK {cmd} { log::debug "+OK" ;# cmd is blank } |
︙ | ︙ | |||
1037 1038 1039 1040 1041 1042 1043 | set coro [info coroutine] log::debug "Started coroutine $coro" try { my ConnectNextServer while {1} { set reason [yield] if {$reason eq "stop"} { | | | | < < < < < < < < < < < < < > > < > > > > > > > > > > > > > > > > > > > > > > > | 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 | set coro [info coroutine] log::debug "Started coroutine $coro" try { my ConnectNextServer while {1} { set reason [yield] if {$reason eq "stop"} { break ;# deliberate disconnect; the socket has been already closed } my ProcessEvent $reason } } trap {NATS STOP_CORO} {msg opts} { # deliberate disconnect; the socket has been already closed } trap {NATS ErrNoServers} {msg opts} { # connection lost; don't overwrite the real last_error - need to log this in case of "connect -async" log::error $msg } trap {} {msg opts} { log::error "Unexpected error: $msg $opts" } my CancelAllRequests set pong 0 ;# cancel pending pings array unset subscriptions ;# make sure we don't try to restore subscriptions, when we connect next time set requestsInboxPrefix "" my CancelConnectTimer set status $nats::status_closed log::debug "Finished coroutine $coro" set coro "" } method CancelAllRequests {} { foreach reqID [array names requests] { #log::debug "Force timeout of request $reqID" after cancel [dict lookup $requests($reqID) timer] set callback [dict lookup $requests($reqID) callback] if {$callback eq ""} { set requests($reqID) "" ;# leave vwait in all sync requests } else { if {[dict exists $requests($reqID) subID]} { # most probably this is JS fetch - mark it as timed out for proper cleanup after 0 [list {*}$callback 1 ""] continue } if {$last_error eq ""} { continue } # invoke other callbacks only if the connection was lost after 0 [list {*}$callback 1 ""] } } array unset requests } method ProcessEvent {reason} { switch -- $reason { connected { # this event will arrive again and again if we don't disable it chan event $sock writable {} lassign [my current_server] host port |
︙ | ︙ | |||
1116 1117 1118 1119 1120 1121 1122 | # the chan readable event will be sent again and again for as long as there's pending data # so I don't need a loop around [chan gets] to read all lines, even if they arrive together try { set readCount [chan gets $sock line] } trap {POSIX} {err errOpts} { # can be ECONNABORTED or ECONNRESET lassign [my current_server] host port | | | 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 | # the chan readable event will be sent again and again for as long as there's pending data # so I don't need a loop around [chan gets] to read all lines, even if they arrive together try { set readCount [chan gets $sock line] } trap {POSIX} {err errOpts} { # can be ECONNABORTED or ECONNRESET lassign [my current_server] host port my AsyncError ErrBrokenSocket "Connection to $host:$port broken - [lindex [dict get $errOpts -errorcode] end]" 1 return } # Tcl documentation for non-blocking gets is very misleading # checking for $readCount <= 0 is NOT enough to ensure that I never get an incomplete line # so checking for EOF must PRECEDE checking for $readCount if {[eof $sock]} { #set err [chan configure $sock -error] - no point in this, $err will be blank |
︙ | ︙ | |||
1145 1146 1147 1148 1149 1150 1151 | } # extract the first word from the line (INFO, MSG etc) # protocol_arg will be empty in case of PING/PONG/OK set protocol_arg [lassign $line protocol_op] if {$protocol_op in {MSG HMSG INFO -ERR +OK PING PONG}} { my $protocol_op $protocol_arg } else { | | | 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 | } # extract the first word from the line (INFO, MSG etc) # protocol_arg will be empty in case of PING/PONG/OK set protocol_arg [lassign $line protocol_op] if {$protocol_op in {MSG HMSG INFO -ERR +OK PING PONG}} { my $protocol_op $protocol_arg } else { my AsyncError ErrProtocol "Invalid protocol $protocol_op $protocol_arg" 1 } } default { log::error "CoroMain: unknown reason $reason" } } } |
︙ | ︙ | |||
1199 1200 1201 1202 1203 1204 1205 | throw {NATS ErrConnectionClosed} "No connection to NATS server" } } method AsyncError {code msg { doReconnect 0 }} { # lower severity than "error", because the client can recover and connect to another NATS log::warn $msg | | | 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 | throw {NATS ErrConnectionClosed} "No connection to NATS server" } } method AsyncError {code msg { doReconnect 0 }} { # lower severity than "error", because the client can recover and connect to another NATS log::warn $msg set last_error [dict create code [list NATS $code] errorMessage $msg] if {$doReconnect} { my CloseSocket 1 my ConnectNextServer ;# can be done only in the coro } } method StartConnectTimer {} { |
︙ | ︙ | |||
1256 1257 1258 1259 1260 1261 1262 | proc data {msg} { return [dict get $msg data] } proc reply {msg} { return [dict get $msg reply] } proc no_responders {msg} { | | > > > > > > > > > > > | > > > | | 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 | proc data {msg} { return [dict get $msg data] } proc reply {msg} { return [dict get $msg reply] } proc no_responders {msg} { return [expr {[nats::header lookup $msg Status 0] == 503}] } proc IsCtrlMsg {msg descr} { if {[string length [dict get $msg data]]} { return 0 } ::set s [nats::header lookup $msg Status 0] if {$s != 100} { return 0 } ::set d [nats::header lookup $msg Description ""] return [string match $descr $d] } proc idle_heartbeat {msg} { return [IsCtrlMsg $msg "Idle*"] } proc flow_control {msg} { return [IsCtrlMsg $msg "Flow*"] } # only messages fetched using STREAM.MSG.GET will have it proc seq {msg} { if {[dict exists $msg seq]} { return [dict get $msg seq] } else { throw {NATS ErrInvalidArg} "Invalid field 'seq'" } } proc timestamp {msg} { if {[dict exists $msg time]} { return [dict get $msg time] ;# ISO timestamp like 2022-11-22T13:31:35.4514983Z ; [clock scan] doesn't understand it } else { throw {NATS ErrInvalidArg} "Invalid field 'timestamp'" } } namespace export {[a-z]*} namespace ensemble create } namespace eval ::nats::header { proc add {msgVar key value} { upvar 1 $msgVar msg if {[dict exists $msg header $key]} { dict with msg header { |
︙ | ︙ | |||
1336 1337 1338 1339 1340 1341 1342 | proc lookup {msg key def} { ::set h [dict get $msg header] if {![dict exists $h $key]} { return $def } return [lindex [dict get $h $key] 0] } | | | 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 | proc lookup {msg key def} { ::set h [dict get $msg header] if {![dict exists $h $key]} { return $def } return [lindex [dict get $h $key] 0] } namespace export {[a-z]*} namespace ensemble create } proc ::nats::isotime_to_msec {isotime} { # parse into date-time, fractional seconds and timezone (optional) if {[scan $isotime {%[^.].%d%s} datetime fs tz] < 2} { throw {NATS ErrInvalidArg} "Invalid time $isotime" |
︙ | ︙ | |||
1372 1373 1374 1375 1376 1377 1378 | if {[info coroutine] eq ""} { vwait $var } else { coroutine::util vwait $var } } # returns a dict, where each key points to a list of values | | > | | | | | > > < < | < | 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 | if {[info coroutine] eq ""} { vwait $var } else { coroutine::util vwait $var } } # returns a dict, where each key points to a list of values # NB! unlike HTTP headers, in NATS header keys are case-sensitive # NATS Message Headers https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-4.md proc ::nats::_parse_header {header} { set result [dict create] # textutil::split::splitx is slower than [string map]+split and RFC 5322 doesn't allow LF in a field body set split_headers [split [string map {\r\n \n} $header] \n] # the first line is always NATS status like NATS/1.0 404 No Messages set split_headers [lassign $split_headers first_line] # the code and description are optional set descr [lassign $first_line headerVersion status_code] if {![string match "NATS/1.0" $headerVersion]} { throw {NATS ErrBadHeaderMsg} "Unknown header version $headerVersion" } if {[string is integer -strict $status_code]} { dict set result Status $status_code ;# non-int status is allowed but ignored } if {$descr ne ""} { dict set result Description [list $descr] } # process remaining fields foreach line $split_headers { if {![regexp -nocase {^([^:]+):(.+)$} $line -> k v]} { continue } set k [string trim $k] set v [string trim $v] dict lappend result $k $v } return $result } proc ::nats::_format_header { header } { # other official clients accept inline status & description in the first line when *parsing* headers # but when serializing headers, status & description are treated just like usual headers set result "NATS/1.0\r\n" |
︙ | ︙ | |||
1452 1453 1454 1455 1456 1457 1458 | } proc ::nats::_validate {name val type} { if {[lindex $type 0] eq "enum"} { return true } switch -- $type { | | | 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 | } proc ::nats::_validate {name val type} { if {[lindex $type 0] eq "enum"} { return true } switch -- $type { str - ns - int - json - json_list { # some types used only in JetStream JSON generation don't need to be validated here } valid_str { if {[string length $val] == 0} { return false } } |
︙ | ︙ |
Name change from assets/nats2/pkgIndex.tcl to assets/nats3/pkgIndex.tcl.
|
| | | | 1 2 3 4 5 6 7 8 | package ifneeded nats 3.0 \ [list apply {{dir} { source [file join $dir nats_client.tcl] source [file join $dir server_pool.tcl] source [file join $dir jet_stream.tcl] source [file join $dir key_value.tcl] package provide nats 3.0 }} $dir] |
Name change from assets/nats2/server_pool.tcl to assets/nats3/server_pool.tcl.
1 2 3 4 5 6 7 8 9 | # Copyright (c) 2021-2023 Petro Kazmirchuk https://github.com/Kazmirchuk # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. package require uri package require struct::list oo::class create ::nats::server_pool { | | | | | | | | < | | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 | # Copyright (c) 2021-2023 Petro Kazmirchuk https://github.com/Kazmirchuk # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. package require uri package require struct::list oo::class create ::nats::server_pool { variable Servers Conn constructor {c} { set Servers [list] ;# list of dicts working as FIFO queue # each dict contains: host port scheme discovered reconnects last_attempt (ms, mandatory), user password auth_token (optional) set Conn $c } # used only for URL discovered from the INFO message # remember that it carries only IP:port, so no scheme etc method add {url} { set ns [info object namespace $Conn] try { set newServer [my parse $url] } trap {NATS INVALID_ARG} err { ${ns}::log::warn $err ;# very unlikely return } foreach s $Servers { if {[dict get $s host] eq [dict get $newServer host] && [dict get $s port] == [dict get $newServer port]} { return ;# we already know this server } } dict set newServer discovered true set Servers [linsert $Servers 0 $newServer] ;# the current server is always at the end of the list ${ns}::log::debug "Added $url to the server pool" } # used by "configure". All or nothing: if at least one URL is invalid, the old configuration stays intact method set_servers {urls} { set result [list] foreach url $urls { lappend result [my parse $url] ;# will throw ErrInvalidArg in case of invalid URL - let it propagate } if {[$Conn cget randomize]} { # ofc lsort will mess up the URL list if randomize=false # interestingly, it seems that official NATS clients don't check the server list for duplicates set result [lsort -unique $result] # IMHO official clients do shuffling too often, at least in 3 places! I do it only once set result [struct::list shuffle $result] } set Servers $result } method parse {url} { # replace nats/tls scheme with http and delegate parsing to the uri package set scheme nats if {[string equal -length 7 $url "nats://"]} { set dummy_url [string range $url 7 end] |
︙ | ︙ | |||
89 90 91 92 93 94 95 | dict set newServer auth_token $parsed(user) } } return $newServer } method next_server {} { | | | | | | | | | | | | | | | | | | | | | | | 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 | dict set newServer auth_token $parsed(user) } } return $newServer } method next_server {} { set ns [info object namespace $Conn] while {1} { if { [llength $Servers] == 0 } { throw {NATS ErrNoServers} "Server pool is empty" } set attempts [$Conn cget max_reconnect_attempts] set wait [$Conn cget reconnect_time_wait] #"pop" a server; using struct::queue seems like an overkill for such a small list set s [lindex $Servers 0] # during initial connecting process we go through the pool only once if {[$Conn cget status] eq $nats::status_connecting && [dict get $s reconnects]} { throw {NATS ErrNoServers} "No servers available for connection" } set Servers [lreplace $Servers 0 0] # max_reconnect_attempts == -1 means "unlimited". See also selectNextServer in nats.go if {$attempts >= 0 && [dict get $s reconnects] >= $attempts} { ${ns}::log::debug "Removed [dict get $s host]:[dict get $s port] from the server pool" continue } set now [clock milliseconds] set last_attempt [dict get $s last_attempt] if {$now < $last_attempt + $wait} { # other clients simply wait for reconnect_time_wait, but this approach is more precise set waiting_time [expr {$wait - ($now - $last_attempt)}] ${ns}::log::debug "Waiting for $waiting_time ms before connecting to the next server" set timer [after $waiting_time [info coroutine]] set reason [yield] if {$reason eq "stop" } { # user called "disconnect" after cancel $timer dict set s last_attempt [clock milliseconds] lappend Servers $s throw {NATS STOP_CORO} "Stop coroutine" ;# break from the main loop } } lappend Servers $s break } return [my current_server] } method current_server_connected {ok} { [info object namespace $Conn]::my CancelConnectTimer set s [lindex $Servers end] dict set s last_attempt [clock milliseconds] if {$ok} { dict set s reconnects 0 } else { dict incr s reconnects } lset Servers end $s } method format_credentials {} { set s [lindex $Servers end] set def_user [$Conn cget user] set def_pass [$Conn cget password] set def_token [$Conn cget token] if {[dict exists $s user] && [dict exists $s password]} { return [list user [json::write string [dict get $s user]] pass [json::write string [dict get $s password]]] } if {[dict exists $s auth_token]} { return [list auth_token [json::write string [dict get $s auth_token]]] } if {$def_user ne "" && $def_pass ne ""} { return [list user [json::write string $def_user] pass [json::write string $def_pass]] } if {$def_token ne ""} { return [list auth_token [json::write string $def_token]] } throw {NATS ErrAuthorization} "No credentials known for NATS server at [dict get $s host]:[dict get $s port]" } method current_server {} { set s [lindex $Servers end] return [list [dict get $s host] [dict get $s port] [dict get $s scheme]] } method all_servers {} { return $Servers } method clear {} { set Servers [list] } method reset_counters {} { set new_list [list] foreach s $Servers { dict set s last_attempt 0 dict set s reconnects 0 lappend new_list $s } set Servers $new_list } } |
Changes to undroid/luck/cgi-bin/luck.tips.
︙ | ︙ | |||
139 140 141 142 143 144 145 | mkappimg {Scripts to make AppImage binaries from vanillawish} modbus0.1 {Tcl modbus interface using Ffidl and TclOO} Mpexpr12 {Multi precision math package} mqtt2.0 {MQTT library including simple broker by Schelte Bron} mqtt3.0 {MQTT library including simple broker by Schelte Bron} mqtt3.1 {MQTT library including simple broker by Schelte Bron} msgpack2 {A pure Tcl implementation of the MessagePack object serialization library} | | | | 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 | mkappimg {Scripts to make AppImage binaries from vanillawish} modbus0.1 {Tcl modbus interface using Ffidl and TclOO} Mpexpr12 {Multi precision math package} mqtt2.0 {MQTT library including simple broker by Schelte Bron} mqtt3.0 {MQTT library including simple broker by Schelte Bron} mqtt3.1 {MQTT library including simple broker by Schelte Bron} msgpack2 {A pure Tcl implementation of the MessagePack object serialization library} nats2 {Tcl client library for the NATS message broker} nats3 {Tcl client library for the NATS message broker} nccompat1.0 {ncurses compatibility helper} notebook2.2 {Will Duquette's notebook app} nsf2.3.0 {New Scripting Framework} nsf2.4.0 {New Scripting Framework} odbcsyms2.5 {Support library for tclodbc on Linux} ooxml1 {Read and write Office Open XML "XLSX" since Excel 2007} opencv0.11 {Tcl interface to OpenCV} |
︙ | ︙ |