Skip to content

Commit

Permalink
Updates of the tests
Browse files Browse the repository at this point in the history
  • Loading branch information
teroxik committed Jun 21, 2020
1 parent 5bae4b2 commit 4e60a2d
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 72 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2016 Typesafe Inc. <http://www.typesafe.com>
* Copyright (C) 2020 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.dynamodb.journal

Expand Down Expand Up @@ -30,7 +30,7 @@ trait DynamoDBHelper {
val settings: DynamoDBConfig
import settings._

// def shutdown(): Unit = dynamoDB..shutdown()
def shutdown(): Unit = dynamoDB.close()

private var reporter: ActorRef = _
def setReporter(ref: ActorRef): Unit = reporter = ref
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
*/
package akka.persistence.dynamodb.journal

import java.net.URI
import java.util
import java.util.Base64

import org.scalactic.TypeCheckedTripleEquals
Expand All @@ -12,12 +14,15 @@ import akka.actor.ActorSystem
import akka.persistence._
import akka.persistence.JournalProtocol._
import akka.testkit._
import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.regions.Regions
import com.amazonaws.services.dynamodbv2.{ AmazonDynamoDB, AmazonDynamoDBClient }
import com.amazonaws.services.dynamodbv2.document.{ DynamoDB, Item }
import software.amazon.awssdk.auth.credentials.{ AwsBasicCredentials, StaticCredentialsProvider }
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.dynamodb._
import software.amazon.awssdk.services.dynamodb.model._

import com.typesafe.config.ConfigFactory

import akka.persistence.dynamodb._

class BackwardsCompatibilityV1Spec extends TestKit(ActorSystem("PartialAsyncSerializationSpec"))
with ImplicitSender
with WordSpecLike
Expand All @@ -34,12 +39,14 @@ class BackwardsCompatibilityV1Spec extends TestKit(ActorSystem("PartialAsyncSeri
val accesKey = config.getString("my-dynamodb-journal.aws-access-key-id")
val secretKey = config.getString("my-dynamodb-journal.aws-secret-access-key")

val client: AmazonDynamoDB = new AmazonDynamoDBClient(new BasicAWSCredentials(accesKey, secretKey))
.withRegion(Regions.US_EAST_1)

client.setEndpoint(endpoint)
/*
val client: DynamoDbAsyncClient = DynamoDbAsyncClient.builder()
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create(accesKey, secretKey)))
.region(Region.US_EAST_1)
.endpointOverride(new URI(endpoint))
.build()
val dynamoDB = new DynamoDB(client)
*/

val persistenceId = "journal-P-OldFormatEvents-0"

Expand All @@ -65,13 +72,13 @@ class BackwardsCompatibilityV1Spec extends TestKit(ActorSystem("PartialAsyncSeri
"ChEIARINrO0ABXQABmEtMDAxORATGg9PbGRGb3JtYXRFdmVudHNqJDI5NWZhMTE2LWZhOTUtNGY1Yi1hZjk2LTgwZDk4NjFhODk4ZA==",
"ChEIARINrO0ABXQABmEtMDAyMBAUGg9PbGRGb3JtYXRFdmVudHNqJDI5NWZhMTE2LWZhOTUtNGY1Yi1hZjk2LTgwZDk4NjFhODk4ZA==")

val table = dynamoDB.getTable(tableName)

def createItem(number: Int, data: String): Unit = {
table.putItem(
new Item()
.withPrimaryKey("par", persistenceId, "num", number)
.withBinary("pay", Base64.getDecoder.decode(data)))
val item: Item = new util.HashMap()
item.put(Key, S(persistenceId))
item.put(Sort, N(number))
item.put(Payload, B(Base64.getDecoder.decode(data)))

client.putItem(PutItemRequest.builder().tableName(tableName).item(item).build())
}

for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@
*/
package akka.persistence.dynamodb.journal

import com.amazonaws.services.dynamodbv2.model._
import software.amazon.awssdk.services.dynamodb.model._
import scala.concurrent.Await
import akka.actor.ActorSystem
import scala.concurrent.Future
import scala.concurrent.duration._
import akka.persistence.Persistence
import akka.util.Timeout
import java.util.UUID
import akka.persistence.PersistentRepr
Expand All @@ -33,19 +32,20 @@ trait DynamoDBUtils {

def ensureJournalTableExists(read: Long = 10L, write: Long = 10L): Unit = {
val create = schema
.withTableName(JournalTable)
.withProvisionedThroughput(new ProvisionedThroughput(read, write))
.tableName(JournalTable)
.provisionedThroughput(ProvisionedThroughput.builder().readCapacityUnits(read).writeCapacityUnits(write).build())
.build()

var names = Vector.empty[String]
lazy val complete: ListTablesResult => Future[Vector[String]] = aws =>
if (aws.getLastEvaluatedTableName == null) Future.successful(names ++ aws.getTableNames.asScala)
lazy val complete: ListTablesResponse => Future[Vector[String]] = aws =>
if (aws.lastEvaluatedTableName() == null) Future.successful(names ++ aws.tableNames().asScala)
else {
names ++= aws.getTableNames.asScala
names ++= aws.tableNames().asScala
client
.listTables(new ListTablesRequest().withExclusiveStartTableName(aws.getLastEvaluatedTableName))
.listTables(ListTablesRequest.builder().exclusiveStartTableName(aws.lastEvaluatedTableName()).build())
.flatMap(complete)
}
val list = client.listTables(new ListTablesRequest).flatMap(complete)
val list = client.listTables(ListTablesRequest.builder().build()).flatMap(complete)

val setup = for {
exists <- list.map(_ contains JournalTable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,19 @@ import org.scalatest.concurrent.ScalaFutures

import akka.actor._
import akka.testkit._

import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.persistence._
import akka.event.Logging
import akka.persistence._
import akka.persistence.JournalProtocol._

import scala.concurrent.duration._
import scala.collection.JavaConverters._

import com.amazonaws.services.dynamodbv2.model._
import com.typesafe.config.ConfigFactory

import akka.persistence.dynamodb._
import software.amazon.awssdk.services.dynamodb.model._

class FailureReportingSpec extends TestKit(ActorSystem("FailureReportingSpec"))
with ImplicitSender
Expand Down Expand Up @@ -196,47 +198,50 @@ akka.loggers = ["akka.testkit.TestEventListener"]
val key2Item = Map(Key -> S("The2Key"), Sort -> N("43")).asJava

"reporting table problems" in {
val aws = new DescribeTableRequest().withTableName("TheTable")
val aws = DescribeTableRequest.builder().tableName("TheTable").build()
desc(aws) should include("DescribeTable")
desc(aws) should include("TheTable")
}

"reporting putItem problems" in {
val aws = new PutItemRequest().withTableName("TheTable").withItem(keyItem)
val aws = PutItemRequest.builder().tableName("TheTable").item(keyItem).build()
desc(aws) should include("PutItem")
desc(aws) should include("TheTable")
desc(aws) should include("TheKey")
desc(aws) should include("42")
}

"reporting deleteItem problems" in {
val aws = new DeleteItemRequest().withTableName("TheTable").withKey(keyItem)
val aws = DeleteItemRequest.builder().tableName("TheTable").key(keyItem).build()
desc(aws) should include("DeleteItem")
desc(aws) should include("TheTable")
desc(aws) should include("TheKey")
desc(aws) should include("42")
}

"reporting query problems" in {
val aws = new QueryRequest().withTableName("TheTable").withExpressionAttributeValues(Map(":kkey" -> S("TheKey")).asJava)
val aws = QueryRequest.builder().tableName("TheTable")
.expressionAttributeValues(Map(":kkey" -> S("TheKey")).asJava).build()
desc(aws) should include("Query")
desc(aws) should include("TheTable")
desc(aws) should include("TheKey")
}

"reporting batch write problems" in {
val write = new WriteRequest().withPutRequest(new PutRequest().withItem(keyItem))
val remove = new WriteRequest().withDeleteRequest(new DeleteRequest().withKey(key2Item))
val aws = new BatchWriteItemRequest().withRequestItems(Map("TheTable" -> Seq(write, remove).asJava).asJava)
val write = WriteRequest.builder().putRequest(
PutRequest.builder().item(keyItem).build()).build()
val remove = WriteRequest.builder().deleteRequest(
DeleteRequest.builder().key(key2Item).build()).build()
val aws = BatchWriteItemRequest.builder().requestItems(Map("TheTable" -> Seq(write, remove).asJava).asJava).build()
desc(aws) should include("BatchWriteItem")
desc(aws) should include("TheTable")
desc(aws) should include("put[par=TheKey,num=42]")
desc(aws) should include("del[par=The2Key,num=43]")
}

"reporting batch read problems" in {
val ka = new KeysAndAttributes().withKeys(keyItem, key2Item)
val aws = new BatchGetItemRequest().withRequestItems(Map("TheTable" -> ka).asJava)
val ka = KeysAndAttributes.builder().keys(keyItem, key2Item).build()
val aws = BatchGetItemRequest.builder().requestItems(Map("TheTable" -> ka).asJava).build()
desc(aws) should include("BatchGetItem")
desc(aws) should include("TheTable")
desc(aws) should include("TheKey")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import akka.actor.ActorSystem
import akka.persistence._
import akka.persistence.JournalProtocol._
import akka.testkit._
import com.amazonaws.services.dynamodbv2.model._
import java.util.{ HashMap => JHMap }

import akka.persistence.dynamodb._
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest

class RecoveryConsistencySpec extends TestKit(ActorSystem("FailureReportingSpec"))
with ImplicitSender
Expand Down Expand Up @@ -151,6 +152,6 @@ class RecoveryConsistencySpec extends TestKit(ActorSystem("FailureReportingSpec"
val key: Item = new JHMap
key.put(Key, S(s"$JournalName-P-$persistenceId-${num / 100}"))
key.put(Sort, N(num % 100))
client.deleteItem(new DeleteItemRequest().withTableName(JournalTable).withKey(key)).futureValue
client.deleteItem(DeleteItemRequest.builder().tableName(JournalTable).key(key).build())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@ package akka.persistence.dynamodb.snapshot
import java.util.UUID

import akka.actor.ActorSystem
import akka.persistence.PersistentRepr
import akka.util.Timeout
import com.amazonaws.services.dynamodbv2.model._

import software.amazon.awssdk.services.dynamodb.model._
import scala.collection.JavaConverters._
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._
Expand All @@ -31,37 +29,37 @@ trait DynamoDBUtils {

implicit val timeout = Timeout(5.seconds)

import com.amazonaws.services.dynamodbv2.model.{ KeySchemaElement, KeyType }

val schema = new CreateTableRequest()
.withAttributeDefinitions(
new AttributeDefinition().withAttributeName(Key).withAttributeType("S"),
new AttributeDefinition().withAttributeName(SequenceNr).withAttributeType("N"),
new AttributeDefinition().withAttributeName(Timestamp).withAttributeType("N"))
.withKeySchema(
new KeySchemaElement().withAttributeName(Key).withKeyType(KeyType.HASH),
new KeySchemaElement().withAttributeName(SequenceNr).withKeyType(KeyType.RANGE))
.withLocalSecondaryIndexes(
new LocalSecondaryIndex()
.withIndexName(TimestampIndex).withKeySchema(
new KeySchemaElement().withAttributeName(Key).withKeyType(KeyType.HASH),
new KeySchemaElement().withAttributeName(Timestamp).withKeyType(KeyType.RANGE)).withProjection(new Projection().withProjectionType(ProjectionType.ALL)))
val schema = CreateTableRequest.builder()
.attributeDefinitions(
AttributeDefinition.builder().attributeName(Key).attributeType("S").build(),
AttributeDefinition.builder().attributeName(SequenceNr).attributeType("N").build(),
AttributeDefinition.builder().attributeName(Timestamp).attributeType("N").build()).keySchema(
KeySchemaElement.builder().attributeName(Key).keyType(KeyType.HASH).build(),
KeySchemaElement.builder().attributeName(SequenceNr).keyType(KeyType.RANGE).build()).localSecondaryIndexes(
LocalSecondaryIndex.builder()
.indexName(TimestampIndex)
.keySchema(
KeySchemaElement.builder().attributeName(Key).keyType(KeyType.HASH).build(),
KeySchemaElement.builder().attributeName(Timestamp).keyType(KeyType.RANGE).build())
.projection(
Projection.builder().projectionType(ProjectionType.ALL).build()).build())

def ensureSnapshotTableExists(read: Long = 10L, write: Long = 10L): Unit = {
val create = schema
.withTableName(Table)
.withProvisionedThroughput(new ProvisionedThroughput(read, write))
.tableName(Table)
.provisionedThroughput(ProvisionedThroughput.builder().readCapacityUnits(read).writeCapacityUnits(write).build())
.build()

var names = Vector.empty[String]
lazy val complete: ListTablesResult => Future[Vector[String]] = aws =>
if (aws.getLastEvaluatedTableName == null) Future.successful(names ++ aws.getTableNames.asScala)
lazy val complete: ListTablesResponse => Future[Vector[String]] = aws =>
if (aws.lastEvaluatedTableName() == null) Future.successful(names ++ aws.tableNames().asScala)
else {
names ++= aws.getTableNames.asScala
names ++= aws.tableNames().asScala
client
.listTables(new ListTablesRequest().withExclusiveStartTableName(aws.getLastEvaluatedTableName))
.listTables(ListTablesRequest.builder().exclusiveStartTableName(aws.lastEvaluatedTableName).build())
.flatMap(complete)
}
val list = client.listTables(new ListTablesRequest).flatMap(complete)
val list = client.listTables(ListTablesRequest.builder().build()).flatMap(complete)

val setup = for {
exists <- list.map(_ contains Table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,9 @@
*/
package akka.persistence.dynamodb.snapshot

import akka.actor.{ ActorRef, ActorSystem }
import akka.persistence._
import akka.persistence.SnapshotProtocol._
import akka.persistence.scalatest.OptionalTests
import akka.persistence.snapshot.SnapshotStoreSpec
import akka.testkit.TestProbe
import com.typesafe.config.{ Config, ConfigFactory }

import scala.collection.immutable.Seq
import com.typesafe.config.ConfigFactory

class SnapshotStoreTckSpec extends SnapshotStoreSpec(
ConfigFactory.load()) with DynamoDBUtils {
Expand Down

0 comments on commit 4e60a2d

Please sign in to comment.