Skip to content

Commit

Permalink
Support sql statement execution in kafkasql storage
Browse files Browse the repository at this point in the history
  • Loading branch information
EricWittmann committed Jul 12, 2024
1 parent 1d7dc3d commit 3c8d3c6
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.apicurio.registry.storage.impl.kafkasql.messages;

import io.apicurio.registry.storage.RegistryStorage;
import io.apicurio.registry.storage.impl.kafkasql.AbstractMessage;
import io.apicurio.registry.storage.impl.sql.SqlRegistryStorage;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;

@NoArgsConstructor
@AllArgsConstructor
@Builder
@Getter
@Setter
@EqualsAndHashCode(callSuper = false)
@ToString
public class ExecuteSqlStatement1Message extends AbstractMessage {

private String sql;

/**
* @see io.apicurio.registry.storage.impl.kafkasql.KafkaSqlMessage#dispatchTo(RegistryStorage)
*/
@Override
public Object dispatchTo(RegistryStorage storage) {
SqlRegistryStorage sqlStorage = (SqlRegistryStorage) storage;
sqlStorage.executeSqlStatement(sql);
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.apicurio.registry.storage.impl.kafkasql.messages.DeleteGlobalRules0Message;
import io.apicurio.registry.storage.impl.kafkasql.messages.DeleteGroup1Message;
import io.apicurio.registry.storage.impl.kafkasql.messages.DeleteRoleMapping1Message;
import io.apicurio.registry.storage.impl.kafkasql.messages.ExecuteSqlStatement1Message;
import io.apicurio.registry.storage.impl.kafkasql.messages.ImportArtifact1Message;
import io.apicurio.registry.storage.impl.kafkasql.messages.ImportArtifactRule1Message;
import io.apicurio.registry.storage.impl.kafkasql.messages.ImportArtifactVersion1Message;
Expand Down Expand Up @@ -94,7 +95,8 @@ private static void indexMessageClasses(Class<? extends KafkaSqlMessage>... mcla
UpdateArtifactRule4Message.class, UpdateArtifactVersionComment5Message.class,
UpdateArtifactVersionMetaData4Message.class, UpdateBranchMetaData3Message.class,
UpdateContentCanonicalHash3Message.class, UpdateGlobalRule2Message.class,
UpdateGroupMetaData2Message.class, UpdateRoleMapping2Message.class);
UpdateGroupMetaData2Message.class, UpdateRoleMapping2Message.class,
ExecuteSqlStatement1Message.class);
}

public static Class<? extends KafkaSqlMessage> lookup(String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.apicurio.registry.storage.RegistryStorage;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.transaction.Transactional;

/**
* An in-memory SQL implementation of the {@link RegistryStorage} interface.
Expand Down Expand Up @@ -38,4 +39,9 @@ public void restoreFromSnapshot(String snapshotLocation) {
handleFactory.withHandle(handle -> handle.createUpdate(sqlStatements.restoreFromSnapshot())
.bind(0, snapshotLocation).execute());
}

@Transactional
public void executeSqlStatement(String sqlStatement) {
handleFactory.withHandle(handle -> handle.createUpdate(sqlStatement).execute());
}
}

0 comments on commit 3c8d3c6

Please sign in to comment.