Os commits são uma parte essencial do processo de desenvolvimento de software. Eles nos permitem rastrear as alterações feitas em um projeto e fornecem um histórico de tudo o que aconteceu. No entanto, nem todos os commits são úteis, tenho certeza que você já viu algumas mensagens de commits completamente inúteis… tenho certeza que você já comitou mensagens inúteis… rsrs.. admita… estamos entre amigos aqui ;).
O que são Conventional Commits?
Os Conventional Commits são uma convenção de nomenclatura para mensagens de commit. Eles seguem um formato específico, consistindo de um cabeçalho, um corpo opcional e um rodapé também opcional, separados por uma linha em branco. O cabeçalho é composto por um tipo, um escopo (opcional) e uma descrição concisa do commit.
A estrutura básica de um commit convencional é a seguinte:
Tipo(escopo opcional): descrição
Corpo opcional
Signed-by: fulano@fulano.com.br
Refs: #123
Tipos de Commits
Existem vários tipos de commits já convencionados, mas nada impede que você crie os seus pr´óprios tipos.
Aqui estão alguns dos tipos mais comuns:
feat: Usado para adicionar uma nova funcionalidade ao projeto.
fix: Usado para corrigir um bug ou problema existente.
docs: Usado para adicionar ou atualizar a documentação do projeto.
style: Usado para fazer alterações de formatação, como espaços em branco, indentação, etc.
refactor: Usado para fazer alterações no código que não adicionam uma nova funcionalidade ou corrigem um bug.
test: Usado para adicionar ou modificar testes no projeto.
chore: Usado para fazer alterações na configuração do projeto, scripts auxiliares, etc.
Usos e Benefícios
Ao adotar os Conventional Commits, você estabelece uma estrutura clara e consistente para seus commits. Isso torna mais fácil para você e outros desenvolvedores entenderem o propósito de cada commit e acompanhar as mudanças feitas ao longo do tempo.
Outro benefício é a possibilidade de integração com ferramentas de automação, como sistemas de integração contínua. Por exemplo, você pode configurar seu sistema de CI/CD para executar automaticamente testes e implantar seu código em diferentes ambientes com base nos tipos de commits.
Outro benefício é a geração automática de changelogs. Com base nas mensagens de commit, você pode gerar um log de alterações que lista todas as funcionalidades adicionadas, bugs corrigidos e outras alterações relevantes em cada versão do seu projeto.
E por fim, ao adotar um padrão de commits, você terá uma melhor comunicação entre os membros da equipe. Ao seguir a convenção, todos os desenvolvedores têm uma compreensão clara do que cada commit representa e podem colaborar de forma mais eficiente.
Conclusão
Não há motivos para não adotar o uso de Conventional Commits, trata-se de uma abordagem simples e eficaz para padronizar a forma como escrevemos nossos commits. Eles fornecem clareza, organização e facilidade de entendimento para o histórico de alterações de um projeto. Além disso, eles trazem benefícios adicionais, como integração com ferramentas de automação e geração automática de changelogs.
Portanto, se você ainda não está usando os Conventional Commits em seus projetos, considere adotá-los. Segundo um estudo realizado por mim com “migo” mesmo, as chances de dor de cabeça caem 50% só de utilizar padrões nos seus projetos ;).
Se quiser conhecer em mais detalhes, indico a leitura do site oficial:
Que o Python é uma linguagem fantástica e simples de trabalhar, todo mundo já sabe. Mas ele pode ser ainda mais legal =D.
Abaixo vou mostrar algumas dicas simples, e que deixarão seu código bem mais conciso e otimizado. Bora ver =).
Evite loops aninhados com função product()
Essa descobri recentemente, e curti demais. Trata-se de uma função built-in do módulo itertools.
As vezes precisamos criar loops aninhados, o que pode deixar nosso código mais lento e difícil de ser lido, e essa função vem para nos ajudar nisso.
Segue um exemplo simples:
def nested_loop():
start_time = time.perf_counter()
for a in list_a:
for b in list_b:
for c in list_c:
v = a + b + c
end_time = time.perf_counter()
execution_time = end_time - start_time
print(f">>> Nested loops execution time: {execution_time:.2f}")
Mesmo código, mas agora usando a função product():
def product_function():
start_time = time.perf_counter()
from itertools import product
for a, b, c in product(list_a, list_b, list_c):
v = a + b + c
end_time = time.perf_counter()
execution_time = end_time - start_time
print(f">>> Product function execution time: {execution_time:.2f}")
Aqui o código completo com teste de performance. Essa performance aumenta conforme a complexidade do que seu código aumenta:
import numpy as np
import time
list_a = np.random.randint(1000, size=300)
list_b = np.random.randint(1000, size=300)
list_c = np.random.randint(1000, size=300)
def nested_loop():
start_time = time.perf_counter()
for a in list_a:
for b in list_b:
for c in list_c:
v = a + b + c
end_time = time.perf_counter()
execution_time = end_time - start_time
print(f">>> Nested loops execution time: {execution_time:.2f}")
def product_function():
start_time = time.perf_counter()
from itertools import product
for a, b, c in product(list_a, list_b, list_c):
v = a + b + c
end_time = time.perf_counter()
execution_time = end_time - start_time
print(f">>> Product function execution time: {execution_time:.2f}")
if __name__ == "__main__":
nested_loop()
product_function()
Assignment Expressions
Essa é uma forma bem legal de você atribuir um valor à variável enquanto utiliza-o. Da uma olhada:
O código abaixo est´á sem o uso de expressão de atribuição
Legal né… e vc pode usar de outras formas… por exemplo, em um loop:
while (texto := input("Digite um texto: ")) != "sair":
print("Texto digitado: ", texto)
Operator condicional ternário, ou simplesmente… if ternário
Para resolver condições simples que envolvam apenas 2 saídas, mais que isso não recomendo o uso de if ternários, pois podem deixar seu código muito difícil de entender:
A sintax é simples:
min = a if a < b else b
Aplicando no if do exemplo anterior, ficaria assim:
Esse é uma das coisas mais legais do python. A criação de funções anônimas para operações simples. O legal que elas não precisam ser tão anônimas assim, já que conseguimos atribuir uma função lambda a uma variável, e usar essa variável como se fosse uma função definida. Assim:
soma = lambda x,y: x+y
print(soma(1,2))
Outro exemplo legal… uma função para retornar os números da sequência fibonacci
def fib(x):
if x<=1:
return x
else:
return fib(x-1) + fib(x-2)
Com o uso de lambda:
fib = lambda x: x if x <= 1 else fib(x - 1) + fib(x - 2)
List Comprehensions
Este é sem dúvidas uma das coisas mais legais do Python, a compreensão de listas. É uma forma bem simples e prática de trabalhar com listas.
A sintaxe é simples:
lista_b = [item for item in lista_a [condições] ]
Abaixo um exemplo utilizando list comprehensions com a função lambda que criamos log acima:
fib = lambda x: x if x <= 1 else fib(x - 1) + fib(x - 2)
lista = [fib(i) for i in range(1,11)]
print(lista)
O que fizemos aqui foi simplesmente criar uma lista com 10 números da sequência fibonacci… legal né?! mas da pra fazer muito mais… olha esse outro exemplo:
# Filtrando somente frutas que comecem
# com a letra M da lista 'frutas'
frutas = ['Banana', 'Maça', 'Pera', 'Abacaxi', 'Uva', 'Melão', 'Melancia', 'Mamão', 'Jaca']
fm = [fruta for fruta in frutas if fruta.startswith('M')]
print(frutas)
print(fm)
Funções de ordem superior
O python possui algumas funções de ordem superior que são verdadeiras jóias quando trabalhamos com dados em larga escala.
Uma função de ordem superior são aquelas funções que recebem outras funções como parâmetros e retornam funções e/ou iterators.
Recentemente precisei desenvolver uma tabela agregada para servir a um dashboard e um dos dados a serem calculados era a previsão de entrega de um pedido com base em um SLA.
Só tinha um detalhe: no cálculo deveria ser considerado apenas os dias úteis.
Utilizando uma lógica parecida com a que foi usada na function de cálculo de dias úteis, resolvi desenvolver uma nova function para encontrar o próximo dia útil dado um intervalo de tempo e uma lista de datas a desconsiderar.
A função é bastante simples, recebe apenas 3 parametros: Data de inicio da contagem, intervalo e a lista de datas a serem desconsideradas no calculo. O resultado foi esse:
CREATE TEMP FUNCTION calc_proxima_data_util(DATA_INICIO DATE, SLA INT64, LISTA_FERIADOS ARRAY<DATE>) AS (
(
SELECT MAX(dt) FROM (
SELECT dt, RANK() OVER(ORDER BY dt) as id
FROM UNNEST(GENERATE_DATE_ARRAY(DATE_ADD(DATA_INICIO, INTERVAL 1 DAY), DATE_ADD(DATA_INICIO,INTERVAL SLA*10 DAY))) dt
WHERE EXTRACT(DAYOFWEEK from dt) between 2 and 6 and dt not in UNNEST(LISTA_FERIADOS)
QUALIFY id <= SLA)
)
);
A lógica é bastante simples: é gerado um range de datas partindo da data_inicio (mas não contabilizando-a) e indo até o intervalo * 10. Isso para ter uma margem de segurança e não correr o risco de perder alguma data.
Após a geração do range, com a cláusula where tiramos os finais de semana bem como os feriados.
Do resultado, criei um sequencial para cada linha e com a instrução QUALIFY retornei apenas as linhas com sequencial <= SLA. Precisei fazer dessa forma pois o BigQuery não aceitou que eu usasse o LIMIT passando o valor como um parâmetro… infelizmente =/
Depois só precisei fazer um MAX() na data. Legal né?! =D Eu achei =D.
Segue um exemplo de uso completo:
DECLARE DATA_INICIO DATE DEFAULT '2022-09-01';
DECLARE SLA INT64 DEFAULT 10;
DECLARE lista_feriados ARRAY<DATE> DEFAULT NULL;
SET lista_feriados = [
DATE('2022-01-01'),
DATE('2022-03-01'),
DATE('2022-04-15'),
DATE('2022-04-17'),
DATE('2022-04-21'),
DATE('2022-05-01'),
DATE('2022-06-16'),
DATE('2022-09-07'),
DATE('2022-10-12'),
DATE('2022-11-02'),
DATE('2022-11-15'),
DATE('2022-12-25')
];
CREATE TEMP FUNCTION calc_proxima_data_util(DATA_INICIO DATE, SLA INT64, LISTA_FERIADOS ARRAY<DATE>) AS (
(
SELECT MAX(dt) FROM (
SELECT dt, RANK() OVER(ORDER BY dt) as id
FROM UNNEST(GENERATE_DATE_ARRAY(DATE_ADD(DATA_INICIO, INTERVAL 1 DAY), DATE_ADD(DATA_INICIO,INTERVAL SLA*10 DAY))) dt
WHERE EXTRACT(DAYOFWEEK from dt) between 2 and 6 and dt not in UNNEST(LISTA_FERIADOS)
QUALIFY id <= SLA)
)
);
SELECT
dt_inicio,
sla,
calc_proxima_data_util(dt_inicio, sla, lista_feriados) as previsao_entrega
FROM (
SELECT date('2022-09-01') as dt_inicio, 10 as sla union all
SELECT date('2022-02-25'), 7 union all
SELECT date('2022-04-07'), 20
);
Recentemente encarei um grande desafio junto com um colega de trabalho, no qual descobrimos o multiprocessing do Python, e o quão poderoso ele é.
Para vocês entenderem melhor o problema, tínhamos alguns scripts em python, que foram escritos para serem executados em servidores on premises, sem computação distribuída, e utilizando a linguagem na sua forma mais básica e pura. Era o que dava pra ser feito com os recursos disponíveis e o cenário da época.
Quando recebemos esse problema, a primeira reação foi: “Cara… isso aqui é trabalho para o Spark… tem que ser reescrito.”. A questão é que não tínhamos tempo para reescrever todos os scripts utilizando a arquitetura que considerávamos a ideal.
Após várias conversas com várias pessoas, todos tínhamos o mesmo sentimento: “Precisamos de uma versão 2.0 onde tudo será reescrito na melhor arquitetura possível”.
Mas e até lá, o que pode ser feito com a versão atual para que rodem num tempo aceitável?!
O quão ruim estava a execução?! 6 scripts com tempos entre 8h30m e 18h. Isso ai, um dos scripts levava 3/4 de um dia inteiro para ser executado.
Esse era o desafio: Melhorar a performance no processamento de vários milhões de registros sem reescrever o código.
Primeiros passos…
Começamos com alguns pequenos refatoramentos no código, com objetivo de deixar mais limpo e na tentativa de liberar memória durante o processamento. Porém isso não surtiu muito efeito.
Segunda ação, foi quebrar o dataset original em datasets menores para serem processados em sequência. Dessa forma ganhamos mais memória para ser usada durante o processamento.
A terceira ação, foi utilizarmos mais maquinas no processamento. Ainda não estamos trabalhando com o processamento distribuido, mas dessa forma conseguimos executar cada dataset em uma maquina e trabalhar com processamento paralelo. Criamos um orquestrador, que ficou responsável por buscar cada dataset, e disparar a chamada para cada script.
E a quarta ação, talvez seja a mais importante delas, e a que me motivou a escrever esse post: A utilização do multiprocessing do python para executar o código em todas as cpus da maquina de forma PARALELA.
Notamos que ao executar o código na sua forma normal, a máquina muitas vezes utilizava apenas 1 core para a execução do processamento, e quando usava mais de 1 core, não fazia de forma paralela.
Após essas 4 ações, conseguimos baixar o tempo de execução de cada script para menos de 20 minutos cada.
IMPORTANTE Antes de entrar em mais detalhes sobre o multiprocessing, vale lembrar que Multiprocessing É DIFERENTE das threads. Notei que ao conversar sobre Multiprocessing com algumas pessoas, todas citavam as Threads do Python.
Vou deixar dois links bem interessantes que explicam de forma bem simples e prática a diferença entre ambas:
Existe algo chamado Global Interpreter Lock, que é um mecanismo usado pelo CPython(que é a implementação padrão do Python) para garantir que apenas 1 thread por vez execute o Python. Mas por que isso existe? Beeeeeem resumidamente: Para controle de concorrência, dessa forma, ele garante que não terá outros processos tratando os mesmos endereços de memória.
E é ai que entra o Multiprocessing.
Como a própria documentação oficial diz:
“O pacote de multiprocessamento oferece simultaneidade local e remota, efetivamente contornando o Global Interpreter Lock usando subprocessos em vez de threads.”
Python.org
E por que usar o multiprocessing? novamente, a documentação oficial:
“… o módulo de multiprocessamento permite que o programador aproveite totalmente vários processadores em uma determinada máquina”.
Python.org
É isso ai, ele “dibra” o GIL(Global Interpreter Lock) usando subprocessos, dessa forma conseguimos utilizar todos os cpus da maquina de forma PARALELA :D.
Lindo isso né?! 😀
Vou mostrar 2 exemplos bem simples que eu fiz apenas para exemplificar o uso e a execução do multiprocessing.
Análisando a execução do Multiprocessing
A primeira coisa que quero mostrar, é justamente o diferencial dele, a execução em vários processos:
Código normal:
import time
import os
def spawn(num):
print("PID SPAWN: ", os.getpid(), "NUM: ", num)
calc = num**2
if __name__ == '__main__':
print("PID MAIN: ", os.getpid())
start = time.time()
for i in range(10):
spawn(i)
end = time.time()
print("Tempo em segundos: ", int(end - start))
Resultado da execução do código acima
Notem que no resultado da execução, tanto o PID principal quanto o PID de cada execução da função spawn são os mesmos, e os números foram executados na ordem.
Código com Multiprocessing utilizando Pool
import multiprocessing
import time
import os
def spawn(num):
print("PID SPAWN: ", os.getpid(), "NUM: ", num)
calc = num**2
if __name__ == '__main__':
start = time.time()
print("PID MAIN: ", os.getpid())
with multiprocessing.Pool(processes=8) as pool:
data = pool.map(spawn, [x for x in range(10)])
pool.close()
end = time.time()
print("Tempo em segundos: ", int(end - start))
Resultado da execução utilizando multiprocessing
Notem que agora temos PIDs diferentes, e que os números não estão mais em ordem. Isso é muito legal :D!
Teste de performance
Vamos continuar com os mesmos códigos, mas vamos remover os prints dos PIDs, e vamos aumentar o numero de registros para 100.000.000 com o objetivo de medir em quanto tempo eles serão processados.
Código normal
import multiprocessing
import time
import os
def spawn(num):
#print("PID SPAWN: ", os.getpid(), "NUM: ", num)
calc = num**2
if __name__ == '__main__':
print("PID MAIN: ", os.getpid())
start = time.time()
for i in range(100000000):
spawn(i)
end = time.time()
print("Tempo em segundos: ", int(end - start))
Resultado da execução do código normalEstado dos processadores na execução do código normal
Notem que com o código normal, tivemos 100.000.000 de registros processados em 35 segundos e analisando os processadores, vimos apenas 1 sendo usado 100%.
Código com Multiprocessing utilizando Pool
import multiprocessing
import time
import os
def spawn(num):
#print("PID SPAWN: ", os.getpid(), "NUM: ", num)
calc = num**2
if __name__ == '__main__':
start = time.time()
print("PID MAIN: ", os.getpid())
with multiprocessing.Pool(processes=8) as pool:
data = pool.map(spawn, [x for x in range(100000000)])
pool.close()
end = time.time()
print("Tempo em segundos: ", int(end - start))
Resultado da execução com multiprocessingEstado dos processadores na execução com multiprocessing
Aqui podemos ver bem a diferença, para o mesmo número de registros, a execução levou apenas 19 segundos, e utilizou todos os processadores.
Demais né?! 😀
Porém é importante ressaltar, que se o volume de dados a ser processado não for muito grande, você não conseguirá ver ganhos utilizando o multiprocessing. O real ganho vai aparecendo conforme o volume de dados aumenta.
Conclusão
Bom, esse problema que conseguimos resolver com multiprocessing nos trouxe uma lição muito importante: olhar o hardware. As vezes não analisamos a infraestrutura que irá suportar os nossos códigos, e quando olhamos, fazemos de forma superficial.
As vezes os problemas não estão no código propriamente dito, mas na forma como ele é executado.
Antes de sair… dá uma conferida na série de posts para o Projeto Video Creator que eu postei aqui no blog. Pra quem curte automatizar o trabalho, vai gostar do conteúdo =D.
Essa é a décima e última parte da série sobre o projeto Video Creator, onde mostrarei como fiz o upload automático do video no o Youtube.
Se você entrou direto nesse post, te aconselho a começar por aqui, para entender exatamente do que se trata essa série.
Sobre essa parte
Para fazer o upload do video no Youtube, usei apenas algumas configurações básicas, mas vale muito a pena dar uma olhada na documentação da API, porque tem muita opção para configurar. Para esse projeto, segui o mais básico.
Requisitos:
Para essa parte, precisaremos habilitar a API do Youtube na sua conta Google Cloud, baixar o arquivo com a secret key que será gerada, fazer alguns imports e está quase tudo pronto. Isso porque 90% do código já está pronto no próprio site da API, e é ele que vamos usar como base:
Após logado, você precisará criar um novo projeto, ao lado de Google Cloud Platform, clique no menu dropdown e a modal abaixo aparecerá, clique em Novo Projeto, preencha com o nome desejado e clique em criar. Se o projeto não ficar selecionado automaticamente, clique novamente no menu dropdown e selecione o projeto recém criado.
layout fev/2022
Agora clique nas barrinhas horizontais no canto superior esquerdo da tela, depois vá em APIs e serviços, e por fim selecione Biblioteca.
layout fev/2022
Na tela que abrir, selecione Youtube Data Api v(x):
Na tela que abrir, clique no botão ativar, e sua tela ficará assim:
layout fev/2022
Para visitar a documentação da API, clique em TESTAR ESTA API.
Clique em Gerenciar, e na tela que abrir, selecione a aba Credenciais e depois em Criar credenciais.
layout fev/2022
Selecione ID do cliente OAuth.
Em tipo de aplicativo, selecione Aplicativo da WEB:
layout fev/2022
Digite um nome, e depois em criar, aparecerá um modal com seu ID de cliente e Chave secreta de cliente. Você pode salvar esses dados em um arquivo de config, ou fazer como eu fiz, que é na listagem de credenciais, clicar no botão de download, e baixar o arquivo com esses dados.
Hora de codificar:
Com a API ativada e o arquivo com a secret key já salvo na pasta credentials, é hora de pegar o sample code e fazer algumas personalizações.
O robô inicia com a função start(), que carrega o conteúdo do arquivo content.json para dentro do objeto video_content, cria o objeto youtube, e nesse momento, o navegador abrirá perguntando para qual conta do Youtube você deseja fazer o upload. Após isso, ele chama a função create_thumbnail(), que cria as thumbnails e inicia o processo de Upload.
A função start() ficou assim:
def start():
logging.info('--- Starting Youtube robot ---')
video_content = rcontent.load()
youtube = get_authenticated_service()
try:
create_thumbnail(video_content)
initialize_upload(youtube, video_content)
result = "Upload concluído!"
except HttpError as e:
result = "An HTTP error %d occurred:\n%s" % (e.resp.status, e.content)
except Exception as ex:
result = ex
print(result)
A função create_thumbnail() ficou assim:
def create_thumbnail(video_content):
logging.info("Creating Thumbnail...")
# apagando as thumbs existentes
os.system("rm -rf {}/thumb*".format(CONTENT_IMAGES_PATH))
# criando os comandos para execução.
# Importante: Refatorar para que a montagem do comando esteja dentro de um for
command_thumb_default = "convert -size {}x{} -font helvetica -pointsize 24 -background 'black' -fill white -gravity center caption:'{}' ./content/images/thumb_default.png"\
.format(YOUTUBE_VIDEO_THUMBNAILS['default']['width'],YOUTUBE_VIDEO_THUMBNAILS['default']['height'], video_content['youtube_details']['title'])
command_thumb_medium = "convert -size {}x{} -font helvetica -pointsize 26 -background 'black' -fill white -gravity center caption:'{}' ./content/images/thumb_medium.png"\
.format(YOUTUBE_VIDEO_THUMBNAILS['medium']['width'],YOUTUBE_VIDEO_THUMBNAILS['medium']['height'], video_content['youtube_details']['title'])
command_thumb_high = "convert -size {}x{} -font helvetica -pointsize 30 -background 'black' -fill white -gravity center caption:'{}' ./content/images/thumb_high.png"\
.format(YOUTUBE_VIDEO_THUMBNAILS['high']['width'],YOUTUBE_VIDEO_THUMBNAILS['high']['height'], video_content['youtube_details']['title'])
command_thumb_standard = "convert -size {}x{} -font helvetica -pointsize 50 -background 'black' -fill white -gravity center caption:'{}' ./content/images/thumb_standard.png"\
.format(YOUTUBE_VIDEO_THUMBNAILS['standard']['width'],YOUTUBE_VIDEO_THUMBNAILS['high']['height'], video_content['youtube_details']['title'])
command_thumb_maxres = "convert -size {}x{} -font helvetica -pointsize 70 -background 'black' -fill white -gravity center caption:'{}' ./content/images/thumb_maxres.png"\
.format(YOUTUBE_VIDEO_THUMBNAILS['maxres']['width'],YOUTUBE_VIDEO_THUMBNAILS['high']['height'], video_content['youtube_details']['title'])
os.system(command_thumb_default)
os.system(command_thumb_medium)
os.system(command_thumb_high)
os.system(command_thumb_standard)
os.system(command_thumb_maxres)
Uma informação importante, é que nessa função eu faço todas as possíveis thumbnails que o Youtube permite, porém não consegui fazer funcionar o upload das thumbnails junto com o video. Precisei utilizar a função set do objeto thumbnails após o upload de video.
Abaixo, as 2 funções que fazem o processo de upload do video:
def initialize_upload(youtube, video_content):
list_of_used_images = '\n'.join('✅ ' + img for img in video_content['images_used'])
list_of_sentences = '\n'.join([ s['text'] for s in video_content['sentences']])
YOUTUBE_VIDEO_DESCRIPTION = """
Conheça um pouco mais sobre {}:
{}
👉 Referências:
💥 Wikipedia
💥 Custom Search API - Bing.
👉 Imagens utilizadas no vídeo:
{}
""".format(video_content['search_term'], list_of_sentences, list_of_used_images)
YOUTUBE_VIDEO_FILE = CONTENT_PATH + '/{}'.format(video_content['video_filename'])
body= {
"snippet": {
"title": video_content['youtube_details']['title'],
"description": YOUTUBE_VIDEO_DESCRIPTION,
"tags": get_tags(video_content),
"categoryId": video_content['youtube_details']['category_id']
},
"status": {
"privacyStatus": YOUTUBE_VIDEO_PRIVACY_STATUS
}
}
# Call the API's videos.insert method to create and upload the video.
insert_request = youtube.videos().insert(
part=",".join(body.keys()),
body=body,
# The chunksize parameter specifies the size of each chunk of data, in
# bytes, that will be uploaded at a time. Set a higher value for
# reliable connections as fewer chunks lead to faster uploads. Set a lower
# value for better recovery on less reliable connections.
#
# Setting "chunksize" equal to -1 in the code below means that the entire
# file will be uploaded in a single HTTP request. (If the upload fails,
# it will still be retried where it left off.) This is usually a best
# practice, but if you're using Python older than 2.6 or if you're
# running on App Engine, you should set the chunksize to something like
# 1024 * 1024 (1 megabyte).
media_body=MediaFileUpload(YOUTUBE_VIDEO_FILE, chunksize=-1, resumable=True)
)
insert_response = resumable_upload(insert_request)
# set thumbnail
set_thumbnail(youtube, insert_response['id'])
# This method implements an exponential backoff strategy to resume a
# failed upload.
def resumable_upload(insert_request):
response = None
error = None
retry = 0
while response is None:
try:
print ("Uploading file...")
status, response = insert_request.next_chunk()
if response is not None:
if 'id' in response:
print ("Video id '%s' was successfully uploaded." % response['id'])
else:
exit("The upload failed with an unexpected response: %s" % response)
except HttpError as e:
if e.resp.status in RETRIABLE_STATUS_CODES:
error = "A retriable HTTP error %d occurred:\n%s" % (e.resp.status,e.content)
else:
raise
except RETRIABLE_EXCEPTIONS as e:
error = "A retriable error occurred: %s" % e
if error is not None:
print (error)
retry += 1
if retry > MAX_RETRIES:
exit("No longer attempting to retry.")
max_sleep = 2 ** retry
sleep_seconds = random.random() * max_sleep
print ("Sleeping %f seconds and then retrying..." % sleep_seconds)
time.sleep(sleep_seconds)
return response
Dentro da primeiro função initialize_upload(), comecei montando os dados que eu precisaria para fazer o upload do video, utilizei o assunto pesquisado, as sentenças e a lista de imagens usadas para compor a descrição do video e dar os devidos créditos para os sites detentores das imagens usadas.
Na hora de montar as tags, utilizei as keywords das sentenças para. Para isso, criei uma função para buscar as keywords e devolvê-las numa lista.
Ficou assim:
def get_tags(video_content):
keywords = []
for sentence in video_content['sentences']:
for k in sentence['keywords']:
if k not in keywords:
keywords.append(k)
return keywords
E após o upload, faço o set da thumbnail:
def set_thumbnail(youtube, video_id):
print('Setting the thumbnail: {} for video: {}'.format(CONTENT_IMAGES_PATH + "/thumb_maxres.png", video_id))
youtube.thumbnails().set(
videoId=video_id,
media_body=CONTENT_IMAGES_PATH + "/thumb_maxres.png"
).execute()
É isso!
Conclusão!
E assim encerro essa série de posts sobre o projeto Video-Creator. Muito feliz com o resultado final.
Sei que tem muitas coisas pra refatorar, pra evoluir, muitas oportunidades de novas implementações, mas o objetivo principal foi atingido: Receber um assunto e gerar um video para o Youtube automaticamente.
Espero que tenham gostado da série, qualquer dúvida, sugestão ou crítica, por favor entrem em contato. Se tiver algum assunto relacionado a essa série de posts que vocês queiram que eu me aprofunde um pouco mais na explicação, mandem um comentário pedindo, prometo trazer assim que possível.
Para a renderização do video, precisaremos importar as libs do MoviePy.
from moviepy.editor import *
from moviepy.video.tools.segmenting import findObjects
MoviePy:
O MoviePy utiliza algumas features do ImageMagick, e precisamos fazer alguns ajustes nas configurações.
No linux, o MoviePy detecta automaticamente o Image Magick. No windows é preciso criar uma variável de ambiente chamada IMAGEMAGICK_BINARY contendo o caminho do binário. Como utilizei Ubuntu, não tive nenhum problema quanto a isso.
Para que ele funcione corretamente, é preciso configurar uma política de segurança, do contrário, dará erro na hora de compilar o video.
Como isso é somente uma POC, apenas comentei no arquivo /etc/ImageMagick-x/policy.xml algumas linhas:
No início da função já criei alguns dados que serão úteis mais pra frente, e salvei no arquivo content.json.
Depois criei um objeto contendo a música que utilizei de fundo. Para esse projeto, deixei uma música fixa de fundo, a mesma para todos os videos.
Depois setei algumas variáveis contendo as configurações do video, como resolução, tamanho, cor e fonte do texto, tamanho da tarja preto onde posicionei o texto.
Todas as configurações relacionadas ao texto foram para chegar nesse resultado:
Após o ajuste da configuração do video, criei os 3 primeiros slides do video com textos padrões de apresentação e uma imagem fixa.
Após, comecei a iterar nas sentenças para incluir o texto, e buscando a imagem correspondente aquela sentença para incluir de background.
Para conseguir chegar numa duração adequada para cada slide, cronometrei quanto tempo eu levava para ler tranquilamente um texto com 150 caracteres, +- 10 segundos. Então para setar a duração do slide contei o número de caracteres de cada sentença e dividi por 15:
duration = int(len(sentence['text'])/15)
Com isso, cada slide ficou com uma duração adequada de acordo com o tamanho da sentença que será exibida.
Por fim, inclui um texto padrão de encerramento do video, inclui a música padrão, e mandei compilar o video. O trecho que compila o video é esse abaixo, e encontra-se no finalzinho da function:
Essa é a oitava parte da série sobre o projeto Video Creator, onde mostrarei como fiz o tratamento das imagens que serão utilizadas nos videos.
Se você entrou direto nesse post, te aconselho a começar por aqui, para entender exatamente do que se trata essa série.
Sobre essa parte
Aqui começamos a desenvolver o penúltimo robô, o rvideo.py, que será responsável por tratar as imagens e compilar o video.
Nesse post mostrarei o tratamento das imagens com uma ferramenta fantástica chamada ImageMagick.
Existe uma outra ferramenta fantástica que cheguei a testar, mas na hora acabei mantendo o ImageMagick por já estar um pouco mais familiarizado. Mas para quem quiser conhecer, segue o link da lib:
A função é bem simples, mas ela faz muita coisa legal.
Primeira ela elimina da pasta de imagens todas as imagens que contenham _composite no nome.
Depois carrega o conteúdo do arquivo content.json para dentro do objeto video_content.
Depois itera nas imagens da pasta com as imagens baixadas, e converte todas elas para o tamanho de 1280 x 720px.
Após, cria uma montagem esticando a imagem principal para cobrir todo o frame e cria o efeito embaçado. Depois coloca no centro da imagem a foto redimensionada anteriormente.
O resultado é bem legal 😀
Imagem originalImagem editada
Notem que no final da function, existe um trecho onde precisei utilizar a lib Pillow para tratar um problema de imagens em grayscale na hora de compilar o video.
Bom, agora que temos as imagens tratadas, hora de compilar o video!
Essa é a sétima parte da série sobre o projeto Video Creator, onde mostrarei como fiz a busca pelas imagens.
Se você entrou direto nesse post, te aconselho a começar por aqui, para entender exatamente do que se trata essa série.
Sobre essa parte
Basicamente o que fiz foi concatenar o assunto pesquisado com a primeira keyword da lista de keywords de cada sentença.
Bem simples!
Requisitos:
O principal requisito, é claro, é óbvio… criação do Bing Resource na Azure e a criação da instância do Bing Custom Search. Tudo isso eu mostrei no último post!
Utilizei também 2 imports praticamente pradrões nesse projeto:
import json
import requests
rimage.py
A parte da busca e do download das imagens, ficou no robô rimage.py.
A chamada ao endpoint do ping resolvi colocar dentro de uma função que recebe 2 parâmetros: a query de pesquisa e a quantidade de imagens que o serviço irá devolver. Ficou assim:
# implementando busca de imagens pelo Bing
def search_images_on_bing(query, count="1"):
logging.info("Searching images on Bing with custom bing image search")
url = credentials['bing_endpoint'] + "?q=" + query + "&count="+ count +"&customconfig=" + credentials['bing_custom_config_id'] +"&licence=ShareCommercially&size=Large"
r = requests.get(url, headers={'Ocp-Apim-Subscription-Key': credentials['azure_subscription_key']})
response = json.loads(r.text)
if 'value' in response:
result = [d['contentUrl'] for d in response['value']]
else:
result = []
return result
Notem que essa função é bem simples, recebe uma query e a quantidade que ela deve retornar de resultado, ai foi só bater na api passando esses parâmetros, fazer o parser do json e extrair somente os links das imagens para enviar como retorno.
Para esse projeto eu utilizei os seguintes parâmetros:
licence=ShareCommercially
Retornará imagens que podem ser usadas com propósito pessoal ou comercial
size=Large
Retornará imagens de no mínimo 500×500 pixels
A Api do bing é realmente fantástica. Existem inúmeros parâmetros que podem ser usados para refinar a busca. Quem quiser personalizar ainda mais a busca, só ver os parametros disponíveis na página oficial da api:
Criei uma outra função responsável por iterar nas sentenças, montar a query de pesquisa e chamar a função acima. Ficou assim:
def fetch_images_from_sentences():
logging.info("Fetching images from sentences...")
print("Fetching images from sentences", end='\n\n')
# loading content
logging.info("Get sentences from object saved")
print("Loading content from content.json")
video_content = rcontent.load()
for sentence in video_content['sentences']:
if len(sentence['keywords'])>0:
if video_content['search_term'] != sentence['keywords'][0]:
sentence['image_search_query'] = "{} {}".format(video_content['search_term'], sentence['keywords'][0])
sentence['images'] = search_images_on_bing(sentence['image_search_query'], "5")
time.sleep(1.5)
rcontent.save(video_content)
Três pontos importantes aqui:
O primeiro é sobre o atributo sentence['image_search_query'], onde eu armazeno o termo que eu utilizei para buscar as imagens, e ele é composto pelo assunto do video + a primeira keyword da lista. Isso para evitar que seja buscado imagens com uma keyword muito genérica que não tenha ligação com o assunto principal.
O segundo é sobre como foi construído o fluxo das coisas. O robô de imagem funciona completamente independente dos demais. O que ele precisa é que exista um arquivo content.json na estrutura esperada. Por isso quando rodamos o rimage.py, ele utiliza a função load() do rcontent.py para buscar o conteúdo do video e efetuar a busca das imagens. Após encontrar as imagens, ele gera uma lista com os links e grava dentro do atributo images, dentro de sentences.
O terceiro ponto é otime.sleep(1.5)no final da função, que precisei incluir para respeitar os limites de Free Tier do Bing Resource. Com isso, eu garanto que só vou fazer 1 chamada por segundo.
Download das imagens
Para o download das imagens, inclui todos os passos dentro de uma função que eu chamei de download_images(). Ficou assim:
def download_images():
logging.info("Downloading images from each sentences")
#os.chdir('./')
path = CONTENT_IMAGES_PATH
# create directory
os.makedirs(path, exist_ok=True)
# limpando a pasta
os.system("rm -rf {}/*".format(path))
video_content = rcontent.load()
list_img = []
for idx_s, sentence in enumerate(video_content['sentences']):
for idx_i, image in enumerate(sentence['images']):
if image not in list_img:
# if an image doesn't downloaded, try another one
try:
print("Trying to download: ", image)
image_filename = "{}/{}_original.jpg".format(path,idx_s)
wget.download(image,image_filename)
list_img.append(image)
print("")
break
except Exception as ex:
logging.error(ex)
continue
video_content['images_used'] = list_img
rcontent.save(video_content)
O que essa função faz é basicamente:
verificar se a pasta onde ficará as imagens já existe, se não… cria…
Remover todas as imagens existentes na pasta
Carrega o conteúdo da content.json para buscar os links das imagens buscadas
Itera nas sentenças e nas imagens encontradas.
Faz uma verificação se a imagem já não foi baixada
Tenta realizar o download da imagem, se por algum motivo ele não conseguir, ele tenta a próxima imagem.
Grava o link das imagens baixadas dentro do atributo images_used e salva o objeto em content.json
Next steps?!
Com conteúdo textual e imagens devidamente providenciadas, é hora de começar a montar o video.
Essa é a sexta parte da série sobre o projeto Video Creator, onde mostrarei um pouco sobre o Bing Custom Search.
Se você entrou direto nesse post, te aconselho a começar por aqui, para entender exatamente do que se trata essa série.
Sobre essa parte
Aqui resolvi seguir um caminho um pouco diferente do projeto original, que utilizou o Custom Search do Google, e resolvi utilizar o Bing Custom Search para realizar a busca pelas imagens.
Nos testes que realizei, os resultados do Google não foram satisfatórios, trazendo imagens completamente sem ligação com o conteúdo. Não sei o porquê!
Então resolvi pesquisar outras formas de buscar imagens e gostei muito do Bing, tanto na acurácia dos resultados, quando nas opções de filtros disponíveis.
Requisitos:
O caminho não é muito diferente do Google, você precisará de:
Este passo é muito simples, basta entrar nesse endereço: https://portal.azure.com e clicar em criar conta:
layout fev/2022
Os próximos passos, é o mesmo de qualquer serviço web.
Criar uma subscription pay as you go
Ao finalizar a criação da conta e realizar o login, você verá as seguinte tela:
layout fev/2022
Clique em Subscriptions e depois, na opção Add +. Na tela que abrir, escolha Pay as you Go. Você pode criar uma Free trial se quiser. Como o recurso possui um Free tier, se você ficar dentro dos limites, você não será cobrado.
layout fev/2022
Agora vem uma parte um pouco mais chata, que é onde você precisa preencher alguns dados pessoais, e os dados da forma de pagamento.
Reforçando: Para este recurso que utilizei no projeto, vamos usar a camada de Free tier. Se você respeitar os limites dessa camada, você não será cobrado.
layout fev/2022
Criar o Bing Custom Search
Após a criação da subscription, busque por bing resource no topo da pagina, e escolha a opção Bing Resources na seção Services
Na tela que abrir, clique em Add+ e depois em Bing Custom Search.
Na nova tela, dê um nome para o recurso, selecione a subscription, e no Pricing Tier, selecione a opção F0, essa opção é referente ao Free Tier. Lembre-se de respeitar os limites de chamada.
layout fev/2022
Após tudo preenchido, clique em criar. Pronto :D.
Agora entre na página do recurso que você acabou de criar, e procure por Manage keys e clique em click here to manage keys.
Na tela que abrir você terá acesso a 2 chaves, você precisará delas para fazer as chamadas.
layout fev/2022
Eu salvei a Key 1 dentro do atributo azure_subscription_key no meu arquivo credentials.yml.
Clique em New Instance. Na tela que abrir, digite um nome para a nova instancia.
layout fev/2022
Após a nova instancia criada, clique nela e vc entrará na tela de configuração dessa instancia. No topo da página, clique em Production:
layout fev/2022
Precisaremos apenas guardar a Custom Configuration ID. Criei uma variável dentro do arquivo credentials.yml chamada bing_custom_config_id, onde eu guardei esse ID.
E agora?!
Nesse momento, já temos os recursos necessários para realizar a pesquisa das imagens.
Mas se quiser testar, na página do recurso no Portal Azure, vai ter uma aba chamada Sample code, onde você terá alguns exemplos de código para testar.
Você precisará apenas da subscription key, que é a Key do recurso, o endpoint e o id da instancia do Custom Search que criamos logo acima.
Essa é a quinta parte da série sobre o projeto Video Creator, onde mostrarei a persistência do conteúdo textual em disco.
Se você entrou direto nesse post, te aconselho a começar por aqui, para entender exatamente do que se trata essa série.
Sobre essa parte
Assim como no projeto original, optei por não utilizar nenhuma persistência em banco de dados, e resolvi persistir o conteúdo em um arquivo .json dentro da pasta content para agilizar o processo de desenvolvimento do projeto e diminuir a complexidade, um vez que não preciso prover nenhuma estrutura adicional de persistência.
Com certeza, a parte mais simples e tranquila do projeto! 🙂
Requisitos
Utilizei os seguintes imports:
import json
import o
Para realizar a persistência em disco, criei a função save(). Ela ficou assim:
def save(content):
logging.info("Saving content...")
print("Saving content...", end='\n\n')
# creating name folder
path = "content"
# create directory
os.makedirs(os.path.dirname("{}/content.json".format(path)), exist_ok=True)
with open("{}/content.json".format(path), "w", encoding='utf8') as f:
try:
json.dump(content, f, indent=4, ensure_ascii=False)
except Exception as ex:
print(ex)
Bem simples, a função recebe um objeto, verifica se o mesmo já existe, e cria o arquivo.
Com isso já temos todo o nosso conteúdo textual.
No próximo post, vou mostrar como fiz a busca e o download das imagens.