Trabalhar com streaming e processamento de dados pode parecer um bicho de sete cabeças, mas pode ser mais simples do que parece. Esse artigo faz parte de uma minissérie onde vamos simplificar e construir um pipeline do zero, desde o streaming, mostrando o processamento e o armazenamento em um data lake.

Vamos começar falando sobre o Kinesis, que é um serviço exclusivo da AWS que realiza streaming de dados em tempo real. Com ele podemos fazer o stream de eventos de aplicativos, sites, bases de dados entre outras possibilidades.

Agora é hora de colocar a mão na massa. Primeiro precisamos criar um streaming do kinesis e para facilitar a didática vamos criar via console. Na aba Kinesis, dentro do console da AWS, entre em “Criar um Novo Streaming” e vocês vão ver a seguinte tela:

Nessa tela podemos realizar algumas configurações, mas como vamos utilizar apenas para estudo deixaremos a configuração padrão. Só é necessário ressaltar um ponto sobre a quantidade de shards, que se referem a capacidade de entrada e consumo de dados do streaming. No nosso caso, vamos utilizar apenas 1, mas em um ambiente de produção isso deve ser calculado dependendo da quantidade de dados que vão entrar e serem consumidos. Cada shard tem a capacidade de fazer a ingestão de 1 mb ou 1000 eventos por segundo e capacidade de leitura de 5 transações por segundo, onde cada leitura pode ter até 10 mb ou 10000 eventos.

Com o streaming de dados criado, vamos inserir dados nele para testar. Vamos precisar instalar o boto3 (pip install boto3). Com ele já instalado, vamos utilizar o script a seguir para gerar dados aleatórios:

import json
import random
import string
import time
import uuid

import boto3

client = boto3.client('kinesis')

for _ in range(1000):
    data = {
        'id': str(uuid.uuid4()),
        'random_number': random.randint(60, 120),
        'random_string': ''.join(random.choice(string.ascii_lowercase) for i in range(10))
    }
    print(data)

    response = client.put_record(
        StreamName='data_streaming',
        Data=json.dumps(data),
        PartitionKey="partitionkey")

    time.sleep(0.3)

Fonte: insert.py hosted with ❤ by GitHub

A saída do script deve ser similar a isso:

Com isso, já temos o streaming configurado e dados dentro dele. Agora vamos consumir esses dados. Para isso, vamos utilizar o Spark Streaming, que é um framework de código aberto para computação distribuída, ele é muito poderoso e muito utilizado para processamento de grandes massas de dados. Caso seja necessário, não é difícil encontrar exemplos com ele.
Para começar, vamos instalar o PySpark em seu ambiente de desenvolvimento utilizando o comando pip:

pip install pyspark==3.1.1

Após instalar o PySpark para consumir os dados, vamos utilizar o trecho de código a seguir. No entanto, antes de executá-lo, vale a pena ressaltar alguns pontos:


1. Variável spark_batch_interval, mesmo os dados estando em streaming, o spark consome os dados do kinesis em micro batches e essa variável diz quanto tempo deve demorar entre um micro batch e outro.


2. Variável kinesis_initial_position, se refere a como vamos consumir os dados, podendo ter dois valores possíveis, como, por exemplo, TRIM_HORIZONTAL que vai começar a consumir os dados a partir do dado mais antigo ou LATEST, que vai começar a consumir da mensagem mais recente.


3. E por último, a variável kinesis_checkpoint_interval, que se refere ao intervalo de tempo em que será feito o checkpoint do ponto em que estamos no kinesis. Um lado bastante positivo de utilizar o KinesisUtils é que ele faz esse controle, quando executamos o código pela primeira vez, ele cria uma tabela no DynamoDB onde ele vai salvar e atualizar o nosso checkpoint.

import json
import logging
import os

from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream

os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages org.apache.spark:spark-streaming-kinesis-asl_2.12:3.0.1,"
                                    "org.apache.spark:spark-avro_2.12:3.0.1 pyspark-shell" # Adicionando as bibliotecas necessárias para o consumo dos dados

dir_path = f'{os.path.dirname(os.path.realpath(__file__))}/data'  # path para armazenar os dados
  
def process(time, rdd):
    print(f'Start process: {time}')
    rdd = rdd.map(lambda v: json.loads(v))
    df = rdd.toDF()
    df.show()

    df.write.mode("append").format("csv").save(dir_path)

if __name__ == "__main__":
    try:
        spark_batch_interval = 60  # Intervalo entre os micro batchs
        app_name = 'stream_data'  # Nome da sua aplicação de streaming
        kinesis_stream_name = 'data_streaming'  # Nome que foi dado ao kinesis
        kinesis_endpoint = 'kinesis.us-east-1.amazonaws.com'  # Endpoint do kinesis, esse é o default na região us-east-1
        aws_region = 'us-east-1'  # AWS Region
        kinesis_initial_position = InitialPositionInStream.TRIM_HORIZON  # De onde vamos iniciar a leitura dos dados
        kinesis_checkpoint_interval = 60  # Intervalo que sera salvo o checkpoint da leitura do kinesis

        spark_session = SparkSession.builder \
            .appName(app_name) \
            .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
            .config("spark.sql.hive.convertMetastoreParquet", "false") \
            .getOrCreate()  # Criando spark context

        spark_streaming_context = StreamingContext(spark_session.sparkContext,
                                                   spark_batch_interval)  # Criando streaming Context

        kinesis_streams = KinesisUtils.createStream(
            spark_streaming_context, app_name, kinesis_stream_name, kinesis_endpoint,
            aws_region, kinesis_initial_position,
            kinesis_checkpoint_interval)  # Inicializando o streaming com kinesis utils  

        kinesis_streams.foreachRDD(process)  # Processando os micro batchs 

        spark_streaming_context.start()
        spark_streaming_context.awaitTermination()
    except Exception as ex:
        logging.error(ex)

Fonte: kinesis_spark.py hosted with ❤ by GitHub

O código está consumindo, exibindo e salvando o que foi consumindo. Você pode ver os dados salvos na pasta data que deve estar na mesma pasta do código. O output deve ser similar a imagem a seguir:

Pessoal, era isso que eu queria passar pra vocês hoje. Obrigado a quem chegou até aqui! No próximo artigo vamos falar um pouco de quando precisamos salvar dados e atualizá-los. Para isso, vamos utilizar o Apache Hudi, que é uma ferramenta open source que nos ajuda a fazer isso de uma forma muito simples!