Azure SQL Database로 CSV 파일 BULK INSERT — Python

Dae Woo Kim
7 min readSep 28, 2021

이번 포스팅에서는 지난 포스팅에 이어서, BULK INSERT로 900만건 CSV 파일 데이터를 처리하는 과정을 수행.

CSV 파일 BULK INSERT는 아래 과정으로 처리한다.

Azure SQL Database로 CSV 파일 BULK INSERT — Python

CSV 데이터 리뷰

Blob Storage로 CSV 파일 업로드

BULK INSERT 준비

- SAS 토큰 생성

- 테이블 생성

- SCOPED CREDENTIAL 생성

- EXTERNAL DATA SOURCE 추가

BULK INSERT 실행

기존 On-prem의 SQL과 다르게, Azure SQL Database에서 위의 과정이 필요한 이유는, Azure SQL Database는 로컬 파일시스템에 대한 접근이 제한되기 때문이다.

Azure SQL Database only supports reading from Azure Blob Storage.

BULK INSERT (Transact-SQL) — SQL Server | Microsoft Docs

따라서, bulk insert 하려는 파일을 blob에 올려 두고 처리해는 과정이 필요하다.

사용된 전체 코드는 아래 github 리포지토리에서 확인 가능.

CloudBreadPaPa/azure-sql-bulk-insert: Bulk insert massive CSV data to Azure SQL Database (github.com)

CSV 데이터 리뷰

데이터는 1만건, 100만건으로 테스트 목적으로 split을 했다. 1만건의 데이터만 repo에 추가해 두었으며, 전체 과정 진행이 가능.

?

1

2

3

4

# load CSV file

import os

import pandas as pd

df = pd.read_csv("data/train_10000.csv")

data 폴더에 CSV 파일을 올려 두었고, 이렇게 구조에 대해서 확인 가능.

파일 split은 How to split large text file in windows? — Stack Overflow 문서를 참고하면 확인 가능하다.

Blob Storage로 CSV 파일 업로드

아래의 방법으로, CSV 파일을 blob storage로 업로드 가능하다.

?

1

2

3

4

5

6

7

8

csv_file_name = "train_10000.csv"

upload_file_path = os.path.join(local_path, csv_file_name)

blob_client = blob_service_client.get_blob_client(container=container_name, blob=csv_file_name)

# Upload the created file

with open(upload_file_path, "rb") as data:

blob_client.upload_blob(data)

Python으로 blob을 처리하는 예제는 아래 링크 참조.

Quickstart: Azure Blob Storage library v12 — Python | Microsoft Docs

BULK INSERT 준비

SQL서비스가 blob에 접근할 경우, 당연히 보안 설정 과정이 필요하다. 이 과정을 미리 처리해 두어야 하며, 이때 blob 접근 시 SAS token을 이용한다.

SAS 토큰 생성

지난 포스트에서 이미 SAS 토큰 생성 방안들을 리뷰했다. 간단히 코드만 리뷰.

개발자 커뮤니티 SQLER.com — Azure Blob Storage SAS token 생성

?

1

2

3

4

5

6

7

8

9

10

11

12

13

ef generate_sas_token(file_name):

sas = generate_blob_sas(account_name=AZURE_ACC_NAME,

account_key=AZURE_PRIMARY_KEY,

container_name=container_name,

blob_name=file_name,

permission=BlobSasPermissions(read=True),

expiry=datetime.utcnow() + timedelta(hours=1)) # 1 hour expire for test

sas_url ='https://'+AZURE_ACC_NAME+'.blob.core.windows.net/'+container_name+'/'+file_name+'?'+sas

secret = sas

return sas_url, secret

sas_url, secret = generate_sas_token(csv_file_name)

이렇게 SAS 토큰을 가져오는 함수를 생성하고, SAS URL과 secret을 받는다. secret만 따로 이용되기 때문에 추가로 함수에서 리턴 받는다.

테이블 생성

bulk insert를 위해서는 미리 테이블이 SQL에 생성되어 있어야 한다. 다음 쿼리로 테이블을 생성한다.

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

create table malware(

MachineIdentifier varchar(256),

ProductName varchar(256),

EngineVersion varchar(256),

AppVersion varchar(256),

AvSigVersion varchar(256),

IsBeta int,

RtpStateBitfield float(24),

IsSxsPassiveMode int,

DefaultBrowsersIdentifier float(24),

AVProductStatesIdentifier float(24),

AVProductsInstalled float(24),

AVProductsEnabled float(24),

HasTpm int,

CountryIdentifier int,

CityIdentifier float(24),

OrganizationIdentifier float(24),

GeoNameIdentifier float(24),

LocaleEnglishNameIdentifier int,

Platform varchar(256),

Processor varchar(256),

OsVer varchar(256),

OsBuild int,

OsSuite int,

OsPlatformSubRelease varchar(256),

OsBuildLab varchar(256),

SkuEdition varchar(256),

IsProtected float(24),

AutoSampleOptIn int,

PuaMode varchar(256),

SMode float(24),

IeVerIdentifier float(24),

SmartScreen varchar(256),

Firewall float(24),

UacLuaenable float(24),

Census_MDC2FormFactor varchar(256),

Census_DeviceFamily varchar(256),

Census_OEMNameIdentifier float(24),

Census_OEMModelIdentifier float(24),

Census_ProcessorCoreCount float(24),

Census_ProcessorManufacturerIdentifier float(24),

Census_ProcessorModelIdentifier float(24),

Census_ProcessorClass varchar(256),

Census_PrimaryDiskTotalCapacity float(24),

Census_PrimaryDiskTypeName varchar(256),

Census_SystemVolumeTotalCapacity float(24),

Census_HasOpticalDiskDrive int,

Census_TotalPhysicalRAM float(24),

Census_ChassisTypeName varchar(256),

Census_InternalPrimaryDiagonalDisplaySizeInInches float(24),

Census_InternalPrimaryDisplayResolutionHorizontal float(24),

Census_InternalPrimaryDisplayResolutionVertical float(24),

Census_PowerPlatformRoleName varchar(256),

Census_InternalBatteryType varchar(256),

Census_InternalBatteryNumberOfCharges float(24),

Census_OSVersion varchar(256),

Census_OSArchitecture varchar(256),

Census_OSBranch varchar(256),

Census_OSBuildNumber int,

Census_OSBuildRevision bigint,

Census_OSEdition varchar(256),

Census_OSSkuName varchar(256),

Census_OSInstallTypeName varchar(256),

Census_OSInstallLanguageIdentifier float(24),

Census_OSUILocaleIdentifier int,

Census_OSWUAutoUpdateOptionsName varchar(256),

Census_IsPortableOperatingSystem int,

Census_GenuineStateName varchar(256),

Census_ActivationChannel varchar(256),

Census_IsFlightingInternal float(24),

Census_IsFlightsDisabled float(24),

Census_FlightRing varchar(256),

Census_ThresholdOptIn float(24),

Census_FirmwareManufacturerIdentifier float(24),

Census_FirmwareVersionIdentifier float(24),

Census_IsSecureBootEnabled int,

Census_IsWIMBootEnabled float(24),

Census_IsVirtualDevice float(24),

Census_IsTouchEnabled int,

Census_IsPenCapable int,

Census_IsAlwaysOnAlwaysConnectedCapable float(24),

Wdft_IsGamer float(24),

Wdft_RegionIdentifier float(24),

HasDetections int

)

GO

이렇게 미리 쿼리를 통해 테이블을 생성해 두고 데이터를 추가한다. 쿼리 수행은 SSMS를 이용하거나, Azure Portal에서 수행도 가능하다.

Query a SQL Database using the query editor in the Azure portal (preview) — Azure SQL Database | Microsoft Docs

테이블 생성이 완료되면, Python으로 데이터베이스 접근을 테스트한다.

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

# install pyodbc in Ubuntu : https://docs.microsoft.com/en-us/sql/connect/python/pyodbc/step-1-configure-development-environment-for-pyodbc-python-development?view=sql-server-ver15#linux

# pip install pyodbc

import pyodbc

server = os.environ['SQLSERVER']

database = os.environ['DATABASE']

username = os.environ['USERNAME']

password = os.environ['PASSWORD']

driver = '{ODBC Driver 17 for SQL Server}'

db_con_str = 'DRIVER='+driver+';SERVER=tcp:'+server+';PORT=1433;DATABASE='+database+';UID='+username+';PWD='+ password

with pyodbc.connect(db_con_str) as conn:

