Source
id: cassandra-to-bigquery
namespace: company.team
tasks:
  - id: query_cassandra
    type: io.kestra.plugin.cassandra.Query
    session:
      endpoints:
        - hostname: localhost
          port: 9042
      localDatacenter: datacenter1
    cql: |
      SELECT salary_id, work_year, experience_level, employment_type, 
      job_title, salary, salary_currency, salary_in_usd, employee_residence,
      remote_ratio, company_location, company_size
      FROM test.salary
    store: true
  - id: write_to_csv
    type: io.kestra.plugin.serdes.csv.IonToCsv
    from: "{{ outputs.query_cassandra.uri }}"
  - id: load_bigquery
    type: io.kestra.plugin.gcp.bigquery.Load
    from: "{{ outputs.write_to_csv.uri }}"
    destinationTable: my_project.my_dataset.my_table
    serviceAccount: "{{ secret('GCP_SERVICE_ACCOUNT_JSON') }}"
    projectId: my_project
    format: CSV
    csvOptions:
      fieldDelimiter: ","
      skipLeadingRows: 1
About this blueprint
SQL GCP
This flow extracts data from a Cassandra table, writes it to a CSV file, and then loads the CSV data into BigQuery. It uses data from the following public dataset.
The GCP credentials are extracted from the environment variable
GCP_SERVICE_ACCOUNT_JSON. You need to create the table in BigQuery prior
to executing this flow, or you can choose to create it via this flow by
adding the schema to the BigQuery task.
You can set up Cassandra locally with Docker using: docker run -p 9042:9042 cassandra.
- Use docker psto get the container ID where Cassandra is running.
- Then, cqlshinto Cassandra usingdocker exec -it <contianer_id> cqlsh.
- Finally, run the following commands to create the table in Cassandra:
CREATE KEYSPACE test;
USE test;
CREATE TABLE salary(
  salary_id int primary key,
  work_year int,
  experience_level text,
  employment_type text,
  job_title text,
  salary int,
  salary_currency text,
  salary_in_usd int,
  employee_residence text,
  remote_ratio int,
  company_location text,
  company_size text
);
Then, either use a COPY command: COPY salary (salary_id, work_year, experience_level, employment_type, job_title, salary, salary_currency, salary_in_usd, employee_residence, remote_ratio, company_location, company_size) FROM '/yourfile.csv' WITH HEADER = TRUE;
Or run a few INSERT commands:
insert into salary(salary_id,
work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size)
values (1, 2023,'EN','FT','Data
Analyst',75000,'USD',75000,'US',100,'US','M');
insert into salary(salary_id,
work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size)
values (2, 2023,'EN','FT','Data
Analyst',60000,'USD',60000,'US',100,'US','M');
insert into salary(salary_id,
work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size)
values (3, 2023,'MI','FT','Analytics
Engineer',185700,'USD',185700,'US',0,'US','M');
insert into salary(salary_id,
work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size)
values (4, 2023,'MI','FT','Analytics
Engineer',165000,'USD',165000,'US',0,'US','M');
insert into salary(salary_id,
work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size)
values (5, 2023,'SE','FT','Data
Engineer',160000,'USD',160000,'US',100,'US','M');
insert into salary(salary_id,
work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size)
values (6, 2023,'SE','FT','Data
Engineer',130000,'USD',130000,'US',100,'US','M');
insert into salary(salary_id,
work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size)
values (7, 2023,'SE','FT','Data
Analyst',169000,'USD',169000,'US',0,'US','M');
insert into salary(salary_id,
work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size)
values (8, 2023,'SE','FT','Data
Analyst',110600,'USD',110600,'US',0,'US','M');
insert into salary(salary_id,
work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size)
values (9, 2023,'SE','FT','Data Operations
Engineer',193000,'USD',193000,'US',100,'US','M');
insert into salary(salary_id,
work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size)
values (10, 2023,'SE','FT','Data Operations
Engineer',136850,'USD',136850,'US',100,'US','M');
insert into salary(salary_id,
work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size)
values (11, 2023,'SE','FT','Machine Learning
Engineer',139500,'USD',139500,'US',0,'US','M');
insert into salary(salary_id,
work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size)
values (12, 2023,'SE','FT','Machine Learning
Engineer',109400,'USD',109400,'US',0,'US','M');
insert into salary(salary_id,
work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size)
values (13, 2023,'SE','FT','Data
Engineer',276000,'USD',276000,'US',100,'US','M');
insert into salary(salary_id,
work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size)
values (14, 2023,'SE','FT','Data
Engineer',178500,'USD',178500,'US',100,'US','M');
insert into salary(salary_id,
work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size)
values (15, 2023,'MI','FT','Data
Scientist',55000,'EUR',59188,'ES',0,'ES','M');
insert into salary(salary_id,
work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size)
values (16, 2023,'MI','FT','Data
Scientist',45000,'EUR',48426,'ES',0,'ES','M');
insert into salary(salary_id,
work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size)
values (17, 2023,'MI','FT','Data
Engineer',70000,'EUR',75330,'SI',100,'SI','M');
insert into salary(salary_id,
work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size)
values (18, 2023,'MI','FT','Data
Engineer',45000,'EUR',48426,'SI',100,'SI','M');
insert into salary(salary_id,
work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size)
values (19, 2023,'SE','FT','Machine Learning
Engineer',161000,'GBP',195940,'GB',0,'GB','M');
insert into salary(salary_id,
work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size)
values (20, 2023,'SE','FT','Machine Learning
Engineer',83300,'GBP',101378,'GB',0,'GB','M');
More Related Blueprints
