ÿØÿà JFIF    ÿÛ „ !.%+&8&+/1555$;@;4?.451 4,$,44444444444414444444444444444444444444444444444444ÿÀ  á á" ÿÄ     ÿÄ ?    !1AQaq"2‘¡±ÁðBRbrÑá#‚’¢²3S CñÿÄ   ÿÄ !    !1QAa‘2ÿÚ   ? 5˜Z¯V¦cø)›t/? z¨±>Õ5€¶‹Á¤·¼z¼Ü¬+ñ®v¤¨_ˆR­BFn©—˜ý®ç̝P8gýt·ÉSTŦˆìät?þé¼íìN/Þa)ì–í6ô… Ï¿øÃj´¿KÇü]ÿ ªô¹-eKànëÕHTx}ýSÜ›ÿ ”7Ø×&µ<¦  ¥ÑO¶[Ù¯ä¨ÞÃÿ PZ-¬;#õ|•oaÿ ©CìÞz3˜öː/¤­ñTûIØ}š^ mÓ%ªxˆ¥ÉŸu=Z+ISe¿45™¼u;ú&WØ÷€æßQ™®{|íx*TC“#ZŠìZ§²‹ 6pv…³¿¡äª*áZÐ%ÒOáˆo"x«OHk w±æ+¬V(kMúŸ5Vö«$ ÁrÏbàb57/luR ¸ÑÛj Òµì`Мq­û žICÀÊ•©4€Âcà¨Ï€O´<èÐ:›ù(Ë^L8þ‘ÍÌ#¸Ð_Ì©ÙK(Öz 4¬û+¸;ü’V’84‘¬ÃŽ:[â‡ÔÌáõp¢~§ªlæ£ö{®G>J¼"°‡7¯ÆÉèßû ‹É‹§ÁòÃýâßî ^ƾÙõ‹×óH#«LP½ïX=xÑÍ$|W?•~• îëÔ©ª‹ {ÝT…Kÿ ”hûâá)J*ö˜–ÔU;iÇ€/ ÆþjóZ\ýwØ=Ìm ºèËL9 ýèÆð/¨’¥öo=nË.%Îì ŽÕ¯È|{Oj²ƒE6e/ßdÄõ²Ìâ1O®ò×TsəԸhOMýíMˆ¿¼H˜l²,7Â¥#MF/Úf°Ö½± ¸–dr‹NýÊ íjqx{œÉ ä-È ¦ øÄër¨q°ð †nцýÑÄÆ’mä…n<0È™;ÁÝá¯ÁZƒ7FÀmì­ É&9ˆîéi¶ùN§Y• ÃZãAâ?•‡©‰ , ó¾IŸŠc1 4â&y­&pŠ­6;M À 0¹qç»p.á …ŸÅáK@%6·y6ƒ‰3?”úºŽ‰éX5ªPT §µ!=Mž«Ú½‹ÅgÂSâÉaþÓoö–¯ÁÔìR>5éÿ üs¶ÆUcÌ kÇR ]ÿ ù¬¼«VŽ;Â|‡~¢¦”ÏŰæ {L™Õ°Óv¹ò¸írޡעCÃ!íVÕ {¶»sŒNPg/ "uÕbkm²“$ďå¿é¹§°½æz¯6 †s¿!s–wÚÝ“™Œ °.ûj>·+™Òa…©Œ&rÝÎtÛë긪Ît’LAVp%c Úý[ÄzJ¾ÇàXXç@˜ó<êL]·T˜¾¥1Ó©V‡g´æ½¦Ý@¹óø!_@´ÞâSÁ —S3™•& ]@JHÚý©ZŽ €×æÔr»Áf!‡yÞ4Mv*èÓã_{‘åóUuљØ«Oïé*®EvÑ Œ÷‡U \"㪒ÍK+À 4“M¡ï:0¥5í!'<@î´”>Ç»&Z–ïCCV˜Ì5Šo&îhè.žû |ÓK©h$s6KìŒëã)¹hI¦GïOåóI;ììü#É$Š0…Ææ¥TØ.5­¾gn´ “ÂÖ\:hœ89G)J@„}œ:’Ò{/Š"¦_Æ×7Æ3VÇŠÊa]ÚŒÙ€Ä–=®uÁßâACZƒ§§£ Qnâ:«,×{tyø¬iÛcœÜÄ€H½ÄÍCk´÷šß .W'b¤Íåh]÷€=,Žv×cÚEÚHXJX¶îo¨FÒtèöŸ>ªª6[J®Fµ£sGÁeqõfe\íjÒÐïÄÐGˆe1Ø‹.Ø”‘Ëuø Y­ˆÜ ŽG|zùªüMpDnQWÄ”%JŠ™)â*p@Örš«ÕT2Ð%ˆG#ª„ ·¤!°ŸOTÂT¸aÚ%4&h™LµšØüÐ.F¿²ÐÞ_Ç‚¾ÅÃaÜ÷09Æ q€öy˜v‡85õN÷]¬äѼóS{°_MެúÔ#°Ç¸0åÞè2ëôPcvÆw9®ií1Ä8F™˜à‰´+‰Ik1òÝ7“Ñ×ÒsÝ\x‚h`ÞÑ`ó"|µEcý£n˜h`}GÞ !±ù²Ápü²ß6 0ïi󜵩SÈÇ7˜-ÕURO˜¦´f$ªž-Í6(œ}<„ éc øs]ŽŽ„*—¾ ìdŽ„)méª\¿êÎIg¾ØÞ~I#C/¼¼´EÁÈŽi8“©õådô·>euä ƒ'Ê×लR1ÉJE1ÐAát`t;ÇР%Ý<‡¥„ÍÆ`×Oyó)õiI€ñQaŸ4Ûù\áàaÃÔ¹HÃu¹*k€¦<„e S‡&õÏ B!ŽhüÞ`yj}mªf×\¿ Ç~æ­9‡û\՞Ǖg²1Žû5V7 !àöšm° c`ܬøÇìµÒ'P"?…´Ö,"§^•õލsÔ)6˜sæéÍR¼ ò|Sl”‹7 nPW Gòú÷½§O¯‡„l¡kSÞŒr½PÊ@æ¢pŽ-mÿ #Ÿ˜Àº¶Áä¦;ïÔæ$1££`“Õ>„—·ž)ßð³ñ#Ï Ô$¶œ‰ÊE‹À;÷º ¯«P:Ñ”8–IÊtpÞ3ª“>ê“þës4ò2OÏÕ­±zô†Õ§‰.÷ä¸;¿˜“'œ›žª}«Œ{ª±Ì 9ÔóÞÕ‡0 $íWV3Üì¬ —@kÝ4@¿r¼±½¬™›?øØæ´'Áé®CË3-g$˜ö‡×auÚi´Žp/êÛ æF›Ú2v‹ã¿¿,nB1̨ƃqÞa5͝@&Æû“él÷ \C²½UÍc ¯k×¢U ÖéQå™—-r wô ÞÏ<Ò=&=ÿ Ôê Òêˈt,i—;LîÜ á¸*ÚÃ1$êL•LÍ <É)ýÐà’ ;F™{ƒ™˜€&'}‚ãÄK`¡ÞT@I;®žZóè‚s’7®°›+§O­Åq©é»²9<Ô J ¼9O’HL»Ùïì¸rk¼Ž_ý‘TŸu[²ßÚŒ·ü÷B%¯E ŸÔX5êO´ Ç•€’I0 ÉJX` ñ¹õ%;µŸD‘«´€àwÒ™U ûئžÖö\×®×´8 ½‡ºÐÆÓ§?Àkmœ=;d5*@-ì0F Rªýš[Ü6âö̃ڸr*KA9· u*µæ£?U¸Âêí†8@¦X4 e-ò„0s{ HâUpU?¼mñRa°®a%Ð'tÉ×’\¾ÊÉ]t›h>·(Ë@R¼¡Ãt h}’O÷au<+nT…Ö…MӐ??Óe95 q>í/;&JSû °¯ÊéÞ øƒ*Ã2½Ài&:nôUl=¾¿5eˆ3”ñc|Ú2V”>„»&eE;«ÚäC p¢Û úy 9š[ŒÌx¼擼A&DåÒ¯ˆ¤ÀÌ;"˜ ÏQä¸åhÊ}Ûq«Û0WžÒ|»€ø®öCm5•\ÇÀ§Pe3£]0ÃàLDÉ‰1øªxjgwT‚÷¿LΨK‹›ùs—xˆÜ±µ kæ¸f‰‰ÜGk/LÛØ6d9ò¶ùA{ƒA3š/¬D¬khÓk‰`˜"㯒r¿±Óã jx‡°e}<Ñø\3y:'À•/h½Í€Ç4~g ?Û(¼]v‘ªlKÎâ~?O‚W%{Ì:“'©úNq¾›úo(X’¥¯ˆ nFê{Ç€ü?º'ë ø‹ì Þ09ŒÌç9Æ —ËC`j@ÓÄ(+a‹un¸#ÂꟋ{K`‘ÑÍÍ'à´»/Û,KW;Þ4²þð ï Nm|~fGÏ(…³Ã)«1ö­Õ ¥‡¨©ƒÃ™ü-s=à=U66Ï«Ýc蓦W¹íž®›nÔ%êÇìŒ<#Ü×84ån®Ð ÒåOC` ñânÑs‡¢ç 1õ%Îhì½Ã½® e:ݼUZo™`  ÅZŸŒÊ«ê1ÏÄo$q¹Þ€©ˆhÐÉä¯ñ[!…Ú˜àJ:x2$Íß&PåT£6ç— ‡Í*4Ýšçjÿ ‰É nófÐ ó(L5C•åÆ\rMÒ@ò }y-W}™üýVù—ú¢=Ù”c®‘< M ž ´Phr ¦©TD ‘ù.$´÷O‡‘V2Æò.=IUŒ=ž‡â¬i™aþÓåÙ?òUø'ØÖ•.~* šTŒ!•-×áºTâ®ä#õü'´ eýlYÅÓeÕKÂrT"CÚ@u!Óxƒ{š3€}1¿(r}%«nËamjÑ%ÑNEò v ˜à  σöK³,*º.àzù¨™Ó ÚçâU¦*¿ 9{%Ö¹ njûdaXöb) kÛÆ±ûÓ\°M7ˆÂ=û›ç¿Ã‚­V»Cg–8ÙêE- j)k$º`Ã-ùEýeBÆÇ]c¡°ñty&Òd0nõ'¡W+ƒ*|–øµFa\GQªEAÔp5\Ǽ·¼Ç8·õ -â§Ú[ ‡ uZeÖ 3}×d'+¹:ð+K†Û®s!Ï$úe€<Û”x)1»a­¡LC]¸µík…ÚàA»AYº{†ªS[¦5HÒ7ù --,ísòDØ€èk ÞÀîÜ ò@â( ËNˆë›4ô½•/¦o‡€Û7 ê•ÆêòðÜy'Án½µ á˜ݦ ndeo…[ì¶Ê,¥R³Ä=À±—–ß;£™´ñSâ*g§”ïaið‘Jå~™ÓÞ ß³Õ¢»8x埒²52>AÊb&-÷\7´éÄù€T˜,w;3{ï˜k…à¹ÄqÀ«œ{€\ ˆ¾[´¨јr &Úé„Ívˆ±8†¿]|¬ņ4I×pÞS1ÈÖz‰#Ìv‡G!YNògñ:màTz¢Ý1ô©^O=~ë|5Bã™ç•¼µõ•bÆ@úÕS¬ÈŒ#¬zünrŸ û” Z²•èðV"ÁHÚý©wÝ €7¼Ìu1hÑa3Éä û f$o¿É ™Ú›ÝçnpÒ3äÌ3†Í§,Äï]$‰/pê †«À¼¸e9­Æê_C]žƒ·ý·frÁN«, E=›Çq -‰öŒ:aÏ¿±í&£Í:-} 84‘ÿ eƒQÑeëSsuiA ³g㟥ú£?ÿ ʼn*”“÷aühe:ÊWa@ÒÞk±eØ] F Ô—r.åä˜ @ö¥ªZoÐýYL·¥S²G/‡ñ <~*ZÆ´è>JlòàÛÆ½ÿ 窘ìGN¢:I®KšJp/`íIÁÀõ#Ä-€ö­šµŒoF4|ÆQØÆ@Ì|£Ô…¢À{9˜è½Üó›€ôYÒÎYsið;ís¤€à²ˆ‚4qÉVŒI$ ‰"° æµ8cXGjœˏ¡Aâý•ËÜ¢ûï e·çLx']á"oÅÎê3¯Ç—¹”ó0nå‚âg{Œñ> S´˜îè°g238‚ãköÝfÚd´6Ò€;ò÷±¢™¼›º ¢Æ'¥Ðx'e¬ç ]bÈÆV¢ó‹kýBO ðÊâ$Ÿ!×T 3Mýמ žìٍàÌü‘8÷€àæØ8æ©6‰©L´«…oãpð„~Çk‰!ñ;‹”ÛžÍ àž±z Ÿôû øŸÝužÏ;ÿ #|u6™Þ¬ÚˆÐõA4¶â|ôl|Ê2ŽÇ¤ÝÅÇY.<#Aí.k§hóF‚”Y; M½Ö4hŸ4&›­¿tès´%FìL¥£Ãk‰ÇT¤haÁ¤ÚxfÉ`ÑìË›>i 3t‚:,–+^÷´–{Û–Nxi"x‘Ûg î¨>¥Õ܁ùZH,2Û“:8xÊ¢Çí9.É-Ìâã-=çjwµS˜dütžçwýGòú®®ûº_ˆýx$–¡ãøO EÚÛÏ÷R„×w+3£Á£öUMyR²¹âŒ°š›¸Ñãò9§Ó_Dl+Ùßc›úšGÅÌc†Ž!Ko=¶.‘Îÿ c²(2®V mª.ÿ ¹B›¹å ù„öŸSV>™ü¯$y:G¢Z×àøúdî¹û­·ýÇ´:•c LÍõi_‹ö+ÎæGÊè>OŠ•äž´§Þ{X}¨1ÚTc›»Qþ•êô°t¿OP?eæ~É{5]•ÙR£r5†nZ\ã@ &îJõ ¾àC°þV>fé¥/ü5ñÊIº_é5 ;e­h<@ Ä&æÃëE%;X,ÒãÆÞ`Oò¦kŸm#˜!ÀyÄ¢| óLšò¥Ä` ¶R=|ÈCâh5ò3DˆïF†ðÒ#ÅìÛœ?¸yhBãœí ZxßÎÄhºRK„`Þödvײ™ÀÈÑÒgŒuY w³%†ƒÓzõ ÖÏp‚dH®¦A´ù§»ÓÇMæ~)ˆð‡û:ù&Ä •vGD´À n ݇¼Ö8Fö óáà£~Ë¥x`oK|Ä?fxiØü%pìR>éò+Û±éÎ>núlFŤ'tq8LZÏvÃ?„¡ß±È⽆¯³íü@x|PöUäèØã¡ð‚ŒAìÏ"vÍwóŸÍ{ ý0.z È•Ö{,N¡£¡ŸKÕÙž>Ýœþ ÍÀ°<×EA!Å‚D™IúOÍ¡>ôG}Â` ÍßkÜL™Ž Þð™ {IøF²¹òQ3&!ÃÂÞz.d&Ï-sH¸,Ôõ˜ŽP€ 77ˆÝ¼ÊëÜw =cÕ Ú,ØÐ5ÎYÐ)ì´öœgŒ[¤ßv㙑8心>h]§µháYš£²ºÑ.{Ï7Sð•?´~×SÃKýJÛ˜ ™Íäiúu<µX¶1õ^kâçIÑ£sZ4h>j*ÔšD:4­¿_ ÷¸ Õxæÿ ¸?Mù _•­ÊÐ ä ÷ý ÑwL œ­ïnTkÛUÍN©ë:¦fV ¶ÜÔÜMªÅâA½–¿R×TXš-%iTÊT•‡Ù‚JôϐZxWÑè‰f‰òG º ×Õû2aZ7OU3[“×AT–ÞŒ…-‘¤”Ì ì&(ˆ¿­•ƒkï’:ðY¦W‘ Å)“†‘˜³Åtcø˜ñTÂwÚÇ4|üLÇªí–v- qˆèU qPE.†â‘˜µ Æ,ÐÅs]8¾„oúÑ i>ÜxxÈó)ƒ ´æÁâØ$À‰vžŸf$Ž |ãw;ÀÁIJ»b` {¦Ó¤Ú$©YÀ‘n@Óïž«9J¼êG m¤ ܯ¹ÌW4€ÐÒÅÛ‡#褕Ÿn-?í|с¥÷Ú¹¬'´ÞÜ9ÓK `hê£SÄSà?7—Wí_´…óB›»:=Ãïq`<8ñÓŒÑlú2d¬ê³£hÖ[l|$vÝro~'R®‰§°ñmY ͧäP |PUª¹·:3Œ[Û{Xÿ ºâ@‚W–Äé u‚ ¯´*=íή.pûÒdt @G‰¬ s¸ ëÉücr ÞæÑ¨Ê@>¤¢Ö±. Þ'¯°ÌME[YéïĵÂCå½ Ué©Áû'Ê9%eÔðNU”ë‘ÌsD3/®+UI˜9h.WC”빓$#:pz:YÓ ¿xž* ³$Í +$kñAŠ‹†¢ Uê>¸)_š¬÷©ßAÂÔb9ÇU ¯¾á•9¯ÏÏ÷O÷¼¼Fähal1‰3Ì[Ïr•´UCksNÐ] R‘¸¥H+§Šé†c©vÖÞ0iÓ76s†î!§=ß ¼~Ô'°Ãmäoäš³ªøi1úÉ)³yV8 CLÄØÁ‘WYïi€H6ÖÑiámø^ÈY´°Ñ7¥Û*—Ñ©L«Qƒï—Ùrÿ ›£Ð*š¸ˆL©ˆ$ˆ ÷¾D§9È®«qbqC)–ˆïv´çñsÑVT­Ø, <àïºÀO«Jý·õ àfPìð .wFšir´þ’2_Y *Æ€x\« ì€9š@ Ž|F⇥ˆkZ@hÖÄ0t¿-<“‹qµ¾*ZL¤Ú)&BJpÓF5=$„at*Zš$’ÑtdûÝRI1 2މ$€$I$#‰SÞ’Hë¬ï;Á$¡t$’`<(ñÇt)$‡Ð.Êf¢X’Kt=Éé$‚ˆªè¢oÝëòI%Rgcª÷ŠyI%¡‰ÿ !ñ)´õ $¤ Ô’IIGÿÙimport types import asyncio import socket import warnings import sys from functools import partial from collections import deque from contextlib import contextmanager from .util import ( encode_command, wait_ok, _NOTSET, _set_result, _set_exception, coerced_keys_dict, decode, parse_url, get_event_loop, ) from .parser import Reader from .stream import open_connection, open_unix_connection from .errors import ( ConnectionClosedError, ConnectionForcedCloseError, RedisError, ProtocolError, ReplyError, WatchVariableError, ReadOnlyError, MaxClientsError ) from .pubsub import Channel from .abc import AbcChannel from .abc import AbcConnection from .log import logger __all__ = ['create_connection', 'RedisConnection'] MAX_CHUNK_SIZE = 65536 _PUBSUB_COMMANDS = ( 'SUBSCRIBE', b'SUBSCRIBE', 'PSUBSCRIBE', b'PSUBSCRIBE', 'UNSUBSCRIBE', b'UNSUBSCRIBE', 'PUNSUBSCRIBE', b'PUNSUBSCRIBE', ) async def create_connection(address, *, db=None, password=None, ssl=None, encoding=None, parser=None, loop=None, timeout=None, connection_cls=None): """Creates redis connection. Opens connection to Redis server specified by address argument. Address argument can be one of the following: * A tuple representing (host, port) pair for TCP connections; * A string representing either Redis URI or unix domain socket path. SSL argument is passed through to asyncio.create_connection. By default SSL/TLS is not used. By default any timeout is applied at the connection stage, however you can set a limitted time used trying to open a connection via the `timeout` Kw. Encoding argument can be used to decode byte-replies to strings. By default no decoding is done. Parser parameter can be used to pass custom Redis protocol parser class. By default hiredis.Reader is used (unless it is missing or platform is not CPython). Return value is RedisConnection instance or a connection_cls if it is given. This function is a coroutine. """ assert isinstance(address, (tuple, list, str)), "tuple or str expected" if isinstance(address, str): address, options = parse_url(address) logger.debug("Parsed Redis URI %r", address) db = options.setdefault('db', db) password = options.setdefault('password', password) encoding = options.setdefault('encoding', encoding) timeout = options.setdefault('timeout', timeout) if 'ssl' in options: assert options['ssl'] or (not options['ssl'] and not ssl), ( "Conflicting ssl options are set", options['ssl'], ssl) ssl = ssl or options['ssl'] if timeout is not None and timeout <= 0: raise ValueError("Timeout has to be None or a number greater than 0") if connection_cls: assert issubclass(connection_cls, AbcConnection),\ "connection_class does not meet the AbcConnection contract" cls = connection_cls else: cls = RedisConnection if loop is not None and sys.version_info >= (3, 8, 0): warnings.warn("The loop argument is deprecated", DeprecationWarning) if isinstance(address, (list, tuple)): host, port = address logger.debug("Creating tcp connection to %r", address) reader, writer = await asyncio.wait_for(open_connection( host, port, limit=MAX_CHUNK_SIZE, ssl=ssl), timeout) sock = writer.transport.get_extra_info('socket') if sock is not None: sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) address = sock.getpeername() address = tuple(address[:2]) else: logger.debug("Creating unix connection to %r", address) reader, writer = await asyncio.wait_for(open_unix_connection( address, ssl=ssl, limit=MAX_CHUNK_SIZE), timeout) sock = writer.transport.get_extra_info('socket') if sock is not None: address = sock.getpeername() conn = cls(reader, writer, encoding=encoding, address=address, parser=parser) try: if password is not None: await conn.auth(password) if db is not None: await conn.select(db) except Exception: conn.close() await conn.wait_closed() raise return conn class RedisConnection(AbcConnection): """Redis connection.""" def __init__(self, reader, writer, *, address, encoding=None, parser=None, loop=None): if loop is not None and sys.version_info >= (3, 8): warnings.warn("The loop argument is deprecated", DeprecationWarning) if parser is None: parser = Reader assert callable(parser), ( "Parser argument is not callable", parser) self._reader = reader self._writer = writer self._address = address self._waiters = deque() self._reader.set_parser( parser(protocolError=ProtocolError, replyError=ReplyError) ) self._reader_task = asyncio.ensure_future(self._read_data()) self._close_msg = None self._db = 0 self._closing = False self._closed = False self._close_state = asyncio.Event() self._reader_task.add_done_callback(lambda x: self._close_state.set()) self._in_transaction = None self._transaction_error = None # XXX: never used? self._in_pubsub = 0 self._pubsub_channels = coerced_keys_dict() self._pubsub_patterns = coerced_keys_dict() self._encoding = encoding self._pipeline_buffer = None def __repr__(self): return ''.format(self._db) async def _read_data(self): """Response reader task.""" last_error = ConnectionClosedError( "Connection has been closed by server") while not self._reader.at_eof(): try: obj = await self._reader.readobj() except asyncio.CancelledError: # NOTE: reader can get cancelled from `close()` method only. last_error = RuntimeError('this is unexpected') break except ProtocolError as exc: # ProtocolError is fatal # so connection must be closed if self._in_transaction is not None: self._transaction_error = exc last_error = exc break except Exception as exc: # NOTE: for QUIT command connection error can be received # before response last_error = exc break else: if (obj == b'' or obj is None) and self._reader.at_eof(): logger.debug("Connection has been closed by server," " response: %r", obj) last_error = ConnectionClosedError("Reader at end of file") break if isinstance(obj, MaxClientsError): last_error = obj break if self._in_pubsub: self._process_pubsub(obj) else: self._process_data(obj) self._closing = True get_event_loop().call_soon(self._do_close, last_error) def _process_data(self, obj): """Processes command results.""" assert len(self._waiters) > 0, (type(obj), obj) waiter, encoding, cb = self._waiters.popleft() if isinstance(obj, RedisError): if isinstance(obj, ReplyError): if obj.args[0].startswith('READONLY'): obj = ReadOnlyError(obj.args[0]) _set_exception(waiter, obj) if self._in_transaction is not None: self._transaction_error = obj else: if encoding is not None: try: obj = decode(obj, encoding) except Exception as exc: _set_exception(waiter, exc) return if cb is not None: try: obj = cb(obj) except Exception as exc: _set_exception(waiter, exc) return _set_result(waiter, obj) if self._in_transaction is not None: self._in_transaction.append((encoding, cb)) def _process_pubsub(self, obj, *, process_waiters=True): """Processes pubsub messages.""" kind, *args, data = obj if kind in (b'subscribe', b'unsubscribe'): chan, = args if process_waiters and self._in_pubsub and self._waiters: self._process_data(obj) if kind == b'unsubscribe': ch = self._pubsub_channels.pop(chan, None) if ch: ch.close() self._in_pubsub = data elif kind in (b'psubscribe', b'punsubscribe'): chan, = args if process_waiters and self._in_pubsub and self._waiters: self._process_data(obj) if kind == b'punsubscribe': ch = self._pubsub_patterns.pop(chan, None) if ch: ch.close() self._in_pubsub = data elif kind == b'message': chan, = args self._pubsub_channels[chan].put_nowait(data) elif kind == b'pmessage': pattern, chan = args self._pubsub_patterns[pattern].put_nowait((chan, data)) elif kind == b'pong': if process_waiters and self._in_pubsub and self._waiters: self._process_data(data or b'PONG') else: logger.warning("Unknown pubsub message received %r", obj) @contextmanager def _buffered(self): # XXX: we must ensure that no await happens # as long as we buffer commands. # Probably we can set some error-raising callback on enter # and remove it on exit # if some await happens in between -> throw an error. # This is creepy solution, 'cause some one might want to await # on some other source except redis. # So we must only raise error we someone tries to await # pending aioredis future # One of solutions is to return coroutine instead of a future # in `execute` method. # In a coroutine we can check if buffering is enabled and raise error. # TODO: describe in docs difference in pipeline mode for # conn.execute vs pipeline.execute() if self._pipeline_buffer is None: self._pipeline_buffer = bytearray() try: yield self buf = self._pipeline_buffer self._writer.write(buf) finally: self._pipeline_buffer = None else: yield self def execute(self, command, *args, encoding=_NOTSET): """Executes redis command and returns Future waiting for the answer. Raises: * TypeError if any of args can not be encoded as bytes. * ReplyError on redis '-ERR' responses. * ProtocolError when response can not be decoded meaning connection is broken. * ConnectionClosedError when either client or server has closed the connection. """ if self._reader is None or self._reader.at_eof(): msg = self._close_msg or "Connection closed or corrupted" raise ConnectionClosedError(msg) if command is None: raise TypeError("command must not be None") if None in args: raise TypeError("args must not contain None") command = command.upper().strip() is_pubsub = command in _PUBSUB_COMMANDS is_ping = command in ('PING', b'PING') if self._in_pubsub and not (is_pubsub or is_ping): raise RedisError("Connection in SUBSCRIBE mode") elif is_pubsub: logger.warning("Deprecated. Use `execute_pubsub` method directly") return self.execute_pubsub(command, *args) if command in ('SELECT', b'SELECT'): cb = partial(self._set_db, args=args) elif command in ('MULTI', b'MULTI'): cb = self._start_transaction elif command in ('EXEC', b'EXEC'): cb = partial(self._end_transaction, discard=False) encoding = None elif command in ('DISCARD', b'DISCARD'): cb = partial(self._end_transaction, discard=True) else: cb = None if encoding is _NOTSET: encoding = self._encoding fut = get_event_loop().create_future() if self._pipeline_buffer is None: self._writer.write(encode_command(command, *args)) else: encode_command(command, *args, buf=self._pipeline_buffer) self._waiters.append((fut, encoding, cb)) return fut def execute_pubsub(self, command, *channels): """Executes redis (p)subscribe/(p)unsubscribe commands. Returns asyncio.gather coroutine waiting for all channels/patterns to receive answers. """ command = command.upper().strip() assert command in _PUBSUB_COMMANDS, ( "Pub/Sub command expected", command) if self._reader is None or self._reader.at_eof(): raise ConnectionClosedError("Connection closed or corrupted") if None in set(channels): raise TypeError("args must not contain None") if not len(channels): raise TypeError("No channels/patterns supplied") is_pattern = len(command) in (10, 12) mkchannel = partial(Channel, is_pattern=is_pattern) channels = [ch if isinstance(ch, AbcChannel) else mkchannel(ch) for ch in channels] if not all(ch.is_pattern == is_pattern for ch in channels): raise ValueError("Not all channels {} match command {}" .format(channels, command)) cmd = encode_command(command, *(ch.name for ch in channels)) res = [] for ch in channels: fut = get_event_loop().create_future() res.append(fut) cb = partial(self._update_pubsub, ch=ch) self._waiters.append((fut, None, cb)) if self._pipeline_buffer is None: self._writer.write(cmd) else: self._pipeline_buffer.extend(cmd) return asyncio.gather(*res) def close(self): """Close connection.""" self._do_close(ConnectionForcedCloseError()) def _do_close(self, exc): if self._closed: return self._closed = True self._closing = False self._writer.transport.close() self._reader_task.cancel() self._reader_task = None self._writer = None self._reader = None self._pipeline_buffer = None if exc is not None: self._close_msg = str(exc) while self._waiters: waiter, *spam = self._waiters.popleft() logger.debug("Cancelling waiter %r", (waiter, spam)) if exc is None: _set_exception(waiter, ConnectionForcedCloseError()) else: _set_exception(waiter, exc) while self._pubsub_channels: _, ch = self._pubsub_channels.popitem() logger.debug("Closing pubsub channel %r", ch) ch.close(exc) while self._pubsub_patterns: _, ch = self._pubsub_patterns.popitem() logger.debug("Closing pubsub pattern %r", ch) ch.close(exc) @property def closed(self): """True if connection is closed.""" closed = self._closing or self._closed if not closed and self._reader and self._reader.at_eof(): self._closing = closed = True get_event_loop().call_soon(self._do_close, None) return closed async def wait_closed(self): """Coroutine waiting until connection is closed.""" await self._close_state.wait() @property def db(self): """Currently selected db index.""" return self._db @property def encoding(self): """Current set codec or None.""" return self._encoding @property def address(self): """Redis server address, either host-port tuple or str.""" return self._address def select(self, db): """Change the selected database for the current connection.""" if not isinstance(db, int): raise TypeError("DB must be of int type, not {!r}".format(db)) if db < 0: raise ValueError("DB must be greater or equal 0, got {!r}" .format(db)) fut = self.execute('SELECT', db) return wait_ok(fut) def _set_db(self, ok, args): assert ok in {b'OK', 'OK'}, ("Unexpected result of SELECT", ok) self._db = args[0] return ok def _start_transaction(self, ok): assert self._in_transaction is None, ( "Connection is already in transaction", self._in_transaction) self._in_transaction = deque() self._transaction_error = None return ok def _end_transaction(self, obj, discard): assert self._in_transaction is not None, ( "Connection is not in transaction", obj) self._transaction_error = None recall, self._in_transaction = self._in_transaction, None recall.popleft() # ignore first (its _start_transaction) if discard: return obj assert isinstance(obj, list) or (obj is None and not discard), ( "Unexpected MULTI/EXEC result", obj, recall) # TODO: need to be able to re-try transaction if obj is None: err = WatchVariableError("WATCH variable has changed") obj = [err] * len(recall) assert len(obj) == len(recall), ( "Wrong number of result items in mutli-exec", obj, recall) res = [] for o, (encoding, cb) in zip(obj, recall): if not isinstance(o, RedisError): try: if encoding: o = decode(o, encoding) if cb: o = cb(o) except Exception as err: res.append(err) continue res.append(o) return res def _update_pubsub(self, obj, *, ch): kind, *pattern, channel, subscriptions = obj self._in_pubsub, was_in_pubsub = subscriptions, self._in_pubsub # XXX: the channels/patterns storage should be refactored. # if code which supposed to read from channel/pattern # failed (exception in reader or else) than # the channel object will still reside in memory # and leak memory (messages will be put in queue). if kind == b'subscribe' and channel not in self._pubsub_channels: self._pubsub_channels[channel] = ch elif kind == b'psubscribe' and channel not in self._pubsub_patterns: self._pubsub_patterns[channel] = ch if not was_in_pubsub: self._process_pubsub(obj, process_waiters=False) return obj @property def in_transaction(self): """Set to True when MULTI command was issued.""" return self._in_transaction is not None @property def in_pubsub(self): """Indicates that connection is in PUB/SUB mode. Provides the number of subscribed channels. """ return self._in_pubsub @property def pubsub_channels(self): """Returns read-only channels dict.""" return types.MappingProxyType(self._pubsub_channels) @property def pubsub_patterns(self): """Returns read-only patterns dict.""" return types.MappingProxyType(self._pubsub_patterns) def auth(self, password): """Authenticate to server.""" fut = self.execute('AUTH', password) return wait_ok(fut)