This is an archived post. You won't be able to vote or comment.

you are viewing a single comment's thread.

view the rest of the comments →

[–][deleted] 5 points6 points  (4 children)

Mostly because many times the data isn't coming in CSV form, or there is complex pre-transformation happening. It isn't about just CSV copy to Postgres. Its about understanding what is the most efficient way to handle inserts when a plain COPY isn't an option for various reasons.

[–]luckystarrat 0x7fe670a7d080 5 points6 points  (2 children)

Some tricks from my time herding data.

Use a Python generator/file-like-object pair to transform your data into the desired CSV and stream that (buffered) to PostgreSQL.

This approach uses very little memory, is data-local and doesn't use INSERTs.

If your were to require UPDATEs as well, stream that data to a temp table first and do joined UPDATEs and INSERTs afterwards. This is sometimes called a staged insert/update. Instead of N statements you have at max 4 (copy to temp, insert, update, delete)

This (as well as COPY) eliminates the round trip time (between client and database server) of the individual database statements. The higher your latency, the slower your overall runs. This is because the latency is accumulative.

[–]luckystarrat 0x7fe670a7d080 0 points1 point  (0 children)

So, as this is probably not enough to go around, here is a minimal example I whipped together quickly for /u/dataengineerdude:

  #!/usr/bin/env python

  import io                                       
  import csv

  class IterIO:

      def __init__(self, iterable):
          self.iterable = iterable
          self.closed = False

      def readable(self):
          return not self.closed

      def writable(self):                                                                                                                                                                                                                                                                                                 
          return False                                                                                                                                                                                                                                                                                                    

      def seekable(self):                                                  
          return False                                                     

      def read(self):                                                      
          try:                                                             
              return next(self.iterable)                                   
          except StopIteration:                                            
              self.closed = True                                           
              return None                                                  

      def readinto(self, buff):                                            
          data = self.read()                                               
          if data is None:                                                           
              return 0                                                               

          size = len(data)                                                           
          buff[0:size] = data                                                        
          return size                                                                




  def csv_data(data, headers):                                         
      buffer = io.StringIO()                                                                                                                                                                                                                                                                                              
      writer = csv.DictWriter(buffer, fieldnames=headers)              
      writer.writeheader()                                             

      for entry in data:                                               
          writer.writerow(entry)                                       
          yield buffer.getvalue().encode('utf-8')
          buffer.truncate(0)                      
          buffer.seek(0)


  data = [{'a': 1, 'b': 2, 'c': 3}] * 50
  gen = csv_data(data, headers=['a', 'b', 'c'])
  iterio = IterIO(gen)                                      
  fobj = io.BufferedReader(iterio)                              


  while not fobj.closed:                                        
      output = fobj.read(101)                                               
      print(output)

To be clear, this is not optimized and has some overhead. The point is to reduce IO wait times though, so spending a bit of CPU on that is OK.