Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HIVE-28518: Iceberg: Fix ClassCastException during in-place migration to Iceberg tables with timestamp columns #5590

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

ggangadharan
Copy link
Contributor

What changes were proposed in this pull request?

This fix improves the stability and reliability of in-place migrated Iceberg tables involving timestamp data types.

Why are the changes needed?

The issue occurred due to incorrect type casting in the timestamp handling logic, which caused the migrated Iceberg tables Fetch task to fail.

Does this PR introduce any user-facing change?

No

Is the change a dependency upgrade?

No

How was this patch tested?

Qtest - iceberg_inplace_migration_with_timestamp_column.q

@Aggarwal-Raghav
Copy link
Contributor

@ggangadharan , thanks for the PR. I have 1 question though.
I tried to run the q file without the patch with PARQEUT and ORC. I am seeing the difference in behaviour. In ORC, without the patch also it is working. is it because of the way ORC and parquet store date/time type?
Attaching screenshot for same.
With ORC:
ORC
With Parquet:
Parquet

@ggangadharan
Copy link
Contributor Author

Hi @Aggarwal-Raghav

Thank you for raising this question.

Upon investigation, it appears that the issue stems from how the IcebergRecordReader interprets the timestamp column for different file formats:

  • For ORC tables, the timestamp column is read as LOCALDATETIME.
  • For Parquet tables, the same column is read as OFFSETDATETIME.

Due to this discrepancy, we are encountering a ClassCastException when working with Parquet tables.

As you mentioned, I also believe the root cause lies in the underlying file format/Iceberg level.

I’ve attached a screenshot for reference.

HIVE-28518_DEBUG

Please let me know if you need further details or if we should take any additional steps to address this.

if it looks okay , Please review the PR.

@okumin
Copy link
Contributor

okumin commented Dec 27, 2024

I wonder why everything is ok when we directly create an Iceberg + Parquet table.

> CREATE TABLE test3(`id` int,`name` string,`dt` timestamp) stored by iceberg stored as parquet
> insert into test3 values (1, "test name" , cast('2024-08-09 14:08:26.326107' as timestamp));
> select * from test3;

@ggangadharan
Copy link
Contributor Author

Hi @okumin ,

Thank you for taking the time to review the pull request.

In the Iceberg Parquet table, the timestamp column is read as LOCALDATETIME. I’ve attached a screenshot for reference.

Screenshot 2024-12-28 at 8 18 14 AM

There is a notable difference in how the timestamp column is stored at the Parquet file format level. Specifically:

  • In Iceberg Parquet tables, the timestamp column is stored as INT64 L:TIMESTAMP(MICROS,false) .
  • In standard Parquet tables, the timestamp column is stored as INT96 .

For clarity, I’ve also included the metadata from Parquet-tools for reference.

As Iceberg Parquet table

file schema: table
------------------------------------------------------------------------------------------------------------------------------------------------------------
id:          OPTIONAL INT32 R:0 D:1
name:        OPTIONAL BINARY L:STRING R:0 D:1
dt:          OPTIONAL INT64 L:TIMESTAMP(MICROS,false) R:0 D:1

row group 1: RC:1 TS:112 OFFSET:4
------------------------------------------------------------------------------------------------------------------------------------------------------------
id:           INT32 SNAPPY DO:0 FPO:4 SZ:35/33/0.94 VC:1 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 1, max: 1, num_nulls: 0]
name:         BINARY SNAPPY DO:0 FPO:39 SZ:44/42/0.95 VC:1 ENC:BIT_PACKED,RLE,PLAIN ST:[min: test name, max: test name, num_nulls: 0]
dt:           INT64 SNAPPY DO:0 FPO:83 SZ:39/37/0.95 VC:1 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 2024-08-09T14:08:26.326107, max: 2024-08-09T14:08:26.326107, num_nulls: 0]

As standard parquet table

file schema: hive_schema
------------------------------------------------------------------------------------------------------------------------------------------------------------
id:          OPTIONAL INT32 R:0 D:1
name:        OPTIONAL BINARY L:STRING R:0 D:1
dt:          OPTIONAL INT96 R:0 D:1

row group 1: RC:1 TS:137 OFFSET:4
------------------------------------------------------------------------------------------------------------------------------------------------------------
id:           INT32 UNCOMPRESSED DO:0 FPO:4 SZ:33/33/1.00 VC:1 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 1, max: 1, num_nulls: 0]
name:         BINARY UNCOMPRESSED DO:0 FPO:37 SZ:42/42/1.00 VC:1 ENC:BIT_PACKED,RLE,PLAIN ST:[min: test name, max: test name, num_nulls: 0]
dt:           INT96 UNCOMPRESSED DO:79 FPO:110 SZ:62/62/1.00 VC:1 ENC:BIT_PACKED,RLE,PLAIN_DICTIONARY ST:[min: 0x78037C8D4C2E0000748B2500, max: 0x78037C8D4C2E0000748B2500, num_nulls: 0]

