Trabalhar com streaming e processamento de dados pode parecer um bicho de sete cabeças, mas pode ser mais simples do que parece. Esse artigo faz parte de uma minissérie onde vamos simplificar e construir um pipeline do zero, desde o streaming, mostrando o processamento e o armazenamento em um data lake.
Vamos começar falando sobre o Kinesis, que é um serviço exclusivo da AWS que realiza streaming de dados em tempo real. Com ele podemos fazer o stream de eventos de aplicativos, sites, bases de dados entre outras possibilidades.
Agora é hora de colocar a mão na massa. Primeiro precisamos criar um streaming do kinesis e para facilitar a didática vamos criar via console. Na aba Kinesis, dentro do console da AWS, entre em “Criar um Novo Streaming” e vocês vão ver a seguinte tela:

Nessa tela podemos realizar algumas configurações, mas como vamos utilizar apenas para estudo deixaremos a configuração padrão. Só é necessário ressaltar um ponto sobre a quantidade de shards, que se referem a capacidade de entrada e consumo de dados do streaming. No nosso caso, vamos utilizar apenas 1, mas em um ambiente de produção isso deve ser calculado dependendo da quantidade de dados que vão entrar e serem consumidos. Cada shard tem a capacidade de fazer a ingestão de 1 mb ou 1000 eventos por segundo e capacidade de leitura de 5 transações por segundo, onde cada leitura pode ter até 10 mb ou 10000 eventos.
Com o streaming de dados criado, vamos inserir dados nele para testar. Vamos precisar instalar o boto3 (pip install boto3). Com ele já instalado, vamos utilizar o script a seguir para gerar dados aleatórios:
import json
import random
import string
import time
import uuid
import boto3
client = boto3.client('kinesis')
for _ in range(1000):
data = {
'id': str(uuid.uuid4()),
'random_number': random.randint(60, 120),
'random_string': ''.join(random.choice(string.ascii_lowercase) for i in range(10))
}
print(data)
response = client.put_record(
StreamName='data_streaming',
Data=json.dumps(data),
PartitionKey="partitionkey")
time.sleep(0.3)
Fonte: insert.py hosted with ❤ by GitHub
A saída do script deve ser similar a isso:

Com isso, já temos o streaming configurado e dados dentro dele. Agora vamos consumir esses dados. Para isso, vamos utilizar o Spark Streaming, que é um framework de código aberto para computação distribuída, ele é muito poderoso e muito utilizado para processamento de grandes massas de dados. Caso seja necessário, não é difícil encontrar exemplos com ele.
Para começar, vamos instalar o PySpark em seu ambiente de desenvolvimento utilizando o comando pip:
pip install pyspark==3.1.1
Após instalar o PySpark para consumir os dados, vamos utilizar o trecho de código a seguir. No entanto, antes de executá-lo, vale a pena ressaltar alguns pontos:
1. Variável spark_batch_interval, mesmo os dados estando em streaming, o spark consome os dados do kinesis em micro batches e essa variável diz quanto tempo deve demorar entre um micro batch e outro.
2. Variável kinesis_initial_position, se refere a como vamos consumir os dados, podendo ter dois valores possíveis, como, por exemplo, TRIM_HORIZONTAL que vai começar a consumir os dados a partir do dado mais antigo ou LATEST, que vai começar a consumir da mensagem mais recente.
3. E por último, a variável kinesis_checkpoint_interval, que se refere ao intervalo de tempo em que será feito o checkpoint do ponto em que estamos no kinesis. Um lado bastante positivo de utilizar o KinesisUtils é que ele faz esse controle, quando executamos o código pela primeira vez, ele cria uma tabela no DynamoDB onde ele vai salvar e atualizar o nosso checkpoint.
import json
import logging
import os
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages org.apache.spark:spark-streaming-kinesis-asl_2.12:3.0.1,"
"org.apache.spark:spark-avro_2.12:3.0.1 pyspark-shell" # Adicionando as bibliotecas necessárias para o consumo dos dados
dir_path = f'{os.path.dirname(os.path.realpath(__file__))}/data' # path para armazenar os dados
def process(time, rdd):
print(f'Start process: {time}')
rdd = rdd.map(lambda v: json.loads(v))
df = rdd.toDF()
df.show()
df.write.mode("append").format("csv").save(dir_path)
if __name__ == "__main__":
try:
spark_batch_interval = 60 # Intervalo entre os micro batchs
app_name = 'stream_data' # Nome da sua aplicação de streaming
kinesis_stream_name = 'data_streaming' # Nome que foi dado ao kinesis
kinesis_endpoint = 'kinesis.us-east-1.amazonaws.com' # Endpoint do kinesis, esse é o default na região us-east-1
aws_region = 'us-east-1' # AWS Region
kinesis_initial_position = InitialPositionInStream.TRIM_HORIZON # De onde vamos iniciar a leitura dos dados
kinesis_checkpoint_interval = 60 # Intervalo que sera salvo o checkpoint da leitura do kinesis
spark_session = SparkSession.builder \
.appName(app_name) \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.sql.hive.convertMetastoreParquet", "false") \
.getOrCreate() # Criando spark context
spark_streaming_context = StreamingContext(spark_session.sparkContext,
spark_batch_interval) # Criando streaming Context
kinesis_streams = KinesisUtils.createStream(
spark_streaming_context, app_name, kinesis_stream_name, kinesis_endpoint,
aws_region, kinesis_initial_position,
kinesis_checkpoint_interval) # Inicializando o streaming com kinesis utils
kinesis_streams.foreachRDD(process) # Processando os micro batchs
spark_streaming_context.start()
spark_streaming_context.awaitTermination()
except Exception as ex:
logging.error(ex)
Fonte: kinesis_spark.py hosted with ❤ by GitHub
O código está consumindo, exibindo e salvando o que foi consumindo. Você pode ver os dados salvos na pasta data que deve estar na mesma pasta do código. O output deve ser similar a imagem a seguir:

Pessoal, era isso que eu queria passar pra vocês hoje. Obrigado a quem chegou até aqui! No próximo artigo vamos falar um pouco de quando precisamos salvar dados e atualizá-los. Para isso, vamos utilizar o Apache Hudi, que é uma ferramenta open source que nos ajuda a fazer isso de uma forma muito simples!