with conn.cursor() as cursor:

cursor.execute("SELECT TOP 3 name, collation_name FROM sys.databases") # test run

row = cursor.fetchone()

while row:

print (str(row[0]) + " " + str(row[1]))

row = cursor.fetchone()

DB를 다룰 때는 주로 SQLAlchemy만 이용했는데, bulk insert 과정이고, 조회 처리가 없어 pyodbc를 이용해 쿼리만 처리한다.

Use Python to query a database — Azure SQL Database & SQL Managed Instance | Microsoft Docs

문서에서 python 구성과 Azure SQL Database 접속을 위한 여러 정보를 확인 가능하다.

SCOPED CREDENTIAL 생성

위에서 언급한 것처럼, SQL 서비스가 blob에 접근할 때 이용하게 되며, SAS token을 사용해 구성한다.

전반적인 흐름을 아래 문서에서 확인 가능.

Bulk access to data in Azure Blob storage — SQL Server | Microsoft Docs

?

1

2

3

4

5

6

7

8

sec_cred = "scred_bulk_ins"

query = "CREATE DATABASE SCOPED CREDENTIAL " + sec_cred + " WITH IDENTITY = 'SHARED ACCESS SIGNATURE', SECRET = '" + secret + "';"

print(query)

# set "SCOPED CREDENTIAL"

with pyodbc.connect(db_con_str) as conn:

with conn.cursor() as cursor:

cursor.execute(query)

이 과정에서 다음 오류가 발생할 수 있다.

ProgrammingError: (‘42000’, ‘[42000] [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]Please create a master key in the database or open the master key in the session before performing this operation. (15581) (SQLExecDirectW)’)

아래 문서에서 관련 내용을 찾을 수 있다.

Create a Database Master Key — SQL Server | Microsoft Docs

?

1

CREATE MASTER KEY ENCRYPTION BY PASSWORD = 'YOUR-MASTER-KEY';

이렇게, master key를 추가하면 해결된다.

이렇게 scoped credential을 생성했으면, 다음 과정인 external data source를 추가한다.

EXTERNAL DATA SOURCE 추가

일종의 mount 처리 과정이다. blob storage를 위의 credential로 SQL database가 접근 가능하도록 구성한다.

?

1

2

3

4

5

6

7

8

ext_ds = "malware_ds"

query = "CREATE EXTERNAL DATA SOURCE " + ext_ds + " WITH (TYPE = BLOB_STORAGE, LOCATION = 'https://" + AZURE_ACC_NAME + ".blob.core.windows.net', CREDENTIAL = " + sec_cred + ");"

print(query)

# set "EXTERNAL DATA SOURCE"

with pyodbc.connect(db_con_str) as conn:

with conn.cursor() as cursor:

cursor.execute(query)

credential과 생성하다가 data source를 추가하다가 수정할 일이 생기면 중복 불가라 약간 귀찮다.

오류가 생겨 재생성 할 경우, 아래 쿼리를 적절히 이용해 drop하고 재생성 하는 것을 권장한다.

?

1

2

--DROP DATABASE SCOPED CREDENTIAL scred_bulk_ins

--DROP EXTERNAL DATA SOURCE malware_ds

이제, bulk insert를 수행하기 위한 모든 준비가 완료되었다. bulk insert를 수행한다.

BULK INSERT 실행

다음과 같은 python 코드로 수행 가능하다.

?

1

2

3

4

5

6

7

8

sql_table_name = "malware"

query = "BULK INSERT " + sql_table_name + " FROM '" + container_name + "/" + csv_file_name + "' WITH (DATA_SOURCE = '" + ext_ds + "', DATAFILETYPE = 'char', FIELDTERMINATOR = ',', ROWTERMINATOR = '0x0a', TABLOCK, CODEPAGE = 'UTF-8', FIRSTROW=2);"

print(query)

# run "BULK INSERT"

with pyodbc.connect(db_con_str) as conn:

with conn.cursor() as cursor:

cursor.execute(query)

SQL 쿼리를 먼저 살펴보면 다음과 같다.

?

1

BULK INSERT malware FROM 'bulk-insert/train_1000000.csv' WITH (DATA_SOURCE = 'malware_ds', DATAFILETYPE = 'char', FIELDTERMINATOR = ',', ROWTERMINATOR = '0x0a', TABLOCK, CODEPAGE = 'UTF-8', FIRSTROW=2);

여러 시행착오와 오류가 발생했다. Windows의 CRLF가 아닌 UNIX LF 이슈가 있었으며, encoding 관련 이슈로 판단된다.

오류가 발생한 쿼리 #1

?

1

BULK INSERT malware FROM 'bulk-insert/train_10000.csv' WITH (DATA_SOURCE = 'malware_ds', FORMAT = 'CSV', FIRSTROW=2);

위의 쿼리를 최초 이용했으나, 다음 오류가 발생했다.

Msg 7301, Level 16, State 2, Line 98
Cannot obtain the required interface (“IID_IColumnsInfo”) from OLE DB provider “BULK” for linked server “(null)”.

Completion time: 2021–09–27T15:26:53.6117081+09:00

Cannot obtain the required interface (“IID_IColumnsInfo”) from OLE DB provider “BULK” for linked server “(null)”. · Issue #408 · microsoft/sql-server-samples (github.com)

내용을 리뷰해 보완하였다.

오류 발생 쿼리 #2

?

1

BULK INSERT malware FROM 'bulk-insert/train_10000.csv' WITH (DATA_SOURCE = 'malware_ds', DATAFILETYPE = 'char', FIELDTERMINATOR = ',', ROWTERMINATOR = '\n', FIRSTROW=2);

Msg 4866, Level 16, State 8, Line 117
The bulk load failed. The column is too long in the data file for row 1, column 83. Verify that the field terminator and row terminator are specified correctly.
Msg 7301, Level 16, State 2, Line 117
Cannot obtain the required interface (“IID_IColumnsInfo”) from OLE DB provider “BULK” for linked server “(null)”.

아마도, 필드 터미네이터 관련 이슈로, UNIX LF 관련 이슈로 예상했다.

Bulk insert, SQL Server 2000, unix linebreaks — Stack Overflow 내용을 참조해 ROWTERMINATOR를 수정했고 해결되었다.

참고로, “FIRSTROW=2”는 첫 줄 header를 건너뛰고, 데이터만 가져오는 속성이다.

이렇게 시행착오가 있었으나, 모두 잘 해결되었다.

BULK INSERT 실행 결과 및 속도

우선, 1만건의 데이터를 가장 작은 Azure SQL Database인 basic에서 검토했을 때

- 1만건(4.7MByte)에 약 11초가 소요되었다.

100만건 테스트는 기존 basic 사이즈에서 DTUs(S3) / 20GB Data 사이즈로 변경 후 수행했으며,
100만건에 267초가 소요되었다.

?

1

sp_spaceused

?

1

2

3

reserved data index_size unused

------------ ------------------ ------------------ ------------------

638048 KB 634448 KB 1784 KB 1816 KB

CSV는 480M이며, 대략적으로 630M 정도의 DB 사이즈가 소요되었다.

최종 900만건 테스트는

?

1

2

3

reserved data index_size unused

---------- ------------------ ------------------ ------------------

5522608 KB 5518976 KB 1856 KB 1776 KB

2219초 = 36분이 소요되었고, DB는 약 5.5G, CSV 파일은 약 4.1G 크기였다.

지난 포스팅에서 테스트했던 ADF와 비교할 때 ADF는 약 1시간 20분 이상 소요되어, 절반 이하의 시간에 완료되었으며, Azure SQL Database의 스케일을 조절하거나 일부 퍼포먼스 관련 튜닝을 수행하면 조금 더 속도가 나올 것으로 예상.

아직 ADX에서 CSV로 추출하는 과정은 수행하지 않았으나, 이 과정을 고려해도 충분히 좋은 성능을 제공할 것으로 판단됨.

참고링크

개발자 커뮤니티 SQLER.com — Azure Data Factory를 이용해 ADX에서 SQL로 900만건의 데이터 전송

개발자 커뮤니티 SQLER.com — Azure Data Explorer에서 SQL서버 데이베이스 테이블 조회/삽입 — sql_request plugin

개발자 커뮤니티 SQLER.com — Azure Data Explorer에 대량 CSV 파일 ingest

개발자 커뮤니티 SQLER.com — Azure Blob Storage SAS token 생성

Bulk access to data in Azure Blob storage — SQL Server | Microsoft Docs

--

--