@okumin
Copy link
Contributor

okumin commented Dec 28, 2024

Thanks. I also remember Hive didn't follow the regular convention on encoding TIMESTAMP. I don't have an immediate idea on how to fix it

@ayushtkn
Copy link
Member

We should try to fix the regular convention on encoding TIMESTAMP, but that might not fix the case with existing tables, For those the fix in the current PR seems ok to me

@ggangadharan
Copy link
Contributor Author

@ayushtkn Thank you for the feedback.

Based on this, I believe the changes are in a good state to proceed unless there are further concerns.

@Aggarwal-Raghav @okumin Could you kindly review the code and share your feedback? Your insights would be greatly appreciated to help move this forward. If there are any questions or blockers, feel free to let me know.

Thank you for your time and support!

@okumin
Copy link
Contributor

okumin commented Jan 2, 2025

Thank you! It is obvious. So, is the remaining problem to verify INT96 can be compatible with Iceberg's TIMESTAMP, which means verifying other query engines or tools can read it as a timestamp. I am trying to check it.

@ggangadharan
Copy link
Contributor Author

ggangadharan commented Jan 2, 2025

@okumin Thanks for the update

Successfully read the migrated ICEBERG table (previously migrated from Hive) using spark.sql in Spark , and it worked as expected. Spark is reading the timestamp column as TimestampNTZType.

As per documentation - TimestampNTZType : Timestamp without time zone(TIMESTAMP_NTZ). It represents values comprising values of fields year, month, day, hour, minute, and second. All operations are performed without taking any time zone into account.

Ref - https://spark.apache.org/docs/latest/sql-ref-datatypes.html

Attaching spark3-shell output for a reference.

scala> spark.sql("DESCRIBE  TABLE formatted default.hive_28518_test").show(false)
+----------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+
|col_name                    |data_type                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |comment|
+----------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+
|id                          |int                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |null   |
|name                        |string                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |null   |
|dt                          |timestamp_ntz                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |null   |
|                            |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |       |
|# Metadata Columns          |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |       |
|_spec_id                    |int                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |       |
|_partition                  |struct<>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |       |
|_file                       |string                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |       |
|_pos                        |bigint                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |       |
|_deleted                    |boolean                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |       |
|                            |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |       |
|# Detailed Table Information|                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |       |
|Name                        |spark_catalog.default.hive_28518_test                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |       |
|Type                        |MANAGED                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |       |
|Location                    |hdfs://ns1/warehouse/tablespace/external/hive/hive_28518_test                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |       |
|Provider                    |iceberg                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |       |
|Table Properties            |[EXTERNAL=TRUE,MIGRATED_TO_ICEBERG=true,OBJCAPABILITIES=EXTREAD,EXTWRITE,current-snapshot-id=4868016704679265240,engine.hive.enabled=true,format=iceberg/parquet,format-version=2,iceberg.orc.files.only=false,last_modified_by=hive,last_modified_time=1735822103,schema.name-mapping.default=[ {\n  "field-id" : 1,\n  "names" : [ "id" ]\n}, {\n  "field-id" : 2,\n  "names" : [ "name" ]\n}, {\n  "field-id" : 3,\n  "names" : [ "dt" ]\n} ],storage_handler=org.apache.iceberg.mr.hive.HiveIcebergStorageHandler,table_type=ICEBERG,write.delete.mode=merge-on-read,write.format.default=parquet,write.merge.mode=merge-on-read,write.update.mode=merge-on-read]|       |
+----------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+


scala> spark.sql("select dt from default.hive_28518_test").show(10,false)
+--------------------------+
|dt                        |
+--------------------------+
|2024-08-09 14:08:26.326107|
+--------------------------+

FYI

While reading the string column name, I encountered an error that has been reported here . Since it is related to a spark/Iceberg issue, we can ignore it for now

scala> spark.sql("select name from default.hive_28518_test").show()
25/01/02 12:59:29 WARN  scheduler.TaskSetManager: [task-result-getter-3]: Lost task 0.0 in stage 5.0 (TID 11) (ccycloud-2.nightly7310-ec.root.comops.site executor 2): java.lang.UnsupportedOperationException: Unsupported type: UTF8String
	at org.apache.iceberg.arrow.vectorized.ArrowVectorAccessor.getUTF8String(ArrowVectorAccessor.java:81)
	at org.apache.iceberg.spark.data.vectorized.IcebergArrowColumnVector.getUTF8String(IcebergArrowColumnVector.java:138)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:574)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1530)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:577)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

25/01/02 12:59:29 ERROR scheduler.TaskSetManager: [task-result-getter-2]: Task 0 in stage 5.0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 4 times, most recent failure: Lost task 0.3 in stage 5.0 (TID 14) (ccycloud-2.nightly7310-ec.root.comops.site executor 2): java.lang.UnsupportedOperationException: Unsupported type: UTF8String
	at org.apache.iceberg.arrow.vectorized.ArrowVectorAccessor.getUTF8String(ArrowVectorAccessor.java:81)
	at org.apache.iceberg.spark.data.vectorized.IcebergArrowColumnVector.getUTF8String(IcebergArrowColumnVector.java:138)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:574)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1530)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:577)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
  at scala.Option.foreach(Option.scala:407)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2300)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2319)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
  at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4183)
  at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3167)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4173)
  at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:527)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4171)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4171)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:3167)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:3388)
  at org.apache.spark.sql.Dataset.getRows(Dataset.scala:290)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:329)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:815)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:774)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:783)
  ... 47 elided
Caused by: java.lang.UnsupportedOperationException: Unsupported type: UTF8String
  at org.apache.iceberg.arrow.vectorized.ArrowVectorAccessor.getUTF8String(ArrowVectorAccessor.java:81)
  at org.apache.iceberg.spark.data.vectorized.IcebergArrowColumnVector.getUTF8String(IcebergArrowColumnVector.java:138)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
  at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
  at org.apache.spark.scheduler.Task.run(Task.scala:139)
  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:574)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1530)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:577)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)

@ggangadharan
Copy link
Contributor Author

@okumin I have shared the output snippets from SPARK side in my previous response. Please let me know if there's anything else required from my side to help resolve this issue.

@okumin
Copy link
Contributor

okumin commented Jan 6, 2025

@ggangadharan Thanks. I think you provided sufficient information. I was testing Spark and Trino, but the progress has not been so good. I faced another problem while testing them, and it's been a holiday season in my country. I would appreciate it if you could give us time

@ggangadharan
Copy link
Contributor Author

@okumin Thanks for the update. Let me know if there's anything needed from my end. Will check and do the needful from my side.

OffsetDateTime odt = (OffsetDateTime) o;
time = odt.atZoneSameInstant(TypeInfoFactory.timestampLocalTZTypeInfo.getTimeZone()).toLocalDateTime();
} else {
time = (LocalDateTime) o;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be safe to check if it is LocalDateTime before casting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zratkai Thanks for taking a look into this.

Sure, I will make the necessary changes. Besides LocalDateTime and OffsetDateTime, are there any other classes that might be expected here? Please let me know so I can handle them appropriately.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should always be LocalDateTime, ideally. But the Iceberg + Parquet reader constructs OffsetDateTime. So, we should support only LocalDateTime and OffsetDateTime.

Though we expected never fail, it could be a little kinder if we had a better error message like String.format("An unexpected type %s was passed as timestamp", o.getClass)

Copy link
Contributor Author

@ggangadharan ggangadharan Jan 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed update @okumin

if it's okay. I will make the below code changes.

 public Timestamp getPrimitiveJavaObject(Object o) {
    if (o == null) {
      return null;
    }
    LocalDateTime time;
    if (o instanceof LocalDateTime) {
      time = (LocalDateTime) o;
    } else if (o instanceof OffsetDateTime) {
      OffsetDateTime odt = (OffsetDateTime) o;
      time = odt.atZoneSameInstant(TypeInfoFactory.timestampLocalTZTypeInfo.getTimeZone()).toLocalDateTime();
    } else {
      throw new ClassCastException(String.format("An unexpected type %s was passed as timestamp. " +
              "Expected LocalDateTime/OffsetDateTime", o.getClass().getName()));
    }
    return Timestamp.ofEpochMilli(time.toInstant(ZoneOffset.UTC).toEpochMilli(), time.getNano());
  }

Please check and let me know.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be ok

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update @ayushtkn

@okumin
Copy link
Contributor

okumin commented Jan 9, 2025

I have checked how we handle timestamps for the past few days, and this is the summary.
https://gist.github.com/okumin/b33876398a106068b4fd7ddd3049ae8c

As far as I tested, I didn't find any patterns where this doesn't work so far. So, I am thinking to review this and give +1. I might ask someone for one more look because I might not have comprehensive information.

Copy link
Contributor

@okumin okumin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ayushtkn @zabetak While investigating the history of TIMESTAMP and related tickets, I thought you would likely be knowledgeable about this area. I'd appreciate it if you could take another look at it.

I have two mysteries.

  1. I am not sure if it is very valid that the Iceberg reader returns OffsetDateTime
  2. INT96 might have historically broken or strange timestamps

OffsetDateTime odt = (OffsetDateTime) o;
time = odt.atZoneSameInstant(TypeInfoFactory.timestampLocalTZTypeInfo.getTimeZone()).toLocalDateTime();
} else {
time = (LocalDateTime) o;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should always be LocalDateTime, ideally. But the Iceberg + Parquet reader constructs OffsetDateTime. So, we should support only LocalDateTime and OffsetDateTime.

Though we expected never fail, it could be a little kinder if we had a better error message like String.format("An unexpected type %s was passed as timestamp", o.getClass)

…Iceberg tables, Changes after Review Comments
@ggangadharan
Copy link
Contributor Author

@ayushtkn Updated the changes in the recent commit, and all tests have passed. Please review.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants