Skip to content

How to use pyflink to read data file in S3 bucket? #22

@ericlxbgit

Description

@ericlxbgit

Hi Jaehyeon,
This is just a question about my practice, not a question about your solution. I am not sure whether is proper to ask here.
I tried to followed your solution of Lab 2. To make it simple, I just want to have the pyflink read the source file in the local docker container folder, and read the source file in S3 bucket.
Now the pyflink can read the source file in local docker container folder with the simplied pyflink code.
image

`

  from pyflink.table import EnvironmentSettings, TableEnvironment      
  env_settings = EnvironmentSettings.in_streaming_mode()
  table_env = TableEnvironment.create(env_settings)
  
  def create_source_table(table_name: str, file_path: str):
      stmt = f"""
      CREATE TABLE {table_name} (
          id                  VARCHAR,
          vendor_id           INT,
          pickup_datetime     VARCHAR,
          dropoff_datetime    VARCHAR,
          passenger_count     INT,
          pickup_longitude    VARCHAR,
          pickup_latitude     VARCHAR,
          dropoff_longitude   VARCHAR,
          dropoff_latitude    VARCHAR,
          store_and_fwd_flag  VARCHAR,
          gc_distance         DOUBLE,
          trip_duration       INT,
          google_distance     VARCHAR,
          google_duration     VARCHAR
      ) WITH (
          'connector'= 'filesystem',
          'format' = 'csv',
          'path' = '{file_path}'
      )
      """
      return stmt
  
  def main():
  
      source_table_name = "taxi_trip_source"
      source_file_path = "/etc/flink/data"
      
      table_env.execute_sql(create_source_table(source_table_name, source_file_path))
      table_env.sql_query(f'SELECT * FROM {source_table_name} LIMIT 10').execute().print()
  
  if __name__ == "__main__":
      main()

`

But I haven't figured out how to let it read the source file in S3 bucket.
I didn't successfully create the same environment with the terraform code your provide. I just simply create a S3 bucket and uploaded the source file, taxi-trips.csv. With aws_access_key_id, aws_secret_access_key, I can read the source file in S3 bucket as below.
image

In docker container, flink-s3-fs-hadoop-1.17.1.jar is in the folder "/opt/flink/plugins/s3-fs-hadoop"
image

I didn't create IAM user/role for my test, can I let pyflink read/select data in the source file in S3 bucket with aws_access_key_id and aws_secret_access_key? How can I do it?

Thank you so much!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions