From e36b3fefdf26c3602cacda3381fdd6ca726cb75b Mon Sep 17 00:00:00 2001 From: JR Conlin Date: Tue, 19 Dec 2023 16:39:51 -0800 Subject: [PATCH] bug: copy over the channels when migrating the user (#539) This will attempt to copy the existing channels for a user. This is important because desktop does not check and rebuild channels if the server doesn't have them. (Might want to make that a standardized feature in the future.) Closes: SYNC-4050 --- Cargo.lock | 367 +++++++++--------- autopush-common/Cargo.toml | 2 +- .../src/db/bigtable/bigtable_client/error.rs | 86 ++++ .../src/db/bigtable/bigtable_client/mod.rs | 108 +++++- autopush-common/src/db/client.rs | 3 + autopush-common/src/db/dual/mod.rs | 7 + autopush-common/src/db/dynamodb/mod.rs | 8 + autopush-common/src/db/mock.rs | 4 + 8 files changed, 399 insertions(+), 186 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 38ae49331..d6320a120 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10,7 +10,7 @@ dependencies = [ "base64 0.20.0", "erased-serde", "http 0.2.11", - "hyper 0.14.27", + "hyper 0.14.28", "hyper-alpn", "openssl", "serde", @@ -39,7 +39,7 @@ dependencies = [ "parking_lot 0.12.1", "pin-project-lite 0.2.13", "smallvec 1.11.2", - "tokio 1.34.0", + "tokio 1.35.1", "tokio-util", ] @@ -55,16 +55,16 @@ dependencies = [ "futures-sink", "memchr", "pin-project-lite 0.2.13", - "tokio 1.34.0", + "tokio 1.35.1", "tokio-util", "tracing", ] [[package]] name = "actix-cors" -version = "0.6.4" +version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b340e9cfa5b08690aae90fb61beb44e9b06f44fe3d0f93781aaa58cfba86245e" +checksum = "0346d8c1f762b41b458ed3145eea914966bb9ad20b9be0d6d463b20d45586370" dependencies = [ "actix-utils", "actix-web", @@ -99,7 +99,7 @@ dependencies = [ "http 0.2.11", "httparse", "httpdate", - "itoa 1.0.9", + "itoa 1.0.10", "language-tags", "local-channel", "mime", @@ -108,7 +108,7 @@ dependencies = [ "rand 0.8.5", "sha1", "smallvec 1.11.2", - "tokio 1.34.0", + "tokio 1.35.1", "tokio-util", "tracing", "zstd", @@ -136,7 +136,7 @@ dependencies = [ "serde_urlencoded 0.7.1", "slab", "socket2 0.4.10", - "tokio 1.34.0", + "tokio 1.35.1", ] [[package]] @@ -146,7 +146,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e01ed3140b2f8d422c68afa1ed2e85d996ea619c988ac834d255db32138655cb" dependencies = [ "quote 1.0.33", - "syn 2.0.39", + "syn 2.0.41", ] [[package]] @@ -170,7 +170,7 @@ checksum = "28f32d40287d3f402ae0028a9d54bef51af15c8769492826a69d28f81893151d" dependencies = [ "actix-macros", "futures-core", - "tokio 1.34.0", + "tokio 1.35.1", ] [[package]] @@ -184,9 +184,9 @@ dependencies = [ "actix-utils", "futures-core", "futures-util", - "mio 0.8.9", + "mio 0.8.10", "socket2 0.5.5", - "tokio 1.34.0", + "tokio 1.35.1", "tracing", ] @@ -221,7 +221,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded 0.7.1", - "tokio 1.34.0", + "tokio 1.35.1", ] [[package]] @@ -237,9 +237,9 @@ dependencies = [ "http 0.2.11", "impl-more", "pin-project-lite 0.2.13", - "rustls 0.21.9", + "rustls 0.21.10", "rustls-webpki", - "tokio 1.34.0", + "tokio 1.35.1", "tokio-util", "tracing", ] @@ -278,7 +278,7 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "itoa 1.0.9", + "itoa 1.0.10", "language-tags", "log", "mime", @@ -290,7 +290,7 @@ dependencies = [ "serde_urlencoded 0.7.1", "smallvec 1.11.2", "socket2 0.5.5", - "time 0.3.30", + "time 0.3.31", "url 2.5.0", ] @@ -303,7 +303,7 @@ dependencies = [ "actix-router", "proc-macro2 1.0.70", "quote 1.0.33", - "syn 2.0.39", + "syn 2.0.41", ] [[package]] @@ -316,7 +316,7 @@ dependencies = [ "actix-http", "actix-web", "futures-core", - "tokio 1.34.0", + "tokio 1.35.1", ] [[package]] @@ -327,7 +327,7 @@ checksum = "7c7db3d5a9718568e4cf4a537cfd7070e6e6ff7481510d0237fb529ac850f6d3" dependencies = [ "proc-macro2 1.0.70", "quote 1.0.33", - "syn 2.0.39", + "syn 2.0.41", ] [[package]] @@ -481,7 +481,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2 1.0.70", "quote 1.0.33", - "syn 2.0.39", + "syn 2.0.41", ] [[package]] @@ -492,7 +492,7 @@ checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9" dependencies = [ "proc-macro2 1.0.70", "quote 1.0.33", - "syn 2.0.39", + "syn 2.0.41", ] [[package]] @@ -552,7 +552,7 @@ dependencies = [ "lazy_static", "log", "mozsvc-common", - "reqwest 0.11.22", + "reqwest 0.11.23", "sentry", "sentry-actix", "sentry-core", @@ -577,15 +577,15 @@ dependencies = [ "cadence", "futures 0.3.29", "futures-locks 0.7.1", - "hyper 0.14.27", - "reqwest 0.11.22", + "hyper 0.14.28", + "reqwest 0.11.23", "sentry", "serde", "serde_derive", "serde_json", "slog", "slog-scope", - "tokio 1.34.0", + "tokio 1.35.1", "uuid 1.6.1", ] @@ -600,13 +600,13 @@ dependencies = [ "fernet", "lazy_static", "mozsvc-common", - "reqwest 0.11.22", + "reqwest 0.11.23", "serde", "serde_derive", "serde_json", "slog", "slog-scope", - "tokio 1.34.0", + "tokio 1.35.1", ] [[package]] @@ -629,11 +629,11 @@ dependencies = [ "cadence", "ctor", "futures-util", - "reqwest 0.11.22", + "reqwest 0.11.23", "serde_json", "slog-scope", "thiserror", - "tokio 1.34.0", + "tokio 1.35.1", "uuid 1.6.1", ] @@ -660,7 +660,7 @@ dependencies = [ "slog-scope", "strum", "thiserror", - "tokio 1.34.0", + "tokio 1.35.1", ] [[package]] @@ -678,11 +678,11 @@ dependencies = [ "ctor", "futures 0.3.29", "mockall", - "reqwest 0.11.22", + "reqwest 0.11.23", "sentry", "slog-scope", "thiserror", - "tokio 1.34.0", + "tokio 1.35.1", "uuid 1.6.1", ] @@ -716,7 +716,7 @@ dependencies = [ "openssl", "rand 0.8.5", "regex", - "reqwest 0.11.22", + "reqwest 0.11.23", "rusoto_core 0.47.0", "rusoto_dynamodb 0.47.0", "sentry", @@ -734,7 +734,7 @@ dependencies = [ "slog-term", "tempfile", "thiserror", - "tokio 1.34.0", + "tokio 1.35.1", "url 2.5.0", "uuid 1.6.1", "validator", @@ -819,7 +819,7 @@ dependencies = [ "grpcio", "hex", "httparse", - "hyper 0.14.27", + "hyper 0.14.28", "lazy_static", "log", "mockall", @@ -828,7 +828,7 @@ dependencies = [ "protobuf", "rand 0.8.5", "regex", - "reqwest 0.11.22", + "reqwest 0.11.23", "rusoto_core 0.47.0", "rusoto_credential 0.47.0", "rusoto_dynamodb 0.47.0", @@ -849,7 +849,7 @@ dependencies = [ "tempfile", "thiserror", "tokio 0.2.25", - "tokio 1.34.0", + "tokio 1.35.1", "tokio-core", "tungstenite", "url 2.5.0", @@ -878,7 +878,7 @@ dependencies = [ "futures-util", "h2 0.3.22", "http 0.2.11", - "itoa 1.0.9", + "itoa 1.0.10", "log", "mime", "percent-encoding 2.3.1", @@ -887,7 +887,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded 0.7.1", - "tokio 1.34.0", + "tokio 1.35.1", ] [[package]] @@ -1213,11 +1213,10 @@ dependencies = [ [[package]] name = "colored" -version = "2.0.4" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2674ec482fbc38012cf31e6c42ba0177b431a0cb6f15fe40efa5aab1bda516f6" +checksum = "cbf2150cce219b664a8a70df7a1f933836724b503f8a413af9365b4dcc4d90b8" dependencies = [ - "is-terminal", "lazy_static", "windows-sys 0.48.0", ] @@ -1270,7 +1269,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e859cd57d0710d9e06c381b550c06e76992472a8c6d527aecd2fc673dcc231fb" dependencies = [ "percent-encoding 2.3.1", - "time 0.3.30", + "time 0.3.31", "version_check", ] @@ -1328,12 +1327,12 @@ dependencies = [ [[package]] name = "crossbeam-channel" -version = "0.5.8" +version = "0.5.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200" +checksum = "14c3242926edf34aec4ac3a77108ad4854bffaa2e4ddc1824124ce59231302d5" dependencies = [ "cfg-if 1.0.0", - "crossbeam-utils 0.8.16", + "crossbeam-utils 0.8.17", ] [[package]] @@ -1405,9 +1404,9 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.16" +version = "0.8.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294" +checksum = "c06d96137f14f244c37f989d9fff8f95e6c18b918e71f36638f8c49112e4c78f" dependencies = [ "cfg-if 1.0.0", ] @@ -1453,12 +1452,12 @@ dependencies = [ [[package]] name = "ctor" -version = "0.2.5" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37e366bff8cd32dd8754b0991fb66b279dc48f598c3a18914852a6673deef583" +checksum = "30d2b3721e861707777e3195b0158f950ae6dc4a27e4d02ff9f67e3eb3de199e" dependencies = [ "quote 1.0.33", - "syn 2.0.39", + "syn 2.0.41", ] [[package]] @@ -1504,7 +1503,7 @@ dependencies = [ "async-trait", "deadpool-runtime", "num_cpus", - "tokio 1.34.0", + "tokio 1.35.1", ] [[package]] @@ -1525,9 +1524,9 @@ dependencies = [ [[package]] name = "deranged" -version = "0.3.9" +version = "0.3.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f32d04922c60427da6f9fef14d042d9edddef64cb9d4ce0d64d0685fbeb1fd3" +checksum = "8eb30d70a07a3b04884d2677f06bec33509dc67ca60d92949e5535352d3191dc" dependencies = [ "powerfmt", "serde", @@ -1958,7 +1957,7 @@ checksum = "45ec6fe3675af967e67c5536c0b9d44e34e6c52f86bedc4ea49c5317b8e94d06" dependencies = [ "futures-channel", "futures-task", - "tokio 1.34.0", + "tokio 1.35.1", ] [[package]] @@ -1969,7 +1968,7 @@ checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb" dependencies = [ "proc-macro2 1.0.70", "quote 1.0.33", - "syn 2.0.39", + "syn 2.0.41", ] [[package]] @@ -2077,9 +2076,9 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "google-cloud-rust-raw" -version = "0.16.0" +version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05cfec9367aefac6e90f42db5dee124b64fa5bcf84adf4236a861453373d851c" +checksum = "864a48916c62ddbd1dc289be6d041d8ca61160c9c6169298e5cf3da11baf8370" dependencies = [ "futures 0.3.29", "grpcio", @@ -2149,7 +2148,7 @@ dependencies = [ "http 0.2.11", "indexmap 2.1.0", "slab", - "tokio 1.34.0", + "tokio 1.35.1", "tokio-util", "tracing", ] @@ -2227,11 +2226,11 @@ dependencies = [ [[package]] name = "home" -version = "0.5.5" +version = "0.5.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5444c27eef6923071f7ebcc33e3444508466a76f7a2b93da00ed6e19f30c1ddb" +checksum = "e3d1354bf6b7235cb4a0576c2619fd4ed18183f689b12b006a0ee7329eeff9a5" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -2264,7 +2263,7 @@ checksum = "8947b1a6fad4393052c7ba1f4cd97bed3e953a95c79c92ad9b051a04611d9fbb" dependencies = [ "bytes 1.5.0", "fnv", - "itoa 1.0.9", + "itoa 1.0.10", ] [[package]] @@ -2281,9 +2280,9 @@ dependencies = [ [[package]] name = "http-body" -version = "0.4.5" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" +checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" dependencies = [ "bytes 1.5.0", "http 0.2.11", @@ -2340,9 +2339,9 @@ dependencies = [ [[package]] name = "hyper" -version = "0.14.27" +version = "0.14.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffb1cfd654a8219eaef89881fdb3bb3b1cdc5fa75ded05d6933b2b382e395468" +checksum = "bf96e135eb83a2a8ddf766e426a841d8ddd7449d5f00d34ea02b41d2f19eef80" dependencies = [ "bytes 1.5.0", "futures-channel", @@ -2350,13 +2349,13 @@ dependencies = [ "futures-util", "h2 0.3.22", "http 0.2.11", - "http-body 0.4.5", + "http-body 0.4.6", "httparse", "httpdate", - "itoa 1.0.9", + "itoa 1.0.10", "pin-project-lite 0.2.13", - "socket2 0.4.10", - "tokio 1.34.0", + "socket2 0.5.5", + "tokio 1.35.1", "tower-service", "tracing", "want 0.3.1", @@ -2367,11 +2366,11 @@ name = "hyper-alpn" version = "0.4.1" source = "git+https://github.com/WalletConnect/hyper-alpn#9761c744b8ba274dfaea04613bb4c39c1a97c141" dependencies = [ - "hyper 0.14.27", + "hyper 0.14.28", "log", "rustls 0.20.9", "rustls-pemfile", - "tokio 1.34.0", + "tokio 1.35.1", "tokio-rustls 0.23.4", "webpki-roots", ] @@ -2384,11 +2383,11 @@ checksum = "5f9f7a97316d44c0af9b0301e65010573a853a9fc97046d7331d7f6bc0fd5a64" dependencies = [ "ct-logs", "futures-util", - "hyper 0.14.27", + "hyper 0.14.28", "log", "rustls 0.19.1", "rustls-native-certs 0.5.0", - "tokio 1.34.0", + "tokio 1.35.1", "tokio-rustls 0.22.0", "webpki 0.21.4", ] @@ -2401,11 +2400,11 @@ checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" dependencies = [ "futures-util", "http 0.2.11", - "hyper 0.14.27", + "hyper 0.14.28", "log", - "rustls 0.21.9", + "rustls 0.21.10", "rustls-native-certs 0.6.3", - "tokio 1.34.0", + "tokio 1.35.1", "tokio-rustls 0.24.1", ] @@ -2429,9 +2428,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" dependencies = [ "bytes 1.5.0", - "hyper 0.14.27", + "hyper 0.14.28", "native-tls", - "tokio 1.34.0", + "tokio 1.35.1", "tokio-native-tls", ] @@ -2599,9 +2598,9 @@ checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4" [[package]] name = "itoa" -version = "1.0.9" +version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38" +checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c" [[package]] name = "jobserver" @@ -2676,9 +2675,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "libc" -version = "0.2.150" +version = "0.2.151" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c" +checksum = "302d7ab3130588088d277783b1e2d2e10c9e9e4a16dd9050e6ec93fb3e7048f4" [[package]] name = "libloading" @@ -2869,9 +2868,9 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.9" +version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3dce281c5e46beae905d4de1870d8b1509a9142b62eedf18b443b011ca8343d0" +checksum = "8f3d0b296e374a4e6f3c7b0a1f5a51d748a0d34c85e7dc48fc3fa9a87657fe09" dependencies = [ "libc", "log", @@ -2976,7 +2975,7 @@ checksum = "51fba38c7ded23ca88a409f72277d177170b3eadb5e283741182fd3cae60ecdf" dependencies = [ "hostname", "lazy_static", - "reqwest 0.11.22", + "reqwest 0.11.23", ] [[package]] @@ -3084,9 +3083,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.18.0" +version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" +checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" [[package]] name = "opaque-debug" @@ -3102,9 +3101,9 @@ checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" [[package]] name = "openssl" -version = "0.10.60" +version = "0.10.61" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79a4c6c3a2b158f7f8f2a2fc5a969fa3a068df6fc9dbb4a43845436e3af7c800" +checksum = "6b8419dc8cc6d866deb801274bba2e6f8f6108c1bb7fcc10ee5ab864931dbb45" dependencies = [ "bitflags 2.4.1", "cfg-if 1.0.0", @@ -3123,7 +3122,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2 1.0.70", "quote 1.0.33", - "syn 2.0.39", + "syn 2.0.41", ] [[package]] @@ -3134,9 +3133,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "openssl-sys" -version = "0.9.96" +version = "0.9.97" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3812c071ba60da8b5677cc12bcb1d42989a65553772897a7e0355545a819838f" +checksum = "c3eaad34cdd97d81de97964fc7f29e2d104f483840d906ef56daa1912338460b" dependencies = [ "cc", "libc", @@ -3315,7 +3314,7 @@ dependencies = [ "pest_meta", "proc-macro2 1.0.70", "quote 1.0.33", - "syn 2.0.39", + "syn 2.0.41", ] [[package]] @@ -3797,9 +3796,9 @@ dependencies = [ [[package]] name = "reqwest" -version = "0.11.22" +version = "0.11.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "046cd98826c46c2ac8ddecae268eb5c2e58628688a5fc7a2643704a73faba95b" +checksum = "37b1ae8d9ac08420c66222fb9096fc5de435c3c48542bc5336c51892cffafb41" dependencies = [ "base64 0.21.5", "bytes 1.5.0", @@ -3808,8 +3807,8 @@ dependencies = [ "futures-util", "h2 0.3.22", "http 0.2.11", - "http-body 0.4.5", - "hyper 0.14.27", + "http-body 0.4.6", + "hyper 0.14.28", "hyper-tls 0.5.0", "ipnet", "js-sys", @@ -3823,7 +3822,7 @@ dependencies = [ "serde_json", "serde_urlencoded 0.7.1", "system-configuration", - "tokio 1.34.0", + "tokio 1.35.1", "tokio-native-tls", "tower-service", "url 2.5.0", @@ -3850,9 +3849,9 @@ dependencies = [ [[package]] name = "ring" -version = "0.17.6" +version = "0.17.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "684d5e6e18f669ccebf64a92236bb7db9a34f07be010e3627368182027180866" +checksum = "688c63d65483050968b2a8937f7995f443e27041a0f7700aa59b0822aedebb74" dependencies = [ "cc", "getrandom 0.2.11", @@ -3911,7 +3910,7 @@ dependencies = [ "crc32fast", "futures 0.3.29", "http 0.2.11", - "hyper 0.14.27", + "hyper 0.14.28", "hyper-rustls 0.22.1", "lazy_static", "log", @@ -3920,7 +3919,7 @@ dependencies = [ "rustc_version 0.4.0", "serde", "serde_json", - "tokio 1.34.0", + "tokio 1.35.1", "xml-rs", ] @@ -3954,11 +3953,11 @@ dependencies = [ "chrono", "dirs-next", "futures 0.3.29", - "hyper 0.14.27", + "hyper 0.14.28", "serde", "serde_json", "shlex 1.2.0", - "tokio 1.34.0", + "tokio 1.35.1", "zeroize", ] @@ -4028,7 +4027,7 @@ dependencies = [ "hex", "hmac 0.11.0", "http 0.2.11", - "hyper 0.14.27", + "hyper 0.14.28", "log", "md-5", "percent-encoding 2.3.1", @@ -4037,7 +4036,7 @@ dependencies = [ "rustc_version 0.4.0", "serde", "sha2 0.9.9", - "tokio 1.34.0", + "tokio 1.35.1", ] [[package]] @@ -4049,7 +4048,7 @@ dependencies = [ "base64 0.13.1", "blake2b_simd", "constant_time_eq", - "crossbeam-utils 0.8.16", + "crossbeam-utils 0.8.17", ] [[package]] @@ -4094,9 +4093,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.26" +version = "0.38.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9470c4bf8246c8daf25f9598dca807fb6510347b1e1cfa55749113850c79d88a" +checksum = "72e572a5e8ca657d7366229cdde4bd14c4eb5499a9573d4d366fe1b599daa316" dependencies = [ "bitflags 2.4.1", "errno", @@ -4132,12 +4131,12 @@ dependencies = [ [[package]] name = "rustls" -version = "0.21.9" +version = "0.21.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "629648aced5775d558af50b2b4c7b02983a04b312126d45eeead26e7caa498b9" +checksum = "f9d5a6813c0759e4609cd494e8e725babae6a2ca7b62a5536a13daaec6fcb7ba" dependencies = [ "log", - "ring 0.17.6", + "ring 0.17.7", "rustls-webpki", "sct 0.7.1", ] @@ -4181,7 +4180,7 @@ version = "0.101.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" dependencies = [ - "ring 0.17.6", + "ring 0.17.7", "untrusted 0.9.0", ] @@ -4193,9 +4192,9 @@ checksum = "7ffc183a10b4478d04cbbbfc96d0873219d962dd5accaff2ffbd4ceb7df837f4" [[package]] name = "ryu" -version = "1.0.15" +version = "1.0.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741" +checksum = "f98d2aa92eebf49b69786be48e4477826b256916e84a57ff2a4f21923b48eb4c" [[package]] name = "same-file" @@ -4243,7 +4242,7 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" dependencies = [ - "ring 0.17.6", + "ring 0.17.7", "untrusted 0.9.0", ] @@ -4299,29 +4298,29 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" [[package]] name = "sentry" -version = "0.32.0" +version = "0.32.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b596ee5f4e76638de6063ca96fd3d923675416461fc7f1b77406dc2f32d1979" +checksum = "ab18211f62fb890f27c9bb04861f76e4be35e4c2fcbfc2d98afa37aadebb16f1" dependencies = [ "httpdate", "log", "native-tls", - "reqwest 0.11.22", + "reqwest 0.11.23", "sentry-backtrace", "sentry-contexts", "sentry-core", "sentry-debug-images", "sentry-panic", "sentry-tracing", - "tokio 1.34.0", + "tokio 1.35.1", "ureq", ] [[package]] name = "sentry-actix" -version = "0.32.0" +version = "0.32.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e059d10e7d467721131f63393a85e8d24525bbb981f1daa4c5c874162b8311f" +checksum = "a2576f4d8380c3a5baa1f2f3796e442b6afceb178b6a6e573760e95d07dbb3dd" dependencies = [ "actix-web", "futures-util", @@ -4330,9 +4329,9 @@ dependencies = [ [[package]] name = "sentry-backtrace" -version = "0.32.0" +version = "0.32.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6510a97162c288989a6310766bcadfc83ec98ad73121674463b055c42189e85" +checksum = "cf018ff7d5ce5b23165a9cbfee60b270a55ae219bc9eebef2a3b6039356dd7e5" dependencies = [ "backtrace", "once_cell", @@ -4342,9 +4341,9 @@ dependencies = [ [[package]] name = "sentry-contexts" -version = "0.32.0" +version = "0.32.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64e2552a4a578aade01bd44691e6805c32bac34fc918f1675739fbbf2add8460" +checksum = "1d934df6f9a17b8c15b829860d9d6d39e78126b5b970b365ccbd817bc0fe82c9" dependencies = [ "hostname", "libc", @@ -4356,9 +4355,9 @@ dependencies = [ [[package]] name = "sentry-core" -version = "0.32.0" +version = "0.32.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ebb7a6ad833035f6b36db3e61e450643eec8a3c5f2839b8e41c74a73e57c6bae" +checksum = "5e362d3fb1c5de5124bf1681086eaca7adf6a8c4283a7e1545359c729f9128ff" dependencies = [ "log", "once_cell", @@ -4370,9 +4369,9 @@ dependencies = [ [[package]] name = "sentry-debug-images" -version = "0.32.0" +version = "0.32.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0bcd02214397892a3ec25372cc68c210d858f39314535f5d640bdf41294fd441" +checksum = "d8bca420d75d9e7a8e54a4806bf4fa8a7e9a804e8f2ff05c7c80234168c6ca66" dependencies = [ "findshlibs", "once_cell", @@ -4381,9 +4380,9 @@ dependencies = [ [[package]] name = "sentry-panic" -version = "0.32.0" +version = "0.32.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0866e2ba7615fe37e0e485f2373bf9dabbb255f82637b5fe47902095790bbbc9" +checksum = "e0224e7a8e2bd8a32d96804acb8243d6d6e073fed55618afbdabae8249a964d8" dependencies = [ "sentry-backtrace", "sentry-core", @@ -4391,9 +4390,9 @@ dependencies = [ [[package]] name = "sentry-tracing" -version = "0.32.0" +version = "0.32.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53ef38653386354058f30b3c6d0bf764c59ee6270cd769ac4620a2d2fd60c8fe" +checksum = "087bed8c616d176a9c6b662a8155e5f23b40dc9e1fa96d0bd5fb56e8636a9275" dependencies = [ "sentry-backtrace", "sentry-core", @@ -4403,9 +4402,9 @@ dependencies = [ [[package]] name = "sentry-types" -version = "0.32.0" +version = "0.32.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26342e85c6b3332273b820d5be6b93027fe991ded23a2aa6fb88a5a28c845c40" +checksum = "fb4f0e37945b7a8ce7faebc310af92442e2d7c5aa7ef5b42fe6daa98ee133f65" dependencies = [ "debugid", "hex", @@ -4413,7 +4412,7 @@ dependencies = [ "serde", "serde_json", "thiserror", - "time 0.3.30", + "time 0.3.31", "url 2.5.0", "uuid 1.6.1", ] @@ -4435,7 +4434,7 @@ checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3" dependencies = [ "proc-macro2 1.0.70", "quote 1.0.33", - "syn 2.0.39", + "syn 2.0.41", ] [[package]] @@ -4465,7 +4464,7 @@ version = "1.0.108" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d1c7e3eac408d115102c4c24ad393e0821bb3a5df4d506a80f85f7a742a526b" dependencies = [ - "itoa 1.0.9", + "itoa 1.0.10", "ryu", "serde", ] @@ -4489,7 +4488,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" dependencies = [ "form_urlencoded", - "itoa 1.0.9", + "itoa 1.0.10", "ryu", "serde", ] @@ -4599,7 +4598,7 @@ dependencies = [ "num-bigint", "num-traits", "thiserror", - "time 0.3.30", + "time 0.3.31", ] [[package]] @@ -4691,7 +4690,7 @@ dependencies = [ "slog", "term", "thread_local", - "time 0.3.30", + "time 0.3.31", ] [[package]] @@ -4831,9 +4830,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.39" +version = "2.0.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23e78b90f2fcf45d3e842032ce32e3f2d1545ba6636271dcbf24fa306d87be7a" +checksum = "44c8b28c477cc3bf0e7966561e3460130e1255f7a1cf71931075f1c5e7a7e269" dependencies = [ "proc-macro2 1.0.70", "quote 1.0.33", @@ -4929,22 +4928,22 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.50" +version = "1.0.51" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9a7210f5c9a7156bb50aa36aed4c95afb51df0df00713949448cf9e97d382d2" +checksum = "f11c217e1416d6f036b870f14e0413d480dbf28edbee1f877abaf0206af43bb7" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.50" +version = "1.0.51" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "266b2e40bc00e5a6c09c3584011e08b06f123c00362c92b975ba9843aaaa14b8" +checksum = "01742297787513b79cf8e29d1056ede1313e2420b7b3b15d0a768b4921f549df" dependencies = [ "proc-macro2 1.0.70", "quote 1.0.33", - "syn 2.0.39", + "syn 2.0.41", ] [[package]] @@ -4970,12 +4969,12 @@ dependencies = [ [[package]] name = "time" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4a34ab300f2dee6e562c10a046fc05e358b29f9bf92277f30c3c8d82275f6f5" +checksum = "f657ba42c3f86e7680e53c8cd3af8abbe56b5491790b46e22e19c0d57463583e" dependencies = [ "deranged", - "itoa 1.0.9", + "itoa 1.0.10", "libc", "num_threads", "powerfmt", @@ -4992,9 +4991,9 @@ checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" [[package]] name = "time-macros" -version = "0.2.15" +version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ad70d68dba9e1f8aceda7aa6711965dfec1cac869f311a51bd08b3a2ccbce20" +checksum = "26197e33420244aeb70c3e8c78376ca46571bc4e701e4791c2cd9f57dcb3a43f" dependencies = [ "time-core", ] @@ -5051,14 +5050,14 @@ dependencies = [ [[package]] name = "tokio" -version = "1.34.0" +version = "1.35.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0c014766411e834f7af5b8f4cf46257aab4036ca95e9d2c144a10f59ad6f5b9" +checksum = "c89b4efa943be685f629b149f53829423f8f5531ea21249408e8e2f8671ec104" dependencies = [ "backtrace", "bytes 1.5.0", "libc", - "mio 0.8.9", + "mio 0.8.10", "num_cpus", "parking_lot 0.12.1", "pin-project-lite 0.2.13", @@ -5170,7 +5169,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2 1.0.70", "quote 1.0.33", - "syn 2.0.39", + "syn 2.0.41", ] [[package]] @@ -5180,7 +5179,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" dependencies = [ "native-tls", - "tokio 1.34.0", + "tokio 1.35.1", ] [[package]] @@ -5239,7 +5238,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6" dependencies = [ "rustls 0.19.1", - "tokio 1.34.0", + "tokio 1.35.1", "webpki 0.21.4", ] @@ -5250,7 +5249,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" dependencies = [ "rustls 0.20.9", - "tokio 1.34.0", + "tokio 1.35.1", "webpki 0.22.4", ] @@ -5260,8 +5259,8 @@ version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" dependencies = [ - "rustls 0.21.9", - "tokio 1.34.0", + "rustls 0.21.10", + "tokio 1.35.1", ] [[package]] @@ -5388,7 +5387,7 @@ dependencies = [ "futures-core", "futures-sink", "pin-project-lite 0.2.13", - "tokio 1.34.0", + "tokio 1.35.1", "tracing", ] @@ -5439,9 +5438,9 @@ dependencies = [ [[package]] name = "try-lock" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" +checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" [[package]] name = "try_from" @@ -5503,9 +5502,9 @@ dependencies = [ [[package]] name = "unicode-bidi" -version = "0.3.13" +version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92888ba5573ff080736b3648696b70cafad7d250551175acbaa4e0385b3e1460" +checksum = "6f2528f27a9eb2b21e69c95319b30bd0efd85d09c379741b0f78ea1d86be2416" [[package]] name = "unicode-ident" @@ -5753,7 +5752,7 @@ dependencies = [ "once_cell", "proc-macro2 1.0.70", "quote 1.0.33", - "syn 2.0.39", + "syn 2.0.41", "wasm-bindgen-shared", ] @@ -5787,7 +5786,7 @@ checksum = "f0eb82fcb7930ae6219a7ecfd55b217f5f0893484b7a13022ebb2b2bf20b5283" dependencies = [ "proc-macro2 1.0.70", "quote 1.0.33", - "syn 2.0.39", + "syn 2.0.41", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -5839,7 +5838,7 @@ version = "0.22.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed63aea5ce73d0ff405984102c42de94fc55a6b75765d621c65262469b3c9b53" dependencies = [ - "ring 0.17.6", + "ring 0.17.7", "untrusted 0.9.0", ] @@ -6113,40 +6112,40 @@ dependencies = [ "base64 0.13.1", "futures 0.3.29", "http 0.2.11", - "hyper 0.14.27", + "hyper 0.14.28", "hyper-rustls 0.24.2", "itertools", "log", "percent-encoding 2.3.1", - "rustls 0.21.9", + "rustls 0.21.10", "rustls-pemfile", "seahash", "serde", "serde_json", - "time 0.3.30", - "tokio 1.34.0", + "time 0.3.31", + "tokio 1.35.1", "tower-service", "url 2.5.0", ] [[package]] name = "zerocopy" -version = "0.7.28" +version = "0.7.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d6f15f7ade05d2a4935e34a457b936c23dc70a05cc1d97133dc99e7a3fe0f0e" +checksum = "1c4061bedbb353041c12f413700357bec76df2c7e2ca8e4df8bac24c6bf68e3d" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.7.28" +version = "0.7.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbbad221e3f78500350ecbd7dfa4e63ef945c05f4c61cb7f4d3f84cd0bba649b" +checksum = "b3c129550b3e6de3fd0ba67ba5c81818f9805e58b8d7fee80a3a59d2c9fc601a" dependencies = [ "proc-macro2 1.0.70", "quote 1.0.33", - "syn 2.0.39", + "syn 2.0.41", ] [[package]] @@ -6166,7 +6165,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2 1.0.70", "quote 1.0.33", - "syn 2.0.39", + "syn 2.0.41", ] [[package]] diff --git a/autopush-common/Cargo.toml b/autopush-common/Cargo.toml index e681e4033..aeaec5194 100644 --- a/autopush-common/Cargo.toml +++ b/autopush-common/Cargo.toml @@ -76,7 +76,7 @@ actix-rt = "2.8" [features] # for testing: -# default = ["emulator"] +# default = ["emulator", "bigtable", "dynamodb"] default = ["dynamodb"] bigtable = ["dep:google-cloud-rust-raw", "dep:grpcio", "dep:protobuf"] dynamodb = ["dep:rusoto_core", "dep:rusoto_credential", "dep:rusoto_dynamodb"] diff --git a/autopush-common/src/db/bigtable/bigtable_client/error.rs b/autopush-common/src/db/bigtable/bigtable_client/error.rs index 56441be68..29fd9b105 100644 --- a/autopush-common/src/db/bigtable/bigtable_client/error.rs +++ b/autopush-common/src/db/bigtable/bigtable_client/error.rs @@ -3,6 +3,83 @@ use thiserror::Error; use crate::errors::ReportableError; +#[derive(PartialEq, Eq, Debug)] +pub enum MutateRowStatus { + OK, + Cancelled, + Unknown, + InvalidArgument, + DeadlineExceeded, + NotFound, + AlreadyExists, + PermissionDenied, + ResourceExhausted, + FailedPrecondition, + Aborted, + OutOfRange, + Unimplemented, + Internal, + Unavailable, + DataLoss, + Unauthenticated, +} + +impl MutateRowStatus { + pub fn is_ok(&self) -> bool { + self == &Self::OK + } +} + +impl From for MutateRowStatus { + fn from(v: i32) -> Self { + match v { + 0 => Self::OK, + 1 => Self::Cancelled, + 2 => Self::Unknown, + 3 => Self::InvalidArgument, + 4 => Self::DeadlineExceeded, + 5 => Self::NotFound, + 6 => Self::AlreadyExists, + 7 => Self::PermissionDenied, + 8 => Self::ResourceExhausted, + 9 => Self::FailedPrecondition, + 10 => Self::Aborted, + 11 => Self::OutOfRange, + 12 => Self::Unimplemented, + 13 => Self::Internal, + 14 => Self::Unavailable, + 15 => Self::DataLoss, + 16 => Self::Unauthenticated, + _ => Self::Unknown, + } + } +} + +impl ToString for MutateRowStatus { + fn to_string(&self) -> String { + match self { + MutateRowStatus::OK => "Ok", + MutateRowStatus::Cancelled => "Cancelled", + MutateRowStatus::Unknown => "Unknown", + MutateRowStatus::InvalidArgument => "Invalid Argument", + MutateRowStatus::DeadlineExceeded => "Deadline Exceeded", + MutateRowStatus::NotFound => "Not Found", + MutateRowStatus::AlreadyExists => "Already Exists", + MutateRowStatus::PermissionDenied => "Permission Denied", + MutateRowStatus::ResourceExhausted => "Resource Exhausted", + MutateRowStatus::FailedPrecondition => "Failed Precondition", + MutateRowStatus::Aborted => "Aborted", + MutateRowStatus::OutOfRange => "Out of Range", + MutateRowStatus::Unimplemented => "Unimplemented", + MutateRowStatus::Internal => "Internal", + MutateRowStatus::Unavailable => "Unavailable", + MutateRowStatus::DataLoss => "Data Loss", + MutateRowStatus::Unauthenticated => "Unauthenticated", + } + .to_owned() + } +} + #[derive(Debug, Error)] pub enum BigTableError { #[error("Invalid Row Response")] @@ -20,6 +97,11 @@ pub enum BigTableError { #[error("Bigtable write error")] Write(grpcio::Error), + /// Return a GRPC status code and any message. + /// See https://grpc.github.io/grpc/core/md_doc_statuscodes.html + #[error("Bigtable status response")] + Status(MutateRowStatus, String), + #[error("BigTable Admin Error")] Admin(String, Option), @@ -52,6 +134,7 @@ impl ReportableError for BigTableError { BigTableError::InvalidChunk(_) => "storage.bigtable.error.invalid_chunk", BigTableError::Read(_) => "storage.bigtable.error.read", BigTableError::Write(_) => "storage.bigtable.error.write", + BigTableError::Status(_, _) => "storage.bigtable.error.status", BigTableError::WriteTime(_) => "storage.bigtable.error.writetime", BigTableError::Admin(_, _) => "storage.bigtable.error.admin", BigTableError::Recycle => "storage.bigtable.error.recycle", @@ -66,6 +149,9 @@ impl ReportableError for BigTableError { BigTableError::InvalidChunk(s) => vec![("error", s.to_string())], BigTableError::Read(s) => vec![("error", s.to_string())], BigTableError::Write(s) => vec![("error", s.to_string())], + BigTableError::Status(code, s) => { + vec![("code", code.to_string()), ("error", s.to_string())] + } BigTableError::WriteTime(s) => vec![("error", s.to_string())], BigTableError::Admin(s, raw) => { let mut x = vec![("error", s.to_owned())]; diff --git a/autopush-common/src/db/bigtable/bigtable_client/mod.rs b/autopush-common/src/db/bigtable/bigtable_client/mod.rs index 0bf53fb88..e6f4c4961 100644 --- a/autopush-common/src/db/bigtable/bigtable_client/mod.rs +++ b/autopush-common/src/db/bigtable/bigtable_client/mod.rs @@ -7,6 +7,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use async_trait::async_trait; use cadence::StatsdClient; +use futures_util::StreamExt; use google_cloud_rust_raw::bigtable::admin::v2::bigtable_table_admin::DropRowRangeRequest; use google_cloud_rust_raw::bigtable::admin::v2::bigtable_table_admin_grpc::BigtableTableAdminClient; use google_cloud_rust_raw::bigtable::v2::bigtable::ReadRowsRequest; @@ -650,6 +651,98 @@ impl DbClient for BigTableClientImpl { self.write_row(row).await.map_err(|e| e.into()) } + /// Add channels in bulk (used mostly during migration) + /// + async fn add_channels(&self, uaid: &Uuid, channels: HashSet) -> DbResult<()> { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map_err(|e| DbError::General(e.to_string()))? + .as_millis(); + let mut entries = protobuf::RepeatedField::default(); + let mut req = bigtable::MutateRowsRequest::default(); + let mut limit: u32 = 0; + req.set_table_name(self.settings.table_name.clone()); + + // Create entries that define rows that contain mutations to hold the updated value which + // will create/update the channels. + for channel in channels { + let mut entry = bigtable::MutateRowsRequest_Entry::default(); + let key = as_key(uaid, Some(&channel), None); + entry.set_row_key(key.into_bytes()); + + let mut cell_mutations = protobuf::RepeatedField::default(); + let mut mutation = data::Mutation::default(); + let mut set_cell = data::Mutation_SetCell { + family_name: ROUTER_FAMILY.to_owned(), + ..Default::default() + }; + set_cell.set_column_qualifier("updated".to_owned().into_bytes().to_vec()); + set_cell.set_value(now.to_be_bytes().to_vec()); + + mutation.set_set_cell(set_cell); + cell_mutations.push(mutation); + entry.set_mutations(cell_mutations); + entries.push(entry); + // There is a limit of 100,000 mutations per batch for bigtable. + // https://cloud.google.com/bigtable/quotas + // If you have 100,000 channels, you have too many. + limit += 1; + if limit >= 100_000 { + break; + } + } + req.set_entries(entries); + + let bigtable = self.pool.get().await?; + + // ClientSStreamReceiver will cancel an operation if it's dropped before it's done. + let resp = bigtable + .conn + .mutate_rows(&req) + .map_err(error::BigTableError::Write)?; + + // Scan the returned stream looking for errors. + // As I understand, the returned stream contains chunked MutateRowsResponse structs. Each + // struct contains the result of the row mutation, and contains a `status` (non-zero on error) + // and an optional message string (empty if none). + // The structure also contains an overall `status` but that does not appear to be exposed. + // Status codes are defined at https://grpc.github.io/grpc/core/md_doc_statuscodes.html + let mut stream = Box::pin(resp); + let mut cnt = 0; + loop { + let (result, remainder) = stream.into_future().await; + if let Some(result) = result { + debug!("🎏 Result block: {}", cnt); + match result { + Ok(r) => { + for e in r.get_entries() { + if e.has_status() { + let status = e.get_status(); + // See status code definitions: https://grpc.github.io/grpc/core/md_doc_statuscodes.html + let code = error::MutateRowStatus::from(status.get_code()); + if !code.is_ok() { + return Err(error::BigTableError::Status( + code, + status.get_message().to_owned(), + ) + .into()); + } + debug!("🎏 Response: {} OK", e.index); + } + } + } + Err(e) => return Err(error::BigTableError::Write(e).into()), + }; + cnt += 1; + } else { + debug!("🎏 Done!"); + break; + } + stream = remainder; + } + Ok(()) + } + /// Delete all the rows that start with the given prefix. NOTE: this may be metered and should /// be used with caution. async fn get_channels(&self, uaid: &Uuid) -> DbResult> { @@ -1070,6 +1163,7 @@ mod tests { //! use std::sync::Arc; use std::time::SystemTime; + use uuid; use super::*; use cadence::StatsdClient; @@ -1153,6 +1247,19 @@ mod tests { let channels = client.get_channels(&uaid).await; assert!(channels.unwrap().contains(&chid)); + // can we add lots of channels? + let mut new_channels: HashSet = HashSet::new(); + new_channels.insert(chid); + for _ in 1..10 { + new_channels.insert(uuid::Uuid::new_v4()); + } + client + .add_channels(&uaid, new_channels.clone()) + .await + .unwrap(); + let channels = client.get_channels(&uaid).await.unwrap(); + assert_eq!(channels, new_channels); + // can we modify the user record? let updated = User { connected_at: now() + 3, @@ -1222,7 +1329,6 @@ mod tests { .await .unwrap() .messages; - print!("Messages: {:?}", &msgs); assert!(msgs.is_empty()); assert!(client.remove_user(&uaid).await.is_ok()); diff --git a/autopush-common/src/db/client.rs b/autopush-common/src/db/client.rs index 6b642612a..062a2ede4 100644 --- a/autopush-common/src/db/client.rs +++ b/autopush-common/src/db/client.rs @@ -41,6 +41,9 @@ pub trait DbClient: Send + Sync { /// Add a channel to a user async fn add_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult<()>; + /// Add a batch of channels to a user + async fn add_channels(&self, uaid: &Uuid, channels: HashSet) -> DbResult<()>; + /// Get the set of channel IDs for a user async fn get_channels(&self, uaid: &Uuid) -> DbResult>; diff --git a/autopush-common/src/db/dual/mod.rs b/autopush-common/src/db/dual/mod.rs index 7135b0a37..a513274d8 100644 --- a/autopush-common/src/db/dual/mod.rs +++ b/autopush-common/src/db/dual/mod.rs @@ -140,6 +140,8 @@ impl DbClient for DualClientImpl { // copy the user record over to the new data store. debug!("⚖ Found user record in secondary, moving to primary"); self.primary.add_user(&user).await?; + let channels = self.secondary.get_channels(uaid).await?; + self.primary.add_channels(uaid, channels).await?; return Ok(Some(user)); } } @@ -165,6 +167,11 @@ impl DbClient for DualClientImpl { target.add_channel(uaid, channel_id).await } + async fn add_channels(&self, uaid: &Uuid, channels: HashSet) -> DbResult<()> { + let (target, _) = self.allot(uaid).await?; + target.add_channels(uaid, channels).await + } + async fn get_channels(&self, uaid: &Uuid) -> DbResult> { let (target, is_primary) = self.allot(uaid).await?; let mut channels = target.get_channels(uaid).await?; diff --git a/autopush-common/src/db/dynamodb/mod.rs b/autopush-common/src/db/dynamodb/mod.rs index 815a30b2f..6f81f4248 100644 --- a/autopush-common/src/db/dynamodb/mod.rs +++ b/autopush-common/src/db/dynamodb/mod.rs @@ -267,6 +267,14 @@ impl DbClient for DdbClientImpl { Ok(()) } + /// Hopefully, this is never called. It is provided for completion sake. + async fn add_channels(&self, uaid: &Uuid, channels: HashSet) -> DbResult<()> { + for channel_id in channels { + self.add_channel(uaid, &channel_id).await?; + } + Ok(()) + } + async fn get_channels(&self, uaid: &Uuid) -> DbResult> { // Channel IDs are stored in a special row in the message table, where // chidmessageid = " " diff --git a/autopush-common/src/db/mock.rs b/autopush-common/src/db/mock.rs index 391b74f4f..8c3cdfdea 100644 --- a/autopush-common/src/db/mock.rs +++ b/autopush-common/src/db/mock.rs @@ -36,6 +36,10 @@ impl DbClient for Arc { Arc::as_ref(self).add_channel(uaid, channel_id).await } + async fn add_channels(&self, uaid: &Uuid, channels: HashSet) -> DbResult<()> { + Arc::as_ref(self).add_channels(uaid, channels).await + } + async fn get_channels(&self, uaid: &Uuid) -> DbResult> { Arc::as_ref(self).get_channels(uaid).await }