Skip to content

Commit

Permalink
SNOW-1000283: Add support for Structured Types. (#1853)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-jrose authored Jan 26, 2024
1 parent 3cced62 commit 6a2a5b6
Show file tree
Hide file tree
Showing 13 changed files with 513 additions and 6 deletions.
1 change: 1 addition & 0 deletions DESCRIPTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Source code is also available at: https://github.com/snowflakedb/snowflake-conne
- Fixed PyArrow Table type hinting
- Added support for connecting using an existing connection via the session and master token.
- Added support for connecting to Snowflake by authenticating with multiple SAML IDP using external browser.
- Added support for structured types (OBJECT, MAP, ARRAY) to nanoarrow converters.
- Fixed compilation issue due to missing cstdint header on gcc13.
- Improved config permissions warning message.

Expand Down
3 changes: 3 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def build_extension(self, ext):
*((file,) if isinstance(file, str) else file)
)
for file in {
"ArrayConverter.cpp",
"BinaryConverter.cpp",
"BooleanConverter.cpp",
"CArrowChunkIterator.cpp",
Expand All @@ -104,6 +105,8 @@ def build_extension(self, ext):
"FixedSizeListConverter.cpp",
"FloatConverter.cpp",
"IntConverter.cpp",
"MapConverter.cpp",
"ObjectConverter.cpp",
"SnowflakeType.cpp",
"StringConverter.cpp",
"TimeConverter.cpp",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
//
// Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved.
//

#include "ArrayConverter.hpp"

#include <memory>

#include "CArrowChunkIterator.hpp"
#include "CArrowIterator.hpp"
#include "SnowflakeType.hpp"

namespace sf {
Logger* ArrayConverter::logger =
new Logger("snowflake.connector.ArrayConverter");

void ArrayConverter::generateError(const std::string& msg) const {
logger->error(__FILE__, __func__, __LINE__, msg.c_str());
PyErr_SetString(PyExc_Exception, msg.c_str());
}

ArrayConverter::ArrayConverter(ArrowSchemaView* schemaView,
ArrowArrayView* array, PyObject* context,
bool useNumpy) {
m_array = array;

if (schemaView->schema->n_children != 1) {
std::string errorInfo = Logger::formatString(
"[Snowflake Exception] invalid arrow schema for array items expected 1 "
"schema child, but got %d",
schemaView->schema->n_children);
this->generateError(errorInfo);
return;
}

ArrowSchema* item_schema = schemaView->schema->children[0];
ArrowArrayView* item_array = array->children[0];
m_item_converter = getConverterFromSchema(item_schema, item_array, context,
useNumpy, logger);
}

PyObject* ArrayConverter::toPyObject(int64_t rowIndex) const {
if (ArrowArrayViewIsNull(m_array, rowIndex)) {
Py_RETURN_NONE;
}

// Array item offsets are stored in the second array buffers
// Infer start an end of this rows slice by looking at the
// current and next offset. If there isn't another offset use
// the end of the array instead.
int start = m_array->buffer_views[1].data.as_int32[rowIndex];
int end = m_array->children[0]->length;
if (rowIndex + 1 < m_array->length) {
end = m_array->buffer_views[1].data.as_int32[rowIndex + 1];
}

PyObject* list = PyList_New(end - start);
for (int i = start; i < end; i++) {
PyList_SetItem(list, i - start, m_item_converter->toPyObject(i));
}
return list;
}

} // namespace sf
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
//
// Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved.
//

#ifndef PC_ARRAYCONVERTER_HPP
#define PC_ARRAYCONVERTER_HPP

#include <memory>

#include "IColumnConverter.hpp"
#include "logging.hpp"
#include "nanoarrow.h"
#include "nanoarrow.hpp"

namespace sf {

class ArrayConverter : public IColumnConverter {
public:
explicit ArrayConverter(ArrowSchemaView* schemaView, ArrowArrayView* array,
PyObject* context, bool useNumpy);

PyObject* toPyObject(int64_t rowIndex) const override;

private:
void generateError(const std::string& msg) const;

ArrowArrayView* m_array;
std::shared_ptr<sf::IColumnConverter> m_item_converter;
static Logger* logger;
};

} // namespace sf
#endif // PC_ARRAYCONVERTER_HPP
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@
#include <string>
#include <vector>

#include "ArrayConverter.hpp"
#include "BinaryConverter.hpp"
#include "BooleanConverter.hpp"
#include "DateConverter.hpp"
#include "DecimalConverter.hpp"
#include "FixedSizeListConverter.hpp"
#include "FloatConverter.hpp"
#include "IntConverter.hpp"
#include "MapConverter.hpp"
#include "ObjectConverter.hpp"
#include "StringConverter.hpp"
#include "TimeConverter.hpp"
#include "TimeStampConverter.hpp"
Expand Down Expand Up @@ -193,9 +196,7 @@ std::shared_ptr<sf::IColumnConverter> getConverterFromSchema(
}

case SnowflakeType::Type::ANY:
case SnowflakeType::Type::ARRAY:
case SnowflakeType::Type::CHAR:
case SnowflakeType::Type::OBJECT:
case SnowflakeType::Type::TEXT:
case SnowflakeType::Type::VARIANT: {
converter = std::make_shared<sf::StringConverter>(array);
Expand Down Expand Up @@ -378,8 +379,13 @@ std::shared_ptr<sf::IColumnConverter> getConverterFromSchema(
returnCode);
scale =
std::stoi(std::string(scaleString.data, scaleString.size_bytes));
byteLength = std::stoi(
std::string(byteLengthString.data, byteLengthString.size_bytes));

// Byte Length may be unset if TIMESTAMP_TZ is the child of a structured
// type In this case rely on the default value.
if (byteLengthString.data != nullptr) {
byteLength = std::stoi(
std::string(byteLengthString.data, byteLengthString.size_bytes));
}
}
switch (byteLength) {
case 8: {
Expand Down Expand Up @@ -408,6 +414,58 @@ std::shared_ptr<sf::IColumnConverter> getConverterFromSchema(
break;
}

case SnowflakeType::Type::ARRAY: {
switch (schemaView.type) {
case NANOARROW_TYPE_STRING:
converter = std::make_shared<sf::StringConverter>(array);
break;
case NANOARROW_TYPE_LIST:
converter = std::make_shared<sf::ArrayConverter>(&schemaView, array,
context, useNumpy);
break;
default: {
std::string errorInfo = Logger::formatString(
"[Snowflake Exception] unknown arrow internal data type(%d) "
"for ARRAY data in %s",
NANOARROW_TYPE_ENUM_STRING[schemaView.type],
schemaView.schema->name);
logger->error(__FILE__, __func__, __LINE__, errorInfo.c_str());
PyErr_SetString(PyExc_Exception, errorInfo.c_str());
break;
}
}
break;
}

case SnowflakeType::Type::MAP: {
converter = std::make_shared<sf::MapConverter>(&schemaView, array,
context, useNumpy);
break;
}

case SnowflakeType::Type::OBJECT: {
switch (schemaView.type) {
case NANOARROW_TYPE_STRING:
converter = std::make_shared<sf::StringConverter>(array);
break;
case NANOARROW_TYPE_STRUCT:
converter = std::make_shared<sf::ObjectConverter>(&schemaView, array,
context, useNumpy);
break;
default: {
std::string errorInfo = Logger::formatString(
"[Snowflake Exception] unknown arrow internal data type(%d) "
"for OBJECT data in %s",
NANOARROW_TYPE_ENUM_STRING[schemaView.type],
schemaView.schema->name);
logger->error(__FILE__, __func__, __LINE__, errorInfo.c_str());
PyErr_SetString(PyExc_Exception, errorInfo.c_str());
break;
}
}
break;
}

case SnowflakeType::Type::VECTOR: {
converter = std::make_shared<sf::FixedSizeListConverter>(array);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ void CArrowTableIterator::reconstructRecordBatches_nanoarrow() {
case SnowflakeType::Type::BOOLEAN:
case SnowflakeType::Type::CHAR:
case SnowflakeType::Type::DATE:
case SnowflakeType::Type::MAP:
case SnowflakeType::Type::OBJECT:
case SnowflakeType::Type::REAL:
case SnowflakeType::Type::TEXT:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
//
// Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved.
//

#include "MapConverter.hpp"

#include <memory>

#include "CArrowChunkIterator.hpp"
#include "CArrowIterator.hpp"
#include "SnowflakeType.hpp"

namespace sf {
Logger* MapConverter::logger = new Logger("snowflake.connector.MapConverter");

void MapConverter::generateError(const std::string& msg) const {
logger->error(__FILE__, __func__, __LINE__, msg.c_str());
PyErr_SetString(PyExc_Exception, msg.c_str());
}

MapConverter::MapConverter(ArrowSchemaView* schemaView, ArrowArrayView* array,
PyObject* context, bool useNumpy) {
m_array = array;

if (schemaView->schema->n_children != 1) {
std::string errorInfo = Logger::formatString(
"[Snowflake Exception] invalid arrow schema for map entries expected 1 "
"schema child, but got %d",
schemaView->schema->n_children);
this->generateError(errorInfo);
return;
}

ArrowSchema* entries = schemaView->schema->children[0];

if (entries->n_children != 2) {
std::string errorInfo = Logger::formatString(
"[Snowflake Exception] invalid arrow schema for map key/value pair "
"expected 2 entries, but got %d",
entries->n_children);
this->generateError(errorInfo);
return;
}

ArrowSchema* key_schema = entries->children[0];
ArrowArrayView* key_array = array->children[0]->children[0];
m_key_converter =
getConverterFromSchema(key_schema, key_array, context, useNumpy, logger);

ArrowSchema* value_schema = entries->children[1];
ArrowArrayView* value_array = array->children[0]->children[1];
m_value_converter = getConverterFromSchema(value_schema, value_array, context,
useNumpy, logger);
}

PyObject* MapConverter::toPyObject(int64_t rowIndex) const {
if (ArrowArrayViewIsNull(m_array, rowIndex)) {
Py_RETURN_NONE;
}

// Map ArrowArrays have two child Arrays that contain the the keys and values.
// The offsets for how many items belong to each row are stored in the parent
// array offset buffer. The start and end of a row slice has to be infered
// from the offsets for each row.
int start = m_array->buffer_views[1].data.as_int32[rowIndex];
int end = m_array->children[0]->length;
if (rowIndex + 1 < m_array->length) {
end = m_array->buffer_views[1].data.as_int32[rowIndex + 1];
}

PyObject* dict = PyDict_New();
for (int i = start; i < end; i++) {
PyDict_SetItem(dict, m_key_converter->toPyObject(i),
m_value_converter->toPyObject(i));
}
return dict;
}

} // namespace sf
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
//
// Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved.
//

#ifndef PC_MAPCONVERTER_HPP
#define PC_MAPCONVERTER_HPP

#include <memory>

#include "IColumnConverter.hpp"
#include "logging.hpp"
#include "nanoarrow.h"
#include "nanoarrow.hpp"

namespace sf {

class MapConverter : public IColumnConverter {
public:
explicit MapConverter(ArrowSchemaView* schemaView, ArrowArrayView* array,
PyObject* context, bool useNumpy);

PyObject* toPyObject(int64_t rowIndex) const override;

private:
void generateError(const std::string& msg) const;

ArrowArrayView* m_array;
std::shared_ptr<sf::IColumnConverter> m_key_converter;
std::shared_ptr<sf::IColumnConverter> m_value_converter;
static Logger* logger;
};

} // namespace sf
#endif // PC_MAPCONVERTER_HPP
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
//
// Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved.
//

#include "ObjectConverter.hpp"

#include <memory>

#include "CArrowChunkIterator.hpp"
#include "CArrowIterator.hpp"
#include "SnowflakeType.hpp"

namespace sf {
Logger* ObjectConverter::logger =
new Logger("snowflake.connector.BinaryConverter");

ObjectConverter::ObjectConverter(ArrowSchemaView* schemaView,
ArrowArrayView* array, PyObject* context,
bool useNumpy) {
m_array = array;
m_converters.clear();
m_property_names.clear();
m_propertyCount = schemaView->schema->n_children;

for (int i = 0; i < schemaView->schema->n_children; i++) {
ArrowSchema* property_schema = schemaView->schema->children[i];

m_property_names.push_back(property_schema->name);

ArrowArrayView* child_array = array->children[i];

m_converters.push_back(getConverterFromSchema(property_schema, child_array,
context, useNumpy, logger));
}
}

PyObject* ObjectConverter::toPyObject(int64_t rowIndex) const {
if (ArrowArrayViewIsNull(m_array, rowIndex)) {
Py_RETURN_NONE;
}

PyObject* dict = PyDict_New();
for (int i = 0; i < m_propertyCount; i++) {
PyDict_SetItemString(dict, m_property_names[i],
m_converters[i]->toPyObject(rowIndex));
}
return dict;
}

} // namespace sf
Loading

0 comments on commit 6a2a5b6

Please sign in to comment.