-
Notifications
You must be signed in to change notification settings - Fork 127
/
arrow_chunk.go
85 lines (69 loc) · 2.27 KB
/
arrow_chunk.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
// Copyright (c) 2020-2022 Snowflake Computing Inc. All rights reserved.
package gosnowflake
import (
"bytes"
"context"
"encoding/base64"
"time"
"github.com/apache/arrow/go/v16/arrow"
"github.com/apache/arrow/go/v16/arrow/ipc"
"github.com/apache/arrow/go/v16/arrow/memory"
)
type arrowResultChunk struct {
reader *ipc.Reader
rowCount int
loc *time.Location
allocator memory.Allocator
}
func (arc *arrowResultChunk) decodeArrowChunk(ctx context.Context, rowType []execResponseRowType, highPrec bool, params map[string]*string) ([]chunkRowType, error) {
defer arc.reader.Release()
logger.Debug("Arrow Decoder")
var chunkRows []chunkRowType
for arc.reader.Next() {
record := arc.reader.Record()
start := len(chunkRows)
numRows := int(record.NumRows())
logger.Debugf("rows in current record: %v", numRows)
columns := record.Columns()
chunkRows = append(chunkRows, make([]chunkRowType, numRows)...)
for i := start; i < start+numRows; i++ {
chunkRows[i].ArrowRow = make([]snowflakeValue, len(columns))
}
for colIdx, col := range columns {
values := make([]snowflakeValue, numRows)
if err := arrowToValues(ctx, values, rowType[colIdx], col, arc.loc, highPrec, params); err != nil {
return nil, err
}
for i := range values {
chunkRows[start+i].ArrowRow[colIdx] = values[i]
}
}
arc.rowCount += numRows
}
return chunkRows, arc.reader.Err()
}
func (arc *arrowResultChunk) decodeArrowBatch(scd *snowflakeChunkDownloader) (*[]arrow.Record, error) {
var records []arrow.Record
defer arc.reader.Release()
for arc.reader.Next() {
rawRecord := arc.reader.Record()
record, err := arrowToRecord(scd.ctx, rawRecord, arc.allocator, scd.RowSet.RowType, arc.loc)
if err != nil {
return nil, err
}
records = append(records, record)
}
return &records, arc.reader.Err()
}
// Build arrow chunk based on RowSet of base64
func buildFirstArrowChunk(rowsetBase64 string, loc *time.Location, alloc memory.Allocator) (arrowResultChunk, error) {
rowSetBytes, err := base64.StdEncoding.DecodeString(rowsetBase64)
if err != nil {
return arrowResultChunk{}, err
}
rr, err := ipc.NewReader(bytes.NewReader(rowSetBytes), ipc.WithAllocator(alloc))
if err != nil {
return arrowResultChunk{}, err
}
return arrowResultChunk{rr, 0, loc, alloc}, nil